Bug Summary

File:home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc
Warning:line 306, column 24
Dereference of null pointer (loaded from variable 'snap_released')

Annotated Source Code

[?] Use j/k keys for keyboard navigation

/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc

1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#include "utilities/transactions/write_unprepared_txn.h"
9#include "db/db_impl.h"
10#include "util/cast_util.h"
11#include "utilities/transactions/write_unprepared_txn_db.h"
12
13#ifndef __STDC_FORMAT_MACROS
14#define __STDC_FORMAT_MACROS
15#endif
16
17namespace rocksdb {
18
19bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
20 auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers();
21
22 // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
23 // in unprep_seqs, we have to check if seq is equal to prep_seq or any of
24 // the prepare_batch_cnt seq nums after it.
25 //
26 // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
27 // large.
28 for (const auto& it : unprep_seqs) {
29 if (it.first <= seq && seq < it.first + it.second) {
30 return true;
31 }
32 }
33
34 return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_);
1
Passing null pointer value via 4th parameter 'snap_released'
2
Calling 'WritePreparedTxnDB::IsInSnapshot'
35}
36
37SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber(
38 WriteUnpreparedTxn* txn) {
39 auto unprep_seqs = txn->GetUnpreparedSequenceNumbers();
40 if (unprep_seqs.size()) {
41 return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
42 }
43 return 0;
44}
45
46WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
47 const WriteOptions& write_options,
48 const TransactionOptions& txn_options)
49 : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) {
50 max_write_batch_size_ = txn_options.max_write_batch_size;
51 // We set max bytes to zero so that we don't get a memory limit error.
52 // Instead of trying to keep write batch strictly under the size limit, we
53 // just flush to DB when the limit is exceeded in write unprepared, to avoid
54 // having retry logic. This also allows very big key-value pairs that exceed
55 // max bytes to succeed.
56 write_batch_.SetMaxBytes(0);
57}
58
59WriteUnpreparedTxn::~WriteUnpreparedTxn() {
60 if (!unprep_seqs_.empty()) {
61 assert(log_number_ > 0)(static_cast<void> (0));
62 assert(GetId() > 0)(static_cast<void> (0));
63 assert(!name_.empty())(static_cast<void> (0));
64
65 // We should rollback regardless of GetState, but some unit tests that
66 // test crash recovery run the destructor assuming that rollback does not
67 // happen, so that rollback during recovery can be exercised.
68 if (GetState() == STARTED) {
69 auto s __attribute__((__unused__)) = RollbackInternal();
70 // TODO(lth): Better error handling.
71 assert(s.ok())(static_cast<void> (0));
72 dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
73 log_number_);
74 }
75 }
76}
77
78void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
79 PessimisticTransaction::Initialize(txn_options);
80 max_write_batch_size_ = txn_options.max_write_batch_size;
81 write_batch_.SetMaxBytes(0);
82 unprep_seqs_.clear();
83 write_set_keys_.clear();
84}
85
86Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
87 const Slice& key, const Slice& value,
88 const bool assume_tracked) {
89 Status s = MaybeFlushWriteBatchToDB();
90 if (!s.ok()) {
91 return s;
92 }
93 return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
94}
95
96Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
97 const SliceParts& key, const SliceParts& value,
98 const bool assume_tracked) {
99 Status s = MaybeFlushWriteBatchToDB();
100 if (!s.ok()) {
101 return s;
102 }
103 return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
104}
105
106Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
107 const Slice& key, const Slice& value,
108 const bool assume_tracked) {
109 Status s = MaybeFlushWriteBatchToDB();
110 if (!s.ok()) {
111 return s;
112 }
113 return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked);
114}
115
116Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
117 const Slice& key, const bool assume_tracked) {
118 Status s = MaybeFlushWriteBatchToDB();
119 if (!s.ok()) {
120 return s;
121 }
122 return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
123}
124
125Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
126 const SliceParts& key,
127 const bool assume_tracked) {
128 Status s = MaybeFlushWriteBatchToDB();
129 if (!s.ok()) {
130 return s;
131 }
132 return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
133}
134
135Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
136 const Slice& key,
137 const bool assume_tracked) {
138 Status s = MaybeFlushWriteBatchToDB();
139 if (!s.ok()) {
140 return s;
141 }
142 return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
143}
144
145Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
146 const SliceParts& key,
147 const bool assume_tracked) {
148 Status s = MaybeFlushWriteBatchToDB();
149 if (!s.ok()) {
150 return s;
151 }
152 return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
153}
154
155Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
156 const bool kPrepared = true;
157 Status s;
158 if (max_write_batch_size_ != 0 &&
159 write_batch_.GetDataSize() > max_write_batch_size_) {
160 assert(GetState() != PREPARED)(static_cast<void> (0));
161 s = FlushWriteBatchToDB(!kPrepared);
162 }
163 return s;
164}
165
166void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) {
167 // TODO(lth): write_set_keys_ can just be a std::string instead of a vector.
168 write_set_keys_[cfid].push_back(key.ToString());
169}
170
171Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
172 if (name_.empty()) {
173 return Status::InvalidArgument("Cannot write to DB without SetName.");
174 }
175
176 // Update write_key_set_ for rollback purposes.
177 KeySetBuilder keyset_handler(
178 this, wupt_db_->txn_db_options_.rollback_merge_operands);
179 auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler);
180 assert(s.ok())(static_cast<void> (0));
181 if (!s.ok()) {
182 return s;
183 }
184
185 // TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
186 WriteOptions write_options = write_options_;
187 write_options.disableWAL = false;
188 const bool WRITE_AFTER_COMMIT = true;
189 const bool first_prepare_batch = log_number_ == 0;
190 // MarkEndPrepare will change Noop marker to the appropriate marker.
191 WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
192 !WRITE_AFTER_COMMIT, !prepared);
193 // For each duplicate key we account for a new sub-batch
194 prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
195 // AddPrepared better to be called in the pre-release callback otherwise there
196 // is a non-zero chance of max advancing prepare_seq and readers assume the
197 // data as committed.
198 // Also having it in the PreReleaseCallback allows in-order addition of
199 // prepared entries to PreparedHeap and hence enables an optimization. Refer
200 // to SmallestUnCommittedSeq for more details.
201 AddPreparedCallback add_prepared_callback(
202 wpt_db_, db_impl_, prepare_batch_cnt_,
203 db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
204 const bool DISABLE_MEMTABLE = true;
205 uint64_t seq_used = kMaxSequenceNumber;
206 // log_number_ should refer to the oldest log containing uncommitted data
207 // from the current transaction. This means that if log_number_ is set,
208 // WriteImpl should not overwrite that value, so set log_used to nullptr if
209 // log_number_ is already set.
210 uint64_t* log_used = log_number_ ? nullptr : &log_number_;
211 s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
212 /*callback*/ nullptr, log_used, /*log ref*/
213 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_,
214 &add_prepared_callback);
215 assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0));
216 auto prepare_seq = seq_used;
217
218 // Only call SetId if it hasn't been set yet.
219 if (GetId() == 0) {
220 SetId(prepare_seq);
221 }
222 // unprep_seqs_ will also contain prepared seqnos since they are treated in
223 // the same way in the prepare/commit callbacks. See the comment on the
224 // definition of unprep_seqs_.
225 unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
226
227 // Reset transaction state.
228 if (!prepared) {
229 prepare_batch_cnt_ = 0;
230 write_batch_.Clear();
231 WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
232 }
233
234 return s;
235}
236
237Status WriteUnpreparedTxn::PrepareInternal() {
238 const bool kPrepared = true;
239 return FlushWriteBatchToDB(kPrepared);
240}
241
242Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
243 if (unprep_seqs_.empty()) {
244 assert(log_number_ == 0)(static_cast<void> (0));
245 assert(GetId() == 0)(static_cast<void> (0));
246 return WritePreparedTxn::CommitWithoutPrepareInternal();
247 }
248
249 // TODO(lth): We should optimize commit without prepare to not perform
250 // a prepare under the hood.
251 auto s = PrepareInternal();
252 if (!s.ok()) {
253 return s;
254 }
255 return CommitInternal();
256}
257
258Status WriteUnpreparedTxn::CommitInternal() {
259 // TODO(lth): Reduce duplicate code with WritePrepared commit logic.
260
261 // We take the commit-time batch and append the Commit marker. The Memtable
262 // will ignore the Commit marker in non-recovery mode
263 WriteBatch* working_batch = GetCommitTimeWriteBatch();
264 const bool empty = working_batch->Count() == 0;
265 WriteBatchInternal::MarkCommit(working_batch, name_);
266
267 const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
268 if (!empty && for_recovery) {
269 // When not writing to memtable, we can still cache the latest write batch.
270 // The cached batch will be written to memtable in WriteRecoverableState
271 // during FlushMemTable
272 WriteBatchInternal::SetAsLastestPersistentState(working_batch);
273 }
274
275 const bool includes_data = !empty && !for_recovery;
276 size_t commit_batch_cnt = 0;
277 if (UNLIKELY(includes_data)(__builtin_expect((includes_data), 0))) {
278 ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, db_impl_->immutable_db_options
().info_log, ("[%s:" "279" "] " "Duplicate key overhead"), RocksLogShorterFileName
("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc"
))
279 "Duplicate key overhead")rocksdb::Log(InfoLogLevel::WARN_LEVEL, db_impl_->immutable_db_options
().info_log, ("[%s:" "279" "] " "Duplicate key overhead"), RocksLogShorterFileName
("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_unprepared_txn.cc"
))
;
280 SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
281 auto s = working_batch->Iterate(&counter);
282 assert(s.ok())(static_cast<void> (0));
283 commit_batch_cnt = counter.BatchCount();
284 }
285 const bool disable_memtable = !includes_data;
286 const bool do_one_write =
287 !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
288 const bool publish_seq = do_one_write;
289 // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
290 // DB in one shot. min_uncommitted still works since it requires capturing
291 // data that is written to DB but not yet committed, while
292 // CommitTimeWriteBatch commits with PreReleaseCallback.
293 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
294 wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt, publish_seq);
295 uint64_t seq_used = kMaxSequenceNumber;
296 // Since the prepared batch is directly written to memtable, there is already
297 // a connection between the memtable and its WAL, so there is no need to
298 // redundantly reference the log that contains the prepared data.
299 const uint64_t zero_log_number = 0ull;
300 size_t batch_cnt = UNLIKELY(commit_batch_cnt)(__builtin_expect((commit_batch_cnt), 0)) ? commit_batch_cnt : 1;
301 auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
302 zero_log_number, disable_memtable, &seq_used,
303 batch_cnt, &update_commit_map);
304 assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0));
305 if (LIKELY(do_one_write || !s.ok())(__builtin_expect((do_one_write || !s.ok()), 1))) {
306 if (LIKELY(s.ok())(__builtin_expect((s.ok()), 1))) {
307 // Note RemovePrepared should be called after WriteImpl that publishsed
308 // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
309 for (const auto& seq : unprep_seqs_) {
310 wpt_db_->RemovePrepared(seq.first, seq.second);
311 }
312 }
313 unprep_seqs_.clear();
314 write_set_keys_.clear();
315 return s;
316 } // else do the 2nd write to publish seq
317 // Note: the 2nd write comes with a performance penality. So if we have too
318 // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
319 // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
320 // two_write_queues should be disabled to avoid many additional writes here.
321 class PublishSeqPreReleaseCallback : public PreReleaseCallback {
322 public:
323 explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
324 : db_impl_(db_impl) {}
325 Status Callback(SequenceNumber seq,
326 bool is_mem_disabled __attribute__((__unused__)),
327 uint64_t) override {
328 assert(is_mem_disabled)(static_cast<void> (0));
329 assert(db_impl_->immutable_db_options().two_write_queues)(static_cast<void> (0));
330 db_impl_->SetLastPublishedSequence(seq);
331 return Status::OK();
332 }
333
334 private:
335 DBImpl* db_impl_;
336 } publish_seq_callback(db_impl_);
337 WriteBatch empty_batch;
338 empty_batch.PutLogData(Slice());
339 // In the absence of Prepare markers, use Noop as a batch separator
340 WriteBatchInternal::InsertNoop(&empty_batch);
341 const bool DISABLE_MEMTABLE = true;
342 const size_t ONE_BATCH = 1;
343 const uint64_t NO_REF_LOG = 0;
344 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
345 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
346 &publish_seq_callback);
347 assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0));
348 // Note RemovePrepared should be called after WriteImpl that publishsed the
349 // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
350 for (const auto& seq : unprep_seqs_) {
351 wpt_db_->RemovePrepared(seq.first, seq.second);
352 }
353 unprep_seqs_.clear();
354 write_set_keys_.clear();
355 return s;
356}
357
358Status WriteUnpreparedTxn::RollbackInternal() {
359 // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
360 WriteBatchWithIndex rollback_batch(
361 wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0);
362 assert(GetId() != kMaxSequenceNumber)(static_cast<void> (0));
363 assert(GetId() > 0)(static_cast<void> (0));
364 const auto& cf_map = *wupt_db_->GetCFHandleMap();
365 auto read_at_seq = kMaxSequenceNumber;
366 Status s;
367
368 ReadOptions roptions;
369 // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
370 // need to read our own writes when reading prior versions of the key for
371 // rollback.
372 WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
373 for (const auto& cfkey : write_set_keys_) {
374 const auto cfid = cfkey.first;
375 const auto& keys = cfkey.second;
376 for (const auto& key : keys) {
377 const auto& cf_handle = cf_map.at(cfid);
378 PinnableSlice pinnable_val;
379 bool not_used;
380 s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
381 &callback);
382
383 if (s.ok()) {
384 s = rollback_batch.Put(cf_handle, key, pinnable_val);
385 assert(s.ok())(static_cast<void> (0));
386 } else if (s.IsNotFound()) {
387 s = rollback_batch.Delete(cf_handle, key);
388 assert(s.ok())(static_cast<void> (0));
389 } else {
390 return s;
391 }
392 }
393 }
394
395 // The Rollback marker will be used as a batch separator
396 WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
397 bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
398 const bool DISABLE_MEMTABLE = true;
399 const uint64_t NO_REF_LOG = 0;
400 uint64_t seq_used = kMaxSequenceNumber;
401 // TODO(lth): We write rollback batch all in a single batch here, but this
402 // should be subdivded into multiple batches as well. In phase 2, when key
403 // sets are read from WAL, this will happen naturally.
404 const size_t ONE_BATCH = 1;
405 // We commit the rolled back prepared batches. ALthough this is
406 // counter-intuitive, i) it is safe to do so, since the prepared batches are
407 // already canceled out by the rollback batch, ii) adding the commit entry to
408 // CommitCache will allow us to benefit from the existing mechanism in
409 // CommitCache that keeps an entry evicted due to max advance and yet overlaps
410 // with a live snapshot around so that the live snapshot properly skips the
411 // entry even if its prepare seq is lower than max_evicted_seq_.
412 WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
413 wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH);
414 // Note: the rollback batch does not need AddPrepared since it is written to
415 // DB in one shot. min_uncommitted still works since it requires capturing
416 // data that is written to DB but not yet committed, while the roolback
417 // batch commits with PreReleaseCallback.
418 s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
419 nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
420 &seq_used, rollback_batch.SubBatchCnt(),
421 do_one_write ? &update_commit_map : nullptr);
422 assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0));
423 if (!s.ok()) {
424 return s;
425 }
426 if (do_one_write) {
427 for (const auto& seq : unprep_seqs_) {
428 wpt_db_->RemovePrepared(seq.first, seq.second);
429 }
430 unprep_seqs_.clear();
431 write_set_keys_.clear();
432 return s;
433 } // else do the 2nd write for commit
434 uint64_t& prepare_seq = seq_used;
435 ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,;
436 "RollbackInternal 2nd write prepare_seq: %" PRIu64,;
437 prepare_seq);;
438 // Commit the batch by writing an empty batch to the queue that will release
439 // the commit sequence number to readers.
440 WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
441 wpt_db_, db_impl_, unprep_seqs_, prepare_seq);
442 WriteBatch empty_batch;
443 empty_batch.PutLogData(Slice());
444 // In the absence of Prepare markers, use Noop as a batch separator
445 WriteBatchInternal::InsertNoop(&empty_batch);
446 s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
447 NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
448 &update_commit_map_with_prepare);
449 assert(!s.ok() || seq_used != kMaxSequenceNumber)(static_cast<void> (0));
450 // Mark the txn as rolled back
451 if (s.ok()) {
452 for (const auto& seq : unprep_seqs_) {
453 wpt_db_->RemovePrepared(seq.first, seq.second);
454 }
455 }
456
457 unprep_seqs_.clear();
458 write_set_keys_.clear();
459 return s;
460}
461
462Status WriteUnpreparedTxn::Get(const ReadOptions& options,
463 ColumnFamilyHandle* column_family,
464 const Slice& key, PinnableSlice* value) {
465 auto snapshot = options.snapshot;
466 auto snap_seq =
467 snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
468 SequenceNumber min_uncommitted =
469 kMinUnCommittedSeq; // by default disable the optimization
470 if (snapshot != nullptr) {
471 min_uncommitted =
472 static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
473 ->min_uncommitted_;
474 }
475
476 WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
477 this);
478 return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value,
479 &callback);
480}
481
482Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
483 return GetIterator(options, wupt_db_->DefaultColumnFamily());
484}
485
486Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
487 ColumnFamilyHandle* column_family) {
488 // Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
489 Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
490 assert(db_iter)(static_cast<void> (0));
491
492 return write_batch_.NewIteratorWithBase(column_family, db_iter);
493}
494
495const std::map<SequenceNumber, size_t>&
496WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
497 return unprep_seqs_;
498}
499
500} // namespace rocksdb
501
502#endif // ROCKSDB_LITE

