| File: | home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h | 
| Warning: | line 282, column 22 Dereference of null pointer (loaded from variable 'snap_released') | 
[?] Use j/k keys for keyboard navigation
| 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |||
| 2 | // This source code is licensed under both the GPLv2 (found in the | |||
| 3 | // COPYING file in the root directory) and Apache 2.0 License | |||
| 4 | // (found in the LICENSE.Apache file in the root directory). | |||
| 5 | ||||
| 6 | #pragma once | |||
| 7 | #ifndef ROCKSDB_LITE | |||
| 8 | ||||
| 9 | #ifndef __STDC_FORMAT_MACROS | |||
| 10 | #define __STDC_FORMAT_MACROS | |||
| 11 | #endif | |||
| 12 | ||||
| 13 | #include <inttypes.h> | |||
| 14 | #include <mutex> | |||
| 15 | #include <queue> | |||
| 16 | #include <set> | |||
| 17 | #include <string> | |||
| 18 | #include <unordered_map> | |||
| 19 | #include <vector> | |||
| 20 | ||||
| 21 | #include "db/db_iter.h" | |||
| 22 | #include "db/pre_release_callback.h" | |||
| 23 | #include "db/read_callback.h" | |||
| 24 | #include "db/snapshot_checker.h" | |||
| 25 | #include "rocksdb/db.h" | |||
| 26 | #include "rocksdb/options.h" | |||
| 27 | #include "rocksdb/utilities/transaction_db.h" | |||
| 28 | #include "util/set_comparator.h" | |||
| 29 | #include "util/string_util.h" | |||
| 30 | #include "utilities/transactions/pessimistic_transaction.h" | |||
| 31 | #include "utilities/transactions/pessimistic_transaction_db.h" | |||
| 32 | #include "utilities/transactions/transaction_lock_mgr.h" | |||
| 33 | #include "utilities/transactions/write_prepared_txn.h" | |||
| 34 | ||||
| 35 | namespace rocksdb { | |||
| 36 | ||||
| 37 | // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. | |||
| 38 | // In this way some data in the DB might not be committed. The DB provides | |||
| 39 | // mechanisms to tell such data apart from committed data. | |||
| 40 | class WritePreparedTxnDB : public PessimisticTransactionDB { | |||
| 41 | public: | |||
| 42 | explicit WritePreparedTxnDB(DB* db, | |||
| 43 | const TransactionDBOptions& txn_db_options) | |||
| 44 | : PessimisticTransactionDB(db, txn_db_options), | |||
| 45 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), | |||
| 46 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), | |||
| 47 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), | |||
| 48 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), | |||
| 49 | FORMAT(COMMIT_CACHE_BITS) { | |||
| 50 | Init(txn_db_options); | |||
| 51 | } | |||
| 52 | ||||
| 53 | explicit WritePreparedTxnDB(StackableDB* db, | |||
| 54 | const TransactionDBOptions& txn_db_options) | |||
| 55 | : PessimisticTransactionDB(db, txn_db_options), | |||
| 56 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), | |||
| 57 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), | |||
| 58 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), | |||
| 59 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), | |||
| 60 | FORMAT(COMMIT_CACHE_BITS) { | |||
| 61 | Init(txn_db_options); | |||
| 62 | } | |||
| 63 | ||||
| 64 | virtual ~WritePreparedTxnDB(); | |||
| 65 | ||||
| 66 | virtual Status Initialize( | |||
| 67 | const std::vector<size_t>& compaction_enabled_cf_indices, | |||
| 68 | const std::vector<ColumnFamilyHandle*>& handles) override; | |||
| 69 | ||||
| 70 | Transaction* BeginTransaction(const WriteOptions& write_options, | |||
| 71 | const TransactionOptions& txn_options, | |||
| 72 | Transaction* old_txn) override; | |||
| 73 | ||||
| 74 | // Optimized version of ::Write that receives more optimization request such | |||
| 75 | // as skip_concurrency_control. | |||
| 76 | using PessimisticTransactionDB::Write; | |||
| 77 | Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&, | |||
| 78 | WriteBatch* updates) override; | |||
| 79 | ||||
| 80 | // Write the batch to the underlying DB and mark it as committed. Could be | |||
| 81 | // used by both directly from TxnDB or through a transaction. | |||
| 82 | Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch, | |||
| 83 | size_t batch_cnt, WritePreparedTxn* txn); | |||
| 84 | ||||
| 85 | using DB::Get; | |||
| 86 | virtual Status Get(const ReadOptions& options, | |||
| 87 | ColumnFamilyHandle* column_family, const Slice& key, | |||
| 88 | PinnableSlice* value) override; | |||
| 89 | ||||
| 90 | using DB::MultiGet; | |||
| 91 | virtual std::vector<Status> MultiGet( | |||
| 92 | const ReadOptions& options, | |||
| 93 | const std::vector<ColumnFamilyHandle*>& column_family, | |||
| 94 | const std::vector<Slice>& keys, | |||
| 95 | std::vector<std::string>* values) override; | |||
| 96 | ||||
| 97 | using DB::NewIterator; | |||
| 98 | virtual Iterator* NewIterator(const ReadOptions& options, | |||
| 99 | ColumnFamilyHandle* column_family) override; | |||
| 100 | ||||
| 101 | using DB::NewIterators; | |||
| 102 | virtual Status NewIterators( | |||
| 103 | const ReadOptions& options, | |||
| 104 | const std::vector<ColumnFamilyHandle*>& column_families, | |||
| 105 | std::vector<Iterator*>* iterators) override; | |||
| 106 | ||||
| 107 | // Check whether the transaction that wrote the value with sequence number seq | |||
| 108 | // is visible to the snapshot with sequence number snapshot_seq. | |||
| 109 | // Returns true if commit_seq <= snapshot_seq | |||
| 110 | // If the snapshot_seq is already released and snapshot_seq <= max, sets | |||
| 111 | // *snap_released to true and returns true as well. | |||
| 112 | inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, | |||
| 113 | uint64_t min_uncommitted = kMinUnCommittedSeq, | |||
| 114 | bool* snap_released = nullptr) const { | |||
| 115 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 116 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 117 | " min_uncommitted %" PRIu64,; | |||
| 118 | prep_seq, snapshot_seq, min_uncommitted);; | |||
| 119 | assert(min_uncommitted >= kMinUnCommittedSeq)(static_cast<void> (0)); | |||
| 120 | // Caller is responsible to initialize snap_released. | |||
| 121 | assert(snap_released == nullptr || *snap_released == false)(static_cast<void> (0)); | |||
| 122 | // Here we try to infer the return value without looking into prepare list. | |||
| 123 | // This would help avoiding synchronization over a shared map. | |||
| 124 | // TODO(myabandeh): optimize this. This sequence of checks must be correct | |||
| 125 | // but not necessary efficient | |||
| 126 | if (prep_seq == 0) { | |||
| 127 | // Compaction will output keys to bottom-level with sequence number 0 if | |||
| 128 | // it is visible to the earliest snapshot. | |||
| 129 | ROCKS_LOG_DETAILS(; | |||
| 130 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
| 131 | prep_seq, snapshot_seq, 1);; | |||
| 132 | return true; | |||
| 133 | } | |||
| 134 | if (snapshot_seq < prep_seq) { | |||
| 135 | // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq | |||
| 136 | ROCKS_LOG_DETAILS(; | |||
| 137 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
| 138 | prep_seq, snapshot_seq, 0);; | |||
| 139 | return false; | |||
| 140 | } | |||
| 141 | if (prep_seq < min_uncommitted) { | |||
| 142 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 143 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 144 | " returns %" PRId32; | |||
| 145 | " because of min_uncommitted %" PRIu64,; | |||
| 146 | prep_seq, snapshot_seq, 1, min_uncommitted);; | |||
| 147 | return true; | |||
| 148 | } | |||
| 149 | // Commit of delayed prepared has two non-atomic steps: add to commit cache, | |||
| 150 | // remove from delayed prepared. Our reads from these two is also | |||
| 151 | // non-atomic. By looking into commit cache first thus we might not find the | |||
| 152 | // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) | |||
| 153 | // we check if there was any delayed prepared BEFORE looking into commit | |||
| 154 | // cache, ii) if there was, we complete the search steps to be these: i) | |||
| 155 | // commit cache, ii) delayed prepared, commit cache again. In this way if | |||
| 156 | // the first query to commit cache missed the commit, the 2nd will catch it. | |||
| 157 | bool was_empty; | |||
| 158 | SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub; | |||
| 159 | CommitEntry64b dont_care; | |||
| 160 | auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; | |||
| 161 | size_t repeats = 0; | |||
| 162 | do { | |||
| 163 | repeats++; | |||
| 164 | assert(repeats < 100)(static_cast<void> (0)); | |||
| 165 | if (UNLIKELY(repeats >= 100)(__builtin_expect((repeats >= 100), 0))) { | |||
| 166 | throw std::runtime_error( | |||
| 167 | "The read was intrupted 100 times by update to max_evicted_seq_. " | |||
| 168 | "This is unexpected in all setups"); | |||
| 169 | } | |||
| 170 | max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire); | |||
| 171 | TEST_SYNC_POINT( | |||
| 172 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause"); | |||
| 173 | TEST_SYNC_POINT( | |||
| 174 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"); | |||
| 175 | was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); | |||
| 176 | TEST_SYNC_POINT( | |||
| 177 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause"); | |||
| 178 | TEST_SYNC_POINT( | |||
| 179 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"); | |||
| 180 | CommitEntry cached; | |||
| 181 | bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |||
| 182 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); | |||
| 183 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); | |||
| 184 | if (exist && prep_seq == cached.prep_seq) { | |||
| 185 | // It is committed and also not evicted from commit cache | |||
| 186 | ROCKS_LOG_DETAILS(; | |||
| 187 | info_log_,; | |||
| 188 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
| 189 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);; | |||
| 190 | return cached.commit_seq <= snapshot_seq; | |||
| 191 | } | |||
| 192 | // else it could be committed but not inserted in the map which could | |||
| 193 | // happen after recovery, or it could be committed and evicted by another | |||
| 194 | // commit, or never committed. | |||
| 195 | ||||
| 196 | // At this point we dont know if it was committed or it is still prepared | |||
| 197 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); | |||
| 198 | if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)(__builtin_expect((max_evicted_seq_lb != max_evicted_seq_ub), 0))) { | |||
| 199 | continue; | |||
| 200 | } | |||
| 201 | // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub | |||
| 202 | if (max_evicted_seq_ub < prep_seq) { | |||
| 203 | // Not evicted from cache and also not present, so must be still | |||
| 204 | // prepared | |||
| 205 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 206 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 207 | " returns %" PRId32,; | |||
| 208 | prep_seq, snapshot_seq, 0);; | |||
| 209 | return false; | |||
| 210 | } | |||
| 211 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause"); | |||
| 212 | TEST_SYNC_POINT( | |||
| 213 | "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"); | |||
| 214 | if (!was_empty) { | |||
| 215 | // We should not normally reach here | |||
| 216 | WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); | |||
| 217 | ReadLock rl(&prepared_mutex_); | |||
| 218 | ROCKS_LOG_WARN(rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220" "] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h" ), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq ) | |||
| 219 | info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220" "] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h" ), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq ) | |||
| 220 | static_cast<uint64_t>(delayed_prepared_.size()), prep_seq)rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220" "] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h" ), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq ); | |||
| 221 | if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { | |||
| 222 | // This is the order: 1) delayed_prepared_commits_ update, 2) publish | |||
| 223 | // 3) delayed_prepared_ clean up. So check if it is the case of a late | |||
| 224 | // clenaup. | |||
| 225 | auto it = delayed_prepared_commits_.find(prep_seq); | |||
| 226 | if (it == delayed_prepared_commits_.end()) { | |||
| 227 | // Then it is not committed yet | |||
| 228 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 229 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 230 | " returns %" PRId32,; | |||
| 231 | prep_seq, snapshot_seq, 0);; | |||
| 232 | return false; | |||
| 233 | } else { | |||
| 234 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 235 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 236 | " commit: %" PRIu64 " returns %" PRId32,; | |||
| 237 | prep_seq, snapshot_seq, it->second,; | |||
| 238 | snapshot_seq <= it->second);; | |||
| 239 | return it->second <= snapshot_seq; | |||
| 240 | } | |||
| 241 | } else { | |||
| 242 | // 2nd query to commit cache. Refer to was_empty comment above. | |||
| 243 | exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |||
| 244 | if (exist && prep_seq == cached.prep_seq) { | |||
| 245 | ROCKS_LOG_DETAILS(; | |||
| 246 | info_log_,; | |||
| 247 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
| 248 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);; | |||
| 249 | return cached.commit_seq <= snapshot_seq; | |||
| 250 | } | |||
| 251 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); | |||
| 252 | } | |||
| 253 | } | |||
| 254 | } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)(__builtin_expect((max_evicted_seq_lb != max_evicted_seq_ub), 0))); | |||
| 255 | // When advancing max_evicted_seq_, we move older entires from prepared to | |||
| 256 | // delayed_prepared_. Also we move evicted entries from commit cache to | |||
| 257 | // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= | |||
| 258 | // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in | |||
| 259 | // old_commit_map_, iii) committed with no conflict with any snapshot. Case | |||
| 260 | // (i) delayed_prepared_ is checked above | |||
| 261 | if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case | |||
| 262 | // only (iii) is the case: committed | |||
| 263 | // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < | |||
| 264 | // snapshot_seq | |||
| 265 | ROCKS_LOG_DETAILS(; | |||
| 266 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
| 267 | prep_seq, snapshot_seq, 1);; | |||
| 268 | return true; | |||
| 269 | } | |||
| 270 | // else (ii) might be the case: check the commit data saved for this | |||
| 271 | // snapshot. If there was no overlapping commit entry, then it is committed | |||
| 272 | // with a commit_seq lower than any live snapshot, including snapshot_seq. | |||
| 273 | if (old_commit_map_empty_.load(std::memory_order_acquire)) { | |||
| 274 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 275 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 276 | " returns %" PRId32 " released=1",; | |||
| 277 | prep_seq, snapshot_seq, 0);; | |||
| 278 | assert(snap_released)(static_cast<void> (0)); | |||
| 279 | // This snapshot is not valid anymore. We cannot tell if prep_seq is | |||
| 280 | // committed before or after the snapshot. Return true but also set | |||
| 281 | // snap_released to true. | |||
| 282 | *snap_released = true; | |||
| 
 | ||||
