File: | home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc |
Warning: | line 43, column 5 Called C++ object pointer is null |
[?] Use j/k keys for keyboard navigation
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |||
2 | // This source code is licensed under both the GPLv2 (found in the | |||
3 | // COPYING file in the root directory) and Apache 2.0 License | |||
4 | // (found in the LICENSE.Apache file in the root directory). | |||
5 | // | |||
6 | #include "db/memtable_list.h" | |||
7 | ||||
8 | #ifndef __STDC_FORMAT_MACROS | |||
9 | #define __STDC_FORMAT_MACROS | |||
10 | #endif | |||
11 | ||||
12 | #include <inttypes.h> | |||
13 | #include <limits> | |||
14 | #include <queue> | |||
15 | #include <string> | |||
16 | #include "db/db_impl.h" | |||
17 | #include "db/memtable.h" | |||
18 | #include "db/range_tombstone_fragmenter.h" | |||
19 | #include "db/version_set.h" | |||
20 | #include "monitoring/thread_status_util.h" | |||
21 | #include "rocksdb/db.h" | |||
22 | #include "rocksdb/env.h" | |||
23 | #include "rocksdb/iterator.h" | |||
24 | #include "table/merging_iterator.h" | |||
25 | #include "util/coding.h" | |||
26 | #include "util/log_buffer.h" | |||
27 | #include "util/sync_point.h" | |||
28 | ||||
29 | namespace rocksdb { | |||
30 | ||||
31 | class InternalKeyComparator; | |||
32 | class Mutex; | |||
33 | class VersionSet; | |||
34 | ||||
35 | void MemTableListVersion::AddMemTable(MemTable* m) { | |||
36 | memlist_.push_front(m); | |||
37 | *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); | |||
38 | } | |||
39 | ||||
40 | void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete, | |||
41 | MemTable* m) { | |||
42 | if (m->Unref()) { | |||
43 | to_delete->push_back(m); | |||
| ||||
44 | assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage())(static_cast<void> (0)); | |||
45 | *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage(); | |||
46 | } | |||
47 | } | |||
48 | ||||
49 | MemTableListVersion::MemTableListVersion( | |||
50 | size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) | |||
51 | : max_write_buffer_number_to_maintain_( | |||
52 | old->max_write_buffer_number_to_maintain_), | |||
53 | parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { | |||
54 | if (old != nullptr) { | |||
55 | memlist_ = old->memlist_; | |||
56 | for (auto& m : memlist_) { | |||
57 | m->Ref(); | |||
58 | } | |||
59 | ||||
60 | memlist_history_ = old->memlist_history_; | |||
61 | for (auto& m : memlist_history_) { | |||
62 | m->Ref(); | |||
63 | } | |||
64 | } | |||
65 | } | |||
66 | ||||
67 | MemTableListVersion::MemTableListVersion( | |||
68 | size_t* parent_memtable_list_memory_usage, | |||
69 | int max_write_buffer_number_to_maintain) | |||
70 | : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), | |||
71 | parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} | |||
72 | ||||
73 | void MemTableListVersion::Ref() { ++refs_; } | |||
74 | ||||
75 | // called by superversion::clean() | |||
76 | void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) { | |||
77 | assert(refs_ >= 1)(static_cast<void> (0)); | |||
78 | --refs_; | |||
79 | if (refs_ == 0) { | |||
80 | // if to_delete is equal to nullptr it means we're confident | |||
81 | // that refs_ will not be zero | |||
82 | assert(to_delete != nullptr)(static_cast<void> (0)); | |||
83 | for (const auto& m : memlist_) { | |||
84 | UnrefMemTable(to_delete, m); | |||
85 | } | |||
86 | for (const auto& m : memlist_history_) { | |||
87 | UnrefMemTable(to_delete, m); | |||
88 | } | |||
89 | delete this; | |||
90 | } | |||
91 | } | |||
92 | ||||
93 | int MemTableList::NumNotFlushed() const { | |||
94 | int size = static_cast<int>(current_->memlist_.size()); | |||
95 | assert(num_flush_not_started_ <= size)(static_cast<void> (0)); | |||
96 | return size; | |||
97 | } | |||
98 | ||||
99 | int MemTableList::NumFlushed() const { | |||
100 | return static_cast<int>(current_->memlist_history_.size()); | |||
101 | } | |||
102 | ||||
103 | // Search all the memtables starting from the most recent one. | |||
104 | // Return the most recent value found, if any. | |||
105 | // Operands stores the list of merge operations to apply, so far. | |||
106 | bool MemTableListVersion::Get(const LookupKey& key, std::string* value, | |||
107 | Status* s, MergeContext* merge_context, | |||
108 | SequenceNumber* max_covering_tombstone_seq, | |||
109 | SequenceNumber* seq, const ReadOptions& read_opts, | |||
110 | ReadCallback* callback, bool* is_blob_index) { | |||
111 | return GetFromList(&memlist_, key, value, s, merge_context, | |||
112 | max_covering_tombstone_seq, seq, read_opts, callback, | |||
113 | is_blob_index); | |||
114 | } | |||
115 | ||||
116 | bool MemTableListVersion::GetFromHistory( | |||
117 | const LookupKey& key, std::string* value, Status* s, | |||
118 | MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, | |||
119 | SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { | |||
120 | return GetFromList(&memlist_history_, key, value, s, merge_context, | |||
121 | max_covering_tombstone_seq, seq, read_opts, | |||
122 | nullptr /*read_callback*/, is_blob_index); | |||
123 | } | |||
124 | ||||
125 | bool MemTableListVersion::GetFromList( | |||
126 | std::list<MemTable*>* list, const LookupKey& key, std::string* value, | |||
127 | Status* s, MergeContext* merge_context, | |||
128 | SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, | |||
129 | const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { | |||
130 | *seq = kMaxSequenceNumber; | |||
131 | ||||
132 | for (auto& memtable : *list) { | |||
133 | SequenceNumber current_seq = kMaxSequenceNumber; | |||
134 | ||||
135 | bool done = | |||
136 | memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq, | |||
137 | ¤t_seq, read_opts, callback, is_blob_index); | |||
138 | if (*seq == kMaxSequenceNumber) { | |||
139 | // Store the most recent sequence number of any operation on this key. | |||
140 | // Since we only care about the most recent change, we only need to | |||
141 | // return the first operation found when searching memtables in | |||
142 | // reverse-chronological order. | |||
143 | // current_seq would be equal to kMaxSequenceNumber if the value was to be | |||
144 | // skipped. This allows seq to be assigned again when the next value is | |||
145 | // read. | |||
146 | *seq = current_seq; | |||
147 | } | |||
148 | ||||
149 | if (done) { | |||
150 | assert(*seq != kMaxSequenceNumber || s->IsNotFound())(static_cast<void> (0)); | |||
151 | return true; | |||
152 | } | |||
153 | if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) { | |||
154 | return false; | |||
155 | } | |||
156 | } | |||
157 | return false; | |||
158 | } | |||
159 | ||||
160 | Status MemTableListVersion::AddRangeTombstoneIterators( | |||
161 | const ReadOptions& read_opts, Arena* /*arena*/, | |||
162 | RangeDelAggregator* range_del_agg) { | |||
163 | assert(range_del_agg != nullptr)(static_cast<void> (0)); | |||
164 | for (auto& m : memlist_) { | |||
165 | // Using kMaxSequenceNumber is OK because these are immutable memtables. | |||
166 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( | |||
167 | m->NewRangeTombstoneIterator(read_opts, | |||
168 | kMaxSequenceNumber /* read_seq */)); | |||
169 | range_del_agg->AddTombstones(std::move(range_del_iter)); | |||
170 | } | |||
171 | return Status::OK(); | |||
172 | } | |||
173 | ||||
174 | void MemTableListVersion::AddIterators( | |||
175 | const ReadOptions& options, std::vector<InternalIterator*>* iterator_list, | |||
176 | Arena* arena) { | |||
177 | for (auto& m : memlist_) { | |||
178 | iterator_list->push_back(m->NewIterator(options, arena)); | |||
179 | } | |||
180 | } | |||
181 | ||||
182 | void MemTableListVersion::AddIterators( | |||
183 | const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { | |||
184 | for (auto& m : memlist_) { | |||
185 | merge_iter_builder->AddIterator( | |||
186 | m->NewIterator(options, merge_iter_builder->GetArena())); | |||
187 | } | |||
188 | } | |||
189 | ||||
190 | uint64_t MemTableListVersion::GetTotalNumEntries() const { | |||
191 | uint64_t total_num = 0; | |||
192 | for (auto& m : memlist_) { | |||
193 | total_num += m->num_entries(); | |||
194 | } | |||
195 | return total_num; | |||
196 | } | |||
197 | ||||
198 | MemTable::MemTableStats MemTableListVersion::ApproximateStats( | |||
199 | const Slice& start_ikey, const Slice& end_ikey) { | |||
200 | MemTable::MemTableStats total_stats = {0, 0}; | |||
201 | for (auto& m : memlist_) { | |||
202 | auto mStats = m->ApproximateStats(start_ikey, end_ikey); | |||
203 | total_stats.size += mStats.size; | |||
204 | total_stats.count += mStats.count; | |||
205 | } | |||
206 | return total_stats; | |||
207 | } | |||
208 | ||||
209 | uint64_t MemTableListVersion::GetTotalNumDeletes() const { | |||
210 | uint64_t total_num = 0; | |||
211 | for (auto& m : memlist_) { | |||
212 | total_num += m->num_deletes(); | |||
213 | } | |||
214 | return total_num; | |||
215 | } | |||
216 | ||||
217 | SequenceNumber MemTableListVersion::GetEarliestSequenceNumber( | |||
218 | bool include_history) const { | |||
219 | if (include_history && !memlist_history_.empty()) { | |||
220 | return memlist_history_.back()->GetEarliestSequenceNumber(); | |||
221 | } else if (!memlist_.empty()) { | |||
222 | return memlist_.back()->GetEarliestSequenceNumber(); | |||
223 | } else { | |||
224 | return kMaxSequenceNumber; | |||
225 | } | |||
226 | } | |||
227 | ||||
228 | // caller is responsible for referencing m | |||
229 | void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) { | |||
230 | assert(refs_ == 1)(static_cast<void> (0)); // only when refs_ == 1 is MemTableListVersion mutable | |||
231 | AddMemTable(m); | |||
232 | ||||
233 | TrimHistory(to_delete); | |||
234 | } | |||
235 | ||||
236 | // Removes m from list of memtables not flushed. Caller should NOT Unref m. | |||
237 | void MemTableListVersion::Remove(MemTable* m, | |||
238 | autovector<MemTable*>* to_delete) { | |||
239 | assert(refs_ == 1)(static_cast<void> (0)); // only when refs_ == 1 is MemTableListVersion mutable | |||
240 | memlist_.remove(m); | |||
241 | ||||
242 | m->MarkFlushed(); | |||
243 | if (max_write_buffer_number_to_maintain_ > 0) { | |||
244 | memlist_history_.push_front(m); | |||
245 | TrimHistory(to_delete); | |||
246 | } else { | |||
247 | UnrefMemTable(to_delete, m); | |||
248 | } | |||
249 | } | |||
250 | ||||
251 | // Make sure we don't use up too much space in history | |||
252 | void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) { | |||
253 | while (memlist_.size() + memlist_history_.size() > | |||
254 | static_cast<size_t>(max_write_buffer_number_to_maintain_) && | |||
255 | !memlist_history_.empty()) { | |||
256 | MemTable* x = memlist_history_.back(); | |||
257 | memlist_history_.pop_back(); | |||
258 | ||||
259 | UnrefMemTable(to_delete, x); | |||
260 | } | |||
261 | } | |||
262 | ||||
263 | // Returns true if there is at least one memtable on which flush has | |||
264 | // not yet started. | |||
265 | bool MemTableList::IsFlushPending() const { | |||
266 | if ((flush_requested_ && num_flush_not_started_ > 0) || | |||
267 | (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { | |||
268 | assert(imm_flush_needed.load(std::memory_order_relaxed))(static_cast<void> (0)); | |||
269 | return true; | |||
270 | } | |||
271 | return false; | |||
272 | } | |||
273 | ||||
274 | // Returns the memtables that need to be flushed. | |||
275 | void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, | |||
276 | autovector<MemTable*>* ret) { | |||
277 | AutoThreadOperationStageUpdater stage_updater( | |||
278 | ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); | |||
279 | const auto& memlist = current_->memlist_; | |||
280 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { | |||
281 | MemTable* m = *it; | |||
282 | if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) { | |||
283 | break; | |||
284 | } | |||
285 | if (!m->flush_in_progress_) { | |||
286 | assert(!m->flush_completed_)(static_cast<void> (0)); | |||
287 | num_flush_not_started_--; | |||
288 | if (num_flush_not_started_ == 0) { | |||
289 | imm_flush_needed.store(false, std::memory_order_release); | |||
290 | } | |||
291 | m->flush_in_progress_ = true; // flushing will start very soon | |||
292 | ret->push_back(m); | |||
293 | } | |||
294 | } | |||
295 | flush_requested_ = false; // start-flush request is complete | |||
296 | } | |||
297 | ||||
298 | void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems, | |||
299 | uint64_t /*file_number*/) { | |||
300 | AutoThreadOperationStageUpdater stage_updater( | |||
301 | ThreadStatus::STAGE_MEMTABLE_ROLLBACK); | |||
302 | assert(!mems.empty())(static_cast<void> (0)); | |||
303 | ||||
304 | // If the flush was not successful, then just reset state. | |||
305 | // Maybe a succeeding attempt to flush will be successful. | |||
306 | for (MemTable* m : mems) { | |||
307 | assert(m->flush_in_progress_)(static_cast<void> (0)); | |||
308 | assert(m->file_number_ == 0)(static_cast<void> (0)); | |||
309 | ||||
310 | m->flush_in_progress_ = false; | |||
311 | m->flush_completed_ = false; | |||
312 | m->edit_.Clear(); | |||
313 | num_flush_not_started_++; | |||
314 | } | |||
315 | imm_flush_needed.store(true, std::memory_order_release); | |||
316 | } | |||
317 | ||||
318 | // Try record a successful flush in the manifest file. It might just return | |||
319 | // Status::OK letting a concurrent flush to do actual the recording.. | |||
320 | Status MemTableList::TryInstallMemtableFlushResults( | |||
321 | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, | |||
322 | const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker, | |||
323 | VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, | |||
324 | autovector<MemTable*>* to_delete, Directory* db_directory, | |||
325 | LogBuffer* log_buffer) { | |||
326 | AutoThreadOperationStageUpdater stage_updater( | |||
327 | ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); | |||
328 | mu->AssertHeld(); | |||
329 | ||||
330 | // Flush was successful | |||
331 | // Record the status on the memtable object. Either this call or a call by a | |||
332 | // concurrent flush thread will read the status and write it to manifest. | |||
333 | for (size_t i = 0; i < mems.size(); ++i) { | |||
334 | // All the edits are associated with the first memtable of this batch. | |||
335 | assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0)(static_cast<void> (0)); | |||
336 | ||||
337 | mems[i]->flush_completed_ = true; | |||
338 | mems[i]->file_number_ = file_number; | |||
339 | } | |||
340 | ||||
341 | // if some other thread is already committing, then return | |||
342 | Status s; | |||
343 | if (commit_in_progress_) { | |||
344 | TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress"); | |||
345 | return s; | |||
346 | } | |||
347 | ||||
348 | // Only a single thread can be executing this piece of code | |||
349 | commit_in_progress_ = true; | |||
350 | ||||
351 | // Retry until all completed flushes are committed. New flushes can finish | |||
352 | // while the current thread is writing manifest where mutex is released. | |||
353 | while (s.ok()) { | |||
354 | auto& memlist = current_->memlist_; | |||
355 | // The back is the oldest; if flush_completed_ is not set to it, it means | |||
356 | // that we were assigned a more recent memtable. The memtables' flushes must | |||
357 | // be recorded in manifest in order. A concurrent flush thread, who is | |||
358 | // assigned to flush the oldest memtable, will later wake up and does all | |||
359 | // the pending writes to manifest, in order. | |||
360 | if (memlist.empty() || !memlist.back()->flush_completed_) { | |||
361 | break; | |||
362 | } | |||
363 | // scan all memtables from the earliest, and commit those | |||
364 | // (in that order) that have finished flushing. Memtables | |||
365 | // are always committed in the order that they were created. | |||
366 | uint64_t batch_file_number = 0; | |||
367 | size_t batch_count = 0; | |||
368 | autovector<VersionEdit*> edit_list; | |||
369 | autovector<MemTable*> memtables_to_flush; | |||
370 | // enumerate from the last (earliest) element to see how many batch finished | |||
371 | for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { | |||
372 | MemTable* m = *it; | |||
373 | if (!m->flush_completed_) { | |||
374 | break; | |||
375 | } | |||
376 | if (it == memlist.rbegin() || batch_file_number != m->file_number_) { | |||
377 | batch_file_number = m->file_number_; | |||
378 | ROCKS_LOG_BUFFER(log_buffer,rocksdb::LogToBuffer(log_buffer, ("[%s:" "380" "] " "[%s] Level-0 commit table #%" "l" "u" " started"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfd->GetName().c_str(), m->file_number_) | |||
379 | "[%s] Level-0 commit table #%" PRIu64 " started",rocksdb::LogToBuffer(log_buffer, ("[%s:" "380" "] " "[%s] Level-0 commit table #%" "l" "u" " started"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfd->GetName().c_str(), m->file_number_) | |||
380 | cfd->GetName().c_str(), m->file_number_)rocksdb::LogToBuffer(log_buffer, ("[%s:" "380" "] " "[%s] Level-0 commit table #%" "l" "u" " started"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfd->GetName().c_str(), m->file_number_); | |||
381 | edit_list.push_back(&m->edit_); | |||
382 | memtables_to_flush.push_back(m); | |||
383 | } | |||
384 | batch_count++; | |||
385 | } | |||
386 | ||||
387 | // TODO(myabandeh): Not sure how batch_count could be 0 here. | |||
388 | if (batch_count > 0) { | |||
389 | if (vset->db_options()->allow_2pc) { | |||
390 | assert(edit_list.size() > 0)(static_cast<void> (0)); | |||
391 | // We piggyback the information of earliest log file to keep in the | |||
392 | // manifest entry for the last file flushed. | |||
393 | edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep( | |||
394 | vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); | |||
395 | } | |||
396 | ||||
397 | // this can release and reacquire the mutex. | |||
398 | s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, | |||
399 | db_directory); | |||
400 | ||||
401 | // we will be changing the version in the next code path, | |||
402 | // so we better create a new one, since versions are immutable | |||
403 | InstallNewVersion(); | |||
404 | ||||
405 | // All the later memtables that have the same filenum | |||
406 | // are part of the same batch. They can be committed now. | |||
407 | uint64_t mem_id = 1; // how many memtables have been flushed. | |||
408 | ||||
409 | // commit new state only if the column family is NOT dropped. | |||
410 | // The reason is as follows (refer to | |||
411 | // ColumnFamilyTest.FlushAndDropRaceCondition). | |||
412 | // If the column family is dropped, then according to LogAndApply, its | |||
413 | // corresponding flush operation is NOT written to the MANIFEST. This | |||
414 | // means the DB is not aware of the L0 files generated from the flush. | |||
415 | // By committing the new state, we remove the memtable from the memtable | |||
416 | // list. Creating an iterator on this column family will not be able to | |||
417 | // read full data since the memtable is removed, and the DB is not aware | |||
418 | // of the L0 files, causing MergingIterator unable to build child | |||
419 | // iterators. RocksDB contract requires that the iterator can be created | |||
420 | // on a dropped column family, and we must be able to | |||
421 | // read full data as long as column family handle is not deleted, even if | |||
422 | // the column family is dropped. | |||
423 | if (s.ok() && !cfd->IsDropped()) { // commit new state | |||
424 | while (batch_count-- > 0) { | |||
425 | MemTable* m = current_->memlist_.back(); | |||
426 | ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64rocksdb::LogToBuffer(log_buffer, ("[%s:" "428" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfd->GetName().c_str(), m->file_number_, mem_id) | |||
427 | ": memtable #%" PRIu64 " done",rocksdb::LogToBuffer(log_buffer, ("[%s:" "428" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfd->GetName().c_str(), m->file_number_, mem_id) | |||
428 | cfd->GetName().c_str(), m->file_number_, mem_id)rocksdb::LogToBuffer(log_buffer, ("[%s:" "428" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfd->GetName().c_str(), m->file_number_, mem_id); | |||
429 | assert(m->file_number_ > 0)(static_cast<void> (0)); | |||
430 | current_->Remove(m, to_delete); | |||
431 | ++mem_id; | |||
432 | } | |||
433 | } else { | |||
434 | for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { | |||
435 | MemTable* m = *it; | |||
436 | // commit failed. setup state so that we can flush again. | |||
437 | ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64rocksdb::LogToBuffer(log_buffer, ("[%s:" "439" "] " "Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), m->file_number_, mem_id) | |||
438 | ": memtable #%" PRIu64 " failed",rocksdb::LogToBuffer(log_buffer, ("[%s:" "439" "] " "Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), m->file_number_, mem_id) | |||
439 | m->file_number_, mem_id)rocksdb::LogToBuffer(log_buffer, ("[%s:" "439" "] " "Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), m->file_number_, mem_id); | |||
440 | m->flush_completed_ = false; | |||
441 | m->flush_in_progress_ = false; | |||
442 | m->edit_.Clear(); | |||
443 | num_flush_not_started_++; | |||
444 | m->file_number_ = 0; | |||
445 | imm_flush_needed.store(true, std::memory_order_release); | |||
446 | ++mem_id; | |||
447 | } | |||
448 | } | |||
449 | } | |||
450 | } | |||
451 | commit_in_progress_ = false; | |||
452 | return s; | |||
453 | } | |||
454 | ||||
455 | // New memtables are inserted at the front of the list. | |||
456 | void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) { | |||
457 | assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_)(static_cast<void> (0)); | |||
458 | InstallNewVersion(); | |||
| ||||
459 | // this method is used to move mutable memtable into an immutable list. | |||
460 | // since mutable memtable is already refcounted by the DBImpl, | |||
461 | // and when moving to the imutable list we don't unref it, | |||
462 | // we don't have to ref the memtable here. we just take over the | |||
463 | // reference from the DBImpl. | |||
464 | current_->Add(m, to_delete); | |||
465 | m->MarkImmutable(); | |||
466 | num_flush_not_started_++; | |||
467 | if (num_flush_not_started_ == 1) { | |||
468 | imm_flush_needed.store(true, std::memory_order_release); | |||
469 | } | |||
470 | } | |||
471 | ||||
472 | // Returns an estimate of the number of bytes of data in use. | |||
473 | size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { | |||
474 | size_t total_size = 0; | |||
475 | for (auto& memtable : current_->memlist_) { | |||
476 | total_size += memtable->ApproximateMemoryUsage(); | |||
477 | } | |||
478 | return total_size; | |||
479 | } | |||
480 | ||||
481 | size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } | |||
482 | ||||
483 | uint64_t MemTableList::ApproximateOldestKeyTime() const { | |||
484 | if (!current_->memlist_.empty()) { | |||
485 | return current_->memlist_.back()->ApproximateOldestKeyTime(); | |||
486 | } | |||
487 | return std::numeric_limits<uint64_t>::max(); | |||
488 | } | |||
489 | ||||
490 | void MemTableList::InstallNewVersion() { | |||
491 | if (current_->refs_ == 1) { | |||
492 | // we're the only one using the version, just keep using it | |||
493 | } else { | |||
494 | // somebody else holds the current version, we need to create new one | |||
495 | MemTableListVersion* version = current_; | |||
496 | current_ = new MemTableListVersion(¤t_memory_usage_, current_); | |||
497 | current_->Ref(); | |||
498 | version->Unref(); | |||
499 | } | |||
500 | } | |||
501 | ||||
502 | uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( | |||
503 | const autovector<MemTable*>& memtables_to_flush) { | |||
504 | uint64_t min_log = 0; | |||
505 | ||||
506 | for (auto& m : current_->memlist_) { | |||
507 | // Assume the list is very short, we can live with O(m*n). We can optimize | |||
508 | // if the performance has some problem. | |||
509 | bool should_skip = false; | |||
510 | for (MemTable* m_to_flush : memtables_to_flush) { | |||
511 | if (m == m_to_flush) { | |||
512 | should_skip = true; | |||
513 | break; | |||
514 | } | |||
515 | } | |||
516 | if (should_skip) { | |||
517 | continue; | |||
518 | } | |||
519 | ||||
520 | auto log = m->GetMinLogContainingPrepSection(); | |||
521 | ||||
522 | if (log > 0 && (min_log == 0 || log < min_log)) { | |||
523 | min_log = log; | |||
524 | } | |||
525 | } | |||
526 | ||||
527 | return min_log; | |||
528 | } | |||
529 | ||||
530 | // Commit a successful atomic flush in the manifest file. | |||
531 | Status InstallMemtableAtomicFlushResults( | |||
532 | const autovector<MemTableList*>* imm_lists, | |||
533 | const autovector<ColumnFamilyData*>& cfds, | |||
534 | const autovector<const MutableCFOptions*>& mutable_cf_options_list, | |||
535 | const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset, | |||
536 | InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas, | |||
537 | autovector<MemTable*>* to_delete, Directory* db_directory, | |||
538 | LogBuffer* log_buffer) { | |||
539 | AutoThreadOperationStageUpdater stage_updater( | |||
540 | ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); | |||
541 | mu->AssertHeld(); | |||
542 | ||||
543 | size_t num = mems_list.size(); | |||
544 | assert(cfds.size() == num)(static_cast<void> (0)); | |||
545 | if (imm_lists != nullptr) { | |||
546 | assert(imm_lists->size() == num)(static_cast<void> (0)); | |||
547 | } | |||
548 | for (size_t k = 0; k != num; ++k) { | |||
549 | #ifndef NDEBUG1 | |||
550 | const auto* imm = | |||
551 | (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); | |||
552 | if (!mems_list[k]->empty()) { | |||
553 | assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID())(static_cast<void> (0)); | |||
554 | } | |||
555 | #endif | |||
556 | assert(nullptr != file_metas[k])(static_cast<void> (0)); | |||
557 | for (size_t i = 0; i != mems_list[k]->size(); ++i) { | |||
558 | assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0)(static_cast<void> (0)); | |||
559 | (*mems_list[k])[i]->SetFlushCompleted(true); | |||
560 | (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber()); | |||
561 | } | |||
562 | } | |||
563 | ||||
564 | Status s; | |||
565 | ||||
566 | autovector<autovector<VersionEdit*>> edit_lists; | |||
567 | uint32_t num_entries = 0; | |||
568 | for (const auto mems : mems_list) { | |||
569 | assert(mems != nullptr)(static_cast<void> (0)); | |||
570 | autovector<VersionEdit*> edits; | |||
571 | assert(!mems->empty())(static_cast<void> (0)); | |||
572 | edits.emplace_back((*mems)[0]->GetEdits()); | |||
573 | ++num_entries; | |||
574 | edit_lists.emplace_back(edits); | |||
575 | } | |||
576 | // Mark the version edits as an atomic group if the number of version edits | |||
577 | // exceeds 1. | |||
578 | if (cfds.size() > 1) { | |||
579 | for (auto& edits : edit_lists) { | |||
580 | assert(edits.size() == 1)(static_cast<void> (0)); | |||
581 | edits[0]->MarkAtomicGroup(--num_entries); | |||
582 | } | |||
583 | assert(0 == num_entries)(static_cast<void> (0)); | |||
584 | } | |||
585 | ||||
586 | // this can release and reacquire the mutex. | |||
587 | s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, | |||
588 | db_directory); | |||
589 | ||||
590 | for (size_t k = 0; k != cfds.size(); ++k) { | |||
591 | auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); | |||
592 | imm->InstallNewVersion(); | |||
593 | } | |||
594 | ||||
595 | if (s.ok() || s.IsShutdownInProgress()) { | |||
596 | for (size_t i = 0; i != cfds.size(); ++i) { | |||
597 | if (cfds[i]->IsDropped()) { | |||
598 | continue; | |||
599 | } | |||
600 | auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); | |||
601 | for (auto m : *mems_list[i]) { | |||
602 | assert(m->GetFileNumber() > 0)(static_cast<void> (0)); | |||
603 | uint64_t mem_id = m->GetID(); | |||
604 | ROCKS_LOG_BUFFER(log_buffer,rocksdb::LogToBuffer(log_buffer, ("[%s:" "608" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
605 | "[%s] Level-0 commit table #%" PRIu64rocksdb::LogToBuffer(log_buffer, ("[%s:" "608" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
606 | ": memtable #%" PRIu64 " done",rocksdb::LogToBuffer(log_buffer, ("[%s:" "608" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
607 | cfds[i]->GetName().c_str(), m->GetFileNumber(),rocksdb::LogToBuffer(log_buffer, ("[%s:" "608" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
608 | mem_id)rocksdb::LogToBuffer(log_buffer, ("[%s:" "608" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " done"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ); | |||
609 | imm->current_->Remove(m, to_delete); | |||
610 | } | |||
611 | } | |||
612 | } else { | |||
613 | for (size_t i = 0; i != cfds.size(); ++i) { | |||
614 | auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); | |||
615 | for (auto m : *mems_list[i]) { | |||
616 | uint64_t mem_id = m->GetID(); | |||
617 | ROCKS_LOG_BUFFER(log_buffer,rocksdb::LogToBuffer(log_buffer, ("[%s:" "621" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
618 | "[%s] Level-0 commit table #%" PRIu64rocksdb::LogToBuffer(log_buffer, ("[%s:" "621" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
619 | ": memtable #%" PRIu64 " failed",rocksdb::LogToBuffer(log_buffer, ("[%s:" "621" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
620 | cfds[i]->GetName().c_str(), m->GetFileNumber(),rocksdb::LogToBuffer(log_buffer, ("[%s:" "621" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ) | |||
621 | mem_id)rocksdb::LogToBuffer(log_buffer, ("[%s:" "621" "] " "[%s] Level-0 commit table #%" "l" "u" ": memtable #%" "l" "u" " failed"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/db/memtable_list.cc" ), cfds[i]->GetName().c_str(), m->GetFileNumber(), mem_id ); | |||
622 | m->SetFlushCompleted(false); | |||
623 | m->SetFlushInProgress(false); | |||
624 | m->GetEdits()->Clear(); | |||
625 | m->SetFileNumber(0); | |||
626 | imm->num_flush_not_started_++; | |||
627 | } | |||
628 | imm->imm_flush_needed.store(true, std::memory_order_release); | |||
629 | } | |||
630 | } | |||
631 | ||||
632 | return s; | |||
633 | } | |||
634 | ||||
635 | } // namespace rocksdb |