/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h

1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#pragma once
7#ifndef ROCKSDB_LITE
8
9#ifndef __STDC_FORMAT_MACROS
10#define __STDC_FORMAT_MACROS
11#endif
12
13#include <inttypes.h>
14#include <mutex>
15#include <queue>
16#include <set>
17#include <string>
18#include <unordered_map>
19#include <vector>
20
21#include "db/db_iter.h"
22#include "db/pre_release_callback.h"
23#include "db/read_callback.h"
24#include "db/snapshot_checker.h"
25#include "rocksdb/db.h"
26#include "rocksdb/options.h"
27#include "rocksdb/utilities/transaction_db.h"
28#include "util/set_comparator.h"
29#include "util/string_util.h"
30#include "utilities/transactions/pessimistic_transaction.h"
31#include "utilities/transactions/pessimistic_transaction_db.h"
32#include "utilities/transactions/transaction_lock_mgr.h"
33#include "utilities/transactions/write_prepared_txn.h"
34
35namespace rocksdb {
36
37// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
38// In this way some data in the DB might not be committed. The DB provides
39// mechanisms to tell such data apart from committed data.
40class WritePreparedTxnDB : public PessimisticTransactionDB {
41 public:
42 explicit WritePreparedTxnDB(DB* db,
43 const TransactionDBOptions& txn_db_options)
44 : PessimisticTransactionDB(db, txn_db_options),
45 SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
46 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
47 COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
48 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
49 FORMAT(COMMIT_CACHE_BITS) {
50 Init(txn_db_options);
51 }
52
53 explicit WritePreparedTxnDB(StackableDB* db,
54 const TransactionDBOptions& txn_db_options)
55 : PessimisticTransactionDB(db, txn_db_options),
56 SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
57 SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
58 COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
59 COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
60 FORMAT(COMMIT_CACHE_BITS) {
61 Init(txn_db_options);
62 }
63
64 virtual ~WritePreparedTxnDB();
65
66 virtual Status Initialize(
67 const std::vector<size_t>& compaction_enabled_cf_indices,
68 const std::vector<ColumnFamilyHandle*>& handles) override;
69
70 Transaction* BeginTransaction(const WriteOptions& write_options,
71 const TransactionOptions& txn_options,
72 Transaction* old_txn) override;
73
74 // Optimized version of ::Write that receives more optimization request such
75 // as skip_concurrency_control.
76 using PessimisticTransactionDB::Write;
77 Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
78 WriteBatch* updates) override;
79
80 // Write the batch to the underlying DB and mark it as committed. Could be
81 // used by both directly from TxnDB or through a transaction.
82 Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
83 size_t batch_cnt, WritePreparedTxn* txn);
84
85 using DB::Get;
86 virtual Status Get(const ReadOptions& options,
87 ColumnFamilyHandle* column_family, const Slice& key,
88 PinnableSlice* value) override;
89
90 using DB::MultiGet;
91 virtual std::vector<Status> MultiGet(
92 const ReadOptions& options,
93 const std::vector<ColumnFamilyHandle*>& column_family,
94 const std::vector<Slice>& keys,
95 std::vector<std::string>* values) override;
96
97 using DB::NewIterator;
98 virtual Iterator* NewIterator(const ReadOptions& options,
99 ColumnFamilyHandle* column_family) override;
100
101 using DB::NewIterators;
102 virtual Status NewIterators(
103 const ReadOptions& options,
104 const std::vector<ColumnFamilyHandle*>& column_families,
105 std::vector<Iterator*>* iterators) override;
106
107 // Check whether the transaction that wrote the value with sequence number seq
108 // is visible to the snapshot with sequence number snapshot_seq.
109 // Returns true if commit_seq <= snapshot_seq
110 // If the snapshot_seq is already released and snapshot_seq <= max, sets
111 // *snap_released to true and returns true as well.
112 inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
113 uint64_t min_uncommitted = kMinUnCommittedSeq,
114 bool* snap_released = nullptr) const {
115 ROCKS_LOG_DETAILS(info_log_,;
116 "IsInSnapshot %" PRIu64 " in %" PRIu64;
117 " min_uncommitted %" PRIu64,;
118 prep_seq, snapshot_seq, min_uncommitted);;
119 assert(min_uncommitted >= kMinUnCommittedSeq)(static_cast<void> (0));
120 // Caller is responsible to initialize snap_released.
121 assert(snap_released == nullptr || *snap_released == false)(static_cast<void> (0));
122 // Here we try to infer the return value without looking into prepare list.
123 // This would help avoiding synchronization over a shared map.
124 // TODO(myabandeh): optimize this. This sequence of checks must be correct
125 // but not necessary efficient
126 if (prep_seq == 0) {
3
Assuming 'prep_seq' is not equal to 0
4
Taking false branch
127 // Compaction will output keys to bottom-level with sequence number 0 if
128 // it is visible to the earliest snapshot.
129 ROCKS_LOG_DETAILS(;
130 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,;
131 prep_seq, snapshot_seq, 1);;
132 return true;
133 }
134 if (snapshot_seq < prep_seq) {
5
Taking false branch
135 // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
136 ROCKS_LOG_DETAILS(;
137 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,;
138 prep_seq, snapshot_seq, 0);;
139 return false;
140 }
141 if (prep_seq < min_uncommitted) {
6
Taking false branch
142 ROCKS_LOG_DETAILS(info_log_,;
143 "IsInSnapshot %" PRIu64 " in %" PRIu64;
144 " returns %" PRId32;
145 " because of min_uncommitted %" PRIu64,;
146 prep_seq, snapshot_seq, 1, min_uncommitted);;
147 return true;
148 }
149 // Commit of delayed prepared has two non-atomic steps: add to commit cache,
150 // remove from delayed prepared. Our reads from these two is also
151 // non-atomic. By looking into commit cache first thus we might not find the
152 // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
153 // we check if there was any delayed prepared BEFORE looking into commit
154 // cache, ii) if there was, we complete the search steps to be these: i)
155 // commit cache, ii) delayed prepared, commit cache again. In this way if
156 // the first query to commit cache missed the commit, the 2nd will catch it.
157 bool was_empty;
158 SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
159 CommitEntry64b dont_care;
160 auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
161 size_t repeats = 0;
162 do {
11
Loop condition is false. Exiting loop
163 repeats++;
164 assert(repeats < 100)(static_cast<void> (0));
165 if (UNLIKELY(repeats >= 100)(__builtin_expect((repeats >= 100), 0))) {
7
Taking false branch
166 throw std::runtime_error(
167 "The read was intrupted 100 times by update to max_evicted_seq_. "
168 "This is unexpected in all setups");
169 }
170 max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
171 TEST_SYNC_POINT(
172 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
173 TEST_SYNC_POINT(
174 "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
175 was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
176 TEST_SYNC_POINT(
177 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
178 TEST_SYNC_POINT(
179 "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
180 CommitEntry cached;
181 bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
182 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
183 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
184 if (exist && prep_seq == cached.prep_seq) {
8
Assuming 'exist' is 0
185 // It is committed and also not evicted from commit cache
186 ROCKS_LOG_DETAILS(;
187 info_log_,;
188 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,;
189 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);;
190 return cached.commit_seq <= snapshot_seq;
191 }
192 // else it could be committed but not inserted in the map which could
193 // happen after recovery, or it could be committed and evicted by another
194 // commit, or never committed.
195
196 // At this point we dont know if it was committed or it is still prepared
197 max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
198 if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)(__builtin_expect((max_evicted_seq_lb != max_evicted_seq_ub),
0))
) {
9
Taking true branch
199 continue;
10
Execution continues on line 254
200 }
201 // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
202 if (max_evicted_seq_ub < prep_seq) {
203 // Not evicted from cache and also not present, so must be still
204 // prepared
205 ROCKS_LOG_DETAILS(info_log_,;
206 "IsInSnapshot %" PRIu64 " in %" PRIu64;
207 " returns %" PRId32,;
208 prep_seq, snapshot_seq, 0);;
209 return false;
210 }
211 TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
212 TEST_SYNC_POINT(
213 "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
214 if (!was_empty) {
215 // We should not normally reach here
216 WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
217 ReadLock rl(&prepared_mutex_);
218 ROCKS_LOG_WARN(rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220"
"] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"),
RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h"
), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq
)
219 info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220"
"] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"),
RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h"
), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq
)
220 static_cast<uint64_t>(delayed_prepared_.size()), prep_seq)rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log_, ("[%s:" "220"
"] " "prepared_mutex_ overhead %" "l" "u" " for %" "l" "u"),
RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/utilities/transactions/write_prepared_txn_db.h"
), static_cast<uint64_t>(delayed_prepared_.size()), prep_seq
)
;
221 if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
222 // This is the order: 1) delayed_prepared_commits_ update, 2) publish
223 // 3) delayed_prepared_ clean up. So check if it is the case of a late
224 // clenaup.
225 auto it = delayed_prepared_commits_.find(prep_seq);
226 if (it == delayed_prepared_commits_.end()) {
227 // Then it is not committed yet
228 ROCKS_LOG_DETAILS(info_log_,;
229 "IsInSnapshot %" PRIu64 " in %" PRIu64;
230 " returns %" PRId32,;
231 prep_seq, snapshot_seq, 0);;
232 return false;
233 } else {
234 ROCKS_LOG_DETAILS(info_log_,;
235 "IsInSnapshot %" PRIu64 " in %" PRIu64;
236 " commit: %" PRIu64 " returns %" PRId32,;
237 prep_seq, snapshot_seq, it->second,;
238 snapshot_seq <= it->second);;
239 return it->second <= snapshot_seq;
240 }
241 } else {
242 // 2nd query to commit cache. Refer to was_empty comment above.
243 exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
244 if (exist && prep_seq == cached.prep_seq) {
245 ROCKS_LOG_DETAILS(;
246 info_log_,;
247 "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,;
248 prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);;
249 return cached.commit_seq <= snapshot_seq;
250 }
251 max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
252 }
253 }
254 } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)(__builtin_expect((max_evicted_seq_lb != max_evicted_seq_ub),
0))
);
255 // When advancing max_evicted_seq_, we move older entires from prepared to
256 // delayed_prepared_. Also we move evicted entries from commit cache to
257 // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
258 // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
259 // old_commit_map_, iii) committed with no conflict with any snapshot. Case
260 // (i) delayed_prepared_ is checked above
261 if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case
12
Taking false branch
262 // only (iii) is the case: committed
263 // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
264 // snapshot_seq
265 ROCKS_LOG_DETAILS(;
266 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,;
267 prep_seq, snapshot_seq, 1);;
268 return true;
269 }
270 // else (ii) might be the case: check the commit data saved for this
271 // snapshot. If there was no overlapping commit entry, then it is committed
272 // with a commit_seq lower than any live snapshot, including snapshot_seq.
273 if (old_commit_map_empty_.load(std::memory_order_acquire)) {
13
Taking false branch
274 ROCKS_LOG_DETAILS(info_log_,;
275 "IsInSnapshot %" PRIu64 " in %" PRIu64;
276 " returns %" PRId32 " released=1",;
277 prep_seq, snapshot_seq, 0);;
278 assert(snap_released)(static_cast<void> (0));
279 // This snapshot is not valid anymore. We cannot tell if prep_seq is
280 // committed before or after the snapshot. Return true but also set
281 // snap_released to true.
282 *snap_released = true;
283 return true;
284 }
285 {
286 // We should not normally reach here unless sapshot_seq is old. This is a
287 // rare case and it is ok to pay the cost of mutex ReadLock for such old,
288 // reading transactions.
289 WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
290 ReadLock rl(&old_commit_map_mutex_);
291 auto prep_set_entry = old_commit_map_.find(snapshot_seq);
292 bool found = prep_set_entry != old_commit_map_.end();
293 if (found) {
14
Assuming 'found' is 0
15
Taking false branch
294 auto& vec = prep_set_entry->second;
295 found = std::binary_search(vec.begin(), vec.end(), prep_seq);
296 } else {
297 // coming from compaction
298 ROCKS_LOG_DETAILS(info_log_,;
299 "IsInSnapshot %" PRIu64 " in %" PRIu64;
300 " returns %" PRId32 " released=1",;
301 prep_seq, snapshot_seq, 0);;
302 // This snapshot is not valid anymore. We cannot tell if prep_seq is
303 // committed before or after the snapshot. Return true but also set
304 // snap_released to true.
305 assert(snap_released)(static_cast<void> (0));
306 *snap_released = true;
16
Dereference of null pointer (loaded from variable 'snap_released')
307 return true;
308 }
309
310 if (!found) {
311 ROCKS_LOG_DETAILS(info_log_,;
312 "IsInSnapshot %" PRIu64 " in %" PRIu64;
313 " returns %" PRId32,;
314 prep_seq, snapshot_seq, 1);;
315 return true;
316 }
317 }
318 // (ii) it the case: it is committed but after the snapshot_seq
319 ROCKS_LOG_DETAILS(;
320 info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,;
321 prep_seq, snapshot_seq, 0);;
322 return false;
323 }
324
325 // Add the transaction with prepare sequence seq to the prepared list.
326 // Note: must be called serially with increasing seq on each call.
327 void AddPrepared(uint64_t seq);
328 // Check if any of the prepared txns are less than new max_evicted_seq_. Must
329 // be called with prepared_mutex_ write locked.
330 void CheckPreparedAgainstMax(SequenceNumber new_max);
331 // Remove the transaction with prepare sequence seq from the prepared list
332 void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
333 // Add the transaction with prepare sequence prepare_seq and commit sequence
334 // commit_seq to the commit map. loop_cnt is to detect infinite loops.
335 // Note: must be called serially.
336 void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
337 uint8_t loop_cnt = 0);
338
339 struct CommitEntry {
340 uint64_t prep_seq;
341 uint64_t commit_seq;
342 CommitEntry() : prep_seq(0), commit_seq(0) {}
343 CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
344 bool operator==(const CommitEntry& rhs) const {
345 return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
346 }
347 };
348
349 struct CommitEntry64bFormat {
350 explicit CommitEntry64bFormat(size_t index_bits)
351 : INDEX_BITS(index_bits),
352 PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
353 COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
354 COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
355 DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
356 // Number of higher bits of a sequence number that is not used. They are
357 // used to encode the value type, ...
358 const size_t PAD_BITS = static_cast<size_t>(8);
359 // Number of lower bits from prepare seq that can be skipped as they are
360 // implied by the index of the entry in the array
361 const size_t INDEX_BITS;
362 // Number of bits we use to encode the prepare seq
363 const size_t PREP_BITS;
364 // Number of bits we use to encode the commit seq.
365 const size_t COMMIT_BITS;
366 // Filter to encode/decode commit seq
367 const uint64_t COMMIT_FILTER;
368 // The value of commit_seq - prepare_seq + 1 must be less than this bound
369 const uint64_t DELTA_UPPERBOUND;
370 };
371
372 // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
373 // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ...
374 // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA
375 // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
376 // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
377 // bits that do not have to be encoded (will be provided externally) DELTA:
378 // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
379 // index bits + PADs
380 struct CommitEntry64b {
381 constexpr CommitEntry64b() noexcept : rep_(0) {}
382
383 CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
384 : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
385
386 CommitEntry64b(const uint64_t ps, const uint64_t cs,
387 const CommitEntry64bFormat& format) {
388 assert(ps < static_cast<uint64_t>((static_cast<void> (0))
389 (1ull << (format.PREP_BITS + format.INDEX_BITS))))(static_cast<void> (0));
390 assert(ps <= cs)(static_cast<void> (0));
391 uint64_t delta = cs - ps + 1; // make initialized delta always >= 1
392 // zero is reserved for uninitialized entries
393 assert(0 < delta)(static_cast<void> (0));
394 assert(delta < format.DELTA_UPPERBOUND)(static_cast<void> (0));
395 if (delta >= format.DELTA_UPPERBOUND) {
396 throw std::runtime_error(
397 "commit_seq >> prepare_seq. The allowed distance is " +
398 ToString(format.DELTA_UPPERBOUND) + " commit_seq is " +
399 ToString(cs) + " prepare_seq is " + ToString(ps));
400 }
401 rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
402 rep_ = rep_ | delta;
403 }
404
405 // Return false if the entry is empty
406 bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
407 const CommitEntry64bFormat& format) {
408 uint64_t delta = rep_ & format.COMMIT_FILTER;
409 // zero is reserved for uninitialized entries
410 assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)))(static_cast<void> (0));
411 if (delta == 0) {
412 return false; // initialized entry would have non-zero delta
413 }
414
415 assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)))(static_cast<void> (0));
416 uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
417 prep_up >>= format.PAD_BITS;
418 const uint64_t& prep_low = indexed_seq;
419 entry->prep_seq = prep_up | prep_low;
420
421 entry->commit_seq = entry->prep_seq + delta - 1;
422 return true;
423 }
424
425 private:
426 uint64_t rep_;
427 };
428
429 // Struct to hold ownership of snapshot and read callback for cleanup.
430 struct IteratorState;
431
432 std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
433 return cf_map_;
434 }
435 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
436 return handle_map_;
437 }
438 void UpdateCFComparatorMap(
439 const std::vector<ColumnFamilyHandle*>& handles) override;
440 void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
441
442 virtual const Snapshot* GetSnapshot() override;
443 SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
444
445 protected:
446 virtual Status VerifyCFOptions(
447 const ColumnFamilyOptions& cf_options) override;
448
449 private:
450 friend class PreparedHeap_BasicsTest_Test;
451 friend class PreparedHeap_Concurrent_Test;
452 friend class PreparedHeap_EmptyAtTheEnd_Test;
453 friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
454 friend class WritePreparedCommitEntryPreReleaseCallback;
455 friend class WritePreparedTransactionTestBase;
456 friend class WritePreparedTxn;
457 friend class WritePreparedTxnDBMock;
458 friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
459 friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
460 friend class
461 WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
462 friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
463 friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
464 friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
465 friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
466 friend class
467 WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
468 friend class WritePreparedTransactionTest_CommitMapTest_Test;
469 friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
470 friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
471 friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
472 friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
473 friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
474 friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
475 friend class
476 WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
477 friend class
478 WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
479 friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
480 friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
481 friend class WritePreparedTransactionTest_RollbackTest_Test;
482 friend class WriteUnpreparedTxnDB;
483 friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
484
485 void Init(const TransactionDBOptions& /* unused */);
486
487 void WPRecordTick(uint32_t ticker_type) const {
488 RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
489 }
490
491 // A heap with the amortized O(1) complexity for erase. It uses one extra heap
492 // to keep track of erased entries that are not yet on top of the main heap.
493 class PreparedHeap {
494 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
495 heap_;
496 std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
497 erased_heap_;
498 // True when testing crash recovery
499 bool TEST_CRASH_ = false;
500 friend class WritePreparedTxnDB;
501
502 public:
503 ~PreparedHeap() {
504 if (!TEST_CRASH_) {
505 assert(heap_.empty())(static_cast<void> (0));
506 assert(erased_heap_.empty())(static_cast<void> (0));
507 }
508 }
509 bool empty() { return heap_.empty(); }
510 uint64_t top() { return heap_.top(); }
511 void push(uint64_t v) { heap_.push(v); }
512 void pop() {
513 heap_.pop();
514 while (!heap_.empty() && !erased_heap_.empty() &&
515 // heap_.top() > erased_heap_.top() could happen if we have erased
516 // a non-existent entry. Ideally the user should not do that but we
517 // should be resilient against it.
518 heap_.top() >= erased_heap_.top()) {
519 if (heap_.top() == erased_heap_.top()) {
520 heap_.pop();
521 }
522 uint64_t erased __attribute__((__unused__));
523 erased = erased_heap_.top();
524 erased_heap_.pop();
525 // No duplicate prepare sequence numbers
526 assert(erased_heap_.empty() || erased_heap_.top() != erased)(static_cast<void> (0));
527 }
528 while (heap_.empty() && !erased_heap_.empty()) {
529 erased_heap_.pop();
530 }
531 }
532 void erase(uint64_t seq) {
533 if (!heap_.empty()) {
534 if (seq < heap_.top()) {
535 // Already popped, ignore it.
536 } else if (heap_.top() == seq) {
537 pop();
538 assert(heap_.empty() || heap_.top() != seq)(static_cast<void> (0));
539 } else { // (heap_.top() > seq)
540 // Down the heap, remember to pop it later
541 erased_heap_.push(seq);
542 }
543 }
544 }
545 };
546
547 void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
548
549 // Get the commit entry with index indexed_seq from the commit table. It
550 // returns true if such entry exists.
551 bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
552 CommitEntry* entry) const;
553
554 // Rewrite the entry with the index indexed_seq in the commit table with the
555 // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
556 // sets the evicted_entry and returns true.
557 bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
558 CommitEntry* evicted_entry);
559
560 // Rewrite the entry with the index indexed_seq in the commit table with the
561 // commit entry new_entry only if the existing entry matches the
562 // expected_entry. Returns false otherwise.
563 bool ExchangeCommitEntry(const uint64_t indexed_seq,
564 CommitEntry64b& expected_entry,
565 const CommitEntry& new_entry);
566
567 // Increase max_evicted_seq_ from the previous value prev_max to the new
568 // value. This also involves taking care of prepared txns that are not
569 // committed before new_max, as well as updating the list of live snapshots at
570 // the time of updating the max. Thread-safety: this function can be called
571 // concurrently. The concurrent invocations of this function is equivalent to
572 // a serial invocation in which the last invocation is the one with the
573 // largest new_max value.
574 void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
575 const SequenceNumber& new_max);
576
577 inline SequenceNumber SmallestUnCommittedSeq() {
578 // Since we update the prepare_heap always from the main write queue via
579 // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
580 // prepared data in 2pc transactions. For non-2pc transactions that are
581 // written in two steps, we also update prepared_txns_ at the first step
582 // (via the same mechanism) so that their uncommitted data is reflected in
583 // SmallestUnCommittedSeq.
584 ReadLock rl(&prepared_mutex_);
585 // Since we are holding the mutex, and GetLatestSequenceNumber is updated
586 // after prepared_txns_ are, the value of GetLatestSequenceNumber would
587 // reflect any uncommitted data that is not added to prepared_txns_ yet.
588 // Otherwise, if there is no concurrent txn, this value simply reflects that
589 // latest value in the memtable.
590 if (!delayed_prepared_.empty()) {
591 assert(!delayed_prepared_empty_.load())(static_cast<void> (0));
592 return *delayed_prepared_.begin();
593 }
594 if (prepared_txns_.empty()) {
595 return db_impl_->GetLatestSequenceNumber() + 1;
596 } else {
597 return std::min(prepared_txns_.top(),
598 db_impl_->GetLatestSequenceNumber() + 1);
599 }
600 }
601 // Enhance the snapshot object by recording in it the smallest uncommitted seq
602 inline void EnhanceSnapshot(SnapshotImpl* snapshot,
603 SequenceNumber min_uncommitted) {
604 assert(snapshot)(static_cast<void> (0));
605 snapshot->min_uncommitted_ = min_uncommitted;
606 }
607
608 virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
609 SequenceNumber max);
610
611 // Will be called by the public ReleaseSnapshot method. Does the maintenance
612 // internal to WritePreparedTxnDB
613 void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
614
615 // Update the list of snapshots corresponding to the soon-to-be-updated
616 // max_evicted_seq_. Thread-safety: this function can be called concurrently.
617 // The concurrent invocations of this function is equivalent to a serial
618 // invocation in which the last invocation is the one with the largest
619 // version value.
620 void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
621 const SequenceNumber& version);
622 // Check the new list of new snapshots against the old one to see if any of
623 // the snapshots are released and to do the cleanup for the released snapshot.
624 void CleanupReleasedSnapshots(
625 const std::vector<SequenceNumber>& new_snapshots,
626 const std::vector<SequenceNumber>& old_snapshots);
627
628 // Check an evicted entry against live snapshots to see if it should be kept
629 // around or it can be safely discarded (and hence assume committed for all
630 // snapshots). Thread-safety: this function can be called concurrently. If it
631 // is called concurrently with multiple UpdateSnapshots, the result is the
632 // same as checking the intersection of the snapshot list before updates with
633 // the snapshot list of all the concurrent updates.
634 void CheckAgainstSnapshots(const CommitEntry& evicted);
635
636 // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
637 // commit_seq. Return false if checking the next snapshot(s) is not needed.
638 // This is the case if none of the next snapshots could satisfy the condition.
639 // next_is_larger: the next snapshot will be a larger value
640 bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
641 const uint64_t& commit_seq,
642 const uint64_t& snapshot_seq,
643 const bool next_is_larger);
644
645 // A trick to increase the last visible sequence number by one and also wait
646 // for the in-flight commits to be visible.
647 void AdvanceSeqByOne();
648
649 // The list of live snapshots at the last time that max_evicted_seq_ advanced.
650 // The list stored into two data structures: in snapshot_cache_ that is
651 // efficient for concurrent reads, and in snapshots_ if the data does not fit
652 // into snapshot_cache_. The total number of snapshots in the two lists
653 std::atomic<size_t> snapshots_total_ = {};
654 // The list sorted in ascending order. Thread-safety for writes is provided
655 // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
656 // each entry. In x86_64 architecture such reads are compiled to simple read
657 // instructions.
658 const size_t SNAPSHOT_CACHE_BITS;
659 const size_t SNAPSHOT_CACHE_SIZE;
660 std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
661 // 2nd list for storing snapshots. The list sorted in ascending order.
662 // Thread-safety is provided with snapshots_mutex_.
663 std::vector<SequenceNumber> snapshots_;
664 // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
665 // redundant but simplifies CleanupOldSnapshots implementation.
666 // Thread-safety is provided with snapshots_mutex_.
667 std::vector<SequenceNumber> snapshots_all_;
668 // The version of the latest list of snapshots. This can be used to avoid
669 // rewriting a list that is concurrently updated with a more recent version.
670 SequenceNumber snapshots_version_ = 0;
671
672 // A heap of prepared transactions. Thread-safety is provided with
673 // prepared_mutex_.
674 PreparedHeap prepared_txns_;
675 const size_t COMMIT_CACHE_BITS;
676 const size_t COMMIT_CACHE_SIZE;
677 const CommitEntry64bFormat FORMAT;
678 // commit_cache_ must be initialized to zero to tell apart an empty index from
679 // a filled one. Thread-safety is provided with commit_cache_mutex_.
680 std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
681 // The largest evicted *commit* sequence number from the commit_cache_. If a
682 // seq is smaller than max_evicted_seq_ is might or might not be present in
683 // commit_cache_. So commit_cache_ must first be checked before consulting
684 // with max_evicted_seq_.
685 std::atomic<uint64_t> max_evicted_seq_ = {};
686 // Order: 1) update future_max_evicted_seq_ = new_max, 2)
687 // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
688 // GetSnapshotInternal guarantess that the snapshot seq is larger than
689 // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
690 // than max has already being looked at via a GetSnapshotListFromDB(new_max).
691 std::atomic<uint64_t> future_max_evicted_seq_ = {};
692 // Advance max_evicted_seq_ by this value each time it needs an update. The
693 // larger the value, the less frequent advances we would have. We do not want
694 // it to be too large either as it would cause stalls by doing too much
695 // maintenance work under the lock.
696 size_t INC_STEP_FOR_MAX_EVICTED = 1;
697 // A map from old snapshots (expected to be used by a few read-only txns) to
698 // prepared sequence number of the evicted entries from commit_cache_ that
699 // overlaps with such snapshot. These are the prepared sequence numbers that
700 // the snapshot, to which they are mapped, cannot assume to be committed just
701 // because it is no longer in the commit_cache_. The vector must be sorted
702 // after each update.
703 // Thread-safety is provided with old_commit_map_mutex_.
704 std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
705 // A set of long-running prepared transactions that are not finished by the
706 // time max_evicted_seq_ advances their sequence number. This is expected to
707 // be empty normally. Thread-safety is provided with prepared_mutex_.
708 std::set<uint64_t> delayed_prepared_;
709 // Commit of a delayed prepared: 1) update commit cache, 2) update
710 // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
711 // delayed_prepared_commits_ will help us tell apart the unprepared txns from
712 // the ones that are committed but not cleaned up yet.
713 std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
714 // Update when delayed_prepared_.empty() changes. Expected to be true
715 // normally.
716 std::atomic<bool> delayed_prepared_empty_ = {true};
717 // Update when old_commit_map_.empty() changes. Expected to be true normally.
718 std::atomic<bool> old_commit_map_empty_ = {true};
719 mutable port::RWMutex prepared_mutex_;
720 mutable port::RWMutex old_commit_map_mutex_;
721 mutable port::RWMutex commit_cache_mutex_;
722 mutable port::RWMutex snapshots_mutex_;
723 // A cache of the cf comparators
724 // Thread safety: since it is a const it is safe to read it concurrently
725 std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
726 // A cache of the cf handles
727 // Thread safety: since the handle is read-only object it is a const it is
728 // safe to read it concurrently
729 std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
730};
731
732class WritePreparedTxnReadCallback : public ReadCallback {
733 public:
734 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
735 : ReadCallback(snapshot), db_(db) {}
736 WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
737 SequenceNumber min_uncommitted)
738 : ReadCallback(snapshot, min_uncommitted), db_(db) {}
739
740 // Will be called to see if the seq number visible; if not it moves on to
741 // the next seq number.
742 inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
743 auto snapshot = max_visible_seq_;
744 return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
745 }
746
747 // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
748 private:
749 WritePreparedTxnDB* db_;
750};
751
752class AddPreparedCallback : public PreReleaseCallback {
753 public:
754 AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
755 size_t sub_batch_cnt, bool two_write_queues,
756 bool first_prepare_batch)
757 : db_(db),
758 db_impl_(db_impl),
759 sub_batch_cnt_(sub_batch_cnt),
760 two_write_queues_(two_write_queues),
761 first_prepare_batch_(first_prepare_batch) {
762 (void)two_write_queues_; // to silence unused private field warning
763 }
764 virtual Status Callback(SequenceNumber prepare_seq,
765 bool is_mem_disabled __attribute__((__unused__)),
766 uint64_t log_number) override {
767 // Always Prepare from the main queue
768 assert(!two_write_queues_ || !is_mem_disabled)(static_cast<void> (0)); // implies the 1st queue
769 for (size_t i = 0; i < sub_batch_cnt_; i++) {
770 db_->AddPrepared(prepare_seq + i);
771 }
772 if (first_prepare_batch_) {
773 assert(log_number != 0)(static_cast<void> (0));
774 db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
775 log_number);
776 }
777 return Status::OK();
778 }
779
780 private:
781 WritePreparedTxnDB* db_;
782 DBImpl* db_impl_;
783 size_t sub_batch_cnt_;
784 bool two_write_queues_;
785 // It is 2PC and this is the first prepare batch. Always the case in 2PC
786 // unless it is WriteUnPrepared.
787 bool first_prepare_batch_;
788};
789
790class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
791 public:
792 // includes_data indicates that the commit also writes non-empty
793 // CommitTimeWriteBatch to memtable, which needs to be committed separately.
794 WritePreparedCommitEntryPreReleaseCallback(
795 WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
796 size_t prep_batch_cnt, size_t data_batch_cnt = 0,
797 SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
798 : db_(db),
799 db_impl_(db_impl),
800 prep_seq_(prep_seq),
801 prep_batch_cnt_(prep_batch_cnt),
802 data_batch_cnt_(data_batch_cnt),
803 includes_data_(data_batch_cnt_ > 0),
804 aux_seq_(aux_seq),
805 aux_batch_cnt_(aux_batch_cnt),
806 includes_aux_batch_(aux_batch_cnt > 0) {
807 assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber))(static_cast<void> (0)); // xor
808 assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0)(static_cast<void> (0));
809 assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber))(static_cast<void> (0)); // xor
810 }
811
812 virtual Status Callback(SequenceNumber commit_seq,
813 bool is_mem_disabled __attribute__((__unused__)),
814 uint64_t) override {
815 // Always commit from the 2nd queue
816 assert(!db_impl_->immutable_db_options().two_write_queues ||(static_cast<void> (0))
817 is_mem_disabled)(static_cast<void> (0));
818 assert(includes_data_ || prep_seq_ != kMaxSequenceNumber)(static_cast<void> (0));
819 // Data batch is what accompanied with the commit marker and affects the
820 // last seq in the commit batch.
821 const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)(__builtin_expect((data_batch_cnt_ <= 1), 1))
822 ? commit_seq
823 : commit_seq + data_batch_cnt_ - 1;
824 if (prep_seq_ != kMaxSequenceNumber) {
825 for (size_t i = 0; i < prep_batch_cnt_; i++) {
826 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
827 }
828 } // else there was no prepare phase
829 if (includes_aux_batch_) {
830 for (size_t i = 0; i < aux_batch_cnt_; i++) {
831 db_->AddCommitted(aux_seq_ + i, last_commit_seq);
832 }
833 }
834 if (includes_data_) {
835 assert(data_batch_cnt_)(static_cast<void> (0));
836 // Commit the data that is accompanied with the commit request
837 for (size_t i = 0; i < data_batch_cnt_; i++) {
838 // For commit seq of each batch use the commit seq of the last batch.
839 // This would make debugging easier by having all the batches having
840 // the same sequence number.
841 db_->AddCommitted(commit_seq + i, last_commit_seq);
842 }
843 }
844 if (db_impl_->immutable_db_options().two_write_queues) {
845 assert(is_mem_disabled)(static_cast<void> (0)); // implies the 2nd queue
846 // Publish the sequence number. We can do that here assuming the callback
847 // is invoked only from one write queue, which would guarantee that the
848 // publish sequence numbers will be in order, i.e., once a seq is
849 // published all the seq prior to that are also publishable.
850 db_impl_->SetLastPublishedSequence(last_commit_seq);
851 }
852 // else SequenceNumber that is updated as part of the write already does the
853 // publishing
854 return Status::OK();
855 }
856
857 private:
858 WritePreparedTxnDB* db_;
859 DBImpl* db_impl_;
860 // kMaxSequenceNumber if there was no prepare phase
861 SequenceNumber prep_seq_;
862 size_t prep_batch_cnt_;
863 size_t data_batch_cnt_;
864 // Data here is the batch that is written with the commit marker, either
865 // because it is commit without prepare or commit has a CommitTimeWriteBatch.
866 bool includes_data_;
867 // Auxiliary batch (if there is any) is a batch that is written before, but
868 // gets the same commit seq as prepare batch or data batch. This is used in
869 // two write queues where the CommitTimeWriteBatch becomes the aux batch and
870 // we do a separate write to actually commit everything.
871 SequenceNumber aux_seq_;
872 size_t aux_batch_cnt_;
873 bool includes_aux_batch_;
874};
875
876// For two_write_queues commit both the aborted batch and the cleanup batch and
877// then published the seq
878class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
879 public:
880 WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
881 DBImpl* db_impl,
882 SequenceNumber prep_seq,
883 SequenceNumber rollback_seq,
884 size_t prep_batch_cnt)
885 : db_(db),
886 db_impl_(db_impl),
887 prep_seq_(prep_seq),
888 rollback_seq_(rollback_seq),
889 prep_batch_cnt_(prep_batch_cnt) {
890 assert(prep_seq != kMaxSequenceNumber)(static_cast<void> (0));
891 assert(rollback_seq != kMaxSequenceNumber)(static_cast<void> (0));
892 assert(prep_batch_cnt_ > 0)(static_cast<void> (0));
893 }
894
895 Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
896 uint64_t) override {
897 // Always commit from the 2nd queue
898 assert(is_mem_disabled)(static_cast<void> (0)); // implies the 2nd queue
899 assert(db_impl_->immutable_db_options().two_write_queues)(static_cast<void> (0));
900#ifdef NDEBUG1
901 (void)is_mem_disabled;
902#endif
903 const uint64_t last_commit_seq = commit_seq;
904 db_->AddCommitted(rollback_seq_, last_commit_seq);
905 for (size_t i = 0; i < prep_batch_cnt_; i++) {
906 db_->AddCommitted(prep_seq_ + i, last_commit_seq);
907 }
908 db_impl_->SetLastPublishedSequence(last_commit_seq);
909 return Status::OK();
910 }
911
912 private:
913 WritePreparedTxnDB* db_;
914 DBImpl* db_impl_;
915 SequenceNumber prep_seq_;
916 SequenceNumber rollback_seq_;
917 size_t prep_batch_cnt_;
918};
919
920// Count the number of sub-batches inside a batch. A sub-batch does not have
921// duplicate keys.
922struct SubBatchCounter : public WriteBatch::Handler {
923 explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
924 : comparators_(comparators), batches_(1) {}
925 std::map<uint32_t, const Comparator*>& comparators_;
926 using CFKeys = std::set<Slice, SetComparator>;
927 std::map<uint32_t, CFKeys> keys_;
928 size_t batches_;
929 size_t BatchCount() { return batches_; }
930 void AddKey(const uint32_t cf, const Slice& key);
931 void InitWithComp(const uint32_t cf);
932 Status MarkNoop(bool) override { return Status::OK(); }
933 Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
934 Status MarkCommit(const Slice&) override { return Status::OK(); }
935 Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
936 AddKey(cf, key);
937 return Status::OK();
938 }
939 Status DeleteCF(uint32_t cf, const Slice& key) override {
940 AddKey(cf, key);
941 return Status::OK();
942 }
943 Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
944 AddKey(cf, key);
945 return Status::OK();
946 }
947 Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
948 AddKey(cf, key);
949 return Status::OK();
950 }
951 Status MarkBeginPrepare(bool) override { return Status::OK(); }
952 Status MarkRollback(const Slice&) override { return Status::OK(); }
953 bool WriteAfterCommit() const override { return false; }
954};
955
956} // namespace rocksdb
957#endif // ROCKSDB_LITE