| 283 | return true; | |||
| 284 | } | |||
| 285 | { | |||
| 286 | // We should not normally reach here unless sapshot_seq is old. This is a | |||
| 287 | // rare case and it is ok to pay the cost of mutex ReadLock for such old, | |||
| 288 | // reading transactions. | |||
| 289 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |||
| 290 | ReadLock rl(&old_commit_map_mutex_); | |||
| 291 | auto prep_set_entry = old_commit_map_.find(snapshot_seq); | |||
| 292 | bool found = prep_set_entry != old_commit_map_.end(); | |||
| 293 | if (found) { | |||
| 294 | auto& vec = prep_set_entry->second; | |||
| 295 | found = std::binary_search(vec.begin(), vec.end(), prep_seq); | |||
| 296 | } else { | |||
| 297 | // coming from compaction | |||
| 298 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 299 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 300 | " returns %" PRId32 " released=1",; | |||
| 301 | prep_seq, snapshot_seq, 0);; | |||
| 302 | // This snapshot is not valid anymore. We cannot tell if prep_seq is | |||
| 303 | // committed before or after the snapshot. Return true but also set | |||
| 304 | // snap_released to true. | |||
| 305 | assert(snap_released)(static_cast<void> (0)); | |||
| 306 | *snap_released = true; | |||
| 307 | return true; | |||
| 308 | } | |||
| 309 | ||||
| 310 | if (!found) { | |||
| 311 | ROCKS_LOG_DETAILS(info_log_,; | |||
| 312 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
| 313 | " returns %" PRId32,; | |||
| 314 | prep_seq, snapshot_seq, 1);; | |||
| 315 | return true; | |||
| 316 | } | |||
| 317 | } | |||
| 318 | // (ii) it the case: it is committed but after the snapshot_seq | |||
| 319 | ROCKS_LOG_DETAILS(; | |||
| 320 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
| 321 | prep_seq, snapshot_seq, 0);; | |||
| 322 | return false; | |||
| 323 | } | |||
| 324 | ||||
| 325 | // Add the transaction with prepare sequence seq to the prepared list. | |||
| 326 | // Note: must be called serially with increasing seq on each call. | |||
| 327 | void AddPrepared(uint64_t seq); | |||
| 328 | // Check if any of the prepared txns are less than new max_evicted_seq_. Must | |||
| 329 | // be called with prepared_mutex_ write locked. | |||
| 330 | void CheckPreparedAgainstMax(SequenceNumber new_max); | |||
| 331 | // Remove the transaction with prepare sequence seq from the prepared list | |||
| 332 | void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); | |||
| 333 | // Add the transaction with prepare sequence prepare_seq and commit sequence | |||
| 334 | // commit_seq to the commit map. loop_cnt is to detect infinite loops. | |||
| 335 | // Note: must be called serially. | |||
| 336 | void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, | |||
| 337 | uint8_t loop_cnt = 0); | |||
| 338 | ||||
| 339 | struct CommitEntry { | |||
| 340 | uint64_t prep_seq; | |||
| 341 | uint64_t commit_seq; | |||
| 342 | CommitEntry() : prep_seq(0), commit_seq(0) {} | |||
| 343 | CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} | |||
| 344 | bool operator==(const CommitEntry& rhs) const { | |||
| 345 | return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; | |||
| 346 | } | |||
| 347 | }; | |||
| 348 | ||||
| 349 | struct CommitEntry64bFormat { | |||
| 350 | explicit CommitEntry64bFormat(size_t index_bits) | |||
| 351 | : INDEX_BITS(index_bits), | |||
| 352 | PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)), | |||
| 353 | COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)), | |||
| 354 | COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)), | |||
| 355 | DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {} | |||
| 356 | // Number of higher bits of a sequence number that is not used. They are | |||
| 357 | // used to encode the value type, ... | |||
| 358 | const size_t PAD_BITS = static_cast<size_t>(8); | |||
| 359 | // Number of lower bits from prepare seq that can be skipped as they are | |||
| 360 | // implied by the index of the entry in the array | |||
| 361 | const size_t INDEX_BITS; | |||
| 362 | // Number of bits we use to encode the prepare seq | |||
| 363 | const size_t PREP_BITS; | |||
| 364 | // Number of bits we use to encode the commit seq. | |||
| 365 | const size_t COMMIT_BITS; | |||
| 366 | // Filter to encode/decode commit seq | |||
| 367 | const uint64_t COMMIT_FILTER; | |||
| 368 | // The value of commit_seq - prepare_seq + 1 must be less than this bound | |||
| 369 | const uint64_t DELTA_UPPERBOUND; | |||
| 370 | }; | |||
| 371 | ||||
| 372 | // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... | |||
| 373 | // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... | |||
| 374 | // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA | |||
| 375 | // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and | |||
| 376 | // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the | |||
| 377 | // bits that do not have to be encoded (will be provided externally) DELTA: | |||
| 378 | // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of | |||
| 379 | // index bits + PADs | |||
| 380 | struct CommitEntry64b { | |||
| 381 | constexpr CommitEntry64b() noexcept : rep_(0) {} | |||
| 382 | ||||
| 383 | CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) | |||
| 384 | : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} | |||
| 385 | ||||
| 386 | CommitEntry64b(const uint64_t ps, const uint64_t cs, | |||
| 387 | const CommitEntry64bFormat& format) { | |||
| 388 | assert(ps < static_cast<uint64_t>((static_cast<void> (0)) | |||
| 389 | (1ull << (format.PREP_BITS + format.INDEX_BITS))))(static_cast<void> (0)); | |||
| 390 | assert(ps <= cs)(static_cast<void> (0)); | |||
| 391 | uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 | |||
| 392 | // zero is reserved for uninitialized entries | |||
| 393 | assert(0 < delta)(static_cast<void> (0)); | |||
| 394 | assert(delta < format.DELTA_UPPERBOUND)(static_cast<void> (0)); | |||
| 395 | if (delta >= format.DELTA_UPPERBOUND) { | |||
| 396 | throw std::runtime_error( | |||
| 397 | "commit_seq >> prepare_seq. The allowed distance is " + | |||
| 398 | ToString(format.DELTA_UPPERBOUND) + " commit_seq is " + | |||
| 399 | ToString(cs) + " prepare_seq is " + ToString(ps)); | |||
| 400 | } | |||
| 401 | rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; | |||
| 402 | rep_ = rep_ | delta; | |||
| 403 | } | |||
| 404 | ||||
| 405 | // Return false if the entry is empty | |||
| 406 | bool Parse(const uint64_t indexed_seq, CommitEntry* entry, | |||
| 407 | const CommitEntry64bFormat& format) { | |||
| 408 | uint64_t delta = rep_ & format.COMMIT_FILTER; | |||
| 409 | // zero is reserved for uninitialized entries | |||
| 410 | assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)))(static_cast<void> (0)); | |||
| 411 | if (delta == 0) { | |||
| 412 | return false; // initialized entry would have non-zero delta | |||
| 413 | } | |||
| 414 | ||||
| 415 | assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)))(static_cast<void> (0)); | |||
| 416 | uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; | |||
| 417 | prep_up >>= format.PAD_BITS; | |||
| 418 | const uint64_t& prep_low = indexed_seq; | |||
| 419 | entry->prep_seq = prep_up | prep_low; | |||
| 420 | ||||
| 421 | entry->commit_seq = entry->prep_seq + delta - 1; | |||
| 422 | return true; | |||
| 423 | } | |||
| 424 | ||||
| 425 | private: | |||
| 426 | uint64_t rep_; | |||
| 427 | }; | |||
| 428 | ||||
| 429 | // Struct to hold ownership of snapshot and read callback for cleanup. | |||
| 430 | struct IteratorState; | |||
| 431 | ||||
| 432 | std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() { | |||
| 433 | return cf_map_; | |||
| 434 | } | |||
| 435 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() { | |||
| 436 | return handle_map_; | |||
| 437 | } | |||
| 438 | void UpdateCFComparatorMap( | |||
| 439 | const std::vector<ColumnFamilyHandle*>& handles) override; | |||
| 440 | void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; | |||
| 441 | ||||
| 442 | virtual const Snapshot* GetSnapshot() override; | |||
| 443 | SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check); | |||
| 444 | ||||
| 445 | protected: | |||
| 446 | virtual Status VerifyCFOptions( | |||
| 447 | const ColumnFamilyOptions& cf_options) override; | |||
| 448 | ||||
| 449 | private: | |||
| 450 | friend class PreparedHeap_BasicsTest_Test; | |||
| 451 | friend class PreparedHeap_Concurrent_Test; | |||
| 452 | friend class PreparedHeap_EmptyAtTheEnd_Test; | |||
| 453 | friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; | |||
| 454 | friend class WritePreparedCommitEntryPreReleaseCallback; | |||
| 455 | friend class WritePreparedTransactionTestBase; | |||
| 456 | friend class WritePreparedTxn; | |||
| 457 | friend class WritePreparedTxnDBMock; | |||
| 458 | friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test; | |||
| 459 | friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; | |||
| 460 | friend class | |||
| 461 | WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; | |||
| 462 | friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; | |||
| 463 | friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; | |||
| 464 | friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; | |||
| 465 | friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test; | |||
| 466 | friend class | |||
| 467 | WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test; | |||
| 468 | friend class WritePreparedTransactionTest_CommitMapTest_Test; | |||
| 469 | friend class WritePreparedTransactionTest_DoubleSnapshot_Test; | |||
| 470 | friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; | |||
| 471 | friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; | |||
| 472 | friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; | |||
| 473 | friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; | |||
| 474 | friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; | |||
| 475 | friend class | |||
| 476 | WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test; | |||
| 477 | friend class | |||
| 478 | WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test; | |||
| 479 | friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; | |||
| 480 | friend class WritePreparedTransactionTest_OldCommitMapGC_Test; | |||
| 481 | friend class WritePreparedTransactionTest_RollbackTest_Test; | |||
| 482 | friend class WriteUnpreparedTxnDB; | |||
| 483 | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; | |||
| 484 | ||||
| 485 | void Init(const TransactionDBOptions& /* unused */); | |||
| 486 | ||||
| 487 | void WPRecordTick(uint32_t ticker_type) const { | |||
| 488 | RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); | |||
| 489 | } | |||
| 490 | ||||
| 491 | // A heap with the amortized O(1) complexity for erase. It uses one extra heap | |||
| 492 | // to keep track of erased entries that are not yet on top of the main heap. | |||
| 493 | class PreparedHeap { | |||
| 494 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> | |||
| 495 | heap_; | |||
| 496 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> | |||
| 497 | erased_heap_; | |||
| 498 | // True when testing crash recovery | |||
| 499 | bool TEST_CRASH_ = false; | |||
| 500 | friend class WritePreparedTxnDB; | |||
| 501 | ||||
| 502 | public: | |||
| 503 | ~PreparedHeap() { | |||
| 504 | if (!TEST_CRASH_) { | |||
| 505 | assert(heap_.empty())(static_cast<void> (0)); | |||
| 506 | assert(erased_heap_.empty())(static_cast<void> (0)); | |||
| 507 | } | |||
| 508 | } | |||
| 509 | bool empty() { return heap_.empty(); } | |||
| 510 | uint64_t top() { return heap_.top(); } | |||
| 511 | void push(uint64_t v) { heap_.push(v); } | |||
| 512 | void pop() { | |||
| 513 | heap_.pop(); | |||
| 514 | while (!heap_.empty() && !erased_heap_.empty() && | |||
| 515 | // heap_.top() > erased_heap_.top() could happen if we have erased | |||
| 516 | // a non-existent entry. Ideally the user should not do that but we | |||
| 517 | // should be resilient against it. | |||
| 518 | heap_.top() >= erased_heap_.top()) { | |||
| 519 | if (heap_.top() == erased_heap_.top()) { | |||
| 520 | heap_.pop(); | |||
| 521 | } | |||
| 522 | uint64_t erased __attribute__((__unused__)); | |||
| 523 | erased = erased_heap_.top(); | |||
| 524 | erased_heap_.pop(); | |||
| 525 | // No duplicate prepare sequence numbers | |||
| 526 | assert(erased_heap_.empty() || erased_heap_.top() != erased)(static_cast<void> (0)); | |||
| 527 | } | |||
| 528 | while (heap_.empty() && !erased_heap_.empty()) { | |||
| 529 | erased_heap_.pop(); | |||
| 530 | } | |||
| 531 | } | |||
| 532 | void erase(uint64_t seq) { | |||
| 533 | if (!heap_.empty()) { | |||
| 534 | if (seq < heap_.top()) { | |||
| 535 | // Already popped, ignore it. | |||
| 536 | } else if (heap_.top() == seq) { | |||
| 537 | pop(); | |||
| 538 | assert(heap_.empty() || heap_.top() != seq)(static_cast<void> (0)); | |||
| 539 | } else { // (heap_.top() > seq) | |||
| 540 | // Down the heap, remember to pop it later | |||
| 541 | erased_heap_.push(seq); | |||
| 542 | } | |||
| 543 | } | |||
| 544 | } | |||
| 545 | }; | |||
| 546 | ||||
| 547 | void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } | |||
| 548 | ||||
| 549 | // Get the commit entry with index indexed_seq from the commit table. It | |||
| 550 | // returns true if such entry exists. | |||
| 551 | bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, | |||
| 552 | CommitEntry* entry) const; | |||
| 553 | ||||
| 554 | // Rewrite the entry with the index indexed_seq in the commit table with the | |||
| 555 | // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction, | |||
| 556 | // sets the evicted_entry and returns true. | |||
| 557 | bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, | |||
| 558 | CommitEntry* evicted_entry); | |||
| 559 | ||||
| 560 | // Rewrite the entry with the index indexed_seq in the commit table with the | |||
| 561 | // commit entry new_entry only if the existing entry matches the | |||
| 562 | // expected_entry. Returns false otherwise. | |||
| 563 | bool ExchangeCommitEntry(const uint64_t indexed_seq, | |||
| 564 | CommitEntry64b& expected_entry, | |||
| 565 | const CommitEntry& new_entry); | |||
| 566 | ||||
| 567 | // Increase max_evicted_seq_ from the previous value prev_max to the new | |||
| 568 | // value. This also involves taking care of prepared txns that are not | |||
| 569 | // committed before new_max, as well as updating the list of live snapshots at | |||
| 570 | // the time of updating the max. Thread-safety: this function can be called | |||
| 571 | // concurrently. The concurrent invocations of this function is equivalent to | |||
| 572 | // a serial invocation in which the last invocation is the one with the | |||
| 573 | // largest new_max value. | |||
| 574 | void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |||
| 575 | const SequenceNumber& new_max); | |||
| 576 | ||||
| 577 | inline SequenceNumber SmallestUnCommittedSeq() { | |||
| 578 | // Since we update the prepare_heap always from the main write queue via | |||
| 579 | // PreReleaseCallback, the prepared_txns_.top() indicates the smallest | |||
| 580 | // prepared data in 2pc transactions. For non-2pc transactions that are | |||
| 581 | // written in two steps, we also update prepared_txns_ at the first step | |||
| 582 | // (via the same mechanism) so that their uncommitted data is reflected in | |||
| 583 | // SmallestUnCommittedSeq. | |||
| 584 | ReadLock rl(&prepared_mutex_); | |||
| 585 | // Since we are holding the mutex, and GetLatestSequenceNumber is updated | |||
| 586 | // after prepared_txns_ are, the value of GetLatestSequenceNumber would | |||
| 587 | // reflect any uncommitted data that is not added to prepared_txns_ yet. | |||
| 588 | // Otherwise, if there is no concurrent txn, this value simply reflects that | |||
| 589 | // latest value in the memtable. | |||
| 590 | if (!delayed_prepared_.empty()) { | |||
| 591 | assert(!delayed_prepared_empty_.load())(static_cast<void> (0)); | |||
| 592 | return *delayed_prepared_.begin(); | |||
| 593 | } | |||
| 594 | if (prepared_txns_.empty()) { | |||
| 595 | return db_impl_->GetLatestSequenceNumber() + 1; | |||
| 596 | } else { | |||
| 597 | return std::min(prepared_txns_.top(), | |||
| 598 | db_impl_->GetLatestSequenceNumber() + 1); | |||
| 599 | } | |||
| 600 | } | |||
| 601 | // Enhance the snapshot object by recording in it the smallest uncommitted seq | |||
| 602 | inline void EnhanceSnapshot(SnapshotImpl* snapshot, | |||
| 603 | SequenceNumber min_uncommitted) { | |||
| 604 | assert(snapshot)(static_cast<void> (0)); | |||
| 605 | snapshot->min_uncommitted_ = min_uncommitted; | |||
| 606 | } | |||
| 607 | ||||
| 608 | virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( | |||
| 609 | SequenceNumber max); | |||
| 610 | ||||
| 611 | // Will be called by the public ReleaseSnapshot method. Does the maintenance | |||
| 612 | // internal to WritePreparedTxnDB | |||
| 613 | void ReleaseSnapshotInternal(const SequenceNumber snap_seq); | |||
| 614 | ||||
| 615 | // Update the list of snapshots corresponding to the soon-to-be-updated | |||
| 616 | // max_evicted_seq_. Thread-safety: this function can be called concurrently. | |||
| 617 | // The concurrent invocations of this function is equivalent to a serial | |||
| 618 | // invocation in which the last invocation is the one with the largest | |||
| 619 | // version value. | |||
| 620 | void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots, | |||
| 621 | const SequenceNumber& version); | |||
| 622 | // Check the new list of new snapshots against the old one to see if any of | |||
| 623 | // the snapshots are released and to do the cleanup for the released snapshot. | |||
| 624 | void CleanupReleasedSnapshots( | |||
| 625 | const std::vector<SequenceNumber>& new_snapshots, | |||
| 626 | const std::vector<SequenceNumber>& old_snapshots); | |||
| 627 | ||||
| 628 | // Check an evicted entry against live snapshots to see if it should be kept | |||
| 629 | // around or it can be safely discarded (and hence assume committed for all | |||
| 630 | // snapshots). Thread-safety: this function can be called concurrently. If it | |||
| 631 | // is called concurrently with multiple UpdateSnapshots, the result is the | |||
| 632 | // same as checking the intersection of the snapshot list before updates with | |||
| 633 | // the snapshot list of all the concurrent updates. | |||
| 634 | void CheckAgainstSnapshots(const CommitEntry& evicted); | |||
| 635 | ||||
| 636 | // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < | |||
| 637 | // commit_seq. Return false if checking the next snapshot(s) is not needed. | |||
| 638 | // This is the case if none of the next snapshots could satisfy the condition. | |||
| 639 | // next_is_larger: the next snapshot will be a larger value | |||
| 640 | bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, | |||
| 641 | const uint64_t& commit_seq, | |||
| 642 | const uint64_t& snapshot_seq, | |||
| 643 | const bool next_is_larger); | |||
| 644 | ||||
| 645 | // A trick to increase the last visible sequence number by one and also wait | |||
| 646 | // for the in-flight commits to be visible. | |||
| 647 | void AdvanceSeqByOne(); | |||
| 648 | ||||
| 649 | // The list of live snapshots at the last time that max_evicted_seq_ advanced. | |||
| 650 | // The list stored into two data structures: in snapshot_cache_ that is | |||
| 651 | // efficient for concurrent reads, and in snapshots_ if the data does not fit | |||
| 652 | // into snapshot_cache_. The total number of snapshots in the two lists | |||
| 653 | std::atomic<size_t> snapshots_total_ = {}; | |||
| 654 | // The list sorted in ascending order. Thread-safety for writes is provided | |||
| 655 | // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for | |||
| 656 | // each entry. In x86_64 architecture such reads are compiled to simple read | |||
| 657 | // instructions. | |||
| 658 | const size_t SNAPSHOT_CACHE_BITS; | |||
| 659 | const size_t SNAPSHOT_CACHE_SIZE; | |||
| 660 | std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; | |||
| 661 | // 2nd list for storing snapshots. The list sorted in ascending order. | |||
| 662 | // Thread-safety is provided with snapshots_mutex_. | |||
| 663 | std::vector<SequenceNumber> snapshots_; | |||
| 664 | // The list of all snapshots: snapshots_ + snapshot_cache_. This list although | |||
| 665 | // redundant but simplifies CleanupOldSnapshots implementation. | |||
| 666 | // Thread-safety is provided with snapshots_mutex_. | |||
| 667 | std::vector<SequenceNumber> snapshots_all_; | |||
| 668 | // The version of the latest list of snapshots. This can be used to avoid | |||
| 669 | // rewriting a list that is concurrently updated with a more recent version. | |||
| 670 | SequenceNumber snapshots_version_ = 0; | |||
| 671 | ||||
| 672 | // A heap of prepared transactions. Thread-safety is provided with | |||
| 673 | // prepared_mutex_. | |||
| 674 | PreparedHeap prepared_txns_; | |||
| 675 | const size_t COMMIT_CACHE_BITS; | |||
| 676 | const size_t COMMIT_CACHE_SIZE; | |||
| 677 | const CommitEntry64bFormat FORMAT; | |||
| 678 | // commit_cache_ must be initialized to zero to tell apart an empty index from | |||
| 679 | // a filled one. Thread-safety is provided with commit_cache_mutex_. | |||
| 680 | std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_; | |||
| 681 | // The largest evicted *commit* sequence number from the commit_cache_. If a | |||
| 682 | // seq is smaller than max_evicted_seq_ is might or might not be present in | |||
| 683 | // commit_cache_. So commit_cache_ must first be checked before consulting | |||
| 684 | // with max_evicted_seq_. | |||
| 685 | std::atomic<uint64_t> max_evicted_seq_ = {}; | |||
| 686 | // Order: 1) update future_max_evicted_seq_ = new_max, 2) | |||
| 687 | // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since | |||
| 688 | // GetSnapshotInternal guarantess that the snapshot seq is larger than | |||
| 689 | // future_max_evicted_seq_, this guarantes that if a snapshot is not larger | |||
| 690 | // than max has already being looked at via a GetSnapshotListFromDB(new_max). | |||
| 691 | std::atomic<uint64_t> future_max_evicted_seq_ = {}; | |||
| 692 | // Advance max_evicted_seq_ by this value each time it needs an update. The | |||
| 693 | // larger the value, the less frequent advances we would have. We do not want | |||
| 694 | // it to be too large either as it would cause stalls by doing too much | |||
| 695 | // maintenance work under the lock. | |||
| 696 | size_t INC_STEP_FOR_MAX_EVICTED = 1; | |||
| 697 | // A map from old snapshots (expected to be used by a few read-only txns) to | |||
| 698 | // prepared sequence number of the evicted entries from commit_cache_ that | |||
| 699 | // overlaps with such snapshot. These are the prepared sequence numbers that | |||
| 700 | // the snapshot, to which they are mapped, cannot assume to be committed just | |||
| 701 | // because it is no longer in the commit_cache_. The vector must be sorted | |||
| 702 | // after each update. | |||
| 703 | // Thread-safety is provided with old_commit_map_mutex_. | |||
| 704 | std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_; | |||
| 705 | // A set of long-running prepared transactions that are not finished by the | |||
| 706 | // time max_evicted_seq_ advances their sequence number. This is expected to | |||
| 707 | // be empty normally. Thread-safety is provided with prepared_mutex_. | |||
| 708 | std::set<uint64_t> delayed_prepared_; | |||
| 709 | // Commit of a delayed prepared: 1) update commit cache, 2) update | |||
| 710 | // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_. | |||
| 711 | // delayed_prepared_commits_ will help us tell apart the unprepared txns from | |||
| 712 | // the ones that are committed but not cleaned up yet. | |||
| 713 | std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_; | |||
| 714 | // Update when delayed_prepared_.empty() changes. Expected to be true | |||
| 715 | // normally. | |||
| 716 | std::atomic<bool> delayed_prepared_empty_ = {true}; | |||
| 717 | // Update when old_commit_map_.empty() changes. Expected to be true normally. | |||
| 718 | std::atomic<bool> old_commit_map_empty_ = {true}; | |||
| 719 | mutable port::RWMutex prepared_mutex_; | |||
| 720 | mutable port::RWMutex old_commit_map_mutex_; | |||
| 721 | mutable port::RWMutex commit_cache_mutex_; | |||
| 722 | mutable port::RWMutex snapshots_mutex_; | |||
| 723 | // A cache of the cf comparators | |||
| 724 | // Thread safety: since it is a const it is safe to read it concurrently | |||
| 725 | std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_; | |||
| 726 | // A cache of the cf handles | |||
| 727 | // Thread safety: since the handle is read-only object it is a const it is | |||
| 728 | // safe to read it concurrently | |||
| 729 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_; | |||
| 730 | }; | |||
| 731 | ||||
| 732 | class WritePreparedTxnReadCallback : public ReadCallback { | |||
| 733 | public: | |||
| 734 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) | |||
| 735 | : ReadCallback(snapshot), db_(db) {} | |||
| 736 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, | |||
| 737 | SequenceNumber min_uncommitted) | |||
| 738 | : ReadCallback(snapshot, min_uncommitted), db_(db) {} | |||
| 739 | ||||
| 740 | // Will be called to see if the seq number visible; if not it moves on to | |||
| 741 | // the next seq number. | |||
| 742 | inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { | |||
| 743 | auto snapshot = max_visible_seq_; | |||
| 744 | return db_->IsInSnapshot(seq, snapshot, min_uncommitted_); | |||
| 
 | ||||
| 745 | } | |||
| 746 | ||||
| 747 | // TODO(myabandeh): override Refresh when Iterator::Refresh is supported | |||
| 748 | private: | |||
| 749 | WritePreparedTxnDB* db_; | |||
| 750 | }; | |||
| 751 | ||||
| 752 | class AddPreparedCallback : public PreReleaseCallback { | |||
| 753 | public: | |||
| 754 | AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl, | |||
| 755 | size_t sub_batch_cnt, bool two_write_queues, | |||
| 756 | bool first_prepare_batch) | |||
| 757 | : db_(db), | |||
| 758 | db_impl_(db_impl), | |||
| 759 | sub_batch_cnt_(sub_batch_cnt), | |||
| 760 | two_write_queues_(two_write_queues), | |||
| 761 | first_prepare_batch_(first_prepare_batch) { | |||
| 762 | (void)two_write_queues_; // to silence unused private field warning | |||
| 763 | } | |||
| 764 | virtual Status Callback(SequenceNumber prepare_seq, | |||
| 765 | bool is_mem_disabled __attribute__((__unused__)), | |||
| 766 | uint64_t log_number) override { | |||
| 767 | // Always Prepare from the main queue | |||
| 768 | assert(!two_write_queues_ || !is_mem_disabled)(static_cast<void> (0)); // implies the 1st queue | |||
| 769 | for (size_t i = 0; i < sub_batch_cnt_; i++) { | |||
| 770 | db_->AddPrepared(prepare_seq + i); | |||
| 771 | } | |||
| 772 | if (first_prepare_batch_) { | |||
| 773 | assert(log_number != 0)(static_cast<void> (0)); | |||
| 774 | db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( | |||
| 775 | log_number); | |||
| 776 | } | |||
| 777 | return Status::OK(); | |||
| 778 | } | |||
| 779 | ||||
| 780 | private: | |||
| 781 | WritePreparedTxnDB* db_; | |||
| 782 | DBImpl* db_impl_; | |||
| 783 | size_t sub_batch_cnt_; | |||
| 784 | bool two_write_queues_; | |||
| 785 | // It is 2PC and this is the first prepare batch. Always the case in 2PC | |||
| 786 | // unless it is WriteUnPrepared. | |||
| 787 | bool first_prepare_batch_; | |||
| 788 | }; | |||
| 789 | ||||
| 790 | class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { | |||
| 791 | public: | |||
| 792 | // includes_data indicates that the commit also writes non-empty | |||
| 793 | // CommitTimeWriteBatch to memtable, which needs to be committed separately. | |||
| 794 | WritePreparedCommitEntryPreReleaseCallback( | |||
| 795 | WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq, | |||
| 796 | size_t prep_batch_cnt, size_t data_batch_cnt = 0, | |||
| 797 | SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0) | |||
| 798 | : db_(db), | |||
| 799 | db_impl_(db_impl), | |||
| 800 | prep_seq_(prep_seq), | |||
| 801 | prep_batch_cnt_(prep_batch_cnt), | |||
| 802 | data_batch_cnt_(data_batch_cnt), | |||
| 803 | includes_data_(data_batch_cnt_ > 0), | |||
| 804 | aux_seq_(aux_seq), | |||
| 805 | aux_batch_cnt_(aux_batch_cnt), | |||
| 806 | includes_aux_batch_(aux_batch_cnt > 0) { | |||
| 807 | assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber))(static_cast<void> (0)); // xor | |||
| 808 | assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0)(static_cast<void> (0)); | |||
| 809 | assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber))(static_cast<void> (0)); // xor | |||
| 810 | } | |||
| 811 | ||||
| 812 | virtual Status Callback(SequenceNumber commit_seq, | |||
| 813 | bool is_mem_disabled __attribute__((__unused__)), | |||
| 814 | uint64_t) override { | |||
| 815 | // Always commit from the 2nd queue | |||
| 816 | assert(!db_impl_->immutable_db_options().two_write_queues ||(static_cast<void> (0)) | |||
| 817 | is_mem_disabled)(static_cast<void> (0)); | |||
| 818 | assert(includes_data_ || prep_seq_ != kMaxSequenceNumber)(static_cast<void> (0)); | |||
| 819 | // Data batch is what accompanied with the commit marker and affects the | |||
| 820 | // last seq in the commit batch. | |||
| 821 | const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)(__builtin_expect((data_batch_cnt_ <= 1), 1)) | |||
| 822 | ? commit_seq | |||
| 823 | : commit_seq + data_batch_cnt_ - 1; | |||
| 824 | if (prep_seq_ != kMaxSequenceNumber) { | |||
| 825 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |||
| 826 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |||
| 827 | } | |||
| 828 | } // else there was no prepare phase | |||
| 829 | if (includes_aux_batch_) { | |||
| 830 | for (size_t i = 0; i < aux_batch_cnt_; i++) { | |||
| 831 | db_->AddCommitted(aux_seq_ + i, last_commit_seq); | |||
| 832 | } | |||
| 833 | } | |||
| 834 | if (includes_data_) { | |||
| 835 | assert(data_batch_cnt_)(static_cast<void> (0)); | |||
| 836 | // Commit the data that is accompanied with the commit request | |||
| 837 | for (size_t i = 0; i < data_batch_cnt_; i++) { | |||
| 838 | // For commit seq of each batch use the commit seq of the last batch. | |||
| 839 | // This would make debugging easier by having all the batches having | |||
| 840 | // the same sequence number. | |||
| 841 | db_->AddCommitted(commit_seq + i, last_commit_seq); | |||
| 842 | } | |||
| 843 | } | |||
| 844 | if (db_impl_->immutable_db_options().two_write_queues) { | |||
| 845 | assert(is_mem_disabled)(static_cast<void> (0)); // implies the 2nd queue | |||
| 846 | // Publish the sequence number. We can do that here assuming the callback | |||
| 847 | // is invoked only from one write queue, which would guarantee that the | |||
| 848 | // publish sequence numbers will be in order, i.e., once a seq is | |||
| 849 | // published all the seq prior to that are also publishable. | |||
| 850 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |||
| 851 | } | |||
| 852 | // else SequenceNumber that is updated as part of the write already does the | |||
| 853 | // publishing | |||
| 854 | return Status::OK(); | |||
| 855 | } | |||
| 856 | ||||
| 857 | private: | |||
| 858 | WritePreparedTxnDB* db_; | |||
| 859 | DBImpl* db_impl_; | |||
| 860 | // kMaxSequenceNumber if there was no prepare phase | |||
| 861 | SequenceNumber prep_seq_; | |||
| 862 | size_t prep_batch_cnt_; | |||
| 863 | size_t data_batch_cnt_; | |||
| 864 | // Data here is the batch that is written with the commit marker, either | |||
| 865 | // because it is commit without prepare or commit has a CommitTimeWriteBatch. | |||
| 866 | bool includes_data_; | |||
| 867 | // Auxiliary batch (if there is any) is a batch that is written before, but | |||
| 868 | // gets the same commit seq as prepare batch or data batch. This is used in | |||
| 869 | // two write queues where the CommitTimeWriteBatch becomes the aux batch and | |||
| 870 | // we do a separate write to actually commit everything. | |||
| 871 | SequenceNumber aux_seq_; | |||
| 872 | size_t aux_batch_cnt_; | |||
| 873 | bool includes_aux_batch_; | |||
| 874 | }; | |||
| 875 | ||||
| 876 | // For two_write_queues commit both the aborted batch and the cleanup batch and | |||
| 877 | // then published the seq | |||
| 878 | class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { | |||
| 879 | public: | |||
| 880 | WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, | |||
| 881 | DBImpl* db_impl, | |||
| 882 | SequenceNumber prep_seq, | |||
| 883 | SequenceNumber rollback_seq, | |||
| 884 | size_t prep_batch_cnt) | |||
| 885 | : db_(db), | |||
| 886 | db_impl_(db_impl), | |||
| 887 | prep_seq_(prep_seq), | |||
| 888 | rollback_seq_(rollback_seq), | |||
| 889 | prep_batch_cnt_(prep_batch_cnt) { | |||
| 890 | assert(prep_seq != kMaxSequenceNumber)(static_cast<void> (0)); | |||
| 891 | assert(rollback_seq != kMaxSequenceNumber)(static_cast<void> (0)); | |||
| 892 | assert(prep_batch_cnt_ > 0)(static_cast<void> (0)); | |||
| 893 | } | |||
| 894 | ||||
| 895 | Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, | |||
| 896 | uint64_t) override { | |||
| 897 | // Always commit from the 2nd queue | |||
| 898 | assert(is_mem_disabled)(static_cast<void> (0)); // implies the 2nd queue | |||
| 899 | assert(db_impl_->immutable_db_options().two_write_queues)(static_cast<void> (0)); | |||
| 900 | #ifdef NDEBUG1 | |||
| 901 | (void)is_mem_disabled; | |||
| 902 | #endif | |||
| 903 | const uint64_t last_commit_seq = commit_seq; | |||
| 904 | db_->AddCommitted(rollback_seq_, last_commit_seq); | |||
| 905 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |||
| 906 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |||
| 907 | } | |||
| 908 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |||
| 909 | return Status::OK(); | |||
| 910 | } | |||
| 911 | ||||
| 912 | private: | |||
| 913 | WritePreparedTxnDB* db_; | |||
| 914 | DBImpl* db_impl_; | |||
| 915 | SequenceNumber prep_seq_; | |||
| 916 | SequenceNumber rollback_seq_; | |||
| 917 | size_t prep_batch_cnt_; | |||
| 918 | }; | |||
| 919 | ||||
| 920 | // Count the number of sub-batches inside a batch. A sub-batch does not have | |||
| 921 | // duplicate keys. | |||
| 922 | struct SubBatchCounter : public WriteBatch::Handler { | |||
| 923 | explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators) | |||
| 924 | : comparators_(comparators), batches_(1) {} | |||
| 925 | std::map<uint32_t, const Comparator*>& comparators_; | |||
| 926 | using CFKeys = std::set<Slice, SetComparator>; | |||
| 927 | std::map<uint32_t, CFKeys> keys_; | |||
| 928 | size_t batches_; | |||
| 929 | size_t BatchCount() { return batches_; } | |||
| 930 | void AddKey(const uint32_t cf, const Slice& key); | |||
| 931 | void InitWithComp(const uint32_t cf); | |||
| 932 | Status MarkNoop(bool) override { return Status::OK(); } | |||
| 933 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |||
| 934 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |||
| 935 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { | |||
| 936 | AddKey(cf, key); | |||
| 937 | return Status::OK(); | |||
| 938 | } | |||
| 939 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |||
| 940 | AddKey(cf, key); | |||
| 941 | return Status::OK(); | |||
| 942 | } | |||
| 943 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |||
| 944 | AddKey(cf, key); | |||
| 945 | return Status::OK(); | |||
| 946 | } | |||
| 947 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { | |||
| 948 | AddKey(cf, key); | |||
| 949 | return Status::OK(); | |||
| 950 | } | |||
| 951 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |||
| 952 | Status MarkRollback(const Slice&) override { return Status::OK(); } | |||
| 953 | bool WriteAfterCommit() const override { return false; } | |||
| 954 | }; | |||
| 955 | ||||
| 956 | } // namespace rocksdb | |||
| 957 | #endif // ROCKSDB_LITE |