File: | home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc |
Warning: | line 2072, column 5 Potential leak of memory pointed to by 'block.value' |
[?] Use j/k keys for keyboard navigation
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |||||
2 | // This source code is licensed under both the GPLv2 (found in the | |||||
3 | // COPYING file in the root directory) and Apache 2.0 License | |||||
4 | // (found in the LICENSE.Apache file in the root directory). | |||||
5 | // | |||||
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |||||
7 | // Use of this source code is governed by a BSD-style license that can be | |||||
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |||||
9 | #include "table/block_based_table_reader.h" | |||||
10 | ||||||
11 | #include <algorithm> | |||||
12 | #include <array> | |||||
13 | #include <limits> | |||||
14 | #include <string> | |||||
15 | #include <utility> | |||||
16 | #include <vector> | |||||
17 | ||||||
18 | #include "db/dbformat.h" | |||||
19 | #include "db/pinned_iterators_manager.h" | |||||
20 | ||||||
21 | #include "rocksdb/cache.h" | |||||
22 | #include "rocksdb/comparator.h" | |||||
23 | #include "rocksdb/env.h" | |||||
24 | #include "rocksdb/filter_policy.h" | |||||
25 | #include "rocksdb/iterator.h" | |||||
26 | #include "rocksdb/options.h" | |||||
27 | #include "rocksdb/statistics.h" | |||||
28 | #include "rocksdb/table.h" | |||||
29 | #include "rocksdb/table_properties.h" | |||||
30 | ||||||
31 | #include "table/block.h" | |||||
32 | #include "table/block_based_filter_block.h" | |||||
33 | #include "table/block_based_table_factory.h" | |||||
34 | #include "table/block_fetcher.h" | |||||
35 | #include "table/block_prefix_index.h" | |||||
36 | #include "table/filter_block.h" | |||||
37 | #include "table/format.h" | |||||
38 | #include "table/full_filter_block.h" | |||||
39 | #include "table/get_context.h" | |||||
40 | #include "table/internal_iterator.h" | |||||
41 | #include "table/meta_blocks.h" | |||||
42 | #include "table/partitioned_filter_block.h" | |||||
43 | #include "table/persistent_cache_helper.h" | |||||
44 | #include "table/sst_file_writer_collectors.h" | |||||
45 | #include "table/two_level_iterator.h" | |||||
46 | ||||||
47 | #include "monitoring/perf_context_imp.h" | |||||
48 | #include "util/coding.h" | |||||
49 | #include "util/crc32c.h" | |||||
50 | #include "util/file_reader_writer.h" | |||||
51 | #include "util/stop_watch.h" | |||||
52 | #include "util/string_util.h" | |||||
53 | #include "util/sync_point.h" | |||||
54 | #include "util/xxhash.h" | |||||
55 | ||||||
56 | namespace rocksdb { | |||||
57 | ||||||
58 | extern const uint64_t kBlockBasedTableMagicNumber; | |||||
59 | extern const std::string kHashIndexPrefixesBlock; | |||||
60 | extern const std::string kHashIndexPrefixesMetadataBlock; | |||||
61 | ||||||
62 | typedef BlockBasedTable::IndexReader IndexReader; | |||||
63 | ||||||
64 | BlockBasedTable::~BlockBasedTable() { | |||||
65 | Close(); | |||||
66 | delete rep_; | |||||
67 | } | |||||
68 | ||||||
69 | std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0); | |||||
70 | ||||||
71 | namespace { | |||||
72 | // Read the block identified by "handle" from "file". | |||||
73 | // The only relevant option is options.verify_checksums for now. | |||||
74 | // On failure return non-OK. | |||||
75 | // On success fill *result and return OK - caller owns *result | |||||
76 | // @param uncompression_dict Data for presetting the compression library's | |||||
77 | // dictionary. | |||||
78 | Status ReadBlockFromFile( | |||||
79 | RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, | |||||
80 | const Footer& footer, const ReadOptions& options, const BlockHandle& handle, | |||||
81 | std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions, | |||||
82 | bool do_uncompress, bool maybe_compressed, | |||||
83 | const UncompressionDict& uncompression_dict, | |||||
84 | const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, | |||||
85 | size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) { | |||||
86 | BlockContents contents; | |||||
87 | BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, | |||||
88 | &contents, ioptions, do_uncompress, | |||||
89 | maybe_compressed, uncompression_dict, | |||||
90 | cache_options, memory_allocator); | |||||
91 | Status s = block_fetcher.ReadBlockContents(); | |||||
92 | if (s.ok()) { | |||||
93 | result->reset(new Block(std::move(contents), global_seqno, | |||||
94 | read_amp_bytes_per_bit, ioptions.statistics)); | |||||
95 | } | |||||
96 | ||||||
97 | return s; | |||||
98 | } | |||||
99 | ||||||
100 | inline MemoryAllocator* GetMemoryAllocator( | |||||
101 | const BlockBasedTableOptions& table_options) { | |||||
102 | return table_options.block_cache.get() | |||||
103 | ? table_options.block_cache->memory_allocator() | |||||
104 | : nullptr; | |||||
105 | } | |||||
106 | ||||||
107 | inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( | |||||
108 | const BlockBasedTableOptions& table_options) { | |||||
109 | return table_options.block_cache_compressed.get() | |||||
110 | ? table_options.block_cache_compressed->memory_allocator() | |||||
111 | : nullptr; | |||||
112 | } | |||||
113 | ||||||
114 | // Delete the resource that is held by the iterator. | |||||
115 | template <class ResourceType> | |||||
116 | void DeleteHeldResource(void* arg, void* /*ignored*/) { | |||||
117 | delete reinterpret_cast<ResourceType*>(arg); | |||||
118 | } | |||||
119 | ||||||
120 | // Delete the entry resided in the cache. | |||||
121 | template <class Entry> | |||||
122 | void DeleteCachedEntry(const Slice& /*key*/, void* value) { | |||||
123 | auto entry = reinterpret_cast<Entry*>(value); | |||||
124 | delete entry; | |||||
125 | } | |||||
126 | ||||||
127 | void DeleteCachedFilterEntry(const Slice& key, void* value); | |||||
128 | void DeleteCachedIndexEntry(const Slice& key, void* value); | |||||
129 | void DeleteCachedUncompressionDictEntry(const Slice& key, void* value); | |||||
130 | ||||||
131 | // Release the cached entry and decrement its ref count. | |||||
132 | void ReleaseCachedEntry(void* arg, void* h) { | |||||
133 | Cache* cache = reinterpret_cast<Cache*>(arg); | |||||
134 | Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h); | |||||
135 | cache->Release(handle); | |||||
136 | } | |||||
137 | ||||||
138 | // Release the cached entry and decrement its ref count. | |||||
139 | void ForceReleaseCachedEntry(void* arg, void* h) { | |||||
140 | Cache* cache = reinterpret_cast<Cache*>(arg); | |||||
141 | Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h); | |||||
142 | cache->Release(handle, true /* force_erase */); | |||||
143 | } | |||||
144 | ||||||
145 | Slice GetCacheKeyFromOffset(const char* cache_key_prefix, | |||||
146 | size_t cache_key_prefix_size, uint64_t offset, | |||||
147 | char* cache_key) { | |||||
148 | assert(cache_key != nullptr)(static_cast<void> (0)); | |||||
149 | assert(cache_key_prefix_size != 0)(static_cast<void> (0)); | |||||
150 | assert(cache_key_prefix_size <= BlockBasedTable::kMaxCacheKeyPrefixSize)(static_cast<void> (0)); | |||||
151 | memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); | |||||
152 | char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset); | |||||
153 | return Slice(cache_key, static_cast<size_t>(end - cache_key)); | |||||
154 | } | |||||
155 | ||||||
156 | Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, | |||||
157 | int level, Tickers block_cache_miss_ticker, | |||||
158 | Tickers block_cache_hit_ticker, | |||||
159 | uint64_t* block_cache_miss_stats, | |||||
160 | uint64_t* block_cache_hit_stats, | |||||
161 | Statistics* statistics, | |||||
162 | GetContext* get_context) { | |||||
163 | auto cache_handle = block_cache->Lookup(key, statistics); | |||||
164 | if (cache_handle != nullptr) { | |||||
165 | PERF_COUNTER_ADD(block_cache_hit_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. block_cache_hit_count += 1; }; | |||||
166 | PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1,if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(static_cast <uint32_t>(level)) != (*(perf_context.level_to_perf_context )).end()) { (*(perf_context.level_to_perf_context))[static_cast <uint32_t>(level)].block_cache_hit_count += 1; } else { PerfContextByLevel empty_context; (*(perf_context.level_to_perf_context ))[static_cast<uint32_t>(level)] = empty_context; (*(perf_context .level_to_perf_context))[static_cast<uint32_t>(level)]. block_cache_hit_count += 1; } } | |||||
167 | static_cast<uint32_t>(level))if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(static_cast <uint32_t>(level)) != (*(perf_context.level_to_perf_context )).end()) { (*(perf_context.level_to_perf_context))[static_cast <uint32_t>(level)].block_cache_hit_count += 1; } else { PerfContextByLevel empty_context; (*(perf_context.level_to_perf_context ))[static_cast<uint32_t>(level)] = empty_context; (*(perf_context .level_to_perf_context))[static_cast<uint32_t>(level)]. block_cache_hit_count += 1; } }; | |||||
168 | if (get_context != nullptr) { | |||||
169 | // overall cache hit | |||||
170 | get_context->get_context_stats_.num_cache_hit++; | |||||
171 | // total bytes read from cache | |||||
172 | get_context->get_context_stats_.num_cache_bytes_read += | |||||
173 | block_cache->GetUsage(cache_handle); | |||||
174 | // block-type specific cache hit | |||||
175 | (*block_cache_hit_stats)++; | |||||
176 | } else { | |||||
177 | // overall cache hit | |||||
178 | RecordTick(statistics, BLOCK_CACHE_HIT); | |||||
179 | // total bytes read from cache | |||||
180 | RecordTick(statistics, BLOCK_CACHE_BYTES_READ, | |||||
181 | block_cache->GetUsage(cache_handle)); | |||||
182 | RecordTick(statistics, block_cache_hit_ticker); | |||||
183 | } | |||||
184 | } else { | |||||
185 | PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1,if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(static_cast <uint32_t>(level)) != (*(perf_context.level_to_perf_context )).end()) { (*(perf_context.level_to_perf_context))[static_cast <uint32_t>(level)].block_cache_miss_count += 1; } else { PerfContextByLevel empty_context; (*(perf_context.level_to_perf_context ))[static_cast<uint32_t>(level)] = empty_context; (*(perf_context .level_to_perf_context))[static_cast<uint32_t>(level)]. block_cache_miss_count += 1; } } | |||||
186 | static_cast<uint32_t>(level))if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(static_cast <uint32_t>(level)) != (*(perf_context.level_to_perf_context )).end()) { (*(perf_context.level_to_perf_context))[static_cast <uint32_t>(level)].block_cache_miss_count += 1; } else { PerfContextByLevel empty_context; (*(perf_context.level_to_perf_context ))[static_cast<uint32_t>(level)] = empty_context; (*(perf_context .level_to_perf_context))[static_cast<uint32_t>(level)]. block_cache_miss_count += 1; } }; | |||||
187 | if (get_context != nullptr) { | |||||
188 | // overall cache miss | |||||
189 | get_context->get_context_stats_.num_cache_miss++; | |||||
190 | // block-type specific cache miss | |||||
191 | (*block_cache_miss_stats)++; | |||||
192 | } else { | |||||
193 | RecordTick(statistics, BLOCK_CACHE_MISS); | |||||
194 | RecordTick(statistics, block_cache_miss_ticker); | |||||
195 | } | |||||
196 | } | |||||
197 | ||||||
198 | return cache_handle; | |||||
199 | } | |||||
200 | ||||||
201 | // For hash based index, return true if prefix_extractor and | |||||
202 | // prefix_extractor_block mismatch, false otherwise. This flag will be used | |||||
203 | // as total_order_seek via NewIndexIterator | |||||
204 | bool PrefixExtractorChanged(const TableProperties* table_properties, | |||||
205 | const SliceTransform* prefix_extractor) { | |||||
206 | // BlockBasedTableOptions::kHashSearch requires prefix_extractor to be set. | |||||
207 | // Turn off hash index in prefix_extractor is not set; if prefix_extractor | |||||
208 | // is set but prefix_extractor_block is not set, also disable hash index | |||||
209 | if (prefix_extractor == nullptr || table_properties == nullptr || | |||||
210 | table_properties->prefix_extractor_name.empty()) { | |||||
211 | return true; | |||||
212 | } | |||||
213 | ||||||
214 | // prefix_extractor and prefix_extractor_block are both non-empty | |||||
215 | if (table_properties->prefix_extractor_name.compare( | |||||
216 | prefix_extractor->Name()) != 0) { | |||||
217 | return true; | |||||
218 | } else { | |||||
219 | return false; | |||||
220 | } | |||||
221 | } | |||||
222 | ||||||
223 | } // namespace | |||||
224 | ||||||
225 | // Index that allows binary search lookup in a two-level index structure. | |||||
226 | class PartitionIndexReader : public IndexReader, public Cleanable { | |||||
227 | public: | |||||
228 | // Read the partition index from the file and create an instance for | |||||
229 | // `PartitionIndexReader`. | |||||
230 | // On success, index_reader will be populated; otherwise it will remain | |||||
231 | // unmodified. | |||||
232 | static Status Create(BlockBasedTable* table, RandomAccessFileReader* file, | |||||
233 | FilePrefetchBuffer* prefetch_buffer, | |||||
234 | const Footer& footer, const BlockHandle& index_handle, | |||||
235 | const ImmutableCFOptions& ioptions, | |||||
236 | const InternalKeyComparator* icomparator, | |||||
237 | IndexReader** index_reader, | |||||
238 | const PersistentCacheOptions& cache_options, | |||||
239 | const int level, const bool index_key_includes_seq, | |||||
240 | const bool index_value_is_full, | |||||
241 | MemoryAllocator* memory_allocator) { | |||||
242 | std::unique_ptr<Block> index_block; | |||||
243 | auto s = ReadBlockFromFile( | |||||
244 | file, prefetch_buffer, footer, ReadOptions(), index_handle, | |||||
245 | &index_block, ioptions, true /* decompress */, | |||||
246 | true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), | |||||
247 | cache_options, kDisableGlobalSequenceNumber, | |||||
248 | 0 /* read_amp_bytes_per_bit */, memory_allocator); | |||||
249 | ||||||
250 | if (s.ok()) { | |||||
251 | *index_reader = new PartitionIndexReader( | |||||
252 | table, icomparator, std::move(index_block), ioptions.statistics, | |||||
253 | level, index_key_includes_seq, index_value_is_full); | |||||
254 | } | |||||
255 | ||||||
256 | return s; | |||||
257 | } | |||||
258 | ||||||
259 | // return a two-level iterator: first level is on the partition index | |||||
260 | InternalIteratorBase<BlockHandle>* NewIterator( | |||||
261 | IndexBlockIter* /*iter*/ = nullptr, bool /*dont_care*/ = true, | |||||
262 | bool fill_cache = true) override { | |||||
263 | Statistics* kNullStats = nullptr; | |||||
264 | // Filters are already checked before seeking the index | |||||
265 | if (!partition_map_.empty()) { | |||||
266 | // We don't return pinned datat from index blocks, so no need | |||||
267 | // to set `block_contents_pinned`. | |||||
268 | return NewTwoLevelIterator( | |||||
269 | new BlockBasedTable::PartitionedIndexIteratorState( | |||||
270 | table_, &partition_map_, index_key_includes_seq_, | |||||
271 | index_value_is_full_), | |||||
272 | index_block_->NewIterator<IndexBlockIter>( | |||||
273 | icomparator_, icomparator_->user_comparator(), nullptr, | |||||
274 | kNullStats, true, index_key_includes_seq_, index_value_is_full_)); | |||||
275 | } else { | |||||
276 | auto ro = ReadOptions(); | |||||
277 | ro.fill_cache = fill_cache; | |||||
278 | bool kIsIndex = true; | |||||
279 | // We don't return pinned datat from index blocks, so no need | |||||
280 | // to set `block_contents_pinned`. | |||||
281 | return new BlockBasedTableIterator<IndexBlockIter, BlockHandle>( | |||||
282 | table_, ro, *icomparator_, | |||||
283 | index_block_->NewIterator<IndexBlockIter>( | |||||
284 | icomparator_, icomparator_->user_comparator(), nullptr, | |||||
285 | kNullStats, true, index_key_includes_seq_, index_value_is_full_), | |||||
286 | false, true, /* prefix_extractor */ nullptr, kIsIndex, | |||||
287 | index_key_includes_seq_, index_value_is_full_); | |||||
288 | } | |||||
289 | // TODO(myabandeh): Update TwoLevelIterator to be able to make use of | |||||
290 | // on-stack BlockIter while the state is on heap. Currentlly it assumes | |||||
291 | // the first level iter is always on heap and will attempt to delete it | |||||
292 | // in its destructor. | |||||
293 | } | |||||
294 | ||||||
295 | void CacheDependencies(bool pin) override { | |||||
296 | // Before read partitions, prefetch them to avoid lots of IOs | |||||
297 | auto rep = table_->rep_; | |||||
298 | IndexBlockIter biter; | |||||
299 | BlockHandle handle; | |||||
300 | Statistics* kNullStats = nullptr; | |||||
301 | // We don't return pinned datat from index blocks, so no need | |||||
302 | // to set `block_contents_pinned`. | |||||
303 | index_block_->NewIterator<IndexBlockIter>( | |||||
304 | icomparator_, icomparator_->user_comparator(), &biter, kNullStats, true, | |||||
305 | index_key_includes_seq_, index_value_is_full_); | |||||
306 | // Index partitions are assumed to be consecuitive. Prefetch them all. | |||||
307 | // Read the first block offset | |||||
308 | biter.SeekToFirst(); | |||||
309 | if (!biter.Valid()) { | |||||
310 | // Empty index. | |||||
311 | return; | |||||
312 | } | |||||
313 | handle = biter.value(); | |||||
314 | uint64_t prefetch_off = handle.offset(); | |||||
315 | ||||||
316 | // Read the last block's offset | |||||
317 | biter.SeekToLast(); | |||||
318 | if (!biter.Valid()) { | |||||
319 | // Empty index. | |||||
320 | return; | |||||
321 | } | |||||
322 | handle = biter.value(); | |||||
323 | uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; | |||||
324 | uint64_t prefetch_len = last_off - prefetch_off; | |||||
325 | std::unique_ptr<FilePrefetchBuffer> prefetch_buffer; | |||||
326 | auto& file = table_->rep_->file; | |||||
327 | prefetch_buffer.reset(new FilePrefetchBuffer()); | |||||
328 | Status s = prefetch_buffer->Prefetch(file.get(), prefetch_off, | |||||
329 | static_cast<size_t>(prefetch_len)); | |||||
330 | ||||||
331 | // After prefetch, read the partitions one by one | |||||
332 | biter.SeekToFirst(); | |||||
333 | auto ro = ReadOptions(); | |||||
334 | Cache* block_cache = rep->table_options.block_cache.get(); | |||||
335 | for (; biter.Valid(); biter.Next()) { | |||||
336 | handle = biter.value(); | |||||
337 | BlockBasedTable::CachableEntry<Block> block; | |||||
338 | const bool is_index = true; | |||||
339 | // TODO: Support counter batch update for partitioned index and | |||||
340 | // filter blocks | |||||
341 | s = table_->MaybeReadBlockAndLoadToCache( | |||||
342 | prefetch_buffer.get(), rep, ro, handle, | |||||
343 | UncompressionDict::GetEmptyDict(), &block, is_index, | |||||
344 | nullptr /* get_context */); | |||||
345 | ||||||
346 | assert(s.ok() || block.value == nullptr)(static_cast<void> (0)); | |||||
347 | if (s.ok() && block.value != nullptr) { | |||||
348 | if (block.cache_handle != nullptr) { | |||||
349 | if (pin) { | |||||
350 | partition_map_[handle.offset()] = block; | |||||
351 | RegisterCleanup(&ReleaseCachedEntry, block_cache, | |||||
352 | block.cache_handle); | |||||
353 | } else { | |||||
354 | block_cache->Release(block.cache_handle); | |||||
355 | } | |||||
356 | } else { | |||||
357 | delete block.value; | |||||
358 | } | |||||
359 | } | |||||
360 | } | |||||
361 | } | |||||
362 | ||||||
363 | size_t size() const override { return index_block_->size(); } | |||||
364 | size_t usable_size() const override { return index_block_->usable_size(); } | |||||
365 | ||||||
366 | size_t ApproximateMemoryUsage() const override { | |||||
367 | assert(index_block_)(static_cast<void> (0)); | |||||
368 | size_t usage = index_block_->ApproximateMemoryUsage(); | |||||
369 | #ifdef ROCKSDB_MALLOC_USABLE_SIZE1 | |||||
370 | usage += malloc_usable_size((void*)this); | |||||
371 | #else | |||||
372 | usage += sizeof(*this); | |||||
373 | #endif // ROCKSDB_MALLOC_USABLE_SIZE | |||||
374 | // TODO(myabandeh): more accurate estimate of partition_map_ mem usage | |||||
375 | return usage; | |||||
376 | } | |||||
377 | ||||||
378 | private: | |||||
379 | PartitionIndexReader(BlockBasedTable* table, | |||||
380 | const InternalKeyComparator* icomparator, | |||||
381 | std::unique_ptr<Block>&& index_block, Statistics* stats, | |||||
382 | const int /*level*/, const bool index_key_includes_seq, | |||||
383 | const bool index_value_is_full) | |||||
384 | : IndexReader(icomparator, stats), | |||||
385 | table_(table), | |||||
386 | index_block_(std::move(index_block)), | |||||
387 | index_key_includes_seq_(index_key_includes_seq), | |||||
388 | index_value_is_full_(index_value_is_full) { | |||||
389 | assert(index_block_ != nullptr)(static_cast<void> (0)); | |||||
390 | } | |||||
391 | BlockBasedTable* table_; | |||||
392 | std::unique_ptr<Block> index_block_; | |||||
393 | std::unordered_map<uint64_t, BlockBasedTable::CachableEntry<Block>> | |||||
394 | partition_map_; | |||||
395 | const bool index_key_includes_seq_; | |||||
396 | const bool index_value_is_full_; | |||||
397 | }; | |||||
398 | ||||||
399 | // Index that allows binary search lookup for the first key of each block. | |||||
400 | // This class can be viewed as a thin wrapper for `Block` class which already | |||||
401 | // supports binary search. | |||||
402 | class BinarySearchIndexReader : public IndexReader { | |||||
403 | public: | |||||
404 | // Read index from the file and create an intance for | |||||
405 | // `BinarySearchIndexReader`. | |||||
406 | // On success, index_reader will be populated; otherwise it will remain | |||||
407 | // unmodified. | |||||
408 | static Status Create(RandomAccessFileReader* file, | |||||
409 | FilePrefetchBuffer* prefetch_buffer, | |||||
410 | const Footer& footer, const BlockHandle& index_handle, | |||||
411 | const ImmutableCFOptions& ioptions, | |||||
412 | const InternalKeyComparator* icomparator, | |||||
413 | IndexReader** index_reader, | |||||
414 | const PersistentCacheOptions& cache_options, | |||||
415 | const bool index_key_includes_seq, | |||||
416 | const bool index_value_is_full, | |||||
417 | MemoryAllocator* memory_allocator) { | |||||
418 | std::unique_ptr<Block> index_block; | |||||
419 | auto s = ReadBlockFromFile( | |||||
420 | file, prefetch_buffer, footer, ReadOptions(), index_handle, | |||||
421 | &index_block, ioptions, true /* decompress */, | |||||
422 | true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), | |||||
423 | cache_options, kDisableGlobalSequenceNumber, | |||||
424 | 0 /* read_amp_bytes_per_bit */, memory_allocator); | |||||
425 | ||||||
426 | if (s.ok()) { | |||||
427 | *index_reader = new BinarySearchIndexReader( | |||||
428 | icomparator, std::move(index_block), ioptions.statistics, | |||||
429 | index_key_includes_seq, index_value_is_full); | |||||
430 | } | |||||
431 | ||||||
432 | return s; | |||||
433 | } | |||||
434 | ||||||
435 | InternalIteratorBase<BlockHandle>* NewIterator( | |||||
436 | IndexBlockIter* iter = nullptr, bool /*dont_care*/ = true, | |||||
437 | bool /*dont_care*/ = true) override { | |||||
438 | Statistics* kNullStats = nullptr; | |||||
439 | // We don't return pinned datat from index blocks, so no need | |||||
440 | // to set `block_contents_pinned`. | |||||
441 | return index_block_->NewIterator<IndexBlockIter>( | |||||
442 | icomparator_, icomparator_->user_comparator(), iter, kNullStats, true, | |||||
443 | index_key_includes_seq_, index_value_is_full_); | |||||
444 | } | |||||
445 | ||||||
446 | size_t size() const override { return index_block_->size(); } | |||||
447 | size_t usable_size() const override { return index_block_->usable_size(); } | |||||
448 | ||||||
449 | size_t ApproximateMemoryUsage() const override { | |||||
450 | assert(index_block_)(static_cast<void> (0)); | |||||
451 | size_t usage = index_block_->ApproximateMemoryUsage(); | |||||
452 | #ifdef ROCKSDB_MALLOC_USABLE_SIZE1 | |||||
453 | usage += malloc_usable_size((void*)this); | |||||
454 | #else | |||||
455 | usage += sizeof(*this); | |||||
456 | #endif // ROCKSDB_MALLOC_USABLE_SIZE | |||||
457 | return usage; | |||||
458 | } | |||||
459 | ||||||
460 | private: | |||||
461 | BinarySearchIndexReader(const InternalKeyComparator* icomparator, | |||||
462 | std::unique_ptr<Block>&& index_block, | |||||
463 | Statistics* stats, const bool index_key_includes_seq, | |||||
464 | const bool index_value_is_full) | |||||
465 | : IndexReader(icomparator, stats), | |||||
466 | index_block_(std::move(index_block)), | |||||
467 | index_key_includes_seq_(index_key_includes_seq), | |||||
468 | index_value_is_full_(index_value_is_full) { | |||||
469 | assert(index_block_ != nullptr)(static_cast<void> (0)); | |||||
470 | } | |||||
471 | std::unique_ptr<Block> index_block_; | |||||
472 | const bool index_key_includes_seq_; | |||||
473 | const bool index_value_is_full_; | |||||
474 | }; | |||||
475 | ||||||
476 | // Index that leverages an internal hash table to quicken the lookup for a given | |||||
477 | // key. | |||||
478 | class HashIndexReader : public IndexReader { | |||||
479 | public: | |||||
480 | static Status Create( | |||||
481 | const SliceTransform* hash_key_extractor, const Footer& footer, | |||||
482 | RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, | |||||
483 | const ImmutableCFOptions& ioptions, | |||||
484 | const InternalKeyComparator* icomparator, const BlockHandle& index_handle, | |||||
485 | InternalIterator* meta_index_iter, IndexReader** index_reader, | |||||
486 | bool /*hash_index_allow_collision*/, | |||||
487 | const PersistentCacheOptions& cache_options, | |||||
488 | const bool index_key_includes_seq, const bool index_value_is_full, | |||||
489 | MemoryAllocator* memory_allocator) { | |||||
490 | std::unique_ptr<Block> index_block; | |||||
491 | auto s = ReadBlockFromFile( | |||||
492 | file, prefetch_buffer, footer, ReadOptions(), index_handle, | |||||
493 | &index_block, ioptions, true /* decompress */, | |||||
494 | true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), | |||||
495 | cache_options, kDisableGlobalSequenceNumber, | |||||
496 | 0 /* read_amp_bytes_per_bit */, memory_allocator); | |||||
497 | ||||||
498 | if (!s.ok()) { | |||||
499 | return s; | |||||
500 | } | |||||
501 | ||||||
502 | // Note, failure to create prefix hash index does not need to be a | |||||
503 | // hard error. We can still fall back to the original binary search index. | |||||
504 | // So, Create will succeed regardless, from this point on. | |||||
505 | ||||||
506 | auto new_index_reader = new HashIndexReader( | |||||
507 | icomparator, std::move(index_block), ioptions.statistics, | |||||
508 | index_key_includes_seq, index_value_is_full); | |||||
509 | *index_reader = new_index_reader; | |||||
510 | ||||||
511 | // Get prefixes block | |||||
512 | BlockHandle prefixes_handle; | |||||
513 | s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, | |||||
514 | &prefixes_handle); | |||||
515 | if (!s.ok()) { | |||||
516 | // TODO: log error | |||||
517 | return Status::OK(); | |||||
518 | } | |||||
519 | ||||||
520 | // Get index metadata block | |||||
521 | BlockHandle prefixes_meta_handle; | |||||
522 | s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesMetadataBlock, | |||||
523 | &prefixes_meta_handle); | |||||
524 | if (!s.ok()) { | |||||
525 | // TODO: log error | |||||
526 | return Status::OK(); | |||||
527 | } | |||||
528 | ||||||
529 | // Read contents for the blocks | |||||
530 | BlockContents prefixes_contents; | |||||
531 | BlockFetcher prefixes_block_fetcher( | |||||
532 | file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, | |||||
533 | &prefixes_contents, ioptions, true /*decompress*/, | |||||
534 | true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), | |||||
535 | cache_options, memory_allocator); | |||||
536 | s = prefixes_block_fetcher.ReadBlockContents(); | |||||
537 | if (!s.ok()) { | |||||
538 | return s; | |||||
539 | } | |||||
540 | BlockContents prefixes_meta_contents; | |||||
541 | BlockFetcher prefixes_meta_block_fetcher( | |||||
542 | file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, | |||||
543 | &prefixes_meta_contents, ioptions, true /*decompress*/, | |||||
544 | true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), | |||||
545 | cache_options, memory_allocator); | |||||
546 | s = prefixes_meta_block_fetcher.ReadBlockContents(); | |||||
547 | if (!s.ok()) { | |||||
548 | // TODO: log error | |||||
549 | return Status::OK(); | |||||
550 | } | |||||
551 | ||||||
552 | BlockPrefixIndex* prefix_index = nullptr; | |||||
553 | s = BlockPrefixIndex::Create(hash_key_extractor, prefixes_contents.data, | |||||
554 | prefixes_meta_contents.data, &prefix_index); | |||||
555 | // TODO: log error | |||||
556 | if (s.ok()) { | |||||
557 | new_index_reader->prefix_index_.reset(prefix_index); | |||||
558 | } | |||||
559 | ||||||
560 | return Status::OK(); | |||||
561 | } | |||||
562 | ||||||
563 | InternalIteratorBase<BlockHandle>* NewIterator( | |||||
564 | IndexBlockIter* iter = nullptr, bool total_order_seek = true, | |||||
565 | bool /*dont_care*/ = true) override { | |||||
566 | Statistics* kNullStats = nullptr; | |||||
567 | // We don't return pinned datat from index blocks, so no need | |||||
568 | // to set `block_contents_pinned`. | |||||
569 | return index_block_->NewIterator<IndexBlockIter>( | |||||
570 | icomparator_, icomparator_->user_comparator(), iter, kNullStats, | |||||
571 | total_order_seek, index_key_includes_seq_, index_value_is_full_, | |||||
572 | false /* block_contents_pinned */, prefix_index_.get()); | |||||
573 | } | |||||
574 | ||||||
575 | size_t size() const override { return index_block_->size(); } | |||||
576 | size_t usable_size() const override { return index_block_->usable_size(); } | |||||
577 | ||||||
578 | size_t ApproximateMemoryUsage() const override { | |||||
579 | assert(index_block_)(static_cast<void> (0)); | |||||
580 | size_t usage = index_block_->ApproximateMemoryUsage(); | |||||
581 | usage += prefixes_contents_.usable_size(); | |||||
582 | #ifdef ROCKSDB_MALLOC_USABLE_SIZE1 | |||||
583 | usage += malloc_usable_size((void*)this); | |||||
584 | #else | |||||
585 | if (prefix_index_) { | |||||
586 | usage += prefix_index_->ApproximateMemoryUsage(); | |||||
587 | } | |||||
588 | usage += sizeof(*this); | |||||
589 | #endif // ROCKSDB_MALLOC_USABLE_SIZE | |||||
590 | return usage; | |||||
591 | } | |||||
592 | ||||||
593 | private: | |||||
594 | HashIndexReader(const InternalKeyComparator* icomparator, | |||||
595 | std::unique_ptr<Block>&& index_block, Statistics* stats, | |||||
596 | const bool index_key_includes_seq, | |||||
597 | const bool index_value_is_full) | |||||
598 | : IndexReader(icomparator, stats), | |||||
599 | index_block_(std::move(index_block)), | |||||
600 | index_key_includes_seq_(index_key_includes_seq), | |||||
601 | index_value_is_full_(index_value_is_full) { | |||||
602 | assert(index_block_ != nullptr)(static_cast<void> (0)); | |||||
603 | } | |||||
604 | ||||||
605 | ~HashIndexReader() override {} | |||||
606 | ||||||
607 | std::unique_ptr<Block> index_block_; | |||||
608 | std::unique_ptr<BlockPrefixIndex> prefix_index_; | |||||
609 | BlockContents prefixes_contents_; | |||||
610 | const bool index_key_includes_seq_; | |||||
611 | const bool index_value_is_full_; | |||||
612 | }; | |||||
613 | ||||||
614 | // Helper function to setup the cache key's prefix for the Table. | |||||
615 | void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { | |||||
616 | assert(kMaxCacheKeyPrefixSize >= 10)(static_cast<void> (0)); | |||||
617 | rep->cache_key_prefix_size = 0; | |||||
618 | rep->compressed_cache_key_prefix_size = 0; | |||||
619 | if (rep->table_options.block_cache != nullptr) { | |||||
620 | GenerateCachePrefix(rep->table_options.block_cache.get(), rep->file->file(), | |||||
621 | &rep->cache_key_prefix[0], &rep->cache_key_prefix_size); | |||||
622 | // Create dummy offset of index reader which is beyond the file size. | |||||
623 | rep->dummy_index_reader_offset = | |||||
624 | file_size + rep->table_options.block_cache->NewId(); | |||||
625 | } | |||||
626 | if (rep->table_options.persistent_cache != nullptr) { | |||||
627 | GenerateCachePrefix(/*cache=*/nullptr, rep->file->file(), | |||||
628 | &rep->persistent_cache_key_prefix[0], | |||||
629 | &rep->persistent_cache_key_prefix_size); | |||||
630 | } | |||||
631 | if (rep->table_options.block_cache_compressed != nullptr) { | |||||
632 | GenerateCachePrefix(rep->table_options.block_cache_compressed.get(), | |||||
633 | rep->file->file(), &rep->compressed_cache_key_prefix[0], | |||||
634 | &rep->compressed_cache_key_prefix_size); | |||||
635 | } | |||||
636 | } | |||||
637 | ||||||
638 | void BlockBasedTable::GenerateCachePrefix(Cache* cc, RandomAccessFile* file, | |||||
639 | char* buffer, size_t* size) { | |||||
640 | // generate an id from the file | |||||
641 | *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); | |||||
642 | ||||||
643 | // If the prefix wasn't generated or was too long, | |||||
644 | // create one from the cache. | |||||
645 | if (cc && *size == 0) { | |||||
646 | char* end = EncodeVarint64(buffer, cc->NewId()); | |||||
647 | *size = static_cast<size_t>(end - buffer); | |||||
648 | } | |||||
649 | } | |||||
650 | ||||||
651 | void BlockBasedTable::GenerateCachePrefix(Cache* cc, WritableFile* file, | |||||
652 | char* buffer, size_t* size) { | |||||
653 | // generate an id from the file | |||||
654 | *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); | |||||
655 | ||||||
656 | // If the prefix wasn't generated or was too long, | |||||
657 | // create one from the cache. | |||||
658 | if (*size == 0) { | |||||
659 | char* end = EncodeVarint64(buffer, cc->NewId()); | |||||
660 | *size = static_cast<size_t>(end - buffer); | |||||
661 | } | |||||
662 | } | |||||
663 | ||||||
664 | namespace { | |||||
665 | // Return True if table_properties has `user_prop_name` has a `true` value | |||||
666 | // or it doesn't contain this property (for backward compatible). | |||||
667 | bool IsFeatureSupported(const TableProperties& table_properties, | |||||
668 | const std::string& user_prop_name, Logger* info_log) { | |||||
669 | auto& props = table_properties.user_collected_properties; | |||||
670 | auto pos = props.find(user_prop_name); | |||||
671 | // Older version doesn't have this value set. Skip this check. | |||||
672 | if (pos != props.end()) { | |||||
673 | if (pos->second == kPropFalse) { | |||||
674 | return false; | |||||
675 | } else if (pos->second != kPropTrue) { | |||||
676 | ROCKS_LOG_WARN(info_log, "Property %s has invalidate value %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log, ("[%s:" "677" "] " "Property %s has invalidate value %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), user_prop_name.c_str(), pos->second.c_str()) | |||||
677 | user_prop_name.c_str(), pos->second.c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, info_log, ("[%s:" "677" "] " "Property %s has invalidate value %s"), RocksLogShorterFileName ("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), user_prop_name.c_str(), pos->second.c_str()); | |||||
678 | } | |||||
679 | } | |||||
680 | return true; | |||||
681 | } | |||||
682 | ||||||
683 | // Caller has to ensure seqno is not nullptr. | |||||
684 | Status GetGlobalSequenceNumber(const TableProperties& table_properties, | |||||
685 | SequenceNumber largest_seqno, | |||||
686 | SequenceNumber* seqno) { | |||||
687 | const auto& props = table_properties.user_collected_properties; | |||||
688 | const auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion); | |||||
689 | const auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno); | |||||
690 | ||||||
691 | *seqno = kDisableGlobalSequenceNumber; | |||||
692 | if (version_pos == props.end()) { | |||||
693 | if (seqno_pos != props.end()) { | |||||
694 | std::array<char, 200> msg_buf; | |||||
695 | // This is not an external sst file, global_seqno is not supported. | |||||
696 | snprintf( | |||||
697 | msg_buf.data(), msg_buf.max_size(), | |||||
698 | "A non-external sst file have global seqno property with value %s", | |||||
699 | seqno_pos->second.c_str()); | |||||
700 | return Status::Corruption(msg_buf.data()); | |||||
701 | } | |||||
702 | return Status::OK(); | |||||
703 | } | |||||
704 | ||||||
705 | uint32_t version = DecodeFixed32(version_pos->second.c_str()); | |||||
706 | if (version < 2) { | |||||
707 | if (seqno_pos != props.end() || version != 1) { | |||||
708 | std::array<char, 200> msg_buf; | |||||
709 | // This is a v1 external sst file, global_seqno is not supported. | |||||
710 | snprintf(msg_buf.data(), msg_buf.max_size(), | |||||
711 | "An external sst file with version %u have global seqno " | |||||
712 | "property with value %s", | |||||
713 | version, seqno_pos->second.c_str()); | |||||
714 | return Status::Corruption(msg_buf.data()); | |||||
715 | } | |||||
716 | return Status::OK(); | |||||
717 | } | |||||
718 | ||||||
719 | // Since we have a plan to deprecate global_seqno, we do not return failure | |||||
720 | // if seqno_pos == props.end(). We rely on version_pos to detect whether the | |||||
721 | // SST is external. | |||||
722 | SequenceNumber global_seqno(0); | |||||
723 | if (seqno_pos != props.end()) { | |||||
724 | global_seqno = DecodeFixed64(seqno_pos->second.c_str()); | |||||
725 | } | |||||
726 | // SstTableReader open table reader with kMaxSequenceNumber as largest_seqno | |||||
727 | // to denote it is unknown. | |||||
728 | if (largest_seqno < kMaxSequenceNumber) { | |||||
729 | if (global_seqno == 0) { | |||||
730 | global_seqno = largest_seqno; | |||||
731 | } | |||||
732 | if (global_seqno != largest_seqno) { | |||||
733 | std::array<char, 200> msg_buf; | |||||
734 | snprintf( | |||||
735 | msg_buf.data(), msg_buf.max_size(), | |||||
736 | "An external sst file with version %u have global seqno property " | |||||
737 | "with value %s, while largest seqno in the file is %llu", | |||||
738 | version, seqno_pos->second.c_str(), | |||||
739 | static_cast<unsigned long long>(largest_seqno)); | |||||
740 | return Status::Corruption(msg_buf.data()); | |||||
741 | } | |||||
742 | } | |||||
743 | *seqno = global_seqno; | |||||
744 | ||||||
745 | if (global_seqno > kMaxSequenceNumber) { | |||||
746 | std::array<char, 200> msg_buf; | |||||
747 | snprintf(msg_buf.data(), msg_buf.max_size(), | |||||
748 | "An external sst file with version %u have global seqno property " | |||||
749 | "with value %llu, which is greater than kMaxSequenceNumber", | |||||
750 | version, static_cast<unsigned long long>(global_seqno)); | |||||
751 | return Status::Corruption(msg_buf.data()); | |||||
752 | } | |||||
753 | ||||||
754 | return Status::OK(); | |||||
755 | } | |||||
756 | } // namespace | |||||
757 | ||||||
758 | Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, | |||||
759 | size_t cache_key_prefix_size, | |||||
760 | const BlockHandle& handle, char* cache_key) { | |||||
761 | assert(cache_key != nullptr)(static_cast<void> (0)); | |||||
762 | assert(cache_key_prefix_size != 0)(static_cast<void> (0)); | |||||
763 | assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize)(static_cast<void> (0)); | |||||
764 | memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); | |||||
765 | char* end = | |||||
766 | EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset()); | |||||
767 | return Slice(cache_key, static_cast<size_t>(end - cache_key)); | |||||
768 | } | |||||
769 | ||||||
770 | Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, | |||||
771 | const EnvOptions& env_options, | |||||
772 | const BlockBasedTableOptions& table_options, | |||||
773 | const InternalKeyComparator& internal_comparator, | |||||
774 | std::unique_ptr<RandomAccessFileReader>&& file, | |||||
775 | uint64_t file_size, | |||||
776 | std::unique_ptr<TableReader>* table_reader, | |||||
777 | const SliceTransform* prefix_extractor, | |||||
778 | const bool prefetch_index_and_filter_in_cache, | |||||
779 | const bool skip_filters, const int level, | |||||
780 | const bool immortal_table, | |||||
781 | const SequenceNumber largest_seqno, | |||||
782 | TailPrefetchStats* tail_prefetch_stats) { | |||||
783 | table_reader->reset(); | |||||
784 | ||||||
785 | Status s; | |||||
786 | Footer footer; | |||||
787 | std::unique_ptr<FilePrefetchBuffer> prefetch_buffer; | |||||
788 | ||||||
789 | // prefetch both index and filters, down to all partitions | |||||
790 | const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; | |||||
791 | const bool preload_all = !table_options.cache_index_and_filter_blocks; | |||||
792 | ||||||
793 | s = PrefetchTail(file.get(), file_size, tail_prefetch_stats, prefetch_all, | |||||
794 | preload_all, &prefetch_buffer); | |||||
795 | ||||||
796 | // Read in the following order: | |||||
797 | // 1. Footer | |||||
798 | // 2. [metaindex block] | |||||
799 | // 3. [meta block: properties] | |||||
800 | // 4. [meta block: range deletion tombstone] | |||||
801 | // 5. [meta block: compression dictionary] | |||||
802 | // 6. [meta block: index] | |||||
803 | // 7. [meta block: filter] | |||||
804 | s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer, | |||||
805 | kBlockBasedTableMagicNumber); | |||||
806 | if (!s.ok()) { | |||||
807 | return s; | |||||
808 | } | |||||
809 | if (!BlockBasedTableSupportedVersion(footer.version())) { | |||||
810 | return Status::Corruption( | |||||
811 | "Unknown Footer version. Maybe this file was created with newer " | |||||
812 | "version of RocksDB?"); | |||||
813 | } | |||||
814 | ||||||
815 | // We've successfully read the footer. We are ready to serve requests. | |||||
816 | // Better not mutate rep_ after the creation. eg. internal_prefix_transform | |||||
817 | // raw pointer will be used to create HashIndexReader, whose reset may | |||||
818 | // access a dangling pointer. | |||||
819 | Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options, | |||||
820 | internal_comparator, skip_filters, level, | |||||
821 | immortal_table); | |||||
822 | rep->file = std::move(file); | |||||
823 | rep->footer = footer; | |||||
824 | rep->index_type = table_options.index_type; | |||||
825 | rep->hash_index_allow_collision = table_options.hash_index_allow_collision; | |||||
826 | // We need to wrap data with internal_prefix_transform to make sure it can | |||||
827 | // handle prefix correctly. | |||||
828 | rep->internal_prefix_transform.reset( | |||||
829 | new InternalKeySliceTransform(prefix_extractor)); | |||||
830 | SetupCacheKeyPrefix(rep, file_size); | |||||
831 | std::unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep)); | |||||
832 | ||||||
833 | // page cache options | |||||
834 | rep->persistent_cache_options = | |||||
835 | PersistentCacheOptions(rep->table_options.persistent_cache, | |||||
836 | std::string(rep->persistent_cache_key_prefix, | |||||
837 | rep->persistent_cache_key_prefix_size), | |||||
838 | rep->ioptions.statistics); | |||||
839 | ||||||
840 | // Read metaindex | |||||
841 | std::unique_ptr<Block> meta; | |||||
842 | std::unique_ptr<InternalIterator> meta_iter; | |||||
843 | s = ReadMetaBlock(rep, prefetch_buffer.get(), &meta, &meta_iter); | |||||
844 | if (!s.ok()) { | |||||
845 | return s; | |||||
846 | } | |||||
847 | ||||||
848 | s = ReadPropertiesBlock(rep, prefetch_buffer.get(), meta_iter.get(), | |||||
849 | largest_seqno); | |||||
850 | if (!s.ok()) { | |||||
851 | return s; | |||||
852 | } | |||||
853 | s = ReadRangeDelBlock(rep, prefetch_buffer.get(), meta_iter.get(), | |||||
854 | internal_comparator); | |||||
855 | if (!s.ok()) { | |||||
856 | return s; | |||||
857 | } | |||||
858 | s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(), | |||||
859 | new_table.get(), prefix_extractor, | |||||
860 | prefetch_all, table_options, level, | |||||
861 | prefetch_index_and_filter_in_cache); | |||||
862 | ||||||
863 | if (s.ok()) { | |||||
864 | // Update tail prefetch stats | |||||
865 | assert(prefetch_buffer.get() != nullptr)(static_cast<void> (0)); | |||||
866 | if (tail_prefetch_stats != nullptr) { | |||||
867 | assert(prefetch_buffer->min_offset_read() < file_size)(static_cast<void> (0)); | |||||
868 | tail_prefetch_stats->RecordEffectiveSize( | |||||
869 | static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read()); | |||||
870 | } | |||||
871 | ||||||
872 | *table_reader = std::move(new_table); | |||||
873 | } | |||||
874 | ||||||
875 | return s; | |||||
876 | } | |||||
877 | ||||||
878 | Status BlockBasedTable::PrefetchTail( | |||||
879 | RandomAccessFileReader* file, uint64_t file_size, | |||||
880 | TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, | |||||
881 | const bool preload_all, | |||||
882 | std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer) { | |||||
883 | size_t tail_prefetch_size = 0; | |||||
884 | if (tail_prefetch_stats != nullptr) { | |||||
885 | // Multiple threads may get a 0 (no history) when running in parallel, | |||||
886 | // but it will get cleared after the first of them finishes. | |||||
887 | tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize(); | |||||
888 | } | |||||
889 | if (tail_prefetch_size == 0) { | |||||
890 | // Before read footer, readahead backwards to prefetch data. Do more | |||||
891 | // readahead if we're going to read index/filter. | |||||
892 | // TODO: This may incorrectly select small readahead in case partitioned | |||||
893 | // index/filter is enabled and top-level partition pinning is enabled. | |||||
894 | // That's because we need to issue readahead before we read the properties, | |||||
895 | // at which point we don't yet know the index type. | |||||
896 | tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024; | |||||
897 | } | |||||
898 | size_t prefetch_off; | |||||
899 | size_t prefetch_len; | |||||
900 | if (file_size < tail_prefetch_size) { | |||||
901 | prefetch_off = 0; | |||||
902 | prefetch_len = static_cast<size_t>(file_size); | |||||
903 | } else { | |||||
904 | prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size); | |||||
905 | prefetch_len = tail_prefetch_size; | |||||
906 | } | |||||
907 | TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen", | |||||
908 | &tail_prefetch_size); | |||||
909 | Status s; | |||||
910 | // TODO should not have this special logic in the future. | |||||
911 | if (!file->use_direct_io()) { | |||||
912 | prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true)); | |||||
913 | s = file->Prefetch(prefetch_off, prefetch_len); | |||||
914 | } else { | |||||
915 | prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true)); | |||||
916 | s = (*prefetch_buffer)->Prefetch(file, prefetch_off, prefetch_len); | |||||
917 | } | |||||
918 | return s; | |||||
919 | } | |||||
920 | ||||||
921 | Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, | |||||
922 | uint32_t expected) { | |||||
923 | Status s; | |||||
924 | uint32_t actual = 0; | |||||
925 | switch (type) { | |||||
926 | case kNoChecksum: | |||||
927 | break; | |||||
928 | case kCRC32c: | |||||
929 | expected = crc32c::Unmask(expected); | |||||
930 | actual = crc32c::Value(buf, len); | |||||
931 | break; | |||||
932 | case kxxHash: | |||||
933 | actual = XXH32(buf, static_cast<int>(len), 0); | |||||
934 | break; | |||||
935 | case kxxHash64: | |||||
936 | actual = static_cast<uint32_t>(XXH64(buf, static_cast<int>(len), 0) & | |||||
937 | uint64_t{0xffffffff}); | |||||
938 | break; | |||||
939 | default: | |||||
940 | s = Status::Corruption("unknown checksum type"); | |||||
941 | } | |||||
942 | if (s.ok() && actual != expected) { | |||||
943 | s = Status::Corruption("properties block checksum mismatched"); | |||||
944 | } | |||||
945 | return s; | |||||
946 | } | |||||
947 | ||||||
948 | Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( | |||||
949 | Rep* rep, FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, | |||||
950 | TableProperties** table_properties) { | |||||
951 | assert(table_properties != nullptr)(static_cast<void> (0)); | |||||
952 | // If this is an external SST file ingested with write_global_seqno set to | |||||
953 | // true, then we expect the checksum mismatch because checksum was written | |||||
954 | // by SstFileWriter, but its global seqno in the properties block may have | |||||
955 | // been changed during ingestion. In this case, we read the properties | |||||
956 | // block, copy it to a memory buffer, change the global seqno to its | |||||
957 | // original value, i.e. 0, and verify the checksum again. | |||||
958 | BlockHandle props_block_handle; | |||||
959 | CacheAllocationPtr tmp_buf; | |||||
960 | Status s = ReadProperties(handle_value, rep->file.get(), prefetch_buffer, | |||||
961 | rep->footer, rep->ioptions, table_properties, | |||||
962 | false /* verify_checksum */, &props_block_handle, | |||||
963 | &tmp_buf, false /* compression_type_missing */, | |||||
964 | nullptr /* memory_allocator */); | |||||
965 | if (s.ok() && tmp_buf) { | |||||
966 | const auto seqno_pos_iter = | |||||
967 | (*table_properties) | |||||
968 | ->properties_offsets.find( | |||||
969 | ExternalSstFilePropertyNames::kGlobalSeqno); | |||||
970 | size_t block_size = props_block_handle.size(); | |||||
971 | if (seqno_pos_iter != (*table_properties)->properties_offsets.end()) { | |||||
972 | uint64_t global_seqno_offset = seqno_pos_iter->second; | |||||
973 | EncodeFixed64( | |||||
974 | tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), 0); | |||||
975 | } | |||||
976 | uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1); | |||||
977 | s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(), | |||||
978 | block_size + 1, value); | |||||
979 | } | |||||
980 | return s; | |||||
981 | } | |||||
982 | ||||||
983 | Status BlockBasedTable::ReadPropertiesBlock( | |||||
984 | Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, | |||||
985 | const SequenceNumber largest_seqno) { | |||||
986 | bool found_properties_block = true; | |||||
987 | Status s; | |||||
988 | s = SeekToPropertiesBlock(meta_iter, &found_properties_block); | |||||
989 | ||||||
990 | if (!s.ok()) { | |||||
991 | ROCKS_LOG_WARN(rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "993" "] " "Error when seeking to properties block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
992 | "Error when seeking to properties block from file: %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "993" "] " "Error when seeking to properties block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
993 | s.ToString().c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "993" "] " "Error when seeking to properties block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
994 | } else if (found_properties_block) { | |||||
995 | s = meta_iter->status(); | |||||
996 | TableProperties* table_properties = nullptr; | |||||
997 | if (s.ok()) { | |||||
998 | s = ReadProperties( | |||||
999 | meta_iter->value(), rep->file.get(), prefetch_buffer, rep->footer, | |||||
1000 | rep->ioptions, &table_properties, true /* verify_checksum */, | |||||
1001 | nullptr /* ret_block_handle */, nullptr /* ret_block_contents */, | |||||
1002 | false /* compression_type_missing */, nullptr /* memory_allocator */); | |||||
1003 | } | |||||
1004 | ||||||
1005 | if (s.IsCorruption()) { | |||||
1006 | s = TryReadPropertiesWithGlobalSeqno( | |||||
1007 | rep, prefetch_buffer, meta_iter->value(), &table_properties); | |||||
1008 | } | |||||
1009 | std::unique_ptr<TableProperties> props_guard; | |||||
1010 | if (table_properties != nullptr) { | |||||
1011 | props_guard.reset(table_properties); | |||||
1012 | } | |||||
1013 | ||||||
1014 | if (!s.ok()) { | |||||
1015 | ROCKS_LOG_WARN(rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1018" "] " "Encountered error while reading data from properties " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1016 | "Encountered error while reading data from properties "rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1018" "] " "Encountered error while reading data from properties " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1017 | "block %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1018" "] " "Encountered error while reading data from properties " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1018 | s.ToString().c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1018" "] " "Encountered error while reading data from properties " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
1019 | } else { | |||||
1020 | assert(table_properties != nullptr)(static_cast<void> (0)); | |||||
1021 | rep->table_properties.reset(props_guard.release()); | |||||
1022 | rep->blocks_maybe_compressed = rep->table_properties->compression_name != | |||||
1023 | CompressionTypeToString(kNoCompression); | |||||
1024 | rep->blocks_definitely_zstd_compressed = | |||||
1025 | (rep->table_properties->compression_name == | |||||
1026 | CompressionTypeToString(kZSTD) || | |||||
1027 | rep->table_properties->compression_name == | |||||
1028 | CompressionTypeToString(kZSTDNotFinalCompression)); | |||||
1029 | } | |||||
1030 | } else { | |||||
1031 | ROCKS_LOG_ERROR(rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1032" "] " "Cannot find Properties block from file." ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" )) | |||||
1032 | "Cannot find Properties block from file.")rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1032" "] " "Cannot find Properties block from file." ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" )); | |||||
1033 | } | |||||
1034 | #ifndef ROCKSDB_LITE | |||||
1035 | if (rep->table_properties) { | |||||
1036 | ParseSliceTransform(rep->table_properties->prefix_extractor_name, | |||||
1037 | &(rep->table_prefix_extractor)); | |||||
1038 | } | |||||
1039 | #endif // ROCKSDB_LITE | |||||
1040 | ||||||
1041 | // Read the table properties, if provided. | |||||
1042 | if (rep->table_properties) { | |||||
1043 | rep->whole_key_filtering &= | |||||
1044 | IsFeatureSupported(*(rep->table_properties), | |||||
1045 | BlockBasedTablePropertyNames::kWholeKeyFiltering, | |||||
1046 | rep->ioptions.info_log); | |||||
1047 | rep->prefix_filtering &= IsFeatureSupported( | |||||
1048 | *(rep->table_properties), | |||||
1049 | BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log); | |||||
1050 | ||||||
1051 | s = GetGlobalSequenceNumber(*(rep->table_properties), largest_seqno, | |||||
1052 | &(rep->global_seqno)); | |||||
1053 | if (!s.ok()) { | |||||
1054 | ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str())rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1054" "] " "%s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
1055 | } | |||||
1056 | } | |||||
1057 | return s; | |||||
1058 | } | |||||
1059 | ||||||
1060 | Status BlockBasedTable::ReadRangeDelBlock( | |||||
1061 | Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, | |||||
1062 | const InternalKeyComparator& internal_comparator) { | |||||
1063 | Status s; | |||||
1064 | bool found_range_del_block; | |||||
1065 | BlockHandle range_del_handle; | |||||
1066 | s = SeekToRangeDelBlock(meta_iter, &found_range_del_block, &range_del_handle); | |||||
1067 | if (!s.ok()) { | |||||
1068 | ROCKS_LOG_WARN(rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1071" "] " "Error when seeking to range delete tombstones block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1069 | rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1071" "] " "Error when seeking to range delete tombstones block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1070 | "Error when seeking to range delete tombstones block from file: %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1071" "] " "Error when seeking to range delete tombstones block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1071 | s.ToString().c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1071" "] " "Error when seeking to range delete tombstones block from file: %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
1072 | } else if (found_range_del_block && !range_del_handle.IsNull()) { | |||||
1073 | ReadOptions read_options; | |||||
1074 | std::unique_ptr<InternalIterator> iter(NewDataBlockIterator<DataBlockIter>( | |||||
1075 | rep, read_options, range_del_handle, nullptr /* input_iter */, | |||||
1076 | false /* is_index */, true /* key_includes_seq */, | |||||
1077 | true /* index_key_is_full */, nullptr /* get_context */, Status(), | |||||
1078 | prefetch_buffer)); | |||||
1079 | assert(iter != nullptr)(static_cast<void> (0)); | |||||
1080 | s = iter->status(); | |||||
1081 | if (!s.ok()) { | |||||
1082 | ROCKS_LOG_WARN(rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1085" "] " "Encountered error while reading data from range del block %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1083 | rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1085" "] " "Encountered error while reading data from range del block %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1084 | "Encountered error while reading data from range del block %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1085" "] " "Encountered error while reading data from range del block %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1085 | s.ToString().c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1085" "] " "Encountered error while reading data from range del block %s" ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
1086 | } else { | |||||
1087 | rep->fragmented_range_dels = | |||||
1088 | std::make_shared<FragmentedRangeTombstoneList>(std::move(iter), | |||||
1089 | internal_comparator); | |||||
1090 | } | |||||
1091 | } | |||||
1092 | return s; | |||||
1093 | } | |||||
1094 | ||||||
1095 | Status BlockBasedTable::ReadCompressionDictBlock( | |||||
1096 | Rep* rep, FilePrefetchBuffer* prefetch_buffer, | |||||
1097 | std::unique_ptr<const BlockContents>* compression_dict_block) { | |||||
1098 | assert(compression_dict_block != nullptr)(static_cast<void> (0)); | |||||
1099 | Status s; | |||||
1100 | if (!rep->compression_dict_handle.IsNull()) { | |||||
1101 | std::unique_ptr<BlockContents> compression_dict_cont{new BlockContents()}; | |||||
1102 | PersistentCacheOptions cache_options; | |||||
1103 | ReadOptions read_options; | |||||
1104 | read_options.verify_checksums = true; | |||||
1105 | BlockFetcher compression_block_fetcher( | |||||
1106 | rep->file.get(), prefetch_buffer, rep->footer, read_options, | |||||
1107 | rep->compression_dict_handle, compression_dict_cont.get(), | |||||
1108 | rep->ioptions, false /* decompress */, false /*maybe_compressed*/, | |||||
1109 | UncompressionDict::GetEmptyDict(), cache_options); | |||||
1110 | s = compression_block_fetcher.ReadBlockContents(); | |||||
1111 | ||||||
1112 | if (!s.ok()) { | |||||
1113 | ROCKS_LOG_WARN(rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1117" "] " "Encountered error while reading data from compression dictionary " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1114 | rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1117" "] " "Encountered error while reading data from compression dictionary " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1115 | "Encountered error while reading data from compression dictionary "rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1117" "] " "Encountered error while reading data from compression dictionary " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1116 | "block %s",rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1117" "] " "Encountered error while reading data from compression dictionary " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1117 | s.ToString().c_str())rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log , ("[%s:" "1117" "] " "Encountered error while reading data from compression dictionary " "block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
1118 | } else { | |||||
1119 | *compression_dict_block = std::move(compression_dict_cont); | |||||
1120 | } | |||||
1121 | } | |||||
1122 | return s; | |||||
1123 | } | |||||
1124 | ||||||
1125 | Status BlockBasedTable::PrefetchIndexAndFilterBlocks( | |||||
1126 | Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, | |||||
1127 | BlockBasedTable* new_table, const SliceTransform* prefix_extractor, | |||||
1128 | bool prefetch_all, const BlockBasedTableOptions& table_options, | |||||
1129 | const int level, const bool prefetch_index_and_filter_in_cache) { | |||||
1130 | Status s; | |||||
1131 | ||||||
1132 | // Find filter handle and filter type | |||||
1133 | if (rep->filter_policy) { | |||||
1134 | for (auto filter_type : | |||||
1135 | {Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter, | |||||
1136 | Rep::FilterType::kBlockFilter}) { | |||||
1137 | std::string prefix; | |||||
1138 | switch (filter_type) { | |||||
1139 | case Rep::FilterType::kFullFilter: | |||||
1140 | prefix = kFullFilterBlockPrefix; | |||||
1141 | break; | |||||
1142 | case Rep::FilterType::kPartitionedFilter: | |||||
1143 | prefix = kPartitionedFilterBlockPrefix; | |||||
1144 | break; | |||||
1145 | case Rep::FilterType::kBlockFilter: | |||||
1146 | prefix = kFilterBlockPrefix; | |||||
1147 | break; | |||||
1148 | default: | |||||
1149 | assert(0)(static_cast<void> (0)); | |||||
1150 | } | |||||
1151 | std::string filter_block_key = prefix; | |||||
1152 | filter_block_key.append(rep->filter_policy->Name()); | |||||
1153 | if (FindMetaBlock(meta_iter, filter_block_key, &rep->filter_handle) | |||||
1154 | .ok()) { | |||||
1155 | rep->filter_type = filter_type; | |||||
1156 | break; | |||||
1157 | } | |||||
1158 | } | |||||
1159 | } | |||||
1160 | ||||||
1161 | { | |||||
1162 | // Find compression dictionary handle | |||||
1163 | bool found_compression_dict; | |||||
1164 | s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict, | |||||
1165 | &rep->compression_dict_handle); | |||||
1166 | } | |||||
1167 | ||||||
1168 | bool need_upper_bound_check = | |||||
1169 | PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor); | |||||
1170 | ||||||
1171 | BlockBasedTableOptions::IndexType index_type = new_table->UpdateIndexType(); | |||||
1172 | // prefetch the first level of index | |||||
1173 | const bool prefetch_index = | |||||
1174 | prefetch_all || | |||||
1175 | (table_options.pin_top_level_index_and_filter && | |||||
1176 | index_type == BlockBasedTableOptions::kTwoLevelIndexSearch); | |||||
1177 | // prefetch the first level of filter | |||||
1178 | const bool prefetch_filter = | |||||
1179 | prefetch_all || (table_options.pin_top_level_index_and_filter && | |||||
1180 | rep->filter_type == Rep::FilterType::kPartitionedFilter); | |||||
1181 | // Partition fitlers cannot be enabled without partition indexes | |||||
1182 | assert(!prefetch_filter || prefetch_index)(static_cast<void> (0)); | |||||
1183 | // pin both index and filters, down to all partitions | |||||
1184 | const bool pin_all = | |||||
1185 | rep->table_options.pin_l0_filter_and_index_blocks_in_cache && level == 0; | |||||
1186 | // pin the first level of index | |||||
1187 | const bool pin_index = | |||||
1188 | pin_all || (table_options.pin_top_level_index_and_filter && | |||||
1189 | index_type == BlockBasedTableOptions::kTwoLevelIndexSearch); | |||||
1190 | // pin the first level of filter | |||||
1191 | const bool pin_filter = | |||||
1192 | pin_all || (table_options.pin_top_level_index_and_filter && | |||||
1193 | rep->filter_type == Rep::FilterType::kPartitionedFilter); | |||||
1194 | // pre-fetching of blocks is turned on | |||||
1195 | // Will use block cache for meta-blocks access | |||||
1196 | // Always prefetch index and filter for level 0 | |||||
1197 | // TODO(ajkr): also prefetch compression dictionary block | |||||
1198 | if (table_options.cache_index_and_filter_blocks) { | |||||
1199 | assert(table_options.block_cache != nullptr)(static_cast<void> (0)); | |||||
1200 | if (prefetch_index) { | |||||
1201 | // Hack: Call NewIndexIterator() to implicitly add index to the | |||||
1202 | // block_cache | |||||
1203 | CachableEntry<IndexReader> index_entry; | |||||
1204 | // check prefix_extractor match only if hash based index is used | |||||
1205 | bool disable_prefix_seek = | |||||
1206 | rep->index_type == BlockBasedTableOptions::kHashSearch && | |||||
1207 | need_upper_bound_check; | |||||
1208 | if (s.ok()) { | |||||
1209 | std::unique_ptr<InternalIteratorBase<BlockHandle>> iter( | |||||
1210 | new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek, | |||||
1211 | nullptr, &index_entry)); | |||||
1212 | s = iter->status(); | |||||
1213 | } | |||||
1214 | if (s.ok()) { | |||||
1215 | // This is the first call to NewIndexIterator() since we're in Open(). | |||||
1216 | // On success it should give us ownership of the `CachableEntry` by | |||||
1217 | // populating `index_entry`. | |||||
1218 | assert(index_entry.value != nullptr)(static_cast<void> (0)); | |||||
1219 | if (prefetch_all) { | |||||
1220 | index_entry.value->CacheDependencies(pin_all); | |||||
1221 | } | |||||
1222 | if (pin_index) { | |||||
1223 | rep->index_entry = std::move(index_entry); | |||||
1224 | } else { | |||||
1225 | index_entry.Release(table_options.block_cache.get()); | |||||
1226 | } | |||||
1227 | } | |||||
1228 | } | |||||
1229 | if (s.ok() && prefetch_filter) { | |||||
1230 | // Hack: Call GetFilter() to implicitly add filter to the block_cache | |||||
1231 | auto filter_entry = | |||||
1232 | new_table->GetFilter(rep->table_prefix_extractor.get()); | |||||
1233 | if (filter_entry.value != nullptr && prefetch_all) { | |||||
1234 | filter_entry.value->CacheDependencies( | |||||
1235 | pin_all, rep->table_prefix_extractor.get()); | |||||
1236 | } | |||||
1237 | // if pin_filter is true then save it in rep_->filter_entry; it will be | |||||
1238 | // released in the destructor only, hence it will be pinned in the | |||||
1239 | // cache while this reader is alive | |||||
1240 | if (pin_filter) { | |||||
1241 | rep->filter_entry = filter_entry; | |||||
1242 | } else { | |||||
1243 | filter_entry.Release(table_options.block_cache.get()); | |||||
1244 | } | |||||
1245 | } | |||||
1246 | } else { | |||||
1247 | // If we don't use block cache for meta-block access, we'll pre-load these | |||||
1248 | // blocks, which will kept in member variables in Rep and with a same life- | |||||
1249 | // time as this table object. | |||||
1250 | IndexReader* index_reader = nullptr; | |||||
1251 | if (s.ok()) { | |||||
1252 | s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, | |||||
1253 | meta_iter, level); | |||||
1254 | } | |||||
1255 | std::unique_ptr<const BlockContents> compression_dict_block; | |||||
1256 | if (s.ok()) { | |||||
1257 | rep->index_reader.reset(index_reader); | |||||
1258 | // The partitions of partitioned index are always stored in cache. They | |||||
1259 | // are hence follow the configuration for pin and prefetch regardless of | |||||
1260 | // the value of cache_index_and_filter_blocks | |||||
1261 | if (prefetch_index_and_filter_in_cache || level == 0) { | |||||
1262 | rep->index_reader->CacheDependencies(pin_all); | |||||
1263 | } | |||||
1264 | ||||||
1265 | // Set filter block | |||||
1266 | if (rep->filter_policy) { | |||||
1267 | const bool is_a_filter_partition = true; | |||||
1268 | auto filter = new_table->ReadFilter(prefetch_buffer, rep->filter_handle, | |||||
1269 | !is_a_filter_partition, | |||||
1270 | rep->table_prefix_extractor.get()); | |||||
1271 | rep->filter.reset(filter); | |||||
1272 | // Refer to the comment above about paritioned indexes always being | |||||
1273 | // cached | |||||
1274 | if (filter && (prefetch_index_and_filter_in_cache || level == 0)) { | |||||
1275 | filter->CacheDependencies(pin_all, rep->table_prefix_extractor.get()); | |||||
1276 | } | |||||
1277 | } | |||||
1278 | s = ReadCompressionDictBlock(rep, prefetch_buffer, | |||||
1279 | &compression_dict_block); | |||||
1280 | } else { | |||||
1281 | delete index_reader; | |||||
1282 | } | |||||
1283 | if (s.ok() && !rep->compression_dict_handle.IsNull()) { | |||||
1284 | assert(compression_dict_block != nullptr)(static_cast<void> (0)); | |||||
1285 | // TODO(ajkr): find a way to avoid the `compression_dict_block` data copy | |||||
1286 | rep->uncompression_dict.reset(new UncompressionDict( | |||||
1287 | compression_dict_block->data.ToString(), | |||||
1288 | rep->blocks_definitely_zstd_compressed, rep->ioptions.statistics)); | |||||
1289 | } | |||||
1290 | } | |||||
1291 | return s; | |||||
1292 | } | |||||
1293 | ||||||
1294 | void BlockBasedTable::SetupForCompaction() { | |||||
1295 | switch (rep_->ioptions.access_hint_on_compaction_start) { | |||||
1296 | case Options::NONE: | |||||
1297 | break; | |||||
1298 | case Options::NORMAL: | |||||
1299 | rep_->file->file()->Hint(RandomAccessFile::NORMAL); | |||||
1300 | break; | |||||
1301 | case Options::SEQUENTIAL: | |||||
1302 | rep_->file->file()->Hint(RandomAccessFile::SEQUENTIAL); | |||||
1303 | break; | |||||
1304 | case Options::WILLNEED: | |||||
1305 | rep_->file->file()->Hint(RandomAccessFile::WILLNEED); | |||||
1306 | break; | |||||
1307 | default: | |||||
1308 | assert(false)(static_cast<void> (0)); | |||||
1309 | } | |||||
1310 | } | |||||
1311 | ||||||
1312 | std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties() | |||||
1313 | const { | |||||
1314 | return rep_->table_properties; | |||||
1315 | } | |||||
1316 | ||||||
1317 | size_t BlockBasedTable::ApproximateMemoryUsage() const { | |||||
1318 | size_t usage = 0; | |||||
1319 | if (rep_->filter) { | |||||
1320 | usage += rep_->filter->ApproximateMemoryUsage(); | |||||
1321 | } | |||||
1322 | if (rep_->index_reader) { | |||||
1323 | usage += rep_->index_reader->ApproximateMemoryUsage(); | |||||
1324 | } | |||||
1325 | if (rep_->uncompression_dict) { | |||||
1326 | usage += rep_->uncompression_dict->ApproximateMemoryUsage(); | |||||
1327 | } | |||||
1328 | return usage; | |||||
1329 | } | |||||
1330 | ||||||
1331 | // Load the meta-block from the file. On success, return the loaded meta block | |||||
1332 | // and its iterator. | |||||
1333 | Status BlockBasedTable::ReadMetaBlock(Rep* rep, | |||||
1334 | FilePrefetchBuffer* prefetch_buffer, | |||||
1335 | std::unique_ptr<Block>* meta_block, | |||||
1336 | std::unique_ptr<InternalIterator>* iter) { | |||||
1337 | // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates | |||||
1338 | // it is an empty block. | |||||
1339 | std::unique_ptr<Block> meta; | |||||
1340 | Status s = ReadBlockFromFile( | |||||
1341 | rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), | |||||
1342 | rep->footer.metaindex_handle(), &meta, rep->ioptions, | |||||
1343 | true /* decompress */, true /*maybe_compressed*/, | |||||
1344 | UncompressionDict::GetEmptyDict(), rep->persistent_cache_options, | |||||
1345 | kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, | |||||
1346 | GetMemoryAllocator(rep->table_options)); | |||||
1347 | ||||||
1348 | if (!s.ok()) { | |||||
1349 | ROCKS_LOG_ERROR(rep->ioptions.info_log,rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1352" "] " "Encountered error while reading data from properties" " block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1350 | "Encountered error while reading data from properties"rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1352" "] " "Encountered error while reading data from properties" " block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1351 | " block %s",rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1352" "] " "Encountered error while reading data from properties" " block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()) | |||||
1352 | s.ToString().c_str())rocksdb::Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log , ("[%s:" "1352" "] " "Encountered error while reading data from properties" " block %s"), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" ), s.ToString().c_str()); | |||||
1353 | return s; | |||||
1354 | } | |||||
1355 | ||||||
1356 | *meta_block = std::move(meta); | |||||
1357 | // meta block uses bytewise comparator. | |||||
1358 | iter->reset(meta_block->get()->NewIterator<DataBlockIter>( | |||||
1359 | BytewiseComparator(), BytewiseComparator())); | |||||
1360 | return Status::OK(); | |||||
1361 | } | |||||
1362 | ||||||
1363 | Status BlockBasedTable::GetDataBlockFromCache( | |||||
1364 | const Slice& block_cache_key, const Slice& compressed_block_cache_key, | |||||
1365 | Cache* block_cache, Cache* block_cache_compressed, Rep* rep, | |||||
1366 | const ReadOptions& read_options, | |||||
1367 | BlockBasedTable::CachableEntry<Block>* block, | |||||
1368 | const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, | |||||
1369 | bool is_index, GetContext* get_context) { | |||||
1370 | Status s; | |||||
1371 | BlockContents* compressed_block = nullptr; | |||||
1372 | Cache::Handle* block_cache_compressed_handle = nullptr; | |||||
1373 | Statistics* statistics = rep->ioptions.statistics; | |||||
1374 | ||||||
1375 | // Lookup uncompressed cache first | |||||
1376 | if (block_cache != nullptr) { | |||||
1377 | block->cache_handle = GetEntryFromCache( | |||||
1378 | block_cache, block_cache_key, rep->level, | |||||
1379 | is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS, | |||||
1380 | is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, | |||||
1381 | get_context | |||||
1382 | ? (is_index ? &get_context->get_context_stats_.num_cache_index_miss | |||||
1383 | : &get_context->get_context_stats_.num_cache_data_miss) | |||||
1384 | : nullptr, | |||||
1385 | get_context | |||||
1386 | ? (is_index ? &get_context->get_context_stats_.num_cache_index_hit | |||||
1387 | : &get_context->get_context_stats_.num_cache_data_hit) | |||||
1388 | : nullptr, | |||||
1389 | statistics, get_context); | |||||
1390 | if (block->cache_handle != nullptr) { | |||||
1391 | block->value = | |||||
1392 | reinterpret_cast<Block*>(block_cache->Value(block->cache_handle)); | |||||
1393 | return s; | |||||
1394 | } | |||||
1395 | } | |||||
1396 | ||||||
1397 | // If not found, search from the compressed block cache. | |||||
1398 | assert(block->cache_handle == nullptr && block->value == nullptr)(static_cast<void> (0)); | |||||
1399 | ||||||
1400 | if (block_cache_compressed == nullptr) { | |||||
1401 | return s; | |||||
1402 | } | |||||
1403 | ||||||
1404 | assert(!compressed_block_cache_key.empty())(static_cast<void> (0)); | |||||
1405 | block_cache_compressed_handle = | |||||
1406 | block_cache_compressed->Lookup(compressed_block_cache_key); | |||||
1407 | // if we found in the compressed cache, then uncompress and insert into | |||||
1408 | // uncompressed cache | |||||
1409 | if (block_cache_compressed_handle == nullptr) { | |||||
1410 | RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); | |||||
1411 | return s; | |||||
1412 | } | |||||
1413 | ||||||
1414 | // found compressed block | |||||
1415 | RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); | |||||
1416 | compressed_block = reinterpret_cast<BlockContents*>( | |||||
1417 | block_cache_compressed->Value(block_cache_compressed_handle)); | |||||
1418 | CompressionType compression_type = compressed_block->get_compression_type(); | |||||
1419 | assert(compression_type != kNoCompression)(static_cast<void> (0)); | |||||
1420 | ||||||
1421 | // Retrieve the uncompressed contents into a new buffer | |||||
1422 | BlockContents contents; | |||||
1423 | UncompressionContext context(compression_type); | |||||
1424 | UncompressionInfo info(context, uncompression_dict, compression_type); | |||||
1425 | s = UncompressBlockContents(info, compressed_block->data.data(), | |||||
1426 | compressed_block->data.size(), &contents, | |||||
1427 | rep->table_options.format_version, rep->ioptions, | |||||
1428 | GetMemoryAllocator(rep->table_options)); | |||||
1429 | ||||||
1430 | // Insert uncompressed block into block cache | |||||
1431 | if (s.ok()) { | |||||
1432 | block->value = | |||||
1433 | new Block(std::move(contents), rep->get_global_seqno(is_index), | |||||
1434 | read_amp_bytes_per_bit, | |||||
1435 | statistics); // uncompressed block | |||||
1436 | if (block_cache != nullptr && block->value->own_bytes() && | |||||
1437 | read_options.fill_cache) { | |||||
1438 | size_t charge = block->value->ApproximateMemoryUsage(); | |||||
1439 | s = block_cache->Insert(block_cache_key, block->value, charge, | |||||
1440 | &DeleteCachedEntry<Block>, | |||||
1441 | &(block->cache_handle)); | |||||
1442 | #ifndef NDEBUG1 | |||||
1443 | block_cache->TEST_mark_as_data_block(block_cache_key, charge); | |||||
1444 | #endif // NDEBUG | |||||
1445 | if (s.ok()) { | |||||
1446 | if (get_context != nullptr) { | |||||
1447 | get_context->get_context_stats_.num_cache_add++; | |||||
1448 | get_context->get_context_stats_.num_cache_bytes_write += charge; | |||||
1449 | } else { | |||||
1450 | RecordTick(statistics, BLOCK_CACHE_ADD); | |||||
1451 | RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge); | |||||
1452 | } | |||||
1453 | if (is_index) { | |||||
1454 | if (get_context != nullptr) { | |||||
1455 | get_context->get_context_stats_.num_cache_index_add++; | |||||
1456 | get_context->get_context_stats_.num_cache_index_bytes_insert += | |||||
1457 | charge; | |||||
1458 | } else { | |||||
1459 | RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); | |||||
1460 | RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge); | |||||
1461 | } | |||||
1462 | } else { | |||||
1463 | if (get_context != nullptr) { | |||||
1464 | get_context->get_context_stats_.num_cache_data_add++; | |||||
1465 | get_context->get_context_stats_.num_cache_data_bytes_insert += | |||||
1466 | charge; | |||||
1467 | } else { | |||||
1468 | RecordTick(statistics, BLOCK_CACHE_DATA_ADD); | |||||
1469 | RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge); | |||||
1470 | } | |||||
1471 | } | |||||
1472 | } else { | |||||
1473 | RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); | |||||
1474 | delete block->value; | |||||
1475 | block->value = nullptr; | |||||
1476 | } | |||||
1477 | } | |||||
1478 | } | |||||
1479 | ||||||
1480 | // Release hold on compressed cache entry | |||||
1481 | block_cache_compressed->Release(block_cache_compressed_handle); | |||||
1482 | return s; | |||||
1483 | } | |||||
1484 | ||||||
1485 | Status BlockBasedTable::PutDataBlockToCache( | |||||
1486 | const Slice& block_cache_key, const Slice& compressed_block_cache_key, | |||||
1487 | Cache* block_cache, Cache* block_cache_compressed, | |||||
1488 | const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions, | |||||
1489 | CachableEntry<Block>* cached_block, BlockContents* raw_block_contents, | |||||
1490 | CompressionType raw_block_comp_type, uint32_t format_version, | |||||
1491 | const UncompressionDict& uncompression_dict, SequenceNumber seq_no, | |||||
1492 | size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, | |||||
1493 | bool is_index, Cache::Priority priority, GetContext* get_context) { | |||||
1494 | assert(raw_block_comp_type == kNoCompression ||(static_cast<void> (0)) | |||||
1495 | block_cache_compressed != nullptr)(static_cast<void> (0)); | |||||
1496 | ||||||
1497 | Status s; | |||||
1498 | // Retrieve the uncompressed contents into a new buffer | |||||
1499 | BlockContents uncompressed_block_contents; | |||||
1500 | Statistics* statistics = ioptions.statistics; | |||||
1501 | if (raw_block_comp_type != kNoCompression) { | |||||
1502 | UncompressionContext context(raw_block_comp_type); | |||||
1503 | UncompressionInfo info(context, uncompression_dict, raw_block_comp_type); | |||||
1504 | s = UncompressBlockContents(info, raw_block_contents->data.data(), | |||||
1505 | raw_block_contents->data.size(), | |||||
1506 | &uncompressed_block_contents, format_version, | |||||
1507 | ioptions, memory_allocator); | |||||
1508 | } | |||||
1509 | if (!s.ok()) { | |||||
1510 | return s; | |||||
1511 | } | |||||
1512 | ||||||
1513 | if (raw_block_comp_type != kNoCompression) { | |||||
1514 | cached_block->value = new Block(std::move(uncompressed_block_contents), | |||||
1515 | seq_no, read_amp_bytes_per_bit, | |||||
1516 | statistics); // uncompressed block | |||||
1517 | } else { | |||||
1518 | cached_block->value = | |||||
1519 | new Block(std::move(*raw_block_contents), seq_no, | |||||
1520 | read_amp_bytes_per_bit, ioptions.statistics); | |||||
1521 | } | |||||
1522 | ||||||
1523 | // Insert compressed block into compressed block cache. | |||||
1524 | // Release the hold on the compressed cache entry immediately. | |||||
1525 | if (block_cache_compressed != nullptr && | |||||
1526 | raw_block_comp_type != kNoCompression && raw_block_contents != nullptr && | |||||
1527 | raw_block_contents->own_bytes()) { | |||||
1528 | #ifndef NDEBUG1 | |||||
1529 | assert(raw_block_contents->is_raw_block)(static_cast<void> (0)); | |||||
1530 | #endif // NDEBUG | |||||
1531 | ||||||
1532 | // We cannot directly put raw_block_contents because this could point to | |||||
1533 | // an object in the stack. | |||||
1534 | BlockContents* block_cont_for_comp_cache = | |||||
1535 | new BlockContents(std::move(*raw_block_contents)); | |||||
1536 | s = block_cache_compressed->Insert( | |||||
1537 | compressed_block_cache_key, block_cont_for_comp_cache, | |||||
1538 | block_cont_for_comp_cache->ApproximateMemoryUsage(), | |||||
1539 | &DeleteCachedEntry<BlockContents>); | |||||
1540 | if (s.ok()) { | |||||
1541 | // Avoid the following code to delete this cached block. | |||||
1542 | RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD); | |||||
1543 | } else { | |||||
1544 | RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); | |||||
1545 | delete block_cont_for_comp_cache; | |||||
1546 | } | |||||
1547 | } | |||||
1548 | ||||||
1549 | // insert into uncompressed block cache | |||||
1550 | if (block_cache != nullptr && cached_block->value->own_bytes()) { | |||||
1551 | size_t charge = cached_block->value->ApproximateMemoryUsage(); | |||||
1552 | s = block_cache->Insert(block_cache_key, cached_block->value, charge, | |||||
1553 | &DeleteCachedEntry<Block>, | |||||
1554 | &(cached_block->cache_handle), priority); | |||||
1555 | #ifndef NDEBUG1 | |||||
1556 | block_cache->TEST_mark_as_data_block(block_cache_key, charge); | |||||
1557 | #endif // NDEBUG | |||||
1558 | if (s.ok()) { | |||||
1559 | assert(cached_block->cache_handle != nullptr)(static_cast<void> (0)); | |||||
1560 | if (get_context != nullptr) { | |||||
1561 | get_context->get_context_stats_.num_cache_add++; | |||||
1562 | get_context->get_context_stats_.num_cache_bytes_write += charge; | |||||
1563 | } else { | |||||
1564 | RecordTick(statistics, BLOCK_CACHE_ADD); | |||||
1565 | RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge); | |||||
1566 | } | |||||
1567 | if (is_index) { | |||||
1568 | if (get_context != nullptr) { | |||||
1569 | get_context->get_context_stats_.num_cache_index_add++; | |||||
1570 | get_context->get_context_stats_.num_cache_index_bytes_insert += | |||||
1571 | charge; | |||||
1572 | } else { | |||||
1573 | RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); | |||||
1574 | RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge); | |||||
1575 | } | |||||
1576 | } else { | |||||
1577 | if (get_context != nullptr) { | |||||
1578 | get_context->get_context_stats_.num_cache_data_add++; | |||||
1579 | get_context->get_context_stats_.num_cache_data_bytes_insert += charge; | |||||
1580 | } else { | |||||
1581 | RecordTick(statistics, BLOCK_CACHE_DATA_ADD); | |||||
1582 | RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge); | |||||
1583 | } | |||||
1584 | } | |||||
1585 | assert(reinterpret_cast<Block*>(block_cache->Value((static_cast<void> (0)) | |||||
1586 | cached_block->cache_handle)) == cached_block->value)(static_cast<void> (0)); | |||||
1587 | } else { | |||||
1588 | RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); | |||||
1589 | delete cached_block->value; | |||||
1590 | cached_block->value = nullptr; | |||||
1591 | } | |||||
1592 | } | |||||
1593 | ||||||
1594 | return s; | |||||
1595 | } | |||||
1596 | ||||||
1597 | FilterBlockReader* BlockBasedTable::ReadFilter( | |||||
1598 | FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_handle, | |||||
1599 | const bool is_a_filter_partition, | |||||
1600 | const SliceTransform* prefix_extractor) const { | |||||
1601 | auto& rep = rep_; | |||||
1602 | // TODO: We might want to unify with ReadBlockFromFile() if we start | |||||
1603 | // requiring checksum verification in Table::Open. | |||||
1604 | if (rep->filter_type == Rep::FilterType::kNoFilter) { | |||||
1605 | return nullptr; | |||||
1606 | } | |||||
1607 | BlockContents block; | |||||
1608 | ||||||
1609 | BlockFetcher block_fetcher( | |||||
1610 | rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), | |||||
1611 | filter_handle, &block, rep->ioptions, false /* decompress */, | |||||
1612 | false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), | |||||
1613 | rep->persistent_cache_options, GetMemoryAllocator(rep->table_options)); | |||||
1614 | Status s = block_fetcher.ReadBlockContents(); | |||||
1615 | ||||||
1616 | if (!s.ok()) { | |||||
1617 | // Error reading the block | |||||
1618 | return nullptr; | |||||
1619 | } | |||||
1620 | ||||||
1621 | assert(rep->filter_policy)(static_cast<void> (0)); | |||||
1622 | ||||||
1623 | auto filter_type = rep->filter_type; | |||||
1624 | if (rep->filter_type == Rep::FilterType::kPartitionedFilter && | |||||
1625 | is_a_filter_partition) { | |||||
1626 | filter_type = Rep::FilterType::kFullFilter; | |||||
1627 | } | |||||
1628 | ||||||
1629 | switch (filter_type) { | |||||
1630 | case Rep::FilterType::kPartitionedFilter: { | |||||
1631 | return new PartitionedFilterBlockReader( | |||||
1632 | rep->prefix_filtering ? prefix_extractor : nullptr, | |||||
1633 | rep->whole_key_filtering, std::move(block), nullptr, | |||||
1634 | rep->ioptions.statistics, rep->internal_comparator, this, | |||||
1635 | rep_->table_properties == nullptr || | |||||
1636 | rep_->table_properties->index_key_is_user_key == 0, | |||||
1637 | rep_->table_properties == nullptr || | |||||
1638 | rep_->table_properties->index_value_is_delta_encoded == 0); | |||||
1639 | } | |||||
1640 | ||||||
1641 | case Rep::FilterType::kBlockFilter: | |||||
1642 | return new BlockBasedFilterBlockReader( | |||||
1643 | rep->prefix_filtering ? prefix_extractor : nullptr, | |||||
1644 | rep->table_options, rep->whole_key_filtering, std::move(block), | |||||
1645 | rep->ioptions.statistics); | |||||
1646 | ||||||
1647 | case Rep::FilterType::kFullFilter: { | |||||
1648 | auto filter_bits_reader = | |||||
1649 | rep->filter_policy->GetFilterBitsReader(block.data); | |||||
1650 | assert(filter_bits_reader != nullptr)(static_cast<void> (0)); | |||||
1651 | return new FullFilterBlockReader( | |||||
1652 | rep->prefix_filtering ? prefix_extractor : nullptr, | |||||
1653 | rep->whole_key_filtering, std::move(block), filter_bits_reader, | |||||
1654 | rep->ioptions.statistics); | |||||
1655 | } | |||||
1656 | ||||||
1657 | default: | |||||
1658 | // filter_type is either kNoFilter (exited the function at the first if), | |||||
1659 | // or it must be covered in this switch block | |||||
1660 | assert(false)(static_cast<void> (0)); | |||||
1661 | return nullptr; | |||||
1662 | } | |||||
1663 | } | |||||
1664 | ||||||
1665 | BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter( | |||||
1666 | const SliceTransform* prefix_extractor, FilePrefetchBuffer* prefetch_buffer, | |||||
1667 | bool no_io, GetContext* get_context) const { | |||||
1668 | const BlockHandle& filter_blk_handle = rep_->filter_handle; | |||||
1669 | const bool is_a_filter_partition = true; | |||||
1670 | return GetFilter(prefetch_buffer, filter_blk_handle, !is_a_filter_partition, | |||||
1671 | no_io, get_context, prefix_extractor); | |||||
1672 | } | |||||
1673 | ||||||
1674 | BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter( | |||||
1675 | FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle, | |||||
1676 | const bool is_a_filter_partition, bool no_io, GetContext* get_context, | |||||
1677 | const SliceTransform* prefix_extractor) const { | |||||
1678 | // If cache_index_and_filter_blocks is false, filter should be pre-populated. | |||||
1679 | // We will return rep_->filter anyway. rep_->filter can be nullptr if filter | |||||
1680 | // read fails at Open() time. We don't want to reload again since it will | |||||
1681 | // most probably fail again. | |||||
1682 | if (!is_a_filter_partition && | |||||
1683 | !rep_->table_options.cache_index_and_filter_blocks) { | |||||
1684 | return {rep_->filter.get(), nullptr /* cache handle */}; | |||||
1685 | } | |||||
1686 | ||||||
1687 | Cache* block_cache = rep_->table_options.block_cache.get(); | |||||
1688 | if (rep_->filter_policy == nullptr /* do not use filter */ || | |||||
1689 | block_cache == nullptr /* no block cache at all */) { | |||||
1690 | return {nullptr /* filter */, nullptr /* cache handle */}; | |||||
1691 | } | |||||
1692 | ||||||
1693 | if (!is_a_filter_partition && rep_->filter_entry.IsSet()) { | |||||
1694 | return rep_->filter_entry; | |||||
1695 | } | |||||
1696 | ||||||
1697 | PERF_TIMER_GUARD(read_filter_block_nanos)PerfStepTimer perf_step_timer_read_filter_block_nanos(&(perf_context .read_filter_block_nanos)); perf_step_timer_read_filter_block_nanos .Start();; | |||||
1698 | ||||||
1699 | // Fetching from the cache | |||||
1700 | char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
1701 | auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, | |||||
1702 | filter_blk_handle, cache_key); | |||||
1703 | ||||||
1704 | Statistics* statistics = rep_->ioptions.statistics; | |||||
1705 | auto cache_handle = GetEntryFromCache( | |||||
1706 | block_cache, key, rep_->level, BLOCK_CACHE_FILTER_MISS, | |||||
1707 | BLOCK_CACHE_FILTER_HIT, | |||||
1708 | get_context ? &get_context->get_context_stats_.num_cache_filter_miss | |||||
1709 | : nullptr, | |||||
1710 | get_context ? &get_context->get_context_stats_.num_cache_filter_hit | |||||
1711 | : nullptr, | |||||
1712 | statistics, get_context); | |||||
1713 | ||||||
1714 | FilterBlockReader* filter = nullptr; | |||||
1715 | if (cache_handle != nullptr) { | |||||
1716 | PERF_COUNTER_ADD(block_cache_filter_hit_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. block_cache_filter_hit_count += 1; }; | |||||
1717 | filter = | |||||
1718 | reinterpret_cast<FilterBlockReader*>(block_cache->Value(cache_handle)); | |||||
1719 | } else if (no_io) { | |||||
1720 | // Do not invoke any io. | |||||
1721 | return CachableEntry<FilterBlockReader>(); | |||||
1722 | } else { | |||||
1723 | filter = ReadFilter(prefetch_buffer, filter_blk_handle, | |||||
1724 | is_a_filter_partition, prefix_extractor); | |||||
1725 | if (filter != nullptr) { | |||||
1726 | size_t usage = filter->ApproximateMemoryUsage(); | |||||
1727 | Status s = block_cache->Insert( | |||||
1728 | key, filter, usage, &DeleteCachedFilterEntry, &cache_handle, | |||||
1729 | rep_->table_options.cache_index_and_filter_blocks_with_high_priority | |||||
1730 | ? Cache::Priority::HIGH | |||||
1731 | : Cache::Priority::LOW); | |||||
1732 | if (s.ok()) { | |||||
1733 | PERF_COUNTER_ADD(filter_block_read_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. filter_block_read_count += 1; }; | |||||
1734 | if (get_context != nullptr) { | |||||
1735 | get_context->get_context_stats_.num_cache_add++; | |||||
1736 | get_context->get_context_stats_.num_cache_bytes_write += usage; | |||||
1737 | get_context->get_context_stats_.num_cache_filter_add++; | |||||
1738 | get_context->get_context_stats_.num_cache_filter_bytes_insert += | |||||
1739 | usage; | |||||
1740 | } else { | |||||
1741 | RecordTick(statistics, BLOCK_CACHE_ADD); | |||||
1742 | RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage); | |||||
1743 | RecordTick(statistics, BLOCK_CACHE_FILTER_ADD); | |||||
1744 | RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage); | |||||
1745 | } | |||||
1746 | } else { | |||||
1747 | RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); | |||||
1748 | delete filter; | |||||
1749 | return CachableEntry<FilterBlockReader>(); | |||||
1750 | } | |||||
1751 | } | |||||
1752 | } | |||||
1753 | ||||||
1754 | return {filter, cache_handle}; | |||||
1755 | } | |||||
1756 | ||||||
1757 | BlockBasedTable::CachableEntry<UncompressionDict> | |||||
1758 | BlockBasedTable::GetUncompressionDict(Rep* rep, | |||||
1759 | FilePrefetchBuffer* prefetch_buffer, | |||||
1760 | bool no_io, GetContext* get_context) { | |||||
1761 | if (!rep->table_options.cache_index_and_filter_blocks) { | |||||
1762 | // block cache is either disabled or not used for meta-blocks. In either | |||||
1763 | // case, BlockBasedTableReader is the owner of the uncompression dictionary. | |||||
1764 | return {rep->uncompression_dict.get(), nullptr /* cache handle */}; | |||||
1765 | } | |||||
1766 | if (rep->compression_dict_handle.IsNull()) { | |||||
1767 | return {nullptr, nullptr}; | |||||
1768 | } | |||||
1769 | char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
1770 | auto cache_key = | |||||
1771 | GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size, | |||||
1772 | rep->compression_dict_handle, cache_key_buf); | |||||
1773 | auto cache_handle = GetEntryFromCache( | |||||
1774 | rep->table_options.block_cache.get(), cache_key, rep->level, | |||||
1775 | BLOCK_CACHE_COMPRESSION_DICT_MISS, BLOCK_CACHE_COMPRESSION_DICT_HIT, | |||||
1776 | get_context | |||||
1777 | ? &get_context->get_context_stats_.num_cache_compression_dict_miss | |||||
1778 | : nullptr, | |||||
1779 | get_context | |||||
1780 | ? &get_context->get_context_stats_.num_cache_compression_dict_hit | |||||
1781 | : nullptr, | |||||
1782 | rep->ioptions.statistics, get_context); | |||||
1783 | UncompressionDict* dict = nullptr; | |||||
1784 | if (cache_handle != nullptr) { | |||||
1785 | dict = reinterpret_cast<UncompressionDict*>( | |||||
1786 | rep->table_options.block_cache->Value(cache_handle)); | |||||
1787 | } else if (no_io) { | |||||
1788 | // Do not invoke any io. | |||||
1789 | } else { | |||||
1790 | std::unique_ptr<const BlockContents> compression_dict_block; | |||||
1791 | Status s = | |||||
1792 | ReadCompressionDictBlock(rep, prefetch_buffer, &compression_dict_block); | |||||
1793 | size_t usage = 0; | |||||
1794 | if (s.ok()) { | |||||
1795 | assert(compression_dict_block != nullptr)(static_cast<void> (0)); | |||||
1796 | // TODO(ajkr): find a way to avoid the `compression_dict_block` data copy | |||||
1797 | dict = new UncompressionDict(compression_dict_block->data.ToString(), | |||||
1798 | rep->blocks_definitely_zstd_compressed, | |||||
1799 | rep->ioptions.statistics); | |||||
1800 | usage = dict->ApproximateMemoryUsage(); | |||||
1801 | s = rep->table_options.block_cache->Insert( | |||||
1802 | cache_key, dict, usage, &DeleteCachedUncompressionDictEntry, | |||||
1803 | &cache_handle, | |||||
1804 | rep->table_options.cache_index_and_filter_blocks_with_high_priority | |||||
1805 | ? Cache::Priority::HIGH | |||||
1806 | : Cache::Priority::LOW); | |||||
1807 | } | |||||
1808 | if (s.ok()) { | |||||
1809 | PERF_COUNTER_ADD(compression_dict_block_read_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. compression_dict_block_read_count += 1; }; | |||||
1810 | if (get_context != nullptr) { | |||||
1811 | get_context->get_context_stats_.num_cache_add++; | |||||
1812 | get_context->get_context_stats_.num_cache_bytes_write += usage; | |||||
1813 | get_context->get_context_stats_.num_cache_compression_dict_add++; | |||||
1814 | get_context->get_context_stats_ | |||||
1815 | .num_cache_compression_dict_bytes_insert += usage; | |||||
1816 | } else { | |||||
1817 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ADD); | |||||
1818 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_WRITE, usage); | |||||
1819 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD); | |||||
1820 | RecordTick(rep->ioptions.statistics, | |||||
1821 | BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, usage); | |||||
1822 | } | |||||
1823 | } else { | |||||
1824 | // There should be no way to get here if block cache insertion succeeded. | |||||
1825 | // Though it is still possible something failed earlier. | |||||
1826 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES); | |||||
1827 | delete dict; | |||||
1828 | dict = nullptr; | |||||
1829 | assert(cache_handle == nullptr)(static_cast<void> (0)); | |||||
1830 | } | |||||
1831 | } | |||||
1832 | return {dict, cache_handle}; | |||||
1833 | } | |||||
1834 | ||||||
1835 | // disable_prefix_seek should be set to true when prefix_extractor found in SST | |||||
1836 | // differs from the one in mutable_cf_options and index type is HashBasedIndex | |||||
1837 | InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator( | |||||
1838 | const ReadOptions& read_options, bool disable_prefix_seek, | |||||
1839 | IndexBlockIter* input_iter, CachableEntry<IndexReader>* index_entry, | |||||
1840 | GetContext* get_context) { | |||||
1841 | // index reader has already been pre-populated. | |||||
1842 | if (rep_->index_reader) { | |||||
1843 | // We don't return pinned datat from index blocks, so no need | |||||
1844 | // to set `block_contents_pinned`. | |||||
1845 | return rep_->index_reader->NewIterator( | |||||
1846 | input_iter, read_options.total_order_seek || disable_prefix_seek, | |||||
1847 | read_options.fill_cache); | |||||
1848 | } | |||||
1849 | // we have a pinned index block | |||||
1850 | if (rep_->index_entry.IsSet()) { | |||||
1851 | // We don't return pinned datat from index blocks, so no need | |||||
1852 | // to set `block_contents_pinned`. | |||||
1853 | return rep_->index_entry.value->NewIterator( | |||||
1854 | input_iter, read_options.total_order_seek || disable_prefix_seek, | |||||
1855 | read_options.fill_cache); | |||||
1856 | } | |||||
1857 | ||||||
1858 | PERF_TIMER_GUARD(read_index_block_nanos)PerfStepTimer perf_step_timer_read_index_block_nanos(&(perf_context .read_index_block_nanos)); perf_step_timer_read_index_block_nanos .Start();; | |||||
1859 | ||||||
1860 | const bool no_io = read_options.read_tier == kBlockCacheTier; | |||||
1861 | Cache* block_cache = rep_->table_options.block_cache.get(); | |||||
1862 | char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
1863 | auto key = | |||||
1864 | GetCacheKeyFromOffset(rep_->cache_key_prefix, rep_->cache_key_prefix_size, | |||||
1865 | rep_->dummy_index_reader_offset, cache_key); | |||||
1866 | Statistics* statistics = rep_->ioptions.statistics; | |||||
1867 | auto cache_handle = GetEntryFromCache( | |||||
1868 | block_cache, key, rep_->level, BLOCK_CACHE_INDEX_MISS, | |||||
1869 | BLOCK_CACHE_INDEX_HIT, | |||||
1870 | get_context ? &get_context->get_context_stats_.num_cache_index_miss | |||||
1871 | : nullptr, | |||||
1872 | get_context ? &get_context->get_context_stats_.num_cache_index_hit | |||||
1873 | : nullptr, | |||||
1874 | statistics, get_context); | |||||
1875 | ||||||
1876 | if (cache_handle == nullptr && no_io) { | |||||
1877 | if (input_iter != nullptr) { | |||||
1878 | input_iter->Invalidate(Status::Incomplete("no blocking io")); | |||||
1879 | return input_iter; | |||||
1880 | } else { | |||||
1881 | return NewErrorInternalIterator<BlockHandle>( | |||||
1882 | Status::Incomplete("no blocking io")); | |||||
1883 | } | |||||
1884 | } | |||||
1885 | ||||||
1886 | IndexReader* index_reader = nullptr; | |||||
1887 | if (cache_handle != nullptr) { | |||||
1888 | PERF_COUNTER_ADD(block_cache_index_hit_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. block_cache_index_hit_count += 1; }; | |||||
1889 | index_reader = | |||||
1890 | reinterpret_cast<IndexReader*>(block_cache->Value(cache_handle)); | |||||
1891 | } else { | |||||
1892 | // Create index reader and put it in the cache. | |||||
1893 | Status s; | |||||
1894 | TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2"); | |||||
1895 | s = CreateIndexReader(nullptr /* prefetch_buffer */, &index_reader); | |||||
1896 | TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1"); | |||||
1897 | TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3"); | |||||
1898 | TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4"); | |||||
1899 | size_t charge = 0; | |||||
1900 | if (s.ok()) { | |||||
1901 | assert(index_reader != nullptr)(static_cast<void> (0)); | |||||
1902 | charge = index_reader->ApproximateMemoryUsage(); | |||||
1903 | s = block_cache->Insert( | |||||
1904 | key, index_reader, charge, &DeleteCachedIndexEntry, &cache_handle, | |||||
1905 | rep_->table_options.cache_index_and_filter_blocks_with_high_priority | |||||
1906 | ? Cache::Priority::HIGH | |||||
1907 | : Cache::Priority::LOW); | |||||
1908 | } | |||||
1909 | ||||||
1910 | if (s.ok()) { | |||||
1911 | if (get_context != nullptr) { | |||||
1912 | get_context->get_context_stats_.num_cache_add++; | |||||
1913 | get_context->get_context_stats_.num_cache_bytes_write += charge; | |||||
1914 | } else { | |||||
1915 | RecordTick(statistics, BLOCK_CACHE_ADD); | |||||
1916 | RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge); | |||||
1917 | } | |||||
1918 | PERF_COUNTER_ADD(index_block_read_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. index_block_read_count += 1; }; | |||||
1919 | RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); | |||||
1920 | RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge); | |||||
1921 | } else { | |||||
1922 | if (index_reader != nullptr) { | |||||
1923 | delete index_reader; | |||||
1924 | } | |||||
1925 | RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); | |||||
1926 | // make sure if something goes wrong, index_reader shall remain intact. | |||||
1927 | if (input_iter != nullptr) { | |||||
1928 | input_iter->Invalidate(s); | |||||
1929 | return input_iter; | |||||
1930 | } else { | |||||
1931 | return NewErrorInternalIterator<BlockHandle>(s); | |||||
1932 | } | |||||
1933 | } | |||||
1934 | } | |||||
1935 | ||||||
1936 | assert(cache_handle)(static_cast<void> (0)); | |||||
1937 | // We don't return pinned datat from index blocks, so no need | |||||
1938 | // to set `block_contents_pinned`. | |||||
1939 | auto* iter = index_reader->NewIterator( | |||||
1940 | input_iter, read_options.total_order_seek || disable_prefix_seek); | |||||
1941 | ||||||
1942 | // the caller would like to take ownership of the index block | |||||
1943 | // don't call RegisterCleanup() in this case, the caller will take care of it | |||||
1944 | if (index_entry != nullptr) { | |||||
1945 | *index_entry = {index_reader, cache_handle}; | |||||
1946 | } else { | |||||
1947 | iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); | |||||
1948 | } | |||||
1949 | ||||||
1950 | return iter; | |||||
1951 | } | |||||
1952 | ||||||
1953 | // Convert an index iterator value (i.e., an encoded BlockHandle) | |||||
1954 | // into an iterator over the contents of the corresponding block. | |||||
1955 | // If input_iter is null, new a iterator | |||||
1956 | // If input_iter is not null, update this iter and return it | |||||
1957 | template <typename TBlockIter> | |||||
1958 | TBlockIter* BlockBasedTable::NewDataBlockIterator( | |||||
1959 | Rep* rep, const ReadOptions& ro, const BlockHandle& handle, | |||||
1960 | TBlockIter* input_iter, bool is_index, bool key_includes_seq, | |||||
1961 | bool index_key_is_full, GetContext* get_context, Status s, | |||||
1962 | FilePrefetchBuffer* prefetch_buffer) { | |||||
1963 | PERF_TIMER_GUARD(new_table_block_iter_nanos)PerfStepTimer perf_step_timer_new_table_block_iter_nanos(& (perf_context.new_table_block_iter_nanos)); perf_step_timer_new_table_block_iter_nanos .Start();; | |||||
1964 | ||||||
1965 | Cache* block_cache = rep->table_options.block_cache.get(); | |||||
1966 | CachableEntry<Block> block; | |||||
1967 | TBlockIter* iter; | |||||
1968 | { | |||||
1969 | const bool no_io = (ro.read_tier == kBlockCacheTier); | |||||
1970 | auto uncompression_dict_storage = | |||||
1971 | GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); | |||||
1972 | const UncompressionDict& uncompression_dict = | |||||
1973 | uncompression_dict_storage.value == nullptr | |||||
1974 | ? UncompressionDict::GetEmptyDict() | |||||
1975 | : *uncompression_dict_storage.value; | |||||
1976 | if (s.ok()) { | |||||
1977 | s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, | |||||
1978 | uncompression_dict, &block, is_index, | |||||
1979 | get_context); | |||||
1980 | } | |||||
1981 | ||||||
1982 | if (input_iter != nullptr) { | |||||
1983 | iter = input_iter; | |||||
1984 | } else { | |||||
1985 | iter = new TBlockIter; | |||||
1986 | } | |||||
1987 | // Didn't get any data from block caches. | |||||
1988 | if (s.ok() && block.value == nullptr) { | |||||
1989 | if (no_io) { | |||||
1990 | // Could not read from block_cache and can't do IO | |||||
1991 | iter->Invalidate(Status::Incomplete("no blocking io")); | |||||
1992 | return iter; | |||||
1993 | } | |||||
1994 | std::unique_ptr<Block> block_value; | |||||
1995 | { | |||||
1996 | StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, | |||||
1997 | READ_BLOCK_GET_MICROS); | |||||
1998 | s = ReadBlockFromFile( | |||||
1999 | rep->file.get(), prefetch_buffer, rep->footer, ro, handle, | |||||
2000 | &block_value, rep->ioptions, | |||||
2001 | rep->blocks_maybe_compressed /*do_decompress*/, | |||||
2002 | rep->blocks_maybe_compressed, uncompression_dict, | |||||
2003 | rep->persistent_cache_options, | |||||
2004 | is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, | |||||
2005 | rep->table_options.read_amp_bytes_per_bit, | |||||
2006 | GetMemoryAllocator(rep->table_options)); | |||||
2007 | } | |||||
2008 | if (s.ok()) { | |||||
2009 | block.value = block_value.release(); | |||||
2010 | } | |||||
2011 | } | |||||
2012 | // TODO(ajkr): also pin compression dictionary block when | |||||
2013 | // `pin_l0_filter_and_index_blocks_in_cache == true`. | |||||
2014 | uncompression_dict_storage.Release(block_cache); | |||||
2015 | } | |||||
2016 | ||||||
2017 | if (s.ok()) { | |||||
2018 | assert(block.value != nullptr)(static_cast<void> (0)); | |||||
2019 | const bool kTotalOrderSeek = true; | |||||
2020 | // Block contents are pinned and it is still pinned after the iterator | |||||
2021 | // is destroyed as long as cleanup functions are moved to another object, | |||||
2022 | // when: | |||||
2023 | // 1. block cache handle is set to be released in cleanup function, or | |||||
2024 | // 2. it's pointing to immortal source. If own_bytes is true then we are | |||||
2025 | // not reading data from the original source, whether immortal or not. | |||||
2026 | // Otherwise, the block is pinned iff the source is immortal. | |||||
2027 | bool block_contents_pinned = | |||||
2028 | (block.cache_handle != nullptr || | |||||
2029 | (!block.value->own_bytes() && rep->immortal_table)); | |||||
2030 | iter = block.value->NewIterator<TBlockIter>( | |||||
2031 | &rep->internal_comparator, rep->internal_comparator.user_comparator(), | |||||
2032 | iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq, | |||||
2033 | index_key_is_full, block_contents_pinned); | |||||
2034 | if (block.cache_handle != nullptr) { | |||||
2035 | iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, | |||||
2036 | block.cache_handle); | |||||
2037 | } else { | |||||
2038 | if (!ro.fill_cache && rep->cache_key_prefix_size != 0) { | |||||
2039 | // insert a dummy record to block cache to track the memory usage | |||||
2040 | Cache::Handle* cache_handle; | |||||
2041 | // There are two other types of cache keys: 1) SST cache key added in | |||||
2042 | // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in | |||||
2043 | // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate | |||||
2044 | // from SST cache key(31 bytes), and use non-zero prefix to | |||||
2045 | // differentiate from `write_buffer_manager` | |||||
2046 | const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; | |||||
2047 | char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; | |||||
2048 | // Prefix: use rep->cache_key_prefix padded by 0s | |||||
2049 | memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); | |||||
2050 | assert(rep->cache_key_prefix_size != 0)(static_cast<void> (0)); | |||||
2051 | assert(rep->cache_key_prefix_size <= kExtraCacheKeyPrefix)(static_cast<void> (0)); | |||||
2052 | memcpy(cache_key, rep->cache_key_prefix, rep->cache_key_prefix_size); | |||||
2053 | char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, | |||||
2054 | next_cache_key_id_++); | |||||
2055 | assert(end - cache_key <=(static_cast<void> (0)) | |||||
2056 | static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length))(static_cast<void> (0)); | |||||
2057 | Slice unique_key = | |||||
2058 | Slice(cache_key, static_cast<size_t>(end - cache_key)); | |||||
2059 | s = block_cache->Insert(unique_key, nullptr, | |||||
2060 | block.value->ApproximateMemoryUsage(), nullptr, | |||||
2061 | &cache_handle); | |||||
2062 | if (s.ok()) { | |||||
2063 | if (cache_handle != nullptr) { | |||||
2064 | iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, | |||||
2065 | cache_handle); | |||||
2066 | } | |||||
2067 | } | |||||
2068 | } | |||||
2069 | iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr); | |||||
2070 | } | |||||
2071 | } else { | |||||
2072 | assert(block.value == nullptr)(static_cast<void> (0)); | |||||
| ||||||
2073 | iter->Invalidate(s); | |||||
2074 | } | |||||
2075 | return iter; | |||||
2076 | } | |||||
2077 | ||||||
2078 | Status BlockBasedTable::MaybeReadBlockAndLoadToCache( | |||||
2079 | FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, | |||||
2080 | const BlockHandle& handle, const UncompressionDict& uncompression_dict, | |||||
2081 | CachableEntry<Block>* block_entry, bool is_index, GetContext* get_context) { | |||||
2082 | assert(block_entry != nullptr)(static_cast<void> (0)); | |||||
2083 | const bool no_io = (ro.read_tier == kBlockCacheTier); | |||||
2084 | Cache* block_cache = rep->table_options.block_cache.get(); | |||||
2085 | ||||||
2086 | // No point to cache compressed blocks if it never goes away | |||||
2087 | Cache* block_cache_compressed = | |||||
2088 | rep->immortal_table ? nullptr | |||||
2089 | : rep->table_options.block_cache_compressed.get(); | |||||
2090 | ||||||
2091 | // First, try to get the block from the cache | |||||
2092 | // | |||||
2093 | // If either block cache is enabled, we'll try to read from it. | |||||
2094 | Status s; | |||||
2095 | char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
2096 | char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
2097 | Slice key /* key to the block cache */; | |||||
2098 | Slice ckey /* key to the compressed block cache */; | |||||
2099 | if (block_cache != nullptr || block_cache_compressed != nullptr) { | |||||
2100 | // create key for block cache | |||||
2101 | if (block_cache != nullptr) { | |||||
2102 | key = GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size, | |||||
2103 | handle, cache_key); | |||||
2104 | } | |||||
2105 | ||||||
2106 | if (block_cache_compressed != nullptr) { | |||||
2107 | ckey = GetCacheKey(rep->compressed_cache_key_prefix, | |||||
2108 | rep->compressed_cache_key_prefix_size, handle, | |||||
2109 | compressed_cache_key); | |||||
2110 | } | |||||
2111 | ||||||
2112 | s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, | |||||
2113 | rep, ro, block_entry, uncompression_dict, | |||||
2114 | rep->table_options.read_amp_bytes_per_bit, | |||||
2115 | is_index, get_context); | |||||
2116 | ||||||
2117 | // Can't find the block from the cache. If I/O is allowed, read from the | |||||
2118 | // file. | |||||
2119 | if (block_entry->value == nullptr && !no_io && ro.fill_cache) { | |||||
2120 | Statistics* statistics = rep->ioptions.statistics; | |||||
2121 | bool do_decompress = | |||||
2122 | block_cache_compressed == nullptr && rep->blocks_maybe_compressed; | |||||
2123 | CompressionType raw_block_comp_type; | |||||
2124 | BlockContents raw_block_contents; | |||||
2125 | { | |||||
2126 | StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); | |||||
2127 | BlockFetcher block_fetcher( | |||||
2128 | rep->file.get(), prefetch_buffer, rep->footer, ro, handle, | |||||
2129 | &raw_block_contents, rep->ioptions, | |||||
2130 | do_decompress /* do uncompress */, rep->blocks_maybe_compressed, | |||||
2131 | uncompression_dict, rep->persistent_cache_options, | |||||
2132 | GetMemoryAllocator(rep->table_options), | |||||
2133 | GetMemoryAllocatorForCompressedBlock(rep->table_options)); | |||||
2134 | s = block_fetcher.ReadBlockContents(); | |||||
2135 | raw_block_comp_type = block_fetcher.get_compression_type(); | |||||
2136 | } | |||||
2137 | ||||||
2138 | if (s.ok()) { | |||||
2139 | SequenceNumber seq_no = rep->get_global_seqno(is_index); | |||||
2140 | // If filling cache is allowed and a cache is configured, try to put the | |||||
2141 | // block to the cache. | |||||
2142 | s = PutDataBlockToCache( | |||||
2143 | key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, | |||||
2144 | block_entry, &raw_block_contents, raw_block_comp_type, | |||||
2145 | rep->table_options.format_version, uncompression_dict, seq_no, | |||||
2146 | rep->table_options.read_amp_bytes_per_bit, | |||||
2147 | GetMemoryAllocator(rep->table_options), is_index, | |||||
2148 | is_index && rep->table_options | |||||
2149 | .cache_index_and_filter_blocks_with_high_priority | |||||
2150 | ? Cache::Priority::HIGH | |||||
2151 | : Cache::Priority::LOW, | |||||
2152 | get_context); | |||||
2153 | } | |||||
2154 | } | |||||
2155 | } | |||||
2156 | assert(s.ok() || block_entry->value == nullptr)(static_cast<void> (0)); | |||||
2157 | return s; | |||||
2158 | } | |||||
2159 | ||||||
2160 | BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( | |||||
2161 | BlockBasedTable* table, | |||||
2162 | std::unordered_map<uint64_t, CachableEntry<Block>>* block_map, | |||||
2163 | bool index_key_includes_seq, bool index_key_is_full) | |||||
2164 | : table_(table), | |||||
2165 | block_map_(block_map), | |||||
2166 | index_key_includes_seq_(index_key_includes_seq), | |||||
2167 | index_key_is_full_(index_key_is_full) {} | |||||
2168 | ||||||
2169 | template <class TBlockIter, typename TValue> | |||||
2170 | const size_t BlockBasedTableIterator<TBlockIter, TValue>::kMaxReadaheadSize = | |||||
2171 | 256 * 1024; | |||||
2172 | ||||||
2173 | InternalIteratorBase<BlockHandle>* | |||||
2174 | BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator( | |||||
2175 | const BlockHandle& handle) { | |||||
2176 | // Return a block iterator on the index partition | |||||
2177 | auto rep = table_->get_rep(); | |||||
2178 | auto block = block_map_->find(handle.offset()); | |||||
2179 | // This is a possible scenario since block cache might not have had space | |||||
2180 | // for the partition | |||||
2181 | if (block != block_map_->end()) { | |||||
2182 | PERF_COUNTER_ADD(block_cache_hit_count, 1)if (perf_level >= PerfLevel::kEnableCount) { perf_context. block_cache_hit_count += 1; }; | |||||
2183 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT); | |||||
2184 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT); | |||||
2185 | Cache* block_cache = rep->table_options.block_cache.get(); | |||||
2186 | assert(block_cache)(static_cast<void> (0)); | |||||
2187 | RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ, | |||||
2188 | block_cache->GetUsage(block->second.cache_handle)); | |||||
2189 | Statistics* kNullStats = nullptr; | |||||
2190 | // We don't return pinned datat from index blocks, so no need | |||||
2191 | // to set `block_contents_pinned`. | |||||
2192 | return block->second.value->NewIterator<IndexBlockIter>( | |||||
2193 | &rep->internal_comparator, rep->internal_comparator.user_comparator(), | |||||
2194 | nullptr, kNullStats, true, index_key_includes_seq_, index_key_is_full_); | |||||
2195 | } | |||||
2196 | // Create an empty iterator | |||||
2197 | return new IndexBlockIter(); | |||||
2198 | } | |||||
2199 | ||||||
2200 | // This will be broken if the user specifies an unusual implementation | |||||
2201 | // of Options.comparator, or if the user specifies an unusual | |||||
2202 | // definition of prefixes in BlockBasedTableOptions.filter_policy. | |||||
2203 | // In particular, we require the following three properties: | |||||
2204 | // | |||||
2205 | // 1) key.starts_with(prefix(key)) | |||||
2206 | // 2) Compare(prefix(key), key) <= 0. | |||||
2207 | // 3) If Compare(key1, key2) <= 0, then Compare(prefix(key1), prefix(key2)) <= 0 | |||||
2208 | // | |||||
2209 | // Otherwise, this method guarantees no I/O will be incurred. | |||||
2210 | // | |||||
2211 | // REQUIRES: this method shouldn't be called while the DB lock is held. | |||||
2212 | bool BlockBasedTable::PrefixMayMatch( | |||||
2213 | const Slice& internal_key, const ReadOptions& read_options, | |||||
2214 | const SliceTransform* options_prefix_extractor, | |||||
2215 | const bool need_upper_bound_check) { | |||||
2216 | if (!rep_->filter_policy) { | |||||
2217 | return true; | |||||
2218 | } | |||||
2219 | ||||||
2220 | const SliceTransform* prefix_extractor; | |||||
2221 | ||||||
2222 | if (rep_->table_prefix_extractor == nullptr) { | |||||
2223 | if (need_upper_bound_check) { | |||||
2224 | return true; | |||||
2225 | } | |||||
2226 | prefix_extractor = options_prefix_extractor; | |||||
2227 | } else { | |||||
2228 | prefix_extractor = rep_->table_prefix_extractor.get(); | |||||
2229 | } | |||||
2230 | auto user_key = ExtractUserKey(internal_key); | |||||
2231 | if (!prefix_extractor->InDomain(user_key)) { | |||||
2232 | return true; | |||||
2233 | } | |||||
2234 | ||||||
2235 | bool may_match = true; | |||||
2236 | Status s; | |||||
2237 | ||||||
2238 | // First, try check with full filter | |||||
2239 | auto filter_entry = GetFilter(prefix_extractor); | |||||
2240 | FilterBlockReader* filter = filter_entry.value; | |||||
2241 | bool filter_checked = true; | |||||
2242 | if (filter != nullptr) { | |||||
2243 | if (!filter->IsBlockBased()) { | |||||
2244 | const Slice* const const_ikey_ptr = &internal_key; | |||||
2245 | may_match = filter->RangeMayExist( | |||||
2246 | read_options.iterate_upper_bound, user_key, prefix_extractor, | |||||
2247 | rep_->internal_comparator.user_comparator(), const_ikey_ptr, | |||||
2248 | &filter_checked, need_upper_bound_check); | |||||
2249 | } else { | |||||
2250 | // if prefix_extractor changed for block based filter, skip filter | |||||
2251 | if (need_upper_bound_check) { | |||||
2252 | if (!rep_->filter_entry.IsSet()) { | |||||
2253 | filter_entry.Release(rep_->table_options.block_cache.get()); | |||||
2254 | } | |||||
2255 | return true; | |||||
2256 | } | |||||
2257 | auto prefix = prefix_extractor->Transform(user_key); | |||||
2258 | InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue); | |||||
2259 | auto internal_prefix = internal_key_prefix.Encode(); | |||||
2260 | ||||||
2261 | // To prevent any io operation in this method, we set `read_tier` to make | |||||
2262 | // sure we always read index or filter only when they have already been | |||||
2263 | // loaded to memory. | |||||
2264 | ReadOptions no_io_read_options; | |||||
2265 | no_io_read_options.read_tier = kBlockCacheTier; | |||||
2266 | ||||||
2267 | // Then, try find it within each block | |||||
2268 | // we already know prefix_extractor and prefix_extractor_name must match | |||||
2269 | // because `CheckPrefixMayMatch` first checks `check_filter_ == true` | |||||
2270 | std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter( | |||||
2271 | NewIndexIterator(no_io_read_options, | |||||
2272 | /* need_upper_bound_check */ false)); | |||||
2273 | iiter->Seek(internal_prefix); | |||||
2274 | ||||||
2275 | if (!iiter->Valid()) { | |||||
2276 | // we're past end of file | |||||
2277 | // if it's incomplete, it means that we avoided I/O | |||||
2278 | // and we're not really sure that we're past the end | |||||
2279 | // of the file | |||||
2280 | may_match = iiter->status().IsIncomplete(); | |||||
2281 | } else if ((rep_->table_properties && | |||||
2282 | rep_->table_properties->index_key_is_user_key | |||||
2283 | ? iiter->key() | |||||
2284 | : ExtractUserKey(iiter->key())) | |||||
2285 | .starts_with(ExtractUserKey(internal_prefix))) { | |||||
2286 | // we need to check for this subtle case because our only | |||||
2287 | // guarantee is that "the key is a string >= last key in that data | |||||
2288 | // block" according to the doc/table_format.txt spec. | |||||
2289 | // | |||||
2290 | // Suppose iiter->key() starts with the desired prefix; it is not | |||||
2291 | // necessarily the case that the corresponding data block will | |||||
2292 | // contain the prefix, since iiter->key() need not be in the | |||||
2293 | // block. However, the next data block may contain the prefix, so | |||||
2294 | // we return true to play it safe. | |||||
2295 | may_match = true; | |||||
2296 | } else if (filter->IsBlockBased()) { | |||||
2297 | // iiter->key() does NOT start with the desired prefix. Because | |||||
2298 | // Seek() finds the first key that is >= the seek target, this | |||||
2299 | // means that iiter->key() > prefix. Thus, any data blocks coming | |||||
2300 | // after the data block corresponding to iiter->key() cannot | |||||
2301 | // possibly contain the key. Thus, the corresponding data block | |||||
2302 | // is the only on could potentially contain the prefix. | |||||
2303 | BlockHandle handle = iiter->value(); | |||||
2304 | may_match = | |||||
2305 | filter->PrefixMayMatch(prefix, prefix_extractor, handle.offset()); | |||||
2306 | } | |||||
2307 | } | |||||
2308 | } | |||||
2309 | ||||||
2310 | if (filter_checked) { | |||||
2311 | Statistics* statistics = rep_->ioptions.statistics; | |||||
2312 | RecordTick(statistics, BLOOM_FILTER_PREFIX_CHECKED); | |||||
2313 | if (!may_match) { | |||||
2314 | RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL); | |||||
2315 | } | |||||
2316 | } | |||||
2317 | ||||||
2318 | // if rep_->filter_entry is not set, we should call Release(); otherwise | |||||
2319 | // don't call, in this case we have a local copy in rep_->filter_entry, | |||||
2320 | // it's pinned to the cache and will be released in the destructor | |||||
2321 | if (!rep_->filter_entry.IsSet()) { | |||||
2322 | filter_entry.Release(rep_->table_options.block_cache.get()); | |||||
2323 | } | |||||
2324 | return may_match; | |||||
2325 | } | |||||
2326 | ||||||
2327 | template <class TBlockIter, typename TValue> | |||||
2328 | void BlockBasedTableIterator<TBlockIter, TValue>::Seek(const Slice& target) { | |||||
2329 | is_out_of_bound_ = false; | |||||
2330 | if (!CheckPrefixMayMatch(target)) { | |||||
2331 | ResetDataIter(); | |||||
2332 | return; | |||||
2333 | } | |||||
2334 | ||||||
2335 | SavePrevIndexValue(); | |||||
2336 | ||||||
2337 | index_iter_->Seek(target); | |||||
2338 | ||||||
2339 | if (!index_iter_->Valid()) { | |||||
2340 | ResetDataIter(); | |||||
2341 | return; | |||||
2342 | } | |||||
2343 | ||||||
2344 | InitDataBlock(); | |||||
2345 | ||||||
2346 | block_iter_.Seek(target); | |||||
2347 | ||||||
2348 | FindKeyForward(); | |||||
2349 | assert((static_cast<void> (0)) | |||||
2350 | !block_iter_.Valid() ||(static_cast<void> (0)) | |||||
2351 | (key_includes_seq_ && icomp_.Compare(target, block_iter_.key()) <= 0) ||(static_cast<void> (0)) | |||||
2352 | (!key_includes_seq_ && user_comparator_.Compare(ExtractUserKey(target),(static_cast<void> (0)) | |||||
2353 | block_iter_.key()) <= 0))(static_cast<void> (0)); | |||||
2354 | } | |||||
2355 | ||||||
2356 | template <class TBlockIter, typename TValue> | |||||
2357 | void BlockBasedTableIterator<TBlockIter, TValue>::SeekForPrev( | |||||
2358 | const Slice& target) { | |||||
2359 | is_out_of_bound_ = false; | |||||
2360 | if (!CheckPrefixMayMatch(target)) { | |||||
2361 | ResetDataIter(); | |||||
2362 | return; | |||||
2363 | } | |||||
2364 | ||||||
2365 | SavePrevIndexValue(); | |||||
2366 | ||||||
2367 | // Call Seek() rather than SeekForPrev() in the index block, because the | |||||
2368 | // target data block will likely to contain the position for `target`, the | |||||
2369 | // same as Seek(), rather than than before. | |||||
2370 | // For example, if we have three data blocks, each containing two keys: | |||||
2371 | // [2, 4] [6, 8] [10, 12] | |||||
2372 | // (the keys in the index block would be [4, 8, 12]) | |||||
2373 | // and the user calls SeekForPrev(7), we need to go to the second block, | |||||
2374 | // just like if they call Seek(7). | |||||
2375 | // The only case where the block is difference is when they seek to a position | |||||
2376 | // in the boundary. For example, if they SeekForPrev(5), we should go to the | |||||
2377 | // first block, rather than the second. However, we don't have the information | |||||
2378 | // to distinguish the two unless we read the second block. In this case, we'll | |||||
2379 | // end up with reading two blocks. | |||||
2380 | index_iter_->Seek(target); | |||||
2381 | ||||||
2382 | if (!index_iter_->Valid()) { | |||||
2383 | index_iter_->SeekToLast(); | |||||
2384 | if (!index_iter_->Valid()) { | |||||
2385 | ResetDataIter(); | |||||
2386 | block_iter_points_to_real_block_ = false; | |||||
2387 | return; | |||||
2388 | } | |||||
2389 | } | |||||
2390 | ||||||
2391 | InitDataBlock(); | |||||
2392 | ||||||
2393 | block_iter_.SeekForPrev(target); | |||||
2394 | ||||||
2395 | FindKeyBackward(); | |||||
2396 | assert(!block_iter_.Valid() ||(static_cast<void> (0)) | |||||
2397 | icomp_.Compare(target, block_iter_.key()) >= 0)(static_cast<void> (0)); | |||||
2398 | } | |||||
2399 | ||||||
2400 | template <class TBlockIter, typename TValue> | |||||
2401 | void BlockBasedTableIterator<TBlockIter, TValue>::SeekToFirst() { | |||||
2402 | is_out_of_bound_ = false; | |||||
2403 | SavePrevIndexValue(); | |||||
2404 | index_iter_->SeekToFirst(); | |||||
2405 | if (!index_iter_->Valid()) { | |||||
2406 | ResetDataIter(); | |||||
2407 | return; | |||||
2408 | } | |||||
2409 | InitDataBlock(); | |||||
2410 | block_iter_.SeekToFirst(); | |||||
2411 | FindKeyForward(); | |||||
2412 | } | |||||
2413 | ||||||
2414 | template <class TBlockIter, typename TValue> | |||||
2415 | void BlockBasedTableIterator<TBlockIter, TValue>::SeekToLast() { | |||||
2416 | is_out_of_bound_ = false; | |||||
2417 | SavePrevIndexValue(); | |||||
2418 | index_iter_->SeekToLast(); | |||||
2419 | if (!index_iter_->Valid()) { | |||||
| ||||||
2420 | ResetDataIter(); | |||||
2421 | return; | |||||
2422 | } | |||||
2423 | InitDataBlock(); | |||||
2424 | block_iter_.SeekToLast(); | |||||
2425 | FindKeyBackward(); | |||||
2426 | } | |||||
2427 | ||||||
2428 | template <class TBlockIter, typename TValue> | |||||
2429 | void BlockBasedTableIterator<TBlockIter, TValue>::Next() { | |||||
2430 | assert(block_iter_points_to_real_block_)(static_cast<void> (0)); | |||||
2431 | block_iter_.Next(); | |||||
2432 | FindKeyForward(); | |||||
2433 | } | |||||
2434 | ||||||
2435 | template <class TBlockIter, typename TValue> | |||||
2436 | void BlockBasedTableIterator<TBlockIter, TValue>::Prev() { | |||||
2437 | assert(block_iter_points_to_real_block_)(static_cast<void> (0)); | |||||
2438 | block_iter_.Prev(); | |||||
2439 | FindKeyBackward(); | |||||
2440 | } | |||||
2441 | ||||||
2442 | template <class TBlockIter, typename TValue> | |||||
2443 | void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() { | |||||
2444 | BlockHandle data_block_handle = index_iter_->value(); | |||||
2445 | if (!block_iter_points_to_real_block_ || | |||||
2446 | data_block_handle.offset() != prev_index_value_.offset() || | |||||
2447 | // if previous attempt of reading the block missed cache, try again | |||||
2448 | block_iter_.status().IsIncomplete()) { | |||||
2449 | if (block_iter_points_to_real_block_) { | |||||
2450 | ResetDataIter(); | |||||
2451 | } | |||||
2452 | auto* rep = table_->get_rep(); | |||||
2453 | ||||||
2454 | // Automatically prefetch additional data when a range scan (iterator) does | |||||
2455 | // more than 2 sequential IOs. This is enabled only for user reads and when | |||||
2456 | // ReadOptions.readahead_size is 0. | |||||
2457 | if (!for_compaction_ && read_options_.readahead_size == 0) { | |||||
2458 | num_file_reads_++; | |||||
2459 | if (num_file_reads_ > 2) { | |||||
2460 | if (!rep->file->use_direct_io() && | |||||
2461 | (data_block_handle.offset() + | |||||
2462 | static_cast<size_t>(data_block_handle.size()) + | |||||
2463 | kBlockTrailerSize > | |||||
2464 | readahead_limit_)) { | |||||
2465 | // Buffered I/O | |||||
2466 | // Discarding the return status of Prefetch calls intentionally, as we | |||||
2467 | // can fallback to reading from disk if Prefetch fails. | |||||
2468 | rep->file->Prefetch(data_block_handle.offset(), readahead_size_); | |||||
2469 | readahead_limit_ = | |||||
2470 | static_cast<size_t>(data_block_handle.offset() + readahead_size_); | |||||
2471 | // Keep exponentially increasing readahead size until | |||||
2472 | // kMaxReadaheadSize. | |||||
2473 | readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2); | |||||
2474 | } else if (rep->file->use_direct_io() && !prefetch_buffer_) { | |||||
2475 | // Direct I/O | |||||
2476 | // Let FilePrefetchBuffer take care of the readahead. | |||||
2477 | prefetch_buffer_.reset(new FilePrefetchBuffer( | |||||
2478 | rep->file.get(), kInitReadaheadSize, kMaxReadaheadSize)); | |||||
2479 | } | |||||
2480 | } | |||||
2481 | } | |||||
2482 | ||||||
2483 | Status s; | |||||
2484 | BlockBasedTable::NewDataBlockIterator<TBlockIter>( | |||||
2485 | rep, read_options_, data_block_handle, &block_iter_, is_index_, | |||||
2486 | key_includes_seq_, index_key_is_full_, | |||||
2487 | /* get_context */ nullptr, s, prefetch_buffer_.get()); | |||||
2488 | block_iter_points_to_real_block_ = true; | |||||
2489 | } | |||||
2490 | } | |||||
2491 | ||||||
2492 | template <class TBlockIter, typename TValue> | |||||
2493 | void BlockBasedTableIterator<TBlockIter, TValue>::FindKeyForward() { | |||||
2494 | assert(!is_out_of_bound_)(static_cast<void> (0)); | |||||
2495 | // TODO the while loop inherits from two-level-iterator. We don't know | |||||
2496 | // whether a block can be empty so it can be replaced by an "if". | |||||
2497 | while (!block_iter_.Valid()) { | |||||
2498 | if (!block_iter_.status().ok()) { | |||||
2499 | return; | |||||
2500 | } | |||||
2501 | ResetDataIter(); | |||||
2502 | // We used to check the current index key for upperbound. | |||||
2503 | // It will only save a data reading for a small percentage of use cases, | |||||
2504 | // so for code simplicity, we removed it. We can add it back if there is a | |||||
2505 | // significnat performance regression. | |||||
2506 | index_iter_->Next(); | |||||
2507 | ||||||
2508 | if (index_iter_->Valid()) { | |||||
2509 | InitDataBlock(); | |||||
2510 | block_iter_.SeekToFirst(); | |||||
2511 | } else { | |||||
2512 | return; | |||||
2513 | } | |||||
2514 | } | |||||
2515 | ||||||
2516 | // Check upper bound on the current key | |||||
2517 | bool reached_upper_bound = | |||||
2518 | (read_options_.iterate_upper_bound != nullptr && | |||||
2519 | block_iter_points_to_real_block_ && block_iter_.Valid() && | |||||
2520 | user_comparator_.Compare(ExtractUserKey(block_iter_.key()), | |||||
2521 | *read_options_.iterate_upper_bound) >= 0); | |||||
2522 | TEST_SYNC_POINT_CALLBACK( | |||||
2523 | "BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound", | |||||
2524 | &reached_upper_bound); | |||||
2525 | if (reached_upper_bound) { | |||||
2526 | is_out_of_bound_ = true; | |||||
2527 | return; | |||||
2528 | } | |||||
2529 | } | |||||
2530 | ||||||
2531 | template <class TBlockIter, typename TValue> | |||||
2532 | void BlockBasedTableIterator<TBlockIter, TValue>::FindKeyBackward() { | |||||
2533 | assert(!is_out_of_bound_)(static_cast<void> (0)); | |||||
2534 | while (!block_iter_.Valid()) { | |||||
2535 | if (!block_iter_.status().ok()) { | |||||
2536 | return; | |||||
2537 | } | |||||
2538 | ||||||
2539 | ResetDataIter(); | |||||
2540 | index_iter_->Prev(); | |||||
2541 | ||||||
2542 | if (index_iter_->Valid()) { | |||||
2543 | InitDataBlock(); | |||||
2544 | block_iter_.SeekToLast(); | |||||
2545 | } else { | |||||
2546 | return; | |||||
2547 | } | |||||
2548 | } | |||||
2549 | ||||||
2550 | // We could have check lower bound here too, but we opt not to do it for | |||||
2551 | // code simplicity. | |||||
2552 | } | |||||
2553 | ||||||
2554 | InternalIterator* BlockBasedTable::NewIterator( | |||||
2555 | const ReadOptions& read_options, const SliceTransform* prefix_extractor, | |||||
2556 | Arena* arena, bool skip_filters, bool for_compaction) { | |||||
2557 | bool need_upper_bound_check = | |||||
2558 | PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor); | |||||
2559 | const bool kIsNotIndex = false; | |||||
2560 | if (arena == nullptr) { | |||||
2561 | return new BlockBasedTableIterator<DataBlockIter>( | |||||
2562 | this, read_options, rep_->internal_comparator, | |||||
2563 | NewIndexIterator( | |||||
2564 | read_options, | |||||
2565 | need_upper_bound_check && | |||||
2566 | rep_->index_type == BlockBasedTableOptions::kHashSearch), | |||||
2567 | !skip_filters && !read_options.total_order_seek && | |||||
2568 | prefix_extractor != nullptr, | |||||
2569 | need_upper_bound_check, prefix_extractor, kIsNotIndex, | |||||
2570 | true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction); | |||||
2571 | } else { | |||||
2572 | auto* mem = | |||||
2573 | arena->AllocateAligned(sizeof(BlockBasedTableIterator<DataBlockIter>)); | |||||
2574 | return new (mem) BlockBasedTableIterator<DataBlockIter>( | |||||
2575 | this, read_options, rep_->internal_comparator, | |||||
2576 | NewIndexIterator(read_options, need_upper_bound_check), | |||||
2577 | !skip_filters && !read_options.total_order_seek && | |||||
2578 | prefix_extractor != nullptr, | |||||
2579 | need_upper_bound_check, prefix_extractor, kIsNotIndex, | |||||
2580 | true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction); | |||||
2581 | } | |||||
2582 | } | |||||
2583 | ||||||
2584 | FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator( | |||||
2585 | const ReadOptions& read_options) { | |||||
2586 | if (rep_->fragmented_range_dels == nullptr) { | |||||
2587 | return nullptr; | |||||
2588 | } | |||||
2589 | SequenceNumber snapshot = kMaxSequenceNumber; | |||||
2590 | if (read_options.snapshot != nullptr) { | |||||
2591 | snapshot = read_options.snapshot->GetSequenceNumber(); | |||||
2592 | } | |||||
2593 | return new FragmentedRangeTombstoneIterator( | |||||
2594 | rep_->fragmented_range_dels, rep_->internal_comparator, snapshot); | |||||
2595 | } | |||||
2596 | ||||||
2597 | bool BlockBasedTable::FullFilterKeyMayMatch( | |||||
2598 | const ReadOptions& read_options, FilterBlockReader* filter, | |||||
2599 | const Slice& internal_key, const bool no_io, | |||||
2600 | const SliceTransform* prefix_extractor) const { | |||||
2601 | if (filter == nullptr || filter->IsBlockBased()) { | |||||
2602 | return true; | |||||
2603 | } | |||||
2604 | Slice user_key = ExtractUserKey(internal_key); | |||||
2605 | const Slice* const const_ikey_ptr = &internal_key; | |||||
2606 | bool may_match = true; | |||||
2607 | if (filter->whole_key_filtering()) { | |||||
2608 | may_match = filter->KeyMayMatch(user_key, prefix_extractor, kNotValid, | |||||
2609 | no_io, const_ikey_ptr); | |||||
2610 | } else if (!read_options.total_order_seek && prefix_extractor && | |||||
2611 | rep_->table_properties->prefix_extractor_name.compare( | |||||
2612 | prefix_extractor->Name()) == 0 && | |||||
2613 | prefix_extractor->InDomain(user_key) && | |||||
2614 | !filter->PrefixMayMatch(prefix_extractor->Transform(user_key), | |||||
2615 | prefix_extractor, kNotValid, false, | |||||
2616 | const_ikey_ptr)) { | |||||
2617 | may_match = false; | |||||
2618 | } | |||||
2619 | if (may_match) { | |||||
2620 | RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_POSITIVE); | |||||
2621 | PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, 1, rep_->level)if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(rep_-> level) != (*(perf_context.level_to_perf_context)).end()) { (* (perf_context.level_to_perf_context))[rep_->level].bloom_filter_full_positive += 1; } else { PerfContextByLevel empty_context; (*(perf_context .level_to_perf_context))[rep_->level] = empty_context; (*( perf_context.level_to_perf_context))[rep_->level].bloom_filter_full_positive += 1; } }; | |||||
2622 | } | |||||
2623 | return may_match; | |||||
2624 | } | |||||
2625 | ||||||
2626 | Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, | |||||
2627 | GetContext* get_context, | |||||
2628 | const SliceTransform* prefix_extractor, | |||||
2629 | bool skip_filters) { | |||||
2630 | assert(key.size() >= 8)(static_cast<void> (0)); // key must be internal key | |||||
2631 | Status s; | |||||
2632 | const bool no_io = read_options.read_tier == kBlockCacheTier; | |||||
2633 | CachableEntry<FilterBlockReader> filter_entry; | |||||
2634 | if (!skip_filters) { | |||||
2635 | filter_entry = | |||||
2636 | GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, | |||||
2637 | read_options.read_tier == kBlockCacheTier, get_context); | |||||
2638 | } | |||||
2639 | FilterBlockReader* filter = filter_entry.value; | |||||
2640 | ||||||
2641 | // First check the full filter | |||||
2642 | // If full filter not useful, Then go into each block | |||||
2643 | if (!FullFilterKeyMayMatch(read_options, filter, key, no_io, | |||||
2644 | prefix_extractor)) { | |||||
2645 | RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); | |||||
2646 | PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level)if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(rep_-> level) != (*(perf_context.level_to_perf_context)).end()) { (* (perf_context.level_to_perf_context))[rep_->level].bloom_filter_useful += 1; } else { PerfContextByLevel empty_context; (*(perf_context .level_to_perf_context))[rep_->level] = empty_context; (*( perf_context.level_to_perf_context))[rep_->level].bloom_filter_useful += 1; } }; | |||||
2647 | } else { | |||||
2648 | IndexBlockIter iiter_on_stack; | |||||
2649 | // if prefix_extractor found in block differs from options, disable | |||||
2650 | // BlockPrefixIndex. Only do this check when index_type is kHashSearch. | |||||
2651 | bool need_upper_bound_check = false; | |||||
2652 | if (rep_->index_type == BlockBasedTableOptions::kHashSearch) { | |||||
2653 | need_upper_bound_check = PrefixExtractorChanged( | |||||
2654 | rep_->table_properties.get(), prefix_extractor); | |||||
2655 | } | |||||
2656 | auto iiter = | |||||
2657 | NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack, | |||||
2658 | /* index_entry */ nullptr, get_context); | |||||
2659 | std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr; | |||||
2660 | if (iiter != &iiter_on_stack) { | |||||
2661 | iiter_unique_ptr.reset(iiter); | |||||
2662 | } | |||||
2663 | ||||||
2664 | bool matched = false; // if such user key mathced a key in SST | |||||
2665 | bool done = false; | |||||
2666 | for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { | |||||
2667 | BlockHandle handle = iiter->value(); | |||||
2668 | ||||||
2669 | bool not_exist_in_filter = | |||||
2670 | filter != nullptr && filter->IsBlockBased() == true && | |||||
2671 | !filter->KeyMayMatch(ExtractUserKey(key), prefix_extractor, | |||||
2672 | handle.offset(), no_io); | |||||
2673 | ||||||
2674 | if (not_exist_in_filter) { | |||||
2675 | // Not found | |||||
2676 | // TODO: think about interaction with Merge. If a user key cannot | |||||
2677 | // cross one data block, we should be fine. | |||||
2678 | RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); | |||||
2679 | PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level)if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(rep_-> level) != (*(perf_context.level_to_perf_context)).end()) { (* (perf_context.level_to_perf_context))[rep_->level].bloom_filter_useful += 1; } else { PerfContextByLevel empty_context; (*(perf_context .level_to_perf_context))[rep_->level] = empty_context; (*( perf_context.level_to_perf_context))[rep_->level].bloom_filter_useful += 1; } }; | |||||
2680 | break; | |||||
2681 | } else { | |||||
2682 | DataBlockIter biter; | |||||
2683 | NewDataBlockIterator<DataBlockIter>( | |||||
2684 | rep_, read_options, iiter->value(), &biter, false, | |||||
2685 | true /* key_includes_seq */, true /* index_key_is_full */, | |||||
2686 | get_context); | |||||
2687 | ||||||
2688 | if (read_options.read_tier == kBlockCacheTier && | |||||
2689 | biter.status().IsIncomplete()) { | |||||
2690 | // couldn't get block from block_cache | |||||
2691 | // Update Saver.state to Found because we are only looking for | |||||
2692 | // whether we can guarantee the key is not there when "no_io" is set | |||||
2693 | get_context->MarkKeyMayExist(); | |||||
2694 | break; | |||||
2695 | } | |||||
2696 | if (!biter.status().ok()) { | |||||
2697 | s = biter.status(); | |||||
2698 | break; | |||||
2699 | } | |||||
2700 | ||||||
2701 | bool may_exist = biter.SeekForGet(key); | |||||
2702 | if (!may_exist) { | |||||
2703 | // HashSeek cannot find the key this block and the the iter is not | |||||
2704 | // the end of the block, i.e. cannot be in the following blocks | |||||
2705 | // either. In this case, the seek_key cannot be found, so we break | |||||
2706 | // from the top level for-loop. | |||||
2707 | break; | |||||
2708 | } | |||||
2709 | ||||||
2710 | // Call the *saver function on each entry/block until it returns false | |||||
2711 | for (; biter.Valid(); biter.Next()) { | |||||
2712 | ParsedInternalKey parsed_key; | |||||
2713 | if (!ParseInternalKey(biter.key(), &parsed_key)) { | |||||
2714 | s = Status::Corruption(Slice()); | |||||
2715 | } | |||||
2716 | ||||||
2717 | if (!get_context->SaveValue( | |||||
2718 | parsed_key, biter.value(), &matched, | |||||
2719 | biter.IsValuePinned() ? &biter : nullptr)) { | |||||
2720 | done = true; | |||||
2721 | break; | |||||
2722 | } | |||||
2723 | } | |||||
2724 | s = biter.status(); | |||||
2725 | } | |||||
2726 | if (done) { | |||||
2727 | // Avoid the extra Next which is expensive in two-level indexes | |||||
2728 | break; | |||||
2729 | } | |||||
2730 | } | |||||
2731 | if (matched && filter != nullptr && !filter->IsBlockBased()) { | |||||
2732 | RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE); | |||||
2733 | PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(rep_-> level) != (*(perf_context.level_to_perf_context)).end()) { (* (perf_context.level_to_perf_context))[rep_->level].bloom_filter_full_true_positive += 1; } else { PerfContextByLevel empty_context; (*(perf_context .level_to_perf_context))[rep_->level] = empty_context; (*( perf_context.level_to_perf_context))[rep_->level].bloom_filter_full_true_positive += 1; } } | |||||
2734 | rep_->level)if (perf_level >= PerfLevel::kEnableCount && perf_context .per_level_perf_context_enabled && perf_context.level_to_perf_context ) { if ((*(perf_context.level_to_perf_context)).find(rep_-> level) != (*(perf_context.level_to_perf_context)).end()) { (* (perf_context.level_to_perf_context))[rep_->level].bloom_filter_full_true_positive += 1; } else { PerfContextByLevel empty_context; (*(perf_context .level_to_perf_context))[rep_->level] = empty_context; (*( perf_context.level_to_perf_context))[rep_->level].bloom_filter_full_true_positive += 1; } }; | |||||
2735 | } | |||||
2736 | if (s.ok()) { | |||||
2737 | s = iiter->status(); | |||||
2738 | } | |||||
2739 | } | |||||
2740 | ||||||
2741 | // if rep_->filter_entry is not set, we should call Release(); otherwise | |||||
2742 | // don't call, in this case we have a local copy in rep_->filter_entry, | |||||
2743 | // it's pinned to the cache and will be released in the destructor | |||||
2744 | if (!rep_->filter_entry.IsSet()) { | |||||
2745 | filter_entry.Release(rep_->table_options.block_cache.get()); | |||||
2746 | } | |||||
2747 | return s; | |||||
2748 | } | |||||
2749 | ||||||
2750 | Status BlockBasedTable::Prefetch(const Slice* const begin, | |||||
2751 | const Slice* const end) { | |||||
2752 | auto& comparator = rep_->internal_comparator; | |||||
2753 | auto user_comparator = comparator.user_comparator(); | |||||
2754 | // pre-condition | |||||
2755 | if (begin && end && comparator.Compare(*begin, *end) > 0) { | |||||
2756 | return Status::InvalidArgument(*begin, *end); | |||||
2757 | } | |||||
2758 | ||||||
2759 | IndexBlockIter iiter_on_stack; | |||||
2760 | auto iiter = NewIndexIterator(ReadOptions(), false, &iiter_on_stack); | |||||
2761 | std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr; | |||||
2762 | if (iiter != &iiter_on_stack) { | |||||
2763 | iiter_unique_ptr = | |||||
2764 | std::unique_ptr<InternalIteratorBase<BlockHandle>>(iiter); | |||||
2765 | } | |||||
2766 | ||||||
2767 | if (!iiter->status().ok()) { | |||||
2768 | // error opening index iterator | |||||
2769 | return iiter->status(); | |||||
2770 | } | |||||
2771 | ||||||
2772 | // indicates if we are on the last page that need to be pre-fetched | |||||
2773 | bool prefetching_boundary_page = false; | |||||
2774 | ||||||
2775 | for (begin ? iiter->Seek(*begin) : iiter->SeekToFirst(); iiter->Valid(); | |||||
2776 | iiter->Next()) { | |||||
2777 | BlockHandle block_handle = iiter->value(); | |||||
2778 | const bool is_user_key = rep_->table_properties && | |||||
2779 | rep_->table_properties->index_key_is_user_key > 0; | |||||
2780 | if (end && | |||||
2781 | ((!is_user_key && comparator.Compare(iiter->key(), *end) >= 0) || | |||||
2782 | (is_user_key && | |||||
2783 | user_comparator->Compare(iiter->key(), ExtractUserKey(*end)) >= 0))) { | |||||
2784 | if (prefetching_boundary_page) { | |||||
2785 | break; | |||||
2786 | } | |||||
2787 | ||||||
2788 | // The index entry represents the last key in the data block. | |||||
2789 | // We should load this page into memory as well, but no more | |||||
2790 | prefetching_boundary_page = true; | |||||
2791 | } | |||||
2792 | ||||||
2793 | // Load the block specified by the block_handle into the block cache | |||||
2794 | DataBlockIter biter; | |||||
2795 | NewDataBlockIterator<DataBlockIter>(rep_, ReadOptions(), block_handle, | |||||
2796 | &biter); | |||||
2797 | ||||||
2798 | if (!biter.status().ok()) { | |||||
2799 | // there was an unexpected error while pre-fetching | |||||
2800 | return biter.status(); | |||||
2801 | } | |||||
2802 | } | |||||
2803 | ||||||
2804 | return Status::OK(); | |||||
2805 | } | |||||
2806 | ||||||
2807 | Status BlockBasedTable::VerifyChecksum() { | |||||
2808 | Status s; | |||||
2809 | // Check Meta blocks | |||||
2810 | std::unique_ptr<Block> meta; | |||||
2811 | std::unique_ptr<InternalIterator> meta_iter; | |||||
2812 | s = ReadMetaBlock(rep_, nullptr /* prefetch buffer */, &meta, &meta_iter); | |||||
2813 | if (s.ok()) { | |||||
2814 | s = VerifyChecksumInMetaBlocks(meta_iter.get()); | |||||
2815 | if (!s.ok()) { | |||||
2816 | return s; | |||||
2817 | } | |||||
2818 | } else { | |||||
2819 | return s; | |||||
2820 | } | |||||
2821 | // Check Data blocks | |||||
2822 | IndexBlockIter iiter_on_stack; | |||||
2823 | InternalIteratorBase<BlockHandle>* iiter = | |||||
2824 | NewIndexIterator(ReadOptions(), false, &iiter_on_stack); | |||||
2825 | std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr; | |||||
2826 | if (iiter != &iiter_on_stack) { | |||||
2827 | iiter_unique_ptr = | |||||
2828 | std::unique_ptr<InternalIteratorBase<BlockHandle>>(iiter); | |||||
2829 | } | |||||
2830 | if (!iiter->status().ok()) { | |||||
2831 | // error opening index iterator | |||||
2832 | return iiter->status(); | |||||
2833 | } | |||||
2834 | s = VerifyChecksumInBlocks(iiter); | |||||
2835 | return s; | |||||
2836 | } | |||||
2837 | ||||||
2838 | Status BlockBasedTable::VerifyChecksumInBlocks( | |||||
2839 | InternalIteratorBase<BlockHandle>* index_iter) { | |||||
2840 | Status s; | |||||
2841 | for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) { | |||||
2842 | s = index_iter->status(); | |||||
2843 | if (!s.ok()) { | |||||
2844 | break; | |||||
2845 | } | |||||
2846 | BlockHandle handle = index_iter->value(); | |||||
2847 | BlockContents contents; | |||||
2848 | BlockFetcher block_fetcher( | |||||
2849 | rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, | |||||
2850 | ReadOptions(), handle, &contents, rep_->ioptions, | |||||
2851 | false /* decompress */, false /*maybe_compressed*/, | |||||
2852 | UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); | |||||
2853 | s = block_fetcher.ReadBlockContents(); | |||||
2854 | if (!s.ok()) { | |||||
2855 | break; | |||||
2856 | } | |||||
2857 | } | |||||
2858 | return s; | |||||
2859 | } | |||||
2860 | ||||||
2861 | Status BlockBasedTable::VerifyChecksumInMetaBlocks( | |||||
2862 | InternalIteratorBase<Slice>* index_iter) { | |||||
2863 | Status s; | |||||
2864 | for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) { | |||||
2865 | s = index_iter->status(); | |||||
2866 | if (!s.ok()) { | |||||
2867 | break; | |||||
2868 | } | |||||
2869 | BlockHandle handle; | |||||
2870 | Slice input = index_iter->value(); | |||||
2871 | s = handle.DecodeFrom(&input); | |||||
2872 | BlockContents contents; | |||||
2873 | BlockFetcher block_fetcher( | |||||
2874 | rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, | |||||
2875 | ReadOptions(), handle, &contents, rep_->ioptions, | |||||
2876 | false /* decompress */, false /*maybe_compressed*/, | |||||
2877 | UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); | |||||
2878 | s = block_fetcher.ReadBlockContents(); | |||||
2879 | if (s.IsCorruption() && index_iter->key() == kPropertiesBlock) { | |||||
2880 | TableProperties* table_properties; | |||||
2881 | s = TryReadPropertiesWithGlobalSeqno(rep_, nullptr /* prefetch_buffer */, | |||||
2882 | index_iter->value(), | |||||
2883 | &table_properties); | |||||
2884 | delete table_properties; | |||||
2885 | } | |||||
2886 | if (!s.ok()) { | |||||
2887 | break; | |||||
2888 | } | |||||
2889 | } | |||||
2890 | return s; | |||||
2891 | } | |||||
2892 | ||||||
2893 | bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, | |||||
2894 | const Slice& key) { | |||||
2895 | std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter( | |||||
2896 | NewIndexIterator(options)); | |||||
2897 | iiter->Seek(key); | |||||
2898 | assert(iiter->Valid())(static_cast<void> (0)); | |||||
2899 | CachableEntry<Block> block; | |||||
2900 | ||||||
2901 | BlockHandle handle = iiter->value(); | |||||
2902 | Cache* block_cache = rep_->table_options.block_cache.get(); | |||||
2903 | assert(block_cache != nullptr)(static_cast<void> (0)); | |||||
2904 | ||||||
2905 | char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
2906 | Slice cache_key = | |||||
2907 | GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle, | |||||
2908 | cache_key_storage); | |||||
2909 | Slice ckey; | |||||
2910 | ||||||
2911 | Status s; | |||||
2912 | if (!rep_->compression_dict_handle.IsNull()) { | |||||
2913 | std::unique_ptr<const BlockContents> compression_dict_block; | |||||
2914 | s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */, | |||||
2915 | &compression_dict_block); | |||||
2916 | if (s.ok()) { | |||||
2917 | assert(compression_dict_block != nullptr)(static_cast<void> (0)); | |||||
2918 | UncompressionDict uncompression_dict( | |||||
2919 | compression_dict_block->data.ToString(), | |||||
2920 | rep_->blocks_definitely_zstd_compressed); | |||||
2921 | s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, rep_, | |||||
2922 | options, &block, uncompression_dict, | |||||
2923 | 0 /* read_amp_bytes_per_bit */); | |||||
2924 | } | |||||
2925 | } else { | |||||
2926 | s = GetDataBlockFromCache( | |||||
2927 | cache_key, ckey, block_cache, nullptr, rep_, options, &block, | |||||
2928 | UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */); | |||||
2929 | } | |||||
2930 | assert(s.ok())(static_cast<void> (0)); | |||||
2931 | bool in_cache = block.value != nullptr; | |||||
2932 | if (in_cache) { | |||||
2933 | ReleaseCachedEntry(block_cache, block.cache_handle); | |||||
2934 | } | |||||
2935 | return in_cache; | |||||
2936 | } | |||||
2937 | ||||||
2938 | BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() { | |||||
2939 | // Some old version of block-based tables don't have index type present in | |||||
2940 | // table properties. If that's the case we can safely use the kBinarySearch. | |||||
2941 | BlockBasedTableOptions::IndexType index_type_on_file = | |||||
2942 | BlockBasedTableOptions::kBinarySearch; | |||||
2943 | if (rep_->table_properties) { | |||||
2944 | auto& props = rep_->table_properties->user_collected_properties; | |||||
2945 | auto pos = props.find(BlockBasedTablePropertyNames::kIndexType); | |||||
2946 | if (pos != props.end()) { | |||||
2947 | index_type_on_file = static_cast<BlockBasedTableOptions::IndexType>( | |||||
2948 | DecodeFixed32(pos->second.c_str())); | |||||
2949 | // update index_type with the true type | |||||
2950 | rep_->index_type = index_type_on_file; | |||||
2951 | } | |||||
2952 | } | |||||
2953 | return index_type_on_file; | |||||
2954 | } | |||||
2955 | ||||||
2956 | // REQUIRES: The following fields of rep_ should have already been populated: | |||||
2957 | // 1. file | |||||
2958 | // 2. index_handle, | |||||
2959 | // 3. options | |||||
2960 | // 4. internal_comparator | |||||
2961 | // 5. index_type | |||||
2962 | Status BlockBasedTable::CreateIndexReader( | |||||
2963 | FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, | |||||
2964 | InternalIterator* preloaded_meta_index_iter, int level) { | |||||
2965 | auto index_type_on_file = UpdateIndexType(); | |||||
2966 | ||||||
2967 | auto file = rep_->file.get(); | |||||
2968 | const InternalKeyComparator* icomparator = &rep_->internal_comparator; | |||||
2969 | const Footer& footer = rep_->footer; | |||||
2970 | ||||||
2971 | // kHashSearch requires non-empty prefix_extractor but bypass checking | |||||
2972 | // prefix_extractor here since we have no access to MutableCFOptions. | |||||
2973 | // Add need_upper_bound_check flag in BlockBasedTable::NewIndexIterator. | |||||
2974 | // If prefix_extractor does not match prefix_extractor_name from table | |||||
2975 | // properties, turn off Hash Index by setting total_order_seek to true | |||||
2976 | ||||||
2977 | switch (index_type_on_file) { | |||||
2978 | case BlockBasedTableOptions::kTwoLevelIndexSearch: { | |||||
2979 | return PartitionIndexReader::Create( | |||||
2980 | this, file, prefetch_buffer, footer, footer.index_handle(), | |||||
2981 | rep_->ioptions, icomparator, index_reader, | |||||
2982 | rep_->persistent_cache_options, level, | |||||
2983 | rep_->table_properties == nullptr || | |||||
2984 | rep_->table_properties->index_key_is_user_key == 0, | |||||
2985 | rep_->table_properties == nullptr || | |||||
2986 | rep_->table_properties->index_value_is_delta_encoded == 0, | |||||
2987 | GetMemoryAllocator(rep_->table_options)); | |||||
2988 | } | |||||
2989 | case BlockBasedTableOptions::kBinarySearch: { | |||||
2990 | return BinarySearchIndexReader::Create( | |||||
2991 | file, prefetch_buffer, footer, footer.index_handle(), rep_->ioptions, | |||||
2992 | icomparator, index_reader, rep_->persistent_cache_options, | |||||
2993 | rep_->table_properties == nullptr || | |||||
2994 | rep_->table_properties->index_key_is_user_key == 0, | |||||
2995 | rep_->table_properties == nullptr || | |||||
2996 | rep_->table_properties->index_value_is_delta_encoded == 0, | |||||
2997 | GetMemoryAllocator(rep_->table_options)); | |||||
2998 | } | |||||
2999 | case BlockBasedTableOptions::kHashSearch: { | |||||
3000 | std::unique_ptr<Block> meta_guard; | |||||
3001 | std::unique_ptr<InternalIterator> meta_iter_guard; | |||||
3002 | auto meta_index_iter = preloaded_meta_index_iter; | |||||
3003 | if (meta_index_iter == nullptr) { | |||||
3004 | auto s = | |||||
3005 | ReadMetaBlock(rep_, prefetch_buffer, &meta_guard, &meta_iter_guard); | |||||
3006 | if (!s.ok()) { | |||||
3007 | // we simply fall back to binary search in case there is any | |||||
3008 | // problem with prefix hash index loading. | |||||
3009 | ROCKS_LOG_WARN(rep_->ioptions.info_log,rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep_->ioptions.info_log , ("[%s:" "3011" "] " "Unable to read the metaindex block." " Fall back to binary search index." ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" )) | |||||
3010 | "Unable to read the metaindex block."rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep_->ioptions.info_log , ("[%s:" "3011" "] " "Unable to read the metaindex block." " Fall back to binary search index." ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" )) | |||||
3011 | " Fall back to binary search index.")rocksdb::Log(InfoLogLevel::WARN_LEVEL, rep_->ioptions.info_log , ("[%s:" "3011" "] " "Unable to read the metaindex block." " Fall back to binary search index." ), RocksLogShorterFileName("/home/bhubbard/working/src/ceph/src/rocksdb/table/block_based_table_reader.cc" )); | |||||
3012 | return BinarySearchIndexReader::Create( | |||||
3013 | file, prefetch_buffer, footer, footer.index_handle(), | |||||
3014 | rep_->ioptions, icomparator, index_reader, | |||||
3015 | rep_->persistent_cache_options, | |||||
3016 | rep_->table_properties == nullptr || | |||||
3017 | rep_->table_properties->index_key_is_user_key == 0, | |||||
3018 | rep_->table_properties == nullptr || | |||||
3019 | rep_->table_properties->index_value_is_delta_encoded == 0, | |||||
3020 | GetMemoryAllocator(rep_->table_options)); | |||||
3021 | } | |||||
3022 | meta_index_iter = meta_iter_guard.get(); | |||||
3023 | } | |||||
3024 | ||||||
3025 | return HashIndexReader::Create( | |||||
3026 | rep_->internal_prefix_transform.get(), footer, file, prefetch_buffer, | |||||
3027 | rep_->ioptions, icomparator, footer.index_handle(), meta_index_iter, | |||||
3028 | index_reader, rep_->hash_index_allow_collision, | |||||
3029 | rep_->persistent_cache_options, | |||||
3030 | rep_->table_properties == nullptr || | |||||
3031 | rep_->table_properties->index_key_is_user_key == 0, | |||||
3032 | rep_->table_properties == nullptr || | |||||
3033 | rep_->table_properties->index_value_is_delta_encoded == 0, | |||||
3034 | GetMemoryAllocator(rep_->table_options)); | |||||
3035 | } | |||||
3036 | default: { | |||||
3037 | std::string error_message = | |||||
3038 | "Unrecognized index type: " + ToString(index_type_on_file); | |||||
3039 | return Status::InvalidArgument(error_message.c_str()); | |||||
3040 | } | |||||
3041 | } | |||||
3042 | } | |||||
3043 | ||||||
3044 | uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { | |||||
3045 | std::unique_ptr<InternalIteratorBase<BlockHandle>> index_iter( | |||||
3046 | NewIndexIterator(ReadOptions())); | |||||
3047 | ||||||
3048 | index_iter->Seek(key); | |||||
3049 | uint64_t result; | |||||
3050 | if (index_iter->Valid()) { | |||||
3051 | BlockHandle handle = index_iter->value(); | |||||
3052 | result = handle.offset(); | |||||
3053 | } else { | |||||
3054 | // key is past the last key in the file. If table_properties is not | |||||
3055 | // available, approximate the offset by returning the offset of the | |||||
3056 | // metaindex block (which is right near the end of the file). | |||||
3057 | result = 0; | |||||
3058 | if (rep_->table_properties) { | |||||
3059 | result = rep_->table_properties->data_size; | |||||
3060 | } | |||||
3061 | // table_properties is not present in the table. | |||||
3062 | if (result == 0) { | |||||
3063 | result = rep_->footer.metaindex_handle().offset(); | |||||
3064 | } | |||||
3065 | } | |||||
3066 | return result; | |||||
3067 | } | |||||
3068 | ||||||
3069 | bool BlockBasedTable::TEST_filter_block_preloaded() const { | |||||
3070 | return rep_->filter != nullptr; | |||||
3071 | } | |||||
3072 | ||||||
3073 | bool BlockBasedTable::TEST_index_reader_preloaded() const { | |||||
3074 | return rep_->index_reader != nullptr; | |||||
3075 | } | |||||
3076 | ||||||
3077 | Status BlockBasedTable::GetKVPairsFromDataBlocks( | |||||
3078 | std::vector<KVPairBlock>* kv_pair_blocks) { | |||||
3079 | std::unique_ptr<InternalIteratorBase<BlockHandle>> blockhandles_iter( | |||||
3080 | NewIndexIterator(ReadOptions())); | |||||
3081 | ||||||
3082 | Status s = blockhandles_iter->status(); | |||||
3083 | if (!s.ok()) { | |||||
3084 | // Cannot read Index Block | |||||
3085 | return s; | |||||
3086 | } | |||||
3087 | ||||||
3088 | for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid(); | |||||
3089 | blockhandles_iter->Next()) { | |||||
3090 | s = blockhandles_iter->status(); | |||||
3091 | ||||||
3092 | if (!s.ok()) { | |||||
3093 | break; | |||||
3094 | } | |||||
3095 | ||||||
3096 | std::unique_ptr<InternalIterator> datablock_iter; | |||||
3097 | datablock_iter.reset(NewDataBlockIterator<DataBlockIter>( | |||||
3098 | rep_, ReadOptions(), blockhandles_iter->value())); | |||||
3099 | s = datablock_iter->status(); | |||||
3100 | ||||||
3101 | if (!s.ok()) { | |||||
3102 | // Error reading the block - Skipped | |||||
3103 | continue; | |||||
3104 | } | |||||
3105 | ||||||
3106 | KVPairBlock kv_pair_block; | |||||
3107 | for (datablock_iter->SeekToFirst(); datablock_iter->Valid(); | |||||
3108 | datablock_iter->Next()) { | |||||
3109 | s = datablock_iter->status(); | |||||
3110 | if (!s.ok()) { | |||||
3111 | // Error reading the block - Skipped | |||||
3112 | break; | |||||
3113 | } | |||||
3114 | const Slice& key = datablock_iter->key(); | |||||
3115 | const Slice& value = datablock_iter->value(); | |||||
3116 | std::string key_copy = std::string(key.data(), key.size()); | |||||
3117 | std::string value_copy = std::string(value.data(), value.size()); | |||||
3118 | ||||||
3119 | kv_pair_block.push_back( | |||||
3120 | std::make_pair(std::move(key_copy), std::move(value_copy))); | |||||
3121 | } | |||||
3122 | kv_pair_blocks->push_back(std::move(kv_pair_block)); | |||||
3123 | } | |||||
3124 | return Status::OK(); | |||||
3125 | } | |||||
3126 | ||||||
3127 | Status BlockBasedTable::DumpTable(WritableFile* out_file, | |||||
3128 | const SliceTransform* prefix_extractor) { | |||||
3129 | // Output Footer | |||||
3130 | out_file->Append( | |||||
3131 | "Footer Details:\n" | |||||
3132 | "--------------------------------------\n" | |||||
3133 | " "); | |||||
3134 | out_file->Append(rep_->footer.ToString().c_str()); | |||||
3135 | out_file->Append("\n"); | |||||
3136 | ||||||
3137 | // Output MetaIndex | |||||
3138 | out_file->Append( | |||||
3139 | "Metaindex Details:\n" | |||||
3140 | "--------------------------------------\n"); | |||||
3141 | std::unique_ptr<Block> meta; | |||||
3142 | std::unique_ptr<InternalIterator> meta_iter; | |||||
3143 | Status s = | |||||
3144 | ReadMetaBlock(rep_, nullptr /* prefetch_buffer */, &meta, &meta_iter); | |||||
3145 | if (s.ok()) { | |||||
3146 | for (meta_iter->SeekToFirst(); meta_iter->Valid(); meta_iter->Next()) { | |||||
3147 | s = meta_iter->status(); | |||||
3148 | if (!s.ok()) { | |||||
3149 | return s; | |||||
3150 | } | |||||
3151 | if (meta_iter->key() == rocksdb::kPropertiesBlock) { | |||||
3152 | out_file->Append(" Properties block handle: "); | |||||
3153 | out_file->Append(meta_iter->value().ToString(true).c_str()); | |||||
3154 | out_file->Append("\n"); | |||||
3155 | } else if (meta_iter->key() == rocksdb::kCompressionDictBlock) { | |||||
3156 | out_file->Append(" Compression dictionary block handle: "); | |||||
3157 | out_file->Append(meta_iter->value().ToString(true).c_str()); | |||||
3158 | out_file->Append("\n"); | |||||
3159 | } else if (strstr(meta_iter->key().ToString().c_str(), | |||||
3160 | "filter.rocksdb.") != nullptr) { | |||||
3161 | out_file->Append(" Filter block handle: "); | |||||
3162 | out_file->Append(meta_iter->value().ToString(true).c_str()); | |||||
3163 | out_file->Append("\n"); | |||||
3164 | } else if (meta_iter->key() == rocksdb::kRangeDelBlock) { | |||||
3165 | out_file->Append(" Range deletion block handle: "); | |||||
3166 | out_file->Append(meta_iter->value().ToString(true).c_str()); | |||||
3167 | out_file->Append("\n"); | |||||
3168 | } | |||||
3169 | } | |||||
3170 | out_file->Append("\n"); | |||||
3171 | } else { | |||||
3172 | return s; | |||||
3173 | } | |||||
3174 | ||||||
3175 | // Output TableProperties | |||||
3176 | const rocksdb::TableProperties* table_properties; | |||||
3177 | table_properties = rep_->table_properties.get(); | |||||
3178 | ||||||
3179 | if (table_properties != nullptr) { | |||||
3180 | out_file->Append( | |||||
3181 | "Table Properties:\n" | |||||
3182 | "--------------------------------------\n" | |||||
3183 | " "); | |||||
3184 | out_file->Append(table_properties->ToString("\n ", ": ").c_str()); | |||||
3185 | out_file->Append("\n"); | |||||
3186 | ||||||
3187 | // Output Filter blocks | |||||
3188 | if (!rep_->filter && !table_properties->filter_policy_name.empty()) { | |||||
3189 | // Support only BloomFilter as off now | |||||
3190 | rocksdb::BlockBasedTableOptions table_options; | |||||
3191 | table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(1)); | |||||
3192 | if (table_properties->filter_policy_name.compare( | |||||
3193 | table_options.filter_policy->Name()) == 0) { | |||||
3194 | std::string filter_block_key = kFilterBlockPrefix; | |||||
3195 | filter_block_key.append(table_properties->filter_policy_name); | |||||
3196 | BlockHandle handle; | |||||
3197 | if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) { | |||||
3198 | BlockContents block; | |||||
3199 | BlockFetcher block_fetcher( | |||||
3200 | rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer, | |||||
3201 | ReadOptions(), handle, &block, rep_->ioptions, | |||||
3202 | false /*decompress*/, false /*maybe_compressed*/, | |||||
3203 | UncompressionDict::GetEmptyDict(), | |||||
3204 | rep_->persistent_cache_options); | |||||
3205 | s = block_fetcher.ReadBlockContents(); | |||||
3206 | if (!s.ok()) { | |||||
3207 | rep_->filter.reset(new BlockBasedFilterBlockReader( | |||||
3208 | prefix_extractor, table_options, | |||||
3209 | table_options.whole_key_filtering, std::move(block), | |||||
3210 | rep_->ioptions.statistics)); | |||||
3211 | } | |||||
3212 | } | |||||
3213 | } | |||||
3214 | } | |||||
3215 | } | |||||
3216 | if (rep_->filter) { | |||||
3217 | out_file->Append( | |||||
3218 | "Filter Details:\n" | |||||
3219 | "--------------------------------------\n" | |||||
3220 | " "); | |||||
3221 | out_file->Append(rep_->filter->ToString().c_str()); | |||||
3222 | out_file->Append("\n"); | |||||
3223 | } | |||||
3224 | ||||||
3225 | // Output Index block | |||||
3226 | s = DumpIndexBlock(out_file); | |||||
3227 | if (!s.ok()) { | |||||
3228 | return s; | |||||
3229 | } | |||||
3230 | ||||||
3231 | // Output compression dictionary | |||||
3232 | if (!rep_->compression_dict_handle.IsNull()) { | |||||
3233 | std::unique_ptr<const BlockContents> compression_dict_block; | |||||
3234 | s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */, | |||||
3235 | &compression_dict_block); | |||||
3236 | if (!s.ok()) { | |||||
3237 | return s; | |||||
3238 | } | |||||
3239 | assert(compression_dict_block != nullptr)(static_cast<void> (0)); | |||||
3240 | auto compression_dict = compression_dict_block->data; | |||||
3241 | out_file->Append( | |||||
3242 | "Compression Dictionary:\n" | |||||
3243 | "--------------------------------------\n"); | |||||
3244 | out_file->Append(" size (bytes): "); | |||||
3245 | out_file->Append(rocksdb::ToString(compression_dict.size())); | |||||
3246 | out_file->Append("\n\n"); | |||||
3247 | out_file->Append(" HEX "); | |||||
3248 | out_file->Append(compression_dict.ToString(true).c_str()); | |||||
3249 | out_file->Append("\n\n"); | |||||
3250 | } | |||||
3251 | ||||||
3252 | // Output range deletions block | |||||
3253 | auto* range_del_iter = NewRangeTombstoneIterator(ReadOptions()); | |||||
3254 | if (range_del_iter != nullptr) { | |||||
3255 | range_del_iter->SeekToFirst(); | |||||
3256 | if (range_del_iter->Valid()) { | |||||
3257 | out_file->Append( | |||||
3258 | "Range deletions:\n" | |||||
3259 | "--------------------------------------\n" | |||||
3260 | " "); | |||||
3261 | for (; range_del_iter->Valid(); range_del_iter->Next()) { | |||||
3262 | DumpKeyValue(range_del_iter->key(), range_del_iter->value(), out_file); | |||||
3263 | } | |||||
3264 | out_file->Append("\n"); | |||||
3265 | } | |||||
3266 | delete range_del_iter; | |||||
3267 | } | |||||
3268 | // Output Data blocks | |||||
3269 | s = DumpDataBlocks(out_file); | |||||
3270 | ||||||
3271 | return s; | |||||
3272 | } | |||||
3273 | ||||||
3274 | void BlockBasedTable::Close() { | |||||
3275 | if (rep_->closed) { | |||||
3276 | return; | |||||
3277 | } | |||||
3278 | ||||||
3279 | Cache* const cache = rep_->table_options.block_cache.get(); | |||||
3280 | ||||||
3281 | rep_->filter_entry.Release(cache); | |||||
3282 | rep_->index_entry.Release(cache); | |||||
3283 | ||||||
3284 | // cleanup index, filter, and compression dictionary blocks | |||||
3285 | // to avoid accessing dangling pointers | |||||
3286 | if (!rep_->table_options.no_block_cache) { | |||||
3287 | char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; | |||||
3288 | ||||||
3289 | // Get the filter block key | |||||
3290 | auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, | |||||
3291 | rep_->filter_handle, cache_key); | |||||
3292 | cache->Erase(key); | |||||
3293 | ||||||
3294 | // Get the index block key | |||||
3295 | key = GetCacheKeyFromOffset(rep_->cache_key_prefix, | |||||
3296 | rep_->cache_key_prefix_size, | |||||
3297 | rep_->dummy_index_reader_offset, cache_key); | |||||
3298 | cache->Erase(key); | |||||
3299 | ||||||
3300 | if (!rep_->compression_dict_handle.IsNull()) { | |||||
3301 | // Get the compression dictionary block key | |||||
3302 | key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, | |||||
3303 | rep_->compression_dict_handle, cache_key); | |||||
3304 | cache->Erase(key); | |||||
3305 | } | |||||
3306 | } | |||||
3307 | ||||||
3308 | rep_->closed = true; | |||||
3309 | } | |||||
3310 | ||||||
3311 | Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) { | |||||
3312 | out_file->Append( | |||||
3313 | "Index Details:\n" | |||||
3314 | "--------------------------------------\n"); | |||||
3315 | std::unique_ptr<InternalIteratorBase<BlockHandle>> blockhandles_iter( | |||||
3316 | NewIndexIterator(ReadOptions())); | |||||
3317 | Status s = blockhandles_iter->status(); | |||||
3318 | if (!s.ok()) { | |||||
3319 | out_file->Append("Can not read Index Block \n\n"); | |||||
3320 | return s; | |||||
3321 | } | |||||
3322 | ||||||
3323 | out_file->Append(" Block key hex dump: Data block handle\n"); | |||||
3324 | out_file->Append(" Block key ascii\n\n"); | |||||
3325 | for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid(); | |||||
3326 | blockhandles_iter->Next()) { | |||||
3327 | s = blockhandles_iter->status(); | |||||
3328 | if (!s.ok()) { | |||||
3329 | break; | |||||
3330 | } | |||||
3331 | Slice key = blockhandles_iter->key(); | |||||
3332 | Slice user_key; | |||||
3333 | InternalKey ikey; | |||||
3334 | if (rep_->table_properties && | |||||
3335 | rep_->table_properties->index_key_is_user_key != 0) { | |||||
3336 | user_key = key; | |||||
3337 | } else { | |||||
3338 | ikey.DecodeFrom(key); | |||||
3339 | user_key = ikey.user_key(); | |||||
3340 | } | |||||
3341 | ||||||
3342 | out_file->Append(" HEX "); | |||||
3343 | out_file->Append(user_key.ToString(true).c_str()); | |||||
3344 | out_file->Append(": "); | |||||
3345 | out_file->Append(blockhandles_iter->value().ToString(true).c_str()); | |||||
3346 | out_file->Append("\n"); | |||||
3347 | ||||||
3348 | std::string str_key = user_key.ToString(); | |||||
3349 | std::string res_key(""); | |||||
3350 | char cspace = ' '; | |||||
3351 | for (size_t i = 0; i < str_key.size(); i++) { | |||||
3352 | res_key.append(&str_key[i], 1); | |||||
3353 | res_key.append(1, cspace); | |||||
3354 | } | |||||
3355 | out_file->Append(" ASCII "); | |||||
3356 | out_file->Append(res_key.c_str()); | |||||
3357 | out_file->Append("\n ------\n"); | |||||
3358 | } | |||||
3359 | out_file->Append("\n"); | |||||
3360 | return Status::OK(); | |||||
3361 | } | |||||
3362 | ||||||
3363 | Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) { | |||||
3364 | std::unique_ptr<InternalIteratorBase<BlockHandle>> blockhandles_iter( | |||||
3365 | NewIndexIterator(ReadOptions())); | |||||
3366 | Status s = blockhandles_iter->status(); | |||||
3367 | if (!s.ok()) { | |||||
3368 | out_file->Append("Can not read Index Block \n\n"); | |||||
3369 | return s; | |||||
3370 | } | |||||
3371 | ||||||
3372 | uint64_t datablock_size_min = std::numeric_limits<uint64_t>::max(); | |||||
3373 | uint64_t datablock_size_max = 0; | |||||
3374 | uint64_t datablock_size_sum = 0; | |||||
3375 | ||||||
3376 | size_t block_id = 1; | |||||
3377 | for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid(); | |||||
3378 | block_id++, blockhandles_iter->Next()) { | |||||
3379 | s = blockhandles_iter->status(); | |||||
3380 | if (!s.ok()) { | |||||
3381 | break; | |||||
3382 | } | |||||
3383 | ||||||
3384 | BlockHandle bh = blockhandles_iter->value(); | |||||
3385 | uint64_t datablock_size = bh.size(); | |||||
3386 | datablock_size_min = std::min(datablock_size_min, datablock_size); | |||||
3387 | datablock_size_max = std::max(datablock_size_max, datablock_size); | |||||
3388 | datablock_size_sum += datablock_size; | |||||
3389 | ||||||
3390 | out_file->Append("Data Block # "); | |||||
3391 | out_file->Append(rocksdb::ToString(block_id)); | |||||
3392 | out_file->Append(" @ "); | |||||
3393 | out_file->Append(blockhandles_iter->value().ToString(true).c_str()); | |||||
3394 | out_file->Append("\n"); | |||||
3395 | out_file->Append("--------------------------------------\n"); | |||||
3396 | ||||||
3397 | std::unique_ptr<InternalIterator> datablock_iter; | |||||
3398 | datablock_iter.reset(NewDataBlockIterator<DataBlockIter>( | |||||
3399 | rep_, ReadOptions(), blockhandles_iter->value())); | |||||
3400 | s = datablock_iter->status(); | |||||
3401 | ||||||
3402 | if (!s.ok()) { | |||||
3403 | out_file->Append("Error reading the block - Skipped \n\n"); | |||||
3404 | continue; | |||||
3405 | } | |||||
3406 | ||||||
3407 | for (datablock_iter->SeekToFirst(); datablock_iter->Valid(); | |||||
3408 | datablock_iter->Next()) { | |||||
3409 | s = datablock_iter->status(); | |||||
3410 | if (!s.ok()) { | |||||
3411 | out_file->Append("Error reading the block - Skipped \n"); | |||||
3412 | break; | |||||
3413 | } | |||||
3414 | DumpKeyValue(datablock_iter->key(), datablock_iter->value(), out_file); | |||||
3415 | } | |||||
3416 | out_file->Append("\n"); | |||||
3417 | } | |||||
3418 | ||||||
3419 | uint64_t num_datablocks = block_id - 1; | |||||
3420 | if (num_datablocks) { | |||||
3421 | double datablock_size_avg = | |||||
3422 | static_cast<double>(datablock_size_sum) / num_datablocks; | |||||
3423 | out_file->Append("Data Block Summary:\n"); | |||||
3424 | out_file->Append("--------------------------------------"); | |||||
3425 | out_file->Append("\n # data blocks: "); | |||||
3426 | out_file->Append(rocksdb::ToString(num_datablocks)); | |||||
3427 | out_file->Append("\n min data block size: "); | |||||
3428 | out_file->Append(rocksdb::ToString(datablock_size_min)); | |||||
3429 | out_file->Append("\n max data block size: "); | |||||
3430 | out_file->Append(rocksdb::ToString(datablock_size_max)); | |||||
3431 | out_file->Append("\n avg data block size: "); | |||||
3432 | out_file->Append(rocksdb::ToString(datablock_size_avg)); | |||||
3433 | out_file->Append("\n"); | |||||
3434 | } | |||||
3435 | ||||||
3436 | return Status::OK(); | |||||
3437 | } | |||||
3438 | ||||||
3439 | void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value, | |||||
3440 | WritableFile* out_file) { | |||||
3441 | InternalKey ikey; | |||||
3442 | ikey.DecodeFrom(key); | |||||
3443 | ||||||
3444 | out_file->Append(" HEX "); | |||||
3445 | out_file->Append(ikey.user_key().ToString(true).c_str()); | |||||
3446 | out_file->Append(": "); | |||||
3447 | out_file->Append(value.ToString(true).c_str()); | |||||
3448 | out_file->Append("\n"); | |||||
3449 | ||||||
3450 | std::string str_key = ikey.user_key().ToString(); | |||||
3451 | std::string str_value = value.ToString(); | |||||
3452 | std::string res_key(""), res_value(""); | |||||
3453 | char cspace = ' '; | |||||
3454 | for (size_t i = 0; i < str_key.size(); i++) { | |||||
3455 | if (str_key[i] == '\0') { | |||||
3456 | res_key.append("\\0", 2); | |||||
3457 | } else { | |||||
3458 | res_key.append(&str_key[i], 1); | |||||
3459 | } | |||||
3460 | res_key.append(1, cspace); | |||||
3461 | } | |||||
3462 | for (size_t i = 0; i < str_value.size(); i++) { | |||||
3463 | if (str_value[i] == '\0') { | |||||
3464 | res_value.append("\\0", 2); | |||||
3465 | } else { | |||||
3466 | res_value.append(&str_value[i], 1); | |||||
3467 | } | |||||
3468 | res_value.append(1, cspace); | |||||
3469 | } | |||||
3470 | ||||||
3471 | out_file->Append(" ASCII "); | |||||
3472 | out_file->Append(res_key.c_str()); | |||||
3473 | out_file->Append(": "); | |||||
3474 | out_file->Append(res_value.c_str()); | |||||
3475 | out_file->Append("\n ------\n"); | |||||
3476 | } | |||||
3477 | ||||||
3478 | namespace { | |||||
3479 | ||||||
3480 | void DeleteCachedFilterEntry(const Slice& /*key*/, void* value) { | |||||
3481 | FilterBlockReader* filter = reinterpret_cast<FilterBlockReader*>(value); | |||||
3482 | if (filter->statistics() != nullptr) { | |||||
3483 | RecordTick(filter->statistics(), BLOCK_CACHE_FILTER_BYTES_EVICT, | |||||
3484 | filter->ApproximateMemoryUsage()); | |||||
3485 | } | |||||
3486 | delete filter; | |||||
3487 | } | |||||
3488 | ||||||
3489 | void DeleteCachedIndexEntry(const Slice& /*key*/, void* value) { | |||||
3490 | IndexReader* index_reader = reinterpret_cast<IndexReader*>(value); | |||||
3491 | if (index_reader->statistics() != nullptr) { | |||||
3492 | RecordTick(index_reader->statistics(), BLOCK_CACHE_INDEX_BYTES_EVICT, | |||||
3493 | index_reader->ApproximateMemoryUsage()); | |||||
3494 | } | |||||
3495 | delete index_reader; | |||||
3496 | } | |||||
3497 | ||||||
3498 | void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) { | |||||
3499 | UncompressionDict* dict = reinterpret_cast<UncompressionDict*>(value); | |||||
3500 | RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, | |||||
3501 | dict->ApproximateMemoryUsage()); | |||||
3502 | delete dict; | |||||
3503 | } | |||||
3504 | ||||||
3505 | } // anonymous namespace | |||||
3506 | ||||||
3507 | } // namespace rocksdb |