File: | home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc |
Warning: | line 306, column 24 Dereference of null pointer (loaded from variable 'snap_released') |
[?] Use j/k keys for keyboard navigation
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |||
2 | // This source code is licensed under both the GPLv2 (found in the | |||
3 | // COPYING file in the root directory) and Apache 2.0 License | |||
4 | // (found in the LICENSE.Apache file in the root directory). | |||
5 | ||||
6 | #ifndef ROCKSDB_LITE | |||
7 | ||||
8 | #include "utilities/transactions/write_unprepared_txn.h" | |||
9 | #include "db/db_impl.h" | |||
10 | #include "util/cast_util.h" | |||
11 | #include "utilities/transactions/write_unprepared_txn_db.h" | |||
12 | ||||
13 | #ifndef __STDC_FORMAT_MACROS | |||
14 | #define __STDC_FORMAT_MACROS | |||
15 | #endif | |||
16 | ||||
17 | namespace rocksdb { | |||
18 | ||||
19 | bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { | |||
20 | auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); | |||
21 | ||||
22 | // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is | |||
23 | // in unprep_seqs, we have to check if seq is equal to prep_seq or any of | |||
24 | // the prepare_batch_cnt seq nums after it. | |||
25 | // | |||
26 | // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is | |||
27 | // large. | |||
28 | for (const auto& it : unprep_seqs) { | |||
29 | if (it.first <= seq && seq < it.first + it.second) { | |||
30 | return true; | |||
31 | } | |||
32 | } | |||
33 | ||||
34 | return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); | |||
| ||||
35 | } | |||
36 | ||||
37 | SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( | |||
38 | WriteUnpreparedTxn* txn) { | |||
39 | auto unprep_seqs = txn->GetUnpreparedSequenceNumbers(); | |||
40 | if (unprep_seqs.size()) { | |||
41 | return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; | |||
42 | } | |||
43 | return 0; | |||
44 | } | |||
45 | ||||
46 | WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, | |||
47 | const WriteOptions& write_options, | |||
48 | const TransactionOptions& txn_options) | |||
49 | : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) { | |||
50 | max_write_batch_size_ = txn_options.max_write_batch_size; | |||
51 | // We set max bytes to zero so that we don't get a memory limit error. | |||
52 | // Instead of trying to keep write batch strictly under the size limit, we | |||
53 | // just flush to DB when the limit is exceeded in write unprepared, to avoid | |||
54 | // having retry logic. This also allows very big key-value pairs that exceed | |||
55 | // max bytes to succeed. | |||
56 | write_batch_.SetMaxBytes(0); | |||
57 | } | |||
58 | ||||
59 | WriteUnpreparedTxn::~WriteUnpreparedTxn() { | |||
60 | if (!unprep_seqs_.empty()) { | |||
61 | assert(log_number_ > 0)(static_cast<void> (0)); | |||
62 | assert(GetId() > 0)(static_cast<void> (0)); | |||
63 | assert(!name_.empty())(static_cast<void> (0)); | |||
64 | ||||
65 | // We should rollback regardless of GetState, but some unit tests that | |||
66 | // test crash recovery run the destructor assuming that rollback does not | |||
67 | // happen, so that rollback during recovery can be exercised. | |||
68 | if (GetState() == STARTED) { | |||
69 | auto s __attribute__((__unused__)) = RollbackInternal(); | |||
70 | // TODO(lth): Better error handling. | |||
71 | assert(s.ok())(static_cast<void> (0)); | |||
72 | dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( | |||
73 | log_number_); | |||
74 | } | |||
75 | } | |||
76 | } | |||
77 | ||||
78 | void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { | |||
79 | PessimisticTransaction::Initialize(txn_options); | |||
80 | max_write_batch_size_ = txn_options.max_write_batch_size; | |||
81 | write_batch_.SetMaxBytes(0); | |||
82 | unprep_seqs_.clear(); | |||
83 | write_set_keys_.clear(); | |||
84 | } | |||
85 | ||||
86 | Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, | |||
87 | const Slice& key, const Slice& value, | |||
88 | const bool assume_tracked) { | |||
89 | Status s = MaybeFlushWriteBatchToDB(); | |||
90 | if (!s.ok()) { | |||
91 | return s; | |||
92 | } | |||
93 | return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); | |||
94 | } | |||
95 | ||||
96 | Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, | |||
97 | const SliceParts& key, const SliceParts& value, | |||
98 | const bool assume_tracked) { | |||
99 | Status s = MaybeFlushWriteBatchToDB(); | |||
100 | if (!s.ok()) { | |||
101 | return s; | |||
102 | } | |||
103 | return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); | |||
104 | } | |||
105 | ||||
106 | Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family, | |||
107 | const Slice& key, const Slice& value, | |||
108 | const bool assume_tracked) { | |||
109 | Status s = MaybeFlushWriteBatchToDB(); | |||
110 | if (!s.ok()) { | |||
111 | return s; | |||
112 | } | |||
113 | return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked); | |||
114 | } | |||
115 | ||||
116 | Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, | |||
117 | const Slice& key, const bool assume_tracked) { | |||
118 | Status s = MaybeFlushWriteBatchToDB(); | |||
119 | if (!s.ok()) { | |||
120 | return s; | |||
121 | } | |||
122 | return TransactionBaseImpl::Delete(column_family, key, assume_tracked); | |||
123 | } | |||
124 | ||||
125 | Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, | |||
126 | const SliceParts& key, | |||
127 | const bool assume_tracked) { | |||
128 | Status s = MaybeFlushWriteBatchToDB(); | |||
129 | if (!s.ok()) { | |||
130 | return s; | |||
131 | } | |||
132 | return TransactionBaseImpl::Delete(column_family, key, assume_tracked); | |||
133 | } | |||
134 | ||||
135 | Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, | |||
136 | const Slice& key, | |||
137 | const bool assume_tracked) { | |||
138 | Status s = MaybeFlushWriteBatchToDB(); | |||
139 | if (!s.ok()) { | |||
140 | return s; | |||
141 | } | |||
142 | return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); | |||
143 | } | |||
144 | ||||
145 | Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, | |||
146 | const SliceParts& key, | |||
147 | const bool assume_tracked) { | |||
148 | Status s = MaybeFlushWriteBatchToDB(); | |||
149 | if (!s.ok()) { | |||
150 | return s; | |||
151 | } | |||
152 | return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); | |||
153 | } | |||
154 | ||||
155 | Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { | |||
156 | const bool kPrepared = true; | |||
157 | Status s; | |||
158 | if (max_write_batch_size_ != 0 && | |||
159 | write_batch_.GetDataSize() > max_write_batch_size_) { | |||
160 | assert(GetState() != PREPARED)(static_cast<void> (0)); | |||
161 | s = FlushWriteBatchToDB(!kPrepared); | |||
162 | } | |||
163 | return s; | |||
164 | } | |||
165 | ||||
166 | void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) { | |||
167 | // TODO(lth): write_set_keys_ can just be a std::string instead of a vector. | |||
168 | write_set_keys_[cfid].push_back(key.ToString()); | |||
169 | } | |||
170 | ||||
171 | Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { | |||
172 | if (name_.empty()) { | |||
173 | return Status::InvalidArgument("Cannot write to DB without SetName."); | |||
174 | } | |||
175 | ||||
176 | // Update write_key_set_ for rollback purposes. | |||
177 | KeySetBuilder keyset_handler( | |||
178 | this, wupt_db_->txn_db_options_.rollback_merge_operands); | |||
179 | auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler); | |||
180 | assert(s.ok())(static_cast<void> (0)); | |||
181 | if (!s.ok()) { | |||
182 | return s; | |||
183 | } | |||
184 | ||||
185 | // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. | |||
186 | WriteOptions write_options = write_options_; | |||
187 | write_options.disableWAL = false; | |||
188 | const bool WRITE_AFTER_COMMIT = true; | |||
189 | const bool first_prepare_batch = log_number_ == 0; | |||
190 | // MarkEndPrepare will change Noop marker to the appropriate marker. | |||
191 | WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, | |||
192 | !WRITE_AFTER_COMMIT, !prepared); | |||
193 | // For each duplicate key we account for a new sub-batch | |||
194 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); | |||
195 | // AddPrepared better to be called in the pre-release callback otherwise there | |||
196 | // is a non-zero chance of max advancing prepare_seq and readers assume the | |||
197 | // data as committed. | |||
198 | // Also having it in the PreReleaseCallback allows in-order addition of | |||
199 | // prepared entries to PreparedHeap and hence enables an optimization. Refer | |||
200 | // to SmallestUnCommittedSeq for more details. | |||
201 | AddPreparedCallback add_prepared_callback( | |||
202 | wpt_db_, db_impl_, prepare_batch_cnt_, | |||
203 | db_impl_->immutable_db_options().two_write_queues, first_prepare_batch); | |||
204 | const bool DISABLE_MEMTABLE = true; | |||
205 | uint64_t seq_used = kMaxSequenceNumber; | |||
206 | // log_number_ should refer to the oldest log containing uncommitted data | |||
207 | // from the current transaction. This means that if log_number_ is set, | |||
208 | // WriteImpl should not overwrite that value, so set log_used to nullptr if | |||
209 | // log_number_ is already set. | |||
210 | uint64_t* log_used = log_number_ ? nullptr : &log_number_; | |||
211 | s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), | |||
212 | /*callback*/ nullptr, log_used, /*log ref*/ | |||
213 | 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, | |||
214 | &add_prepared_callback); | |||
215 | assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0)); | |||
216 | auto prepare_seq = seq_used; | |||
217 | ||||
218 | // Only call SetId if it hasn't been set yet. | |||
219 | if (GetId() == 0) { | |||
220 | SetId(prepare_seq); | |||
221 | } | |||
222 | // unprep_seqs_ will also contain prepared seqnos since they are treated in | |||
223 | // the same way in the prepare/commit callbacks. See the comment on the | |||
224 | // definition of unprep_seqs_. | |||
225 | unprep_seqs_[prepare_seq] = prepare_batch_cnt_; | |||
226 | ||||
227 | // Reset transaction state. | |||
228 | if (!prepared) { | |||
229 | prepare_batch_cnt_ = 0; | |||
230 | write_batch_.Clear(); | |||
231 | WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); | |||
232 | } | |||
233 | ||||
234 | return s; | |||
235 | } | |||
236 | ||||
237 | Status WriteUnpreparedTxn::PrepareInternal() { | |||
238 | const bool kPrepared = true; | |||
239 | return FlushWriteBatchToDB(kPrepared); | |||
240 | } | |||
241 | ||||
242 | Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() { | |||
243 | if (unprep_seqs_.empty()) { | |||
244 | assert(log_number_ == 0)(static_cast<void> (0)); | |||
245 | assert(GetId() == 0)(static_cast<void> (0)); | |||
246 | return WritePreparedTxn::CommitWithoutPrepareInternal(); | |||
247 | } | |||
248 | ||||
249 | // TODO(lth): We should optimize commit without prepare to not perform | |||
250 | // a prepare under the hood. | |||
251 | auto s = PrepareInternal(); | |||
252 | if (!s.ok()) { | |||
253 | return s; | |||
254 | } | |||
255 | return CommitInternal(); | |||
256 | } | |||
257 | ||||
258 | Status WriteUnpreparedTxn::CommitInternal() { | |||
259 | // TODO(lth): Reduce duplicate code with WritePrepared commit logic. | |||
260 | ||||
261 | // We take the commit-time batch and append the Commit marker. The Memtable | |||
262 | // will ignore the Commit marker in non-recovery mode | |||
263 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); | |||
264 | const bool empty = working_batch->Count() == 0; | |||
265 | WriteBatchInternal::MarkCommit(working_batch, name_); | |||
266 | ||||
267 | const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; | |||
268 | if (!empty && for_recovery) { | |||
269 | // When not writing to memtable, we can still cache the latest write batch. | |||
270 | // The cached batch will be written to memtable in WriteRecoverableState | |||
271 | // during FlushMemTable | |||
272 | WriteBatchInternal::SetAsLastestPersistentState(working_batch); | |||
273 | } | |||
274 | ||||
275 | const bool includes_data = !empty && !for_recovery; | |||
276 | size_t commit_batch_cnt = 0; | |||
277 | if (UNLIKELY(includes_data)(__builtin_expect((includes_data), 0))) { | |||
278 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, db_impl_->immutable_db_options ().info_log, ("[%s:" "279" "] " "Duplicate key overhead"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc" )) | |||
279 | "Duplicate key overhead")rocksdb::Log(InfoLogLevel::WARN_LEVEL, db_impl_->immutable_db_options ().info_log, ("[%s:" "279" "] " "Duplicate key overhead"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc" )); | |||
280 | SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); | |||
281 | auto s = working_batch->Iterate(&counter); | |||
282 | assert(s.ok())(static_cast<void> (0)); | |||
283 | commit_batch_cnt = counter.BatchCount(); | |||
284 | } | |||
285 | const bool disable_memtable = !includes_data; | |||
286 | const bool do_one_write = | |||
287 | !db_impl_->immutable_db_options().two_write_queues || disable_memtable; | |||
288 | const bool publish_seq = do_one_write; | |||
289 | // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to | |||
290 | // DB in one shot. min_uncommitted still works since it requires capturing | |||
291 | // data that is written to DB but not yet committed, while | |||
292 | // CommitTimeWriteBatch commits with PreReleaseCallback. | |||
293 | WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( | |||
294 | wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt, publish_seq); | |||
295 | uint64_t seq_used = kMaxSequenceNumber; | |||
296 | // Since the prepared batch is directly written to memtable, there is already | |||
297 | // a connection between the memtable and its WAL, so there is no need to | |||
298 | // redundantly reference the log that contains the prepared data. | |||
299 | const uint64_t zero_log_number = 0ull; | |||
300 | size_t batch_cnt = UNLIKELY(commit_batch_cnt)(__builtin_expect((commit_batch_cnt), 0)) ? commit_batch_cnt : 1; | |||
301 | auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, | |||
302 | zero_log_number, disable_memtable, &seq_used, | |||
303 | batch_cnt, &update_commit_map); | |||
304 | assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0)); | |||
305 | if (LIKELY(do_one_write || !s.ok())(__builtin_expect((do_one_write || !s.ok()), 1))) { | |||
306 | if (LIKELY(s.ok())(__builtin_expect((s.ok()), 1))) { | |||
307 | // Note RemovePrepared should be called after WriteImpl that publishsed | |||
308 | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |||
309 | for (const auto& seq : unprep_seqs_) { | |||
310 | wpt_db_->RemovePrepared(seq.first, seq.second); | |||
311 | } | |||
312 | } | |||
313 | unprep_seqs_.clear(); | |||
314 | write_set_keys_.clear(); | |||
315 | return s; | |||
316 | } // else do the 2nd write to publish seq | |||
317 | // Note: the 2nd write comes with a performance penality. So if we have too | |||
318 | // many of commits accompanied with ComitTimeWriteBatch and yet we cannot | |||
319 | // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, | |||
320 | // two_write_queues should be disabled to avoid many additional writes here. | |||
321 | class PublishSeqPreReleaseCallback : public PreReleaseCallback { | |||
322 | public: | |||
323 | explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) | |||
324 | : db_impl_(db_impl) {} | |||
325 | Status Callback(SequenceNumber seq, | |||
326 | bool is_mem_disabled __attribute__((__unused__)), | |||
327 | uint64_t) override { | |||
328 | assert(is_mem_disabled)(static_cast<void> (0)); | |||
329 | assert(db_impl_->immutable_db_options().two_write_queues)(static_cast<void> (0)); | |||
330 | db_impl_->SetLastPublishedSequence(seq); | |||
331 | return Status::OK(); | |||
332 | } | |||
333 | ||||
334 | private: | |||
335 | DBImpl* db_impl_; | |||
336 | } publish_seq_callback(db_impl_); | |||
337 | WriteBatch empty_batch; | |||
338 | empty_batch.PutLogData(Slice()); | |||
339 | // In the absence of Prepare markers, use Noop as a batch separator | |||
340 | WriteBatchInternal::InsertNoop(&empty_batch); | |||
341 | const bool DISABLE_MEMTABLE = true; | |||
342 | const size_t ONE_BATCH = 1; | |||
343 | const uint64_t NO_REF_LOG = 0; | |||
344 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |||
345 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |||
346 | &publish_seq_callback); | |||
347 | assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0)); | |||
348 | // Note RemovePrepared should be called after WriteImpl that publishsed the | |||
349 | // seq. Otherwise SmallestUnCommittedSeq optimization breaks. | |||
350 | for (const auto& seq : unprep_seqs_) { | |||
351 | wpt_db_->RemovePrepared(seq.first, seq.second); | |||
352 | } | |||
353 | unprep_seqs_.clear(); | |||
354 | write_set_keys_.clear(); | |||
355 | return s; | |||
356 | } | |||
357 | ||||
358 | Status WriteUnpreparedTxn::RollbackInternal() { | |||
359 | // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. | |||
360 | WriteBatchWithIndex rollback_batch( | |||
361 | wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); | |||
362 | assert(GetId() != kMaxSequenceNumber)(static_cast<void> (0)); | |||
363 | assert(GetId() > 0)(static_cast<void> (0)); | |||
364 | const auto& cf_map = *wupt_db_->GetCFHandleMap(); | |||
365 | auto read_at_seq = kMaxSequenceNumber; | |||
366 | Status s; | |||
367 | ||||
368 | ReadOptions roptions; | |||
369 | // Note that we do not use WriteUnpreparedTxnReadCallback because we do not | |||
370 | // need to read our own writes when reading prior versions of the key for | |||
371 | // rollback. | |||
372 | WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); | |||
373 | for (const auto& cfkey : write_set_keys_) { | |||
374 | const auto cfid = cfkey.first; | |||
375 | const auto& keys = cfkey.second; | |||
376 | for (const auto& key : keys) { | |||
377 | const auto& cf_handle = cf_map.at(cfid); | |||
378 | PinnableSlice pinnable_val; | |||
379 | bool not_used; | |||
380 | s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, | |||
381 | &callback); | |||
382 | ||||
383 | if (s.ok()) { | |||
384 | s = rollback_batch.Put(cf_handle, key, pinnable_val); | |||
385 | assert(s.ok())(static_cast<void> (0)); | |||
386 | } else if (s.IsNotFound()) { | |||
387 | s = rollback_batch.Delete(cf_handle, key); | |||
388 | assert(s.ok())(static_cast<void> (0)); | |||
389 | } else { | |||
390 | return s; | |||
391 | } | |||
392 | } | |||
393 | } | |||
394 | ||||
395 | // The Rollback marker will be used as a batch separator | |||
396 | WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_); | |||
397 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; | |||
398 | const bool DISABLE_MEMTABLE = true; | |||
399 | const uint64_t NO_REF_LOG = 0; | |||
400 | uint64_t seq_used = kMaxSequenceNumber; | |||
401 | // TODO(lth): We write rollback batch all in a single batch here, but this | |||
402 | // should be subdivded into multiple batches as well. In phase 2, when key | |||
403 | // sets are read from WAL, this will happen naturally. | |||
404 | const size_t ONE_BATCH = 1; | |||
405 | // We commit the rolled back prepared batches. ALthough this is | |||
406 | // counter-intuitive, i) it is safe to do so, since the prepared batches are | |||
407 | // already canceled out by the rollback batch, ii) adding the commit entry to | |||
408 | // CommitCache will allow us to benefit from the existing mechanism in | |||
409 | // CommitCache that keeps an entry evicted due to max advance and yet overlaps | |||
410 | // with a live snapshot around so that the live snapshot properly skips the | |||
411 | // entry even if its prepare seq is lower than max_evicted_seq_. | |||
412 | WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( | |||
413 | wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH); | |||
414 | // Note: the rollback batch does not need AddPrepared since it is written to | |||
415 | // DB in one shot. min_uncommitted still works since it requires capturing | |||
416 | // data that is written to DB but not yet committed, while the roolback | |||
417 | // batch commits with PreReleaseCallback. | |||
418 | s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(), | |||
419 | nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, | |||
420 | &seq_used, rollback_batch.SubBatchCnt(), | |||
421 | do_one_write ? &update_commit_map : nullptr); | |||
422 | assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0)); | |||
423 | if (!s.ok()) { | |||
424 | return s; | |||
425 | } | |||
426 | if (do_one_write) { | |||
427 | for (const auto& seq : unprep_seqs_) { | |||
428 | wpt_db_->RemovePrepared(seq.first, seq.second); | |||
429 | } | |||
430 | unprep_seqs_.clear(); | |||
431 | write_set_keys_.clear(); | |||
432 | return s; | |||
433 | } // else do the 2nd write for commit | |||
434 | uint64_t& prepare_seq = seq_used; | |||
435 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,; | |||
436 | "RollbackInternal 2nd write prepare_seq: %" PRIu64,; | |||
437 | prepare_seq);; | |||
438 | // Commit the batch by writing an empty batch to the queue that will release | |||
439 | // the commit sequence number to readers. | |||
440 | WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare( | |||
441 | wpt_db_, db_impl_, unprep_seqs_, prepare_seq); | |||
442 | WriteBatch empty_batch; | |||
443 | empty_batch.PutLogData(Slice()); | |||
444 | // In the absence of Prepare markers, use Noop as a batch separator | |||
445 | WriteBatchInternal::InsertNoop(&empty_batch); | |||
446 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, | |||
447 | NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, | |||
448 | &update_commit_map_with_prepare); | |||
449 | assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0)); | |||
450 | // Mark the txn as rolled back | |||
451 | if (s.ok()) { | |||
452 | for (const auto& seq : unprep_seqs_) { | |||
453 | wpt_db_->RemovePrepared(seq.first, seq.second); | |||
454 | } | |||
455 | } | |||
456 | ||||
457 | unprep_seqs_.clear(); | |||
458 | write_set_keys_.clear(); | |||
459 | return s; | |||
460 | } | |||
461 | ||||
462 | Status WriteUnpreparedTxn::Get(const ReadOptions& options, | |||
463 | ColumnFamilyHandle* column_family, | |||
464 | const Slice& key, PinnableSlice* value) { | |||
465 | auto snapshot = options.snapshot; | |||
466 | auto snap_seq = | |||
467 | snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; | |||
468 | SequenceNumber min_uncommitted = | |||
469 | kMinUnCommittedSeq; // by default disable the optimization | |||
470 | if (snapshot != nullptr) { | |||
471 | min_uncommitted = | |||
472 | static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) | |||
473 | ->min_uncommitted_; | |||
474 | } | |||
475 | ||||
476 | WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, | |||
477 | this); | |||
478 | return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, | |||
479 | &callback); | |||
480 | } | |||
481 | ||||
482 | Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { | |||
483 | return GetIterator(options, wupt_db_->DefaultColumnFamily()); | |||
484 | } | |||
485 | ||||
486 | Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, | |||
487 | ColumnFamilyHandle* column_family) { | |||
488 | // Make sure to get iterator from WriteUnprepareTxnDB, not the root db. | |||
489 | Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); | |||
490 | assert(db_iter)(static_cast<void> (0)); | |||
491 | ||||
492 | return write_batch_.NewIteratorWithBase(column_family, db_iter); | |||
493 | } | |||
494 | ||||
495 | const std::map<SequenceNumber, size_t>& | |||
496 | WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() { | |||
497 | return unprep_seqs_; | |||
498 | } | |||
499 | ||||
500 | } // namespace rocksdb | |||
501 | ||||
502 | #endif // ROCKSDB_LITE |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |||
2 | // This source code is licensed under both the GPLv2 (found in the | |||
3 | // COPYING file in the root directory) and Apache 2.0 License | |||
4 | // (found in the LICENSE.Apache file in the root directory). | |||
5 | ||||
6 | #pragma once | |||
7 | #ifndef ROCKSDB_LITE | |||
8 | ||||
9 | #ifndef __STDC_FORMAT_MACROS | |||
10 | #define __STDC_FORMAT_MACROS | |||
11 | #endif | |||
12 | ||||
13 | #include <inttypes.h> | |||
14 | #include <mutex> | |||
15 | #include <queue> | |||
16 | #include <set> | |||
17 | #include <string> | |||
18 | #include <unordered_map> | |||
19 | #include <vector> | |||
20 | ||||
21 | #include "db/db_iter.h" | |||
22 | #include "db/pre_release_callback.h" | |||
23 | #include "db/read_callback.h" | |||
24 | #include "db/snapshot_checker.h" | |||
25 | #include "rocksdb/db.h" | |||
26 | #include "rocksdb/options.h" | |||
27 | #include "rocksdb/utilities/transaction_db.h" | |||
28 | #include "util/set_comparator.h" | |||
29 | #include "util/string_util.h" | |||
30 | #include "utilities/transactions/pessimistic_transaction.h" | |||
31 | #include "utilities/transactions/pessimistic_transaction_db.h" | |||
32 | #include "utilities/transactions/transaction_lock_mgr.h" | |||
33 | #include "utilities/transactions/write_prepared_txn.h" | |||
34 | ||||
35 | namespace rocksdb { | |||
36 | ||||
37 | // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. | |||
38 | // In this way some data in the DB might not be committed. The DB provides | |||
39 | // mechanisms to tell such data apart from committed data. | |||
40 | class WritePreparedTxnDB : public PessimisticTransactionDB { | |||
41 | public: | |||
42 | explicit WritePreparedTxnDB(DB* db, | |||
43 | const TransactionDBOptions& txn_db_options) | |||
44 | : PessimisticTransactionDB(db, txn_db_options), | |||
45 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), | |||
46 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), | |||
47 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), | |||
48 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), | |||
49 | FORMAT(COMMIT_CACHE_BITS) { | |||
50 | Init(txn_db_options); | |||
51 | } | |||
52 | ||||
53 | explicit WritePreparedTxnDB(StackableDB* db, | |||
54 | const TransactionDBOptions& txn_db_options) | |||
55 | : PessimisticTransactionDB(db, txn_db_options), | |||
56 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), | |||
57 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), | |||
58 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), | |||
59 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), | |||
60 | FORMAT(COMMIT_CACHE_BITS) { | |||
61 | Init(txn_db_options); | |||
62 | } | |||
63 | ||||
64 | virtual ~WritePreparedTxnDB(); | |||
65 | ||||
66 | virtual Status Initialize( | |||
67 | const std::vector<size_t>& compaction_enabled_cf_indices, | |||
68 | const std::vector<ColumnFamilyHandle*>& handles) override; | |||
69 | ||||
70 | Transaction* BeginTransaction(const WriteOptions& write_options, | |||
71 | const TransactionOptions& txn_options, | |||
72 | Transaction* old_txn) override; | |||
73 | ||||
74 | // Optimized version of ::Write that receives more optimization request such | |||
75 | // as skip_concurrency_control. | |||
76 | using PessimisticTransactionDB::Write; | |||
77 | Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&, | |||
78 | WriteBatch* updates) override; | |||
79 | ||||
80 | // Write the batch to the underlying DB and mark it as committed. Could be | |||
81 | // used by both directly from TxnDB or through a transaction. | |||
82 | Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch, | |||
83 | size_t batch_cnt, WritePreparedTxn* txn); | |||
84 | ||||
85 | using DB::Get; | |||
86 | virtual Status Get(const ReadOptions& options, | |||
87 | ColumnFamilyHandle* column_family, const Slice& key, | |||
88 | PinnableSlice* value) override; | |||
89 | ||||
90 | using DB::MultiGet; | |||
91 | virtual std::vector<Status> MultiGet( | |||
92 | const ReadOptions& options, | |||
93 | const std::vector<ColumnFamilyHandle*>& column_family, | |||
94 | const std::vector<Slice>& keys, | |||
95 | std::vector<std::string>* values) override; | |||
96 | ||||
97 | using DB::NewIterator; | |||
98 | virtual Iterator* NewIterator(const ReadOptions& options, | |||
99 | ColumnFamilyHandle* column_family) override; | |||
100 | ||||
101 | using DB::NewIterators; | |||
102 | virtual Status NewIterators( | |||
103 | const ReadOptions& options, | |||
104 | const std::vector<ColumnFamilyHandle*>& column_families, | |||
105 | std::vector<Iterator*>* iterators) override; | |||
106 | ||||
107 | // Check whether the transaction that wrote the value with sequence number seq | |||
108 | // is visible to the snapshot with sequence number snapshot_seq. | |||
109 | // Returns true if commit_seq <= snapshot_seq | |||
110 | // If the snapshot_seq is already released and snapshot_seq <= max, sets | |||
111 | // *snap_released to true and returns true as well. | |||
112 | inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, | |||
113 | uint64_t min_uncommitted = kMinUnCommittedSeq, | |||
114 | bool* snap_released = nullptr) const { | |||
115 | ROCKS_LOG_DETAILS(info_log_,; | |||
116 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
117 | " min_uncommitted %" PRIu64,; | |||
118 | prep_seq, snapshot_seq, min_uncommitted);; | |||
119 | assert(min_uncommitted >= kMinUnCommittedSeq)(static_cast<void> (0)); | |||
120 | // Caller is responsible to initialize snap_released. | |||
121 | assert(snap_released == nullptr || *snap_released == false)(static_cast<void> (0)); | |||
122 | // Here we try to infer the return value without looking into prepare list. | |||
123 | // This would help avoiding synchronization over a shared map. | |||
124 | // TODO(myabandeh): optimize this. This sequence of checks must be correct | |||
125 | // but not necessary efficient | |||
126 | if (prep_seq == 0) { | |||
127 | // Compaction will output keys to bottom-level with sequence number 0 if | |||
128 | // it is visible to the earliest snapshot. | |||
129 | ROCKS_LOG_DETAILS(; | |||
130 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
131 | prep_seq, snapshot_seq, 1);; | |||
132 | return true; | |||
133 | } | |||
134 | if (snapshot_seq < prep_seq) { | |||
135 | // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq | |||
136 | ROCKS_LOG_DETAILS(; | |||
137 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
138 | prep_seq, snapshot_seq, 0);; | |||
139 | return false; | |||
140 | } | |||
141 | if (prep_seq < min_uncommitted) { | |||
142 | ROCKS_LOG_DETAILS(info_log_,; | |||
143 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
144 | " returns %" PRId32; | |||
145 | " because of min_uncommitted %" PRIu64,; | |||
146 | prep_seq, snapshot_seq, 1, min_uncommitted);; | |||
147 | return true; | |||
148 | } | |||
149 | // Commit of delayed prepared has two non-atomic steps: add to commit cache, | |||
150 | // remove from delayed prepared. Our reads from these two is also | |||
151 | // non-atomic. By looking into commit cache first thus we might not find the | |||
152 | // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) | |||
153 | // we check if there was any delayed prepared BEFORE looking into commit | |||
154 | // cache, ii) if there was, we complete the search steps to be these: i) | |||
155 | // commit cache, ii) delayed prepared, commit cache again. In this way if | |||
156 | // the first query to commit cache missed the commit, the 2nd will catch it. | |||
157 | bool was_empty; | |||
158 | SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub; | |||
159 | CommitEntry64b dont_care; | |||
160 | auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; | |||
161 | size_t repeats = 0; | |||
162 | do { | |||
163 | repeats++; | |||
164 | assert(repeats < 100)(static_cast<void> (0)); | |||
165 | if (UNLIKELY(repeats >= 100)(__builtin_expect((repeats >= 100), 0))) { | |||
166 | throw std::runtime_error( | |||
167 | "The read was intrupted 100 times by update to max_evicted_seq_. " | |||
168 | "This is unexpected in all setups"); | |||
169 | } | |||
170 | max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire); | |||
171 | TEST_SYNC_POINT( | |||
172 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause"); | |||
173 | TEST_SYNC_POINT( | |||
174 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"); | |||
175 | was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); | |||
176 | TEST_SYNC_POINT( | |||
177 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause"); | |||
178 | TEST_SYNC_POINT( | |||
179 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"); | |||
180 | CommitEntry cached; | |||
181 | bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |||
182 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); | |||
183 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); | |||
184 | if (exist && prep_seq == cached.prep_seq) { | |||
185 | // It is committed and also not evicted from commit cache | |||
186 | ROCKS_LOG_DETAILS(; | |||
187 | info_log_,; | |||
188 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
189 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);; | |||
190 | return cached.commit_seq <= snapshot_seq; | |||
191 | } | |||
192 | // else it could be committed but not inserted in the map which could | |||
193 | // happen after recovery, or it could be committed and evicted by another | |||
194 | // commit, or never committed. | |||
195 | ||||
196 | // At this point we dont know if it was committed or it is still prepared | |||
197 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); | |||
198 | if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)(__builtin_expect((max_evicted_seq_lb != max_evicted_seq_ub), 0))) { | |||
199 | continue; | |||
200 | } | |||
201 | // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub | |||
202 | if (max_evicted_seq_ub < prep_seq) { | |||
203 | // Not evicted from cache and also not present, so must be still | |||
204 | // prepared | |||
205 | ROCKS_LOG_DETAILS(info_log_,; | |||
206 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
207 | " returns %" PRId32,; | |||
208 | prep_seq, snapshot_seq, 0);; | |||
209 | return false; | |||
210 | } | |||
211 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause"); | |||
212 | TEST_SYNC_POINT( | |||
213 | "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"); | |||
214 | if (!was_empty) { | |||
215 | // We should not normally reach here | |||
216 | WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); | |||
217 | ReadLock rl(&prepared_mutex_); | |||
218 | ROCKS_LOG_WARN(rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220" "] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h" ), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq ) | |||
219 | info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220" "] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h" ), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq ) | |||
220 | static_cast<uint64_t>(delayed_prepared_.size()), prep_seq)rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220" "] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h" ), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq ); | |||
221 | if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { | |||
222 | // This is the order: 1) delayed_prepared_commits_ update, 2) publish | |||
223 | // 3) delayed_prepared_ clean up. So check if it is the case of a late | |||
224 | // clenaup. | |||
225 | auto it = delayed_prepared_commits_.find(prep_seq); | |||
226 | if (it == delayed_prepared_commits_.end()) { | |||
227 | // Then it is not committed yet | |||
228 | ROCKS_LOG_DETAILS(info_log_,; | |||
229 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
230 | " returns %" PRId32,; | |||
231 | prep_seq, snapshot_seq, 0);; | |||
232 | return false; | |||
233 | } else { | |||
234 | ROCKS_LOG_DETAILS(info_log_,; | |||
235 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
236 | " commit: %" PRIu64 " returns %" PRId32,; | |||
237 | prep_seq, snapshot_seq, it->second,; | |||
238 | snapshot_seq <= it->second);; | |||
239 | return it->second <= snapshot_seq; | |||
240 | } | |||
241 | } else { | |||
242 | // 2nd query to commit cache. Refer to was_empty comment above. | |||
243 | exist = GetCommitEntry(indexed_seq, &dont_care, &cached); | |||
244 | if (exist && prep_seq == cached.prep_seq) { | |||
245 | ROCKS_LOG_DETAILS(; | |||
246 | info_log_,; | |||
247 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
248 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);; | |||
249 | return cached.commit_seq <= snapshot_seq; | |||
250 | } | |||
251 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); | |||
252 | } | |||
253 | } | |||
254 | } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)(__builtin_expect((max_evicted_seq_lb != max_evicted_seq_ub), 0))); | |||
255 | // When advancing max_evicted_seq_, we move older entires from prepared to | |||
256 | // delayed_prepared_. Also we move evicted entries from commit cache to | |||
257 | // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= | |||
258 | // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in | |||
259 | // old_commit_map_, iii) committed with no conflict with any snapshot. Case | |||
260 | // (i) delayed_prepared_ is checked above | |||
261 | if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case | |||
262 | // only (iii) is the case: committed | |||
263 | // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < | |||
264 | // snapshot_seq | |||
265 | ROCKS_LOG_DETAILS(; | |||
266 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
267 | prep_seq, snapshot_seq, 1);; | |||
268 | return true; | |||
269 | } | |||
270 | // else (ii) might be the case: check the commit data saved for this | |||
271 | // snapshot. If there was no overlapping commit entry, then it is committed | |||
272 | // with a commit_seq lower than any live snapshot, including snapshot_seq. | |||
273 | if (old_commit_map_empty_.load(std::memory_order_acquire)) { | |||
274 | ROCKS_LOG_DETAILS(info_log_,; | |||
275 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
276 | " returns %" PRId32 " released=1",; | |||
277 | prep_seq, snapshot_seq, 0);; | |||
278 | assert(snap_released)(static_cast<void> (0)); | |||
279 | // This snapshot is not valid anymore. We cannot tell if prep_seq is | |||
280 | // committed before or after the snapshot. Return true but also set | |||
281 | // snap_released to true. | |||
282 | *snap_released = true; | |||
283 | return true; | |||
284 | } | |||
285 | { | |||
286 | // We should not normally reach here unless sapshot_seq is old. This is a | |||
287 | // rare case and it is ok to pay the cost of mutex ReadLock for such old, | |||
288 | // reading transactions. | |||
289 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); | |||
290 | ReadLock rl(&old_commit_map_mutex_); | |||
291 | auto prep_set_entry = old_commit_map_.find(snapshot_seq); | |||
292 | bool found = prep_set_entry != old_commit_map_.end(); | |||
293 | if (found) { | |||
294 | auto& vec = prep_set_entry->second; | |||
295 | found = std::binary_search(vec.begin(), vec.end(), prep_seq); | |||
296 | } else { | |||
297 | // coming from compaction | |||
298 | ROCKS_LOG_DETAILS(info_log_,; | |||
299 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
300 | " returns %" PRId32 " released=1",; | |||
301 | prep_seq, snapshot_seq, 0);; | |||
302 | // This snapshot is not valid anymore. We cannot tell if prep_seq is | |||
303 | // committed before or after the snapshot. Return true but also set | |||
304 | // snap_released to true. | |||
305 | assert(snap_released)(static_cast<void> (0)); | |||
306 | *snap_released = true; | |||
| ||||
307 | return true; | |||
308 | } | |||
309 | ||||
310 | if (!found) { | |||
311 | ROCKS_LOG_DETAILS(info_log_,; | |||
312 | "IsInSnapshot %" PRIu64 " in %" PRIu64; | |||
313 | " returns %" PRId32,; | |||
314 | prep_seq, snapshot_seq, 1);; | |||
315 | return true; | |||
316 | } | |||
317 | } | |||
318 | // (ii) it the case: it is committed but after the snapshot_seq | |||
319 | ROCKS_LOG_DETAILS(; | |||
320 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,; | |||
321 | prep_seq, snapshot_seq, 0);; | |||
322 | return false; | |||
323 | } | |||
324 | ||||
325 | // Add the transaction with prepare sequence seq to the prepared list. | |||
326 | // Note: must be called serially with increasing seq on each call. | |||
327 | void AddPrepared(uint64_t seq); | |||
328 | // Check if any of the prepared txns are less than new max_evicted_seq_. Must | |||
329 | // be called with prepared_mutex_ write locked. | |||
330 | void CheckPreparedAgainstMax(SequenceNumber new_max); | |||
331 | // Remove the transaction with prepare sequence seq from the prepared list | |||
332 | void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); | |||
333 | // Add the transaction with prepare sequence prepare_seq and commit sequence | |||
334 | // commit_seq to the commit map. loop_cnt is to detect infinite loops. | |||
335 | // Note: must be called serially. | |||
336 | void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, | |||
337 | uint8_t loop_cnt = 0); | |||
338 | ||||
339 | struct CommitEntry { | |||
340 | uint64_t prep_seq; | |||
341 | uint64_t commit_seq; | |||
342 | CommitEntry() : prep_seq(0), commit_seq(0) {} | |||
343 | CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} | |||
344 | bool operator==(const CommitEntry& rhs) const { | |||
345 | return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; | |||
346 | } | |||
347 | }; | |||
348 | ||||
349 | struct CommitEntry64bFormat { | |||
350 | explicit CommitEntry64bFormat(size_t index_bits) | |||
351 | : INDEX_BITS(index_bits), | |||
352 | PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)), | |||
353 | COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)), | |||
354 | COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)), | |||
355 | DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {} | |||
356 | // Number of higher bits of a sequence number that is not used. They are | |||
357 | // used to encode the value type, ... | |||
358 | const size_t PAD_BITS = static_cast<size_t>(8); | |||
359 | // Number of lower bits from prepare seq that can be skipped as they are | |||
360 | // implied by the index of the entry in the array | |||
361 | const size_t INDEX_BITS; | |||
362 | // Number of bits we use to encode the prepare seq | |||
363 | const size_t PREP_BITS; | |||
364 | // Number of bits we use to encode the commit seq. | |||
365 | const size_t COMMIT_BITS; | |||
366 | // Filter to encode/decode commit seq | |||
367 | const uint64_t COMMIT_FILTER; | |||
368 | // The value of commit_seq - prepare_seq + 1 must be less than this bound | |||
369 | const uint64_t DELTA_UPPERBOUND; | |||
370 | }; | |||
371 | ||||
372 | // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... | |||
373 | // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... | |||
374 | // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA | |||
375 | // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and | |||
376 | // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the | |||
377 | // bits that do not have to be encoded (will be provided externally) DELTA: | |||
378 | // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of | |||
379 | // index bits + PADs | |||
380 | struct CommitEntry64b { | |||
381 | constexpr CommitEntry64b() noexcept : rep_(0) {} | |||
382 | ||||
383 | CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) | |||
384 | : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} | |||
385 | ||||
386 | CommitEntry64b(const uint64_t ps, const uint64_t cs, | |||
387 | const CommitEntry64bFormat& format) { | |||
388 | assert(ps < static_cast<uint64_t>((static_cast<void> (0)) | |||
389 | (1ull << (format.PREP_BITS + format.INDEX_BITS))))(static_cast<void> (0)); | |||
390 | assert(ps <= cs)(static_cast<void> (0)); | |||
391 | uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 | |||
392 | // zero is reserved for uninitialized entries | |||
393 | assert(0 < delta)(static_cast<void> (0)); | |||
394 | assert(delta < format.DELTA_UPPERBOUND)(static_cast<void> (0)); | |||
395 | if (delta >= format.DELTA_UPPERBOUND) { | |||
396 | throw std::runtime_error( | |||
397 | "commit_seq >> prepare_seq. The allowed distance is " + | |||
398 | ToString(format.DELTA_UPPERBOUND) + " commit_seq is " + | |||
399 | ToString(cs) + " prepare_seq is " + ToString(ps)); | |||
400 | } | |||
401 | rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; | |||
402 | rep_ = rep_ | delta; | |||
403 | } | |||
404 | ||||
405 | // Return false if the entry is empty | |||
406 | bool Parse(const uint64_t indexed_seq, CommitEntry* entry, | |||
407 | const CommitEntry64bFormat& format) { | |||
408 | uint64_t delta = rep_ & format.COMMIT_FILTER; | |||
409 | // zero is reserved for uninitialized entries | |||
410 | assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)))(static_cast<void> (0)); | |||
411 | if (delta == 0) { | |||
412 | return false; // initialized entry would have non-zero delta | |||
413 | } | |||
414 | ||||
415 | assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)))(static_cast<void> (0)); | |||
416 | uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; | |||
417 | prep_up >>= format.PAD_BITS; | |||
418 | const uint64_t& prep_low = indexed_seq; | |||
419 | entry->prep_seq = prep_up | prep_low; | |||
420 | ||||
421 | entry->commit_seq = entry->prep_seq + delta - 1; | |||
422 | return true; | |||
423 | } | |||
424 | ||||
425 | private: | |||
426 | uint64_t rep_; | |||
427 | }; | |||
428 | ||||
429 | // Struct to hold ownership of snapshot and read callback for cleanup. | |||
430 | struct IteratorState; | |||
431 | ||||
432 | std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() { | |||
433 | return cf_map_; | |||
434 | } | |||
435 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() { | |||
436 | return handle_map_; | |||
437 | } | |||
438 | void UpdateCFComparatorMap( | |||
439 | const std::vector<ColumnFamilyHandle*>& handles) override; | |||
440 | void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; | |||
441 | ||||
442 | virtual const Snapshot* GetSnapshot() override; | |||
443 | SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check); | |||
444 | ||||
445 | protected: | |||
446 | virtual Status VerifyCFOptions( | |||
447 | const ColumnFamilyOptions& cf_options) override; | |||
448 | ||||
449 | private: | |||
450 | friend class PreparedHeap_BasicsTest_Test; | |||
451 | friend class PreparedHeap_Concurrent_Test; | |||
452 | friend class PreparedHeap_EmptyAtTheEnd_Test; | |||
453 | friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test; | |||
454 | friend class WritePreparedCommitEntryPreReleaseCallback; | |||
455 | friend class WritePreparedTransactionTestBase; | |||
456 | friend class WritePreparedTxn; | |||
457 | friend class WritePreparedTxnDBMock; | |||
458 | friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test; | |||
459 | friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; | |||
460 | friend class | |||
461 | WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; | |||
462 | friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; | |||
463 | friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; | |||
464 | friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; | |||
465 | friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test; | |||
466 | friend class | |||
467 | WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test; | |||
468 | friend class WritePreparedTransactionTest_CommitMapTest_Test; | |||
469 | friend class WritePreparedTransactionTest_DoubleSnapshot_Test; | |||
470 | friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; | |||
471 | friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; | |||
472 | friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; | |||
473 | friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; | |||
474 | friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; | |||
475 | friend class | |||
476 | WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test; | |||
477 | friend class | |||
478 | WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test; | |||
479 | friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; | |||
480 | friend class WritePreparedTransactionTest_OldCommitMapGC_Test; | |||
481 | friend class WritePreparedTransactionTest_RollbackTest_Test; | |||
482 | friend class WriteUnpreparedTxnDB; | |||
483 | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; | |||
484 | ||||
485 | void Init(const TransactionDBOptions& /* unused */); | |||
486 | ||||
487 | void WPRecordTick(uint32_t ticker_type) const { | |||
488 | RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); | |||
489 | } | |||
490 | ||||
491 | // A heap with the amortized O(1) complexity for erase. It uses one extra heap | |||
492 | // to keep track of erased entries that are not yet on top of the main heap. | |||
493 | class PreparedHeap { | |||
494 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> | |||
495 | heap_; | |||
496 | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> | |||
497 | erased_heap_; | |||
498 | // True when testing crash recovery | |||
499 | bool TEST_CRASH_ = false; | |||
500 | friend class WritePreparedTxnDB; | |||
501 | ||||
502 | public: | |||
503 | ~PreparedHeap() { | |||
504 | if (!TEST_CRASH_) { | |||
505 | assert(heap_.empty())(static_cast<void> (0)); | |||
506 | assert(erased_heap_.empty())(static_cast<void> (0)); | |||
507 | } | |||
508 | } | |||
509 | bool empty() { return heap_.empty(); } | |||
510 | uint64_t top() { return heap_.top(); } | |||
511 | void push(uint64_t v) { heap_.push(v); } | |||
512 | void pop() { | |||
513 | heap_.pop(); | |||
514 | while (!heap_.empty() && !erased_heap_.empty() && | |||
515 | // heap_.top() > erased_heap_.top() could happen if we have erased | |||
516 | // a non-existent entry. Ideally the user should not do that but we | |||
517 | // should be resilient against it. | |||
518 | heap_.top() >= erased_heap_.top()) { | |||
519 | if (heap_.top() == erased_heap_.top()) { | |||
520 | heap_.pop(); | |||
521 | } | |||
522 | uint64_t erased __attribute__((__unused__)); | |||
523 | erased = erased_heap_.top(); | |||
524 | erased_heap_.pop(); | |||
525 | // No duplicate prepare sequence numbers | |||
526 | assert(erased_heap_.empty() || erased_heap_.top() != erased)(static_cast<void> (0)); | |||
527 | } | |||
528 | while (heap_.empty() && !erased_heap_.empty()) { | |||
529 | erased_heap_.pop(); | |||
530 | } | |||
531 | } | |||
532 | void erase(uint64_t seq) { | |||
533 | if (!heap_.empty()) { | |||
534 | if (seq < heap_.top()) { | |||
535 | // Already popped, ignore it. | |||
536 | } else if (heap_.top() == seq) { | |||
537 | pop(); | |||
538 | assert(heap_.empty() || heap_.top() != seq)(static_cast<void> (0)); | |||
539 | } else { // (heap_.top() > seq) | |||
540 | // Down the heap, remember to pop it later | |||
541 | erased_heap_.push(seq); | |||
542 | } | |||
543 | } | |||
544 | } | |||
545 | }; | |||
546 | ||||
547 | void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } | |||
548 | ||||
549 | // Get the commit entry with index indexed_seq from the commit table. It | |||
550 | // returns true if such entry exists. | |||
551 | bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, | |||
552 | CommitEntry* entry) const; | |||
553 | ||||
554 | // Rewrite the entry with the index indexed_seq in the commit table with the | |||
555 | // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction, | |||
556 | // sets the evicted_entry and returns true. | |||
557 | bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, | |||
558 | CommitEntry* evicted_entry); | |||
559 | ||||
560 | // Rewrite the entry with the index indexed_seq in the commit table with the | |||
561 | // commit entry new_entry only if the existing entry matches the | |||
562 | // expected_entry. Returns false otherwise. | |||
563 | bool ExchangeCommitEntry(const uint64_t indexed_seq, | |||
564 | CommitEntry64b& expected_entry, | |||
565 | const CommitEntry& new_entry); | |||
566 | ||||
567 | // Increase max_evicted_seq_ from the previous value prev_max to the new | |||
568 | // value. This also involves taking care of prepared txns that are not | |||
569 | // committed before new_max, as well as updating the list of live snapshots at | |||
570 | // the time of updating the max. Thread-safety: this function can be called | |||
571 | // concurrently. The concurrent invocations of this function is equivalent to | |||
572 | // a serial invocation in which the last invocation is the one with the | |||
573 | // largest new_max value. | |||
574 | void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, | |||
575 | const SequenceNumber& new_max); | |||
576 | ||||
577 | inline SequenceNumber SmallestUnCommittedSeq() { | |||
578 | // Since we update the prepare_heap always from the main write queue via | |||
579 | // PreReleaseCallback, the prepared_txns_.top() indicates the smallest | |||
580 | // prepared data in 2pc transactions. For non-2pc transactions that are | |||
581 | // written in two steps, we also update prepared_txns_ at the first step | |||
582 | // (via the same mechanism) so that their uncommitted data is reflected in | |||
583 | // SmallestUnCommittedSeq. | |||
584 | ReadLock rl(&prepared_mutex_); | |||
585 | // Since we are holding the mutex, and GetLatestSequenceNumber is updated | |||
586 | // after prepared_txns_ are, the value of GetLatestSequenceNumber would | |||
587 | // reflect any uncommitted data that is not added to prepared_txns_ yet. | |||
588 | // Otherwise, if there is no concurrent txn, this value simply reflects that | |||
589 | // latest value in the memtable. | |||
590 | if (!delayed_prepared_.empty()) { | |||
591 | assert(!delayed_prepared_empty_.load())(static_cast<void> (0)); | |||
592 | return *delayed_prepared_.begin(); | |||
593 | } | |||
594 | if (prepared_txns_.empty()) { | |||
595 | return db_impl_->GetLatestSequenceNumber() + 1; | |||
596 | } else { | |||
597 | return std::min(prepared_txns_.top(), | |||
598 | db_impl_->GetLatestSequenceNumber() + 1); | |||
599 | } | |||
600 | } | |||
601 | // Enhance the snapshot object by recording in it the smallest uncommitted seq | |||
602 | inline void EnhanceSnapshot(SnapshotImpl* snapshot, | |||
603 | SequenceNumber min_uncommitted) { | |||
604 | assert(snapshot)(static_cast<void> (0)); | |||
605 | snapshot->min_uncommitted_ = min_uncommitted; | |||
606 | } | |||
607 | ||||
608 | virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( | |||
609 | SequenceNumber max); | |||
610 | ||||
611 | // Will be called by the public ReleaseSnapshot method. Does the maintenance | |||
612 | // internal to WritePreparedTxnDB | |||
613 | void ReleaseSnapshotInternal(const SequenceNumber snap_seq); | |||
614 | ||||
615 | // Update the list of snapshots corresponding to the soon-to-be-updated | |||
616 | // max_evicted_seq_. Thread-safety: this function can be called concurrently. | |||
617 | // The concurrent invocations of this function is equivalent to a serial | |||
618 | // invocation in which the last invocation is the one with the largest | |||
619 | // version value. | |||
620 | void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots, | |||
621 | const SequenceNumber& version); | |||
622 | // Check the new list of new snapshots against the old one to see if any of | |||
623 | // the snapshots are released and to do the cleanup for the released snapshot. | |||
624 | void CleanupReleasedSnapshots( | |||
625 | const std::vector<SequenceNumber>& new_snapshots, | |||
626 | const std::vector<SequenceNumber>& old_snapshots); | |||
627 | ||||
628 | // Check an evicted entry against live snapshots to see if it should be kept | |||
629 | // around or it can be safely discarded (and hence assume committed for all | |||
630 | // snapshots). Thread-safety: this function can be called concurrently. If it | |||
631 | // is called concurrently with multiple UpdateSnapshots, the result is the | |||
632 | // same as checking the intersection of the snapshot list before updates with | |||
633 | // the snapshot list of all the concurrent updates. | |||
634 | void CheckAgainstSnapshots(const CommitEntry& evicted); | |||
635 | ||||
636 | // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < | |||
637 | // commit_seq. Return false if checking the next snapshot(s) is not needed. | |||
638 | // This is the case if none of the next snapshots could satisfy the condition. | |||
639 | // next_is_larger: the next snapshot will be a larger value | |||
640 | bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, | |||
641 | const uint64_t& commit_seq, | |||
642 | const uint64_t& snapshot_seq, | |||
643 | const bool next_is_larger); | |||
644 | ||||
645 | // A trick to increase the last visible sequence number by one and also wait | |||
646 | // for the in-flight commits to be visible. | |||
647 | void AdvanceSeqByOne(); | |||
648 | ||||
649 | // The list of live snapshots at the last time that max_evicted_seq_ advanced. | |||
650 | // The list stored into two data structures: in snapshot_cache_ that is | |||
651 | // efficient for concurrent reads, and in snapshots_ if the data does not fit | |||
652 | // into snapshot_cache_. The total number of snapshots in the two lists | |||
653 | std::atomic<size_t> snapshots_total_ = {}; | |||
654 | // The list sorted in ascending order. Thread-safety for writes is provided | |||
655 | // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for | |||
656 | // each entry. In x86_64 architecture such reads are compiled to simple read | |||
657 | // instructions. | |||
658 | const size_t SNAPSHOT_CACHE_BITS; | |||
659 | const size_t SNAPSHOT_CACHE_SIZE; | |||
660 | std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; | |||
661 | // 2nd list for storing snapshots. The list sorted in ascending order. | |||
662 | // Thread-safety is provided with snapshots_mutex_. | |||
663 | std::vector<SequenceNumber> snapshots_; | |||
664 | // The list of all snapshots: snapshots_ + snapshot_cache_. This list although | |||
665 | // redundant but simplifies CleanupOldSnapshots implementation. | |||
666 | // Thread-safety is provided with snapshots_mutex_. | |||
667 | std::vector<SequenceNumber> snapshots_all_; | |||
668 | // The version of the latest list of snapshots. This can be used to avoid | |||
669 | // rewriting a list that is concurrently updated with a more recent version. | |||
670 | SequenceNumber snapshots_version_ = 0; | |||
671 | ||||
672 | // A heap of prepared transactions. Thread-safety is provided with | |||
673 | // prepared_mutex_. | |||
674 | PreparedHeap prepared_txns_; | |||
675 | const size_t COMMIT_CACHE_BITS; | |||
676 | const size_t COMMIT_CACHE_SIZE; | |||
677 | const CommitEntry64bFormat FORMAT; | |||
678 | // commit_cache_ must be initialized to zero to tell apart an empty index from | |||
679 | // a filled one. Thread-safety is provided with commit_cache_mutex_. | |||
680 | std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_; | |||
681 | // The largest evicted *commit* sequence number from the commit_cache_. If a | |||
682 | // seq is smaller than max_evicted_seq_ is might or might not be present in | |||
683 | // commit_cache_. So commit_cache_ must first be checked before consulting | |||
684 | // with max_evicted_seq_. | |||
685 | std::atomic<uint64_t> max_evicted_seq_ = {}; | |||
686 | // Order: 1) update future_max_evicted_seq_ = new_max, 2) | |||
687 | // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since | |||
688 | // GetSnapshotInternal guarantess that the snapshot seq is larger than | |||
689 | // future_max_evicted_seq_, this guarantes that if a snapshot is not larger | |||
690 | // than max has already being looked at via a GetSnapshotListFromDB(new_max). | |||
691 | std::atomic<uint64_t> future_max_evicted_seq_ = {}; | |||
692 | // Advance max_evicted_seq_ by this value each time it needs an update. The | |||
693 | // larger the value, the less frequent advances we would have. We do not want | |||
694 | // it to be too large either as it would cause stalls by doing too much | |||
695 | // maintenance work under the lock. | |||
696 | size_t INC_STEP_FOR_MAX_EVICTED = 1; | |||
697 | // A map from old snapshots (expected to be used by a few read-only txns) to | |||
698 | // prepared sequence number of the evicted entries from commit_cache_ that | |||
699 | // overlaps with such snapshot. These are the prepared sequence numbers that | |||
700 | // the snapshot, to which they are mapped, cannot assume to be committed just | |||
701 | // because it is no longer in the commit_cache_. The vector must be sorted | |||
702 | // after each update. | |||
703 | // Thread-safety is provided with old_commit_map_mutex_. | |||
704 | std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_; | |||
705 | // A set of long-running prepared transactions that are not finished by the | |||
706 | // time max_evicted_seq_ advances their sequence number. This is expected to | |||
707 | // be empty normally. Thread-safety is provided with prepared_mutex_. | |||
708 | std::set<uint64_t> delayed_prepared_; | |||
709 | // Commit of a delayed prepared: 1) update commit cache, 2) update | |||
710 | // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_. | |||
711 | // delayed_prepared_commits_ will help us tell apart the unprepared txns from | |||
712 | // the ones that are committed but not cleaned up yet. | |||
713 | std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_; | |||
714 | // Update when delayed_prepared_.empty() changes. Expected to be true | |||
715 | // normally. | |||
716 | std::atomic<bool> delayed_prepared_empty_ = {true}; | |||
717 | // Update when old_commit_map_.empty() changes. Expected to be true normally. | |||
718 | std::atomic<bool> old_commit_map_empty_ = {true}; | |||
719 | mutable port::RWMutex prepared_mutex_; | |||
720 | mutable port::RWMutex old_commit_map_mutex_; | |||
721 | mutable port::RWMutex commit_cache_mutex_; | |||
722 | mutable port::RWMutex snapshots_mutex_; | |||
723 | // A cache of the cf comparators | |||
724 | // Thread safety: since it is a const it is safe to read it concurrently | |||
725 | std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_; | |||
726 | // A cache of the cf handles | |||
727 | // Thread safety: since the handle is read-only object it is a const it is | |||
728 | // safe to read it concurrently | |||
729 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_; | |||
730 | }; | |||
731 | ||||
732 | class WritePreparedTxnReadCallback : public ReadCallback { | |||
733 | public: | |||
734 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) | |||
735 | : ReadCallback(snapshot), db_(db) {} | |||
736 | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, | |||
737 | SequenceNumber min_uncommitted) | |||
738 | : ReadCallback(snapshot, min_uncommitted), db_(db) {} | |||
739 | ||||
740 | // Will be called to see if the seq number visible; if not it moves on to | |||
741 | // the next seq number. | |||
742 | inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { | |||
743 | auto snapshot = max_visible_seq_; | |||
744 | return db_->IsInSnapshot(seq, snapshot, min_uncommitted_); | |||
745 | } | |||
746 | ||||
747 | // TODO(myabandeh): override Refresh when Iterator::Refresh is supported | |||
748 | private: | |||
749 | WritePreparedTxnDB* db_; | |||
750 | }; | |||
751 | ||||
752 | class AddPreparedCallback : public PreReleaseCallback { | |||
753 | public: | |||
754 | AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl, | |||
755 | size_t sub_batch_cnt, bool two_write_queues, | |||
756 | bool first_prepare_batch) | |||
757 | : db_(db), | |||
758 | db_impl_(db_impl), | |||
759 | sub_batch_cnt_(sub_batch_cnt), | |||
760 | two_write_queues_(two_write_queues), | |||
761 | first_prepare_batch_(first_prepare_batch) { | |||
762 | (void)two_write_queues_; // to silence unused private field warning | |||
763 | } | |||
764 | virtual Status Callback(SequenceNumber prepare_seq, | |||
765 | bool is_mem_disabled __attribute__((__unused__)), | |||
766 | uint64_t log_number) override { | |||
767 | // Always Prepare from the main queue | |||
768 | assert(!two_write_queues_ || !is_mem_disabled)(static_cast<void> (0)); // implies the 1st queue | |||
769 | for (size_t i = 0; i < sub_batch_cnt_; i++) { | |||
770 | db_->AddPrepared(prepare_seq + i); | |||
771 | } | |||
772 | if (first_prepare_batch_) { | |||
773 | assert(log_number != 0)(static_cast<void> (0)); | |||
774 | db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( | |||
775 | log_number); | |||
776 | } | |||
777 | return Status::OK(); | |||
778 | } | |||
779 | ||||
780 | private: | |||
781 | WritePreparedTxnDB* db_; | |||
782 | DBImpl* db_impl_; | |||
783 | size_t sub_batch_cnt_; | |||
784 | bool two_write_queues_; | |||
785 | // It is 2PC and this is the first prepare batch. Always the case in 2PC | |||
786 | // unless it is WriteUnPrepared. | |||
787 | bool first_prepare_batch_; | |||
788 | }; | |||
789 | ||||
790 | class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { | |||
791 | public: | |||
792 | // includes_data indicates that the commit also writes non-empty | |||
793 | // CommitTimeWriteBatch to memtable, which needs to be committed separately. | |||
794 | WritePreparedCommitEntryPreReleaseCallback( | |||
795 | WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq, | |||
796 | size_t prep_batch_cnt, size_t data_batch_cnt = 0, | |||
797 | SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0) | |||
798 | : db_(db), | |||
799 | db_impl_(db_impl), | |||
800 | prep_seq_(prep_seq), | |||
801 | prep_batch_cnt_(prep_batch_cnt), | |||
802 | data_batch_cnt_(data_batch_cnt), | |||
803 | includes_data_(data_batch_cnt_ > 0), | |||
804 | aux_seq_(aux_seq), | |||
805 | aux_batch_cnt_(aux_batch_cnt), | |||
806 | includes_aux_batch_(aux_batch_cnt > 0) { | |||
807 | assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber))(static_cast<void> (0)); // xor | |||
808 | assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0)(static_cast<void> (0)); | |||
809 | assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber))(static_cast<void> (0)); // xor | |||
810 | } | |||
811 | ||||
812 | virtual Status Callback(SequenceNumber commit_seq, | |||
813 | bool is_mem_disabled __attribute__((__unused__)), | |||
814 | uint64_t) override { | |||
815 | // Always commit from the 2nd queue | |||
816 | assert(!db_impl_->immutable_db_options().two_write_queues ||(static_cast<void> (0)) | |||
817 | is_mem_disabled)(static_cast<void> (0)); | |||
818 | assert(includes_data_ || prep_seq_ != kMaxSequenceNumber)(static_cast<void> (0)); | |||
819 | // Data batch is what accompanied with the commit marker and affects the | |||
820 | // last seq in the commit batch. | |||
821 | const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)(__builtin_expect((data_batch_cnt_ <= 1), 1)) | |||
822 | ? commit_seq | |||
823 | : commit_seq + data_batch_cnt_ - 1; | |||
824 | if (prep_seq_ != kMaxSequenceNumber) { | |||
825 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |||
826 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |||
827 | } | |||
828 | } // else there was no prepare phase | |||
829 | if (includes_aux_batch_) { | |||
830 | for (size_t i = 0; i < aux_batch_cnt_; i++) { | |||
831 | db_->AddCommitted(aux_seq_ + i, last_commit_seq); | |||
832 | } | |||
833 | } | |||
834 | if (includes_data_) { | |||
835 | assert(data_batch_cnt_)(static_cast<void> (0)); | |||
836 | // Commit the data that is accompanied with the commit request | |||
837 | for (size_t i = 0; i < data_batch_cnt_; i++) { | |||
838 | // For commit seq of each batch use the commit seq of the last batch. | |||
839 | // This would make debugging easier by having all the batches having | |||
840 | // the same sequence number. | |||
841 | db_->AddCommitted(commit_seq + i, last_commit_seq); | |||
842 | } | |||
843 | } | |||
844 | if (db_impl_->immutable_db_options().two_write_queues) { | |||
845 | assert(is_mem_disabled)(static_cast<void> (0)); // implies the 2nd queue | |||
846 | // Publish the sequence number. We can do that here assuming the callback | |||
847 | // is invoked only from one write queue, which would guarantee that the | |||
848 | // publish sequence numbers will be in order, i.e., once a seq is | |||
849 | // published all the seq prior to that are also publishable. | |||
850 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |||
851 | } | |||
852 | // else SequenceNumber that is updated as part of the write already does the | |||
853 | // publishing | |||
854 | return Status::OK(); | |||
855 | } | |||
856 | ||||
857 | private: | |||
858 | WritePreparedTxnDB* db_; | |||
859 | DBImpl* db_impl_; | |||
860 | // kMaxSequenceNumber if there was no prepare phase | |||
861 | SequenceNumber prep_seq_; | |||
862 | size_t prep_batch_cnt_; | |||
863 | size_t data_batch_cnt_; | |||
864 | // Data here is the batch that is written with the commit marker, either | |||
865 | // because it is commit without prepare or commit has a CommitTimeWriteBatch. | |||
866 | bool includes_data_; | |||
867 | // Auxiliary batch (if there is any) is a batch that is written before, but | |||
868 | // gets the same commit seq as prepare batch or data batch. This is used in | |||
869 | // two write queues where the CommitTimeWriteBatch becomes the aux batch and | |||
870 | // we do a separate write to actually commit everything. | |||
871 | SequenceNumber aux_seq_; | |||
872 | size_t aux_batch_cnt_; | |||
873 | bool includes_aux_batch_; | |||
874 | }; | |||
875 | ||||
876 | // For two_write_queues commit both the aborted batch and the cleanup batch and | |||
877 | // then published the seq | |||
878 | class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { | |||
879 | public: | |||
880 | WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, | |||
881 | DBImpl* db_impl, | |||
882 | SequenceNumber prep_seq, | |||
883 | SequenceNumber rollback_seq, | |||
884 | size_t prep_batch_cnt) | |||
885 | : db_(db), | |||
886 | db_impl_(db_impl), | |||
887 | prep_seq_(prep_seq), | |||
888 | rollback_seq_(rollback_seq), | |||
889 | prep_batch_cnt_(prep_batch_cnt) { | |||
890 | assert(prep_seq != kMaxSequenceNumber)(static_cast<void> (0)); | |||
891 | assert(rollback_seq != kMaxSequenceNumber)(static_cast<void> (0)); | |||
892 | assert(prep_batch_cnt_ > 0)(static_cast<void> (0)); | |||
893 | } | |||
894 | ||||
895 | Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, | |||
896 | uint64_t) override { | |||
897 | // Always commit from the 2nd queue | |||
898 | assert(is_mem_disabled)(static_cast<void> (0)); // implies the 2nd queue | |||
899 | assert(db_impl_->immutable_db_options().two_write_queues)(static_cast<void> (0)); | |||
900 | #ifdef NDEBUG1 | |||
901 | (void)is_mem_disabled; | |||
902 | #endif | |||
903 | const uint64_t last_commit_seq = commit_seq; | |||
904 | db_->AddCommitted(rollback_seq_, last_commit_seq); | |||
905 | for (size_t i = 0; i < prep_batch_cnt_; i++) { | |||
906 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); | |||
907 | } | |||
908 | db_impl_->SetLastPublishedSequence(last_commit_seq); | |||
909 | return Status::OK(); | |||
910 | } | |||
911 | ||||
912 | private: | |||
913 | WritePreparedTxnDB* db_; | |||
914 | DBImpl* db_impl_; | |||
915 | SequenceNumber prep_seq_; | |||
916 | SequenceNumber rollback_seq_; | |||
917 | size_t prep_batch_cnt_; | |||
918 | }; | |||
919 | ||||
920 | // Count the number of sub-batches inside a batch. A sub-batch does not have | |||
921 | // duplicate keys. | |||
922 | struct SubBatchCounter : public WriteBatch::Handler { | |||
923 | explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators) | |||
924 | : comparators_(comparators), batches_(1) {} | |||
925 | std::map<uint32_t, const Comparator*>& comparators_; | |||
926 | using CFKeys = std::set<Slice, SetComparator>; | |||
927 | std::map<uint32_t, CFKeys> keys_; | |||
928 | size_t batches_; | |||
929 | size_t BatchCount() { return batches_; } | |||
930 | void AddKey(const uint32_t cf, const Slice& key); | |||
931 | void InitWithComp(const uint32_t cf); | |||
932 | Status MarkNoop(bool) override { return Status::OK(); } | |||
933 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } | |||
934 | Status MarkCommit(const Slice&) override { return Status::OK(); } | |||
935 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { | |||
936 | AddKey(cf, key); | |||
937 | return Status::OK(); | |||
938 | } | |||
939 | Status DeleteCF(uint32_t cf, const Slice& key) override { | |||
940 | AddKey(cf, key); | |||
941 | return Status::OK(); | |||
942 | } | |||
943 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { | |||
944 | AddKey(cf, key); | |||
945 | return Status::OK(); | |||
946 | } | |||
947 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { | |||
948 | AddKey(cf, key); | |||
949 | return Status::OK(); | |||
950 | } | |||
951 | Status MarkBeginPrepare(bool) override { return Status::OK(); } | |||
952 | Status MarkRollback(const Slice&) override { return Status::OK(); } | |||
953 | bool WriteAfterCommit() const override { return false; } | |||
954 | }; | |||
955 | ||||
956 | } // namespace rocksdb | |||
957 | #endif // ROCKSDB_LITE |