1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <set>
5 #include <map>
6 #include <string>
7 #include <memory>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21
22 using std::string;
23 #include "common/perf_counters.h"
24 #include "common/PriorityCache.h"
25 #include "include/str_list.h"
26 #include "include/stringify.h"
27 #include "include/str_map.h"
28 #include "KeyValueDB.h"
29 #include "RocksDBStore.h"
30
31 #include "common/debug.h"
32
33 #define dout_context cct
34 #define dout_subsys ceph_subsys_rocksdb
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rocksdb: "
37
38 static bufferlist to_bufferlist(rocksdb::Slice in) {
39 bufferlist bl;
40 bl.append(bufferptr(in.data(), in.size()));
41 return bl;
42 }
43
44 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
45 vector<rocksdb::Slice> *slices)
46 {
47 unsigned n = 0;
48 for (auto& buf : bl.buffers()) {
49 (*slices)[n].data_ = buf.c_str();
50 (*slices)[n].size_ = buf.length();
51 n++;
52 }
53 return rocksdb::SliceParts(slices->data(), slices->size());
54 }
55
56
57 //
58 // One of these for the default rocksdb column family, routing each prefix
59 // to the appropriate MergeOperator.
60 //
61 class RocksDBStore::MergeOperatorRouter
62 : public rocksdb::AssociativeMergeOperator
63 {
64 RocksDBStore& store;
65 public:
66 const char *Name() const override {
67 // Construct a name that rocksDB will validate against. We want to
68 // do this in a way that doesn't constrain the ordering of calls
69 // to set_merge_operator, so sort the merge operators and then
70 // construct a name from all of those parts.
71 store.assoc_name.clear();
72 map<std::string,std::string> names;
73
74 for (auto& p : store.merge_ops) {
75 names[p.first] = p.second->name();
76 }
77 for (auto& p : store.cf_handles) {
78 names.erase(p.first);
79 }
80 for (auto& p : names) {
81 store.assoc_name += '.';
82 store.assoc_name += p.first;
83 store.assoc_name += ':';
84 store.assoc_name += p.second;
85 }
86 return store.assoc_name.c_str();
87 }
88
89 explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
90
91 bool Merge(const rocksdb::Slice& key,
92 const rocksdb::Slice* existing_value,
93 const rocksdb::Slice& value,
94 std::string* new_value,
95 rocksdb::Logger* logger) const override {
96 // for default column family
97 // extract prefix from key and compare against each registered merge op;
98 // even though merge operator for explicit CF is included in merge_ops,
99 // it won't be picked up, since it won't match.
100 for (auto& p : store.merge_ops) {
101 if (p.first.compare(0, p.first.length(),
102 key.data(), p.first.length()) == 0 &&
103 key.data()[p.first.length()] == 0) {
104 if (existing_value) {
105 p.second->merge(existing_value->data(), existing_value->size(),
106 value.data(), value.size(),
107 new_value);
108 } else {
109 p.second->merge_nonexistent(value.data(), value.size(), new_value);
110 }
111 break;
112 }
113 }
114 return true; // OK :)
115 }
116 };
117
118 //
119 // One of these per non-default column family, linked directly to the
120 // merge operator for that CF/prefix (if any).
121 //
122 class RocksDBStore::MergeOperatorLinker
123 : public rocksdb::AssociativeMergeOperator
124 {
125 private:
126 std::shared_ptr<KeyValueDB::MergeOperator> mop;
127 public:
128 explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
129
130 const char *Name() const override {
131 return mop->name();
132 }
133
134 bool Merge(const rocksdb::Slice& key,
135 const rocksdb::Slice* existing_value,
136 const rocksdb::Slice& value,
137 std::string* new_value,
138 rocksdb::Logger* logger) const override {
139 if (existing_value) {
140 mop->merge(existing_value->data(), existing_value->size(),
141 value.data(), value.size(),
142 new_value);
143 } else {
144 mop->merge_nonexistent(value.data(), value.size(), new_value);
145 }
146 return true;
147 }
148 };
149
150 int RocksDBStore::set_merge_operator(
151 const string& prefix,
152 std::shared_ptr<KeyValueDB::MergeOperator> mop)
153 {
154 // If you fail here, it's because you can't do this on an open database
155 ceph_assert(db == nullptr);
156 merge_ops.push_back(std::make_pair(prefix,mop));
157 return 0;
158 }
159
160 class CephRocksdbLogger : public rocksdb::Logger {
161 CephContext *cct;
162 public:
163 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
164 cct->get();
165 }
166 ~CephRocksdbLogger() override {
167 cct->put();
168 }
169
170 // Write an entry to the log file with the specified format.
171 void Logv(const char* format, va_list ap) override {
172 Logv(rocksdb::INFO_LEVEL, format, ap);
173 }
174
175 // Write an entry to the log file with the specified log level
176 // and format. Any log with level under the internal log level
177 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
178 // printed.
179 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
180 va_list ap) override {
181 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
182 dout(ceph::dout::need_dynamic(v));
183 char buf[65536];
184 vsnprintf(buf, sizeof(buf), format, ap);
185 *_dout << buf << dendl;
186 }
187 };
188
189 rocksdb::Logger *create_rocksdb_ceph_logger()
190 {
191 return new CephRocksdbLogger(g_ceph_context);
192 }
193
194 static int string2bool(const string &val, bool &b_val)
195 {
196 if (strcasecmp(val.c_str(), "false") == 0) {
197 b_val = false;
198 return 0;
199 } else if (strcasecmp(val.c_str(), "true") == 0) {
200 b_val = true;
201 return 0;
202 } else {
203 std::string err;
204 int b = strict_strtol(val.c_str(), 10, &err);
205 if (!err.empty())
206 return -EINVAL;
207 b_val = !!b;
208 return 0;
209 }
210 }
211
212 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
213 {
214 if (key == "compaction_threads") {
215 std::string err;
216 int f = strict_iecstrtoll(val.c_str(), &err);
217 if (!err.empty())
218 return -EINVAL;
219 //Low priority threadpool is used for compaction
220 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
221 } else if (key == "flusher_threads") {
222 std::string err;
223 int f = strict_iecstrtoll(val.c_str(), &err);
224 if (!err.empty())
225 return -EINVAL;
226 //High priority threadpool is used for flusher
227 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
228 } else if (key == "compact_on_mount") {
229 int ret = string2bool(val, compact_on_mount);
230 if (ret != 0)
231 return ret;
232 } else if (key == "disableWAL") {
233 int ret = string2bool(val, disableWAL);
234 if (ret != 0)
235 return ret;
236 } else {
237 //unrecognize config options.
238 return -EINVAL;
239 }
240 return 0;
241 }
242
243 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
244 {
245 map<string, string> str_map;
246 int r = get_str_map(opt_str, &str_map, ",\n;");
247 if (r < 0)
248 return r;
249 map<string, string>::iterator it;
250 for(it = str_map.begin(); it != str_map.end(); ++it) {
251 string this_opt = it->first + "=" + it->second;
252 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
253 if (!status.ok()) {
254 //unrecognized by rocksdb, try to interpret by ourselves.
255 r = tryInterpret(it->first, it->second, opt);
256 if (r < 0) {
257 derr << status.ToString() << dendl;
258 return -EINVAL;
259 }
260 }
261 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
262 << " = " << it->second << dendl;
263 }
264 return 0;
265 }
266
267 int RocksDBStore::init(string _options_str)
268 {
269 options_str = _options_str;
270 rocksdb::Options opt;
271 //try parse options
272 if (options_str.length()) {
273 int r = ParseOptionsFromString(options_str, opt);
274 if (r != 0) {
275 return -EINVAL;
276 }
277 }
278 return 0;
279 }
280
281 int RocksDBStore::create_db_dir()
282 {
283 if (env) {
284 unique_ptr<rocksdb::Directory> dir;
285 env->NewDirectory(path, &dir);
286 } else {
287 int r = ::mkdir(path.c_str(), 0755);
288 if (r < 0)
289 r = -errno;
290 if (r < 0 && r != -EEXIST) {
291 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
292 << dendl;
293 return r;
294 }
295 }
296 return 0;
297 }
298
299 int RocksDBStore::install_cf_mergeop(
300 const string &cf_name,
301 rocksdb::ColumnFamilyOptions *cf_opt)
302 {
303 ceph_assert(cf_opt != nullptr);
304 cf_opt->merge_operator.reset();
305 for (auto& i : merge_ops) {
306 if (i.first == cf_name) {
307 cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
308 }
309 }
310 return 0;
311 }
312
313 int RocksDBStore::create_and_open(ostream &out,
314 const vector<ColumnFamily>& cfs)
315 {
316 int r = create_db_dir();
317 if (r < 0)
318 return r;
319 if (cfs.empty()) {
320 return do_open(out, true, false, nullptr);
321 } else {
322 return do_open(out, true, false, &cfs);
323 }
324 }
325
326 int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
327 {
328 rocksdb::Status status;
329
330 if (options_str.length()) {
331 int r = ParseOptionsFromString(options_str, opt);
332 if (r != 0) {
333 return -EINVAL;
334 }
335 }
336
337 if (g_conf()->rocksdb_perf) {
338 dbstats = rocksdb::CreateDBStatistics();
339 opt.statistics = dbstats;
340 }
341
342 opt.create_if_missing = create_if_missing;
343 if (kv_options.count("separate_wal_dir")) {
344 opt.wal_dir = path + ".wal";
345 }
346
347 // Since ceph::for_each_substr doesn't return a value and
348 // std::stoull does throw, we may as well just catch everything here.
349 try {
350 if (kv_options.count("db_paths")) {
351 list<string> paths;
352 get_str_list(kv_options["db_paths"], "; \t", paths);
353 for (auto& p : paths) {
354 size_t pos = p.find(',');
355 if (pos == std::string::npos) {
356 derr << __func__ << " invalid db path item " << p << " in "
357 << kv_options["db_paths"] << dendl;
358 return -EINVAL;
359 }
360 string path = p.substr(0, pos);
361 string size_str = p.substr(pos + 1);
362 uint64_t size = atoll(size_str.c_str());
363 if (!size) {
364 derr << __func__ << " invalid db path item " << p << " in "
365 << kv_options["db_paths"] << dendl;
366 return -EINVAL;
367 }
368 opt.db_paths.push_back(rocksdb::DbPath(path, size));
369 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
370 }
371 }
372 } catch (const std::system_error& e) {
373 return -e.code().value();
374 }
375
376 if (g_conf()->rocksdb_log_to_ceph_log) {
377 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
378 }
379
380 if (priv) {
381 dout(10) << __func__ << " using custom Env " << priv << dendl;
382 opt.env = static_cast<rocksdb::Env*>(priv);
383 }
384
385 opt.env->SetAllowNonOwnerAccess(false);
386
387 // caches
388 if (!set_cache_flag) {
389 cache_size = g_conf()->rocksdb_cache_size;
390 }
391 uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio;
392 uint64_t block_cache_size = cache_size - row_cache_size;
393
394 if (g_conf()->rocksdb_cache_type == "binned_lru") {
395 bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
396 cct,
397 block_cache_size,
398 g_conf()->rocksdb_cache_shard_bits);
399 } else if (g_conf()->rocksdb_cache_type == "lru") {
400 bbt_opts.block_cache = rocksdb::NewLRUCache(
401 block_cache_size,
402 g_conf()->rocksdb_cache_shard_bits);
403 } else if (g_conf()->rocksdb_cache_type == "clock") {
404 bbt_opts.block_cache = rocksdb::NewClockCache(
405 block_cache_size,
406 g_conf()->rocksdb_cache_shard_bits);
407 if (!bbt_opts.block_cache) {
408 derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
409 << "' chosen, but RocksDB not compiled with LibTBB. "
410 << dendl;
411 return -EINVAL;
412 }
413 } else {
414 derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
415 << "'" << dendl;
416 return -EINVAL;
417 }
418 bbt_opts.block_size = g_conf()->rocksdb_block_size;
419
420 if (row_cache_size > 0)
421 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
422 g_conf()->rocksdb_cache_shard_bits);
423 uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key");
424 if (bloom_bits > 0) {
425 dout(10) << __func__ << " set bloom filter bits per key to "
426 << bloom_bits << dendl;
427 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
428 }
429 using std::placeholders::_1;
430 if (g_conf().with_val<std::string>("rocksdb_index_type",
431 std::bind(std::equal_to<std::string>(), _1,
432 "binary_search")))
433 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
434 if (g_conf().with_val<std::string>("rocksdb_index_type",
435 std::bind(std::equal_to<std::string>(), _1,
436 "hash_search")))
437 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
438 if (g_conf().with_val<std::string>("rocksdb_index_type",
439 std::bind(std::equal_to<std::string>(), _1,
440 "two_level")))
441 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
442 if (!bbt_opts.no_block_cache) {
443 bbt_opts.cache_index_and_filter_blocks =
444 g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks");
445 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
446 g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
447 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
448 g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
449 }
450 bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters");
451 if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
452 bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size");
453
454 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
455 dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size
456 << ", block_cache size " << byte_u_t(block_cache_size)
457 << ", row_cache size " << byte_u_t(row_cache_size)
458 << "; shards "
459 << (1 << g_conf()->rocksdb_cache_shard_bits)
460 << ", type " << g_conf()->rocksdb_cache_type
461 << dendl;
462
463 opt.merge_operator.reset(new MergeOperatorRouter(*this));
464
465 return 0;
466 }
467
468 int RocksDBStore::do_open(ostream &out,
469 bool create_if_missing,
470 bool open_readonly,
471 const vector<ColumnFamily>* cfs)
472 {
473 ceph_assert(!(create_if_missing && open_readonly));
474 rocksdb::Options opt;
475 int r = load_rocksdb_options(create_if_missing, opt);
476 if (r) {
477 dout(1) << __func__ << " load rocksdb options failed" << dendl;
478 return r;
479 }
480 rocksdb::Status status;
481 if (create_if_missing) {
482 status = rocksdb::DB::Open(opt, path, &db);
483 if (!status.ok()) {
484 derr << status.ToString() << dendl;
485 return -EINVAL;
486 }
487 // create and open column families
488 if (cfs) {
489 for (auto& p : *cfs) {
490 // copy default CF settings, block cache, merge operators as
491 // the base for new CF
492 rocksdb::ColumnFamilyOptions cf_opt(opt);
493 // user input options will override the base options
494 status = rocksdb::GetColumnFamilyOptionsFromString(
495 cf_opt, p.option, &cf_opt);
496 if (!status.ok()) {
497 derr << __func__ << " invalid db column family option string for CF: "
498 << p.name << dendl;
499 return -EINVAL;
500 }
501 install_cf_mergeop(p.name, &cf_opt);
502 rocksdb::ColumnFamilyHandle *cf;
503 status = db->CreateColumnFamily(cf_opt, p.name, &cf);
504 if (!status.ok()) {
505 derr << __func__ << " Failed to create rocksdb column family: "
506 << p.name << dendl;
507 return -EINVAL;
508 }
509 // store the new CF handle
510 add_column_family(p.name, static_cast<void*>(cf));
511 }
512 }
513 default_cf = db->DefaultColumnFamily();
514 } else {
515 std::vector<string> existing_cfs;
516 status = rocksdb::DB::ListColumnFamilies(
517 rocksdb::DBOptions(opt),
518 path,
519 &existing_cfs);
520 dout(1) << __func__ << " column families: " << existing_cfs << dendl;
521 if (existing_cfs.empty()) {
522 // no column families
523 if (open_readonly) {
524 status = rocksdb::DB::Open(opt, path, &db);
525 } else {
526 status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
527 }
528 if (!status.ok()) {
529 derr << status.ToString() << dendl;
530 return -EINVAL;
531 }
532 default_cf = db->DefaultColumnFamily();
533 } else {
534 // we cannot change column families for a created database. so, map
535 // what options we are given to whatever cf's already exist.
536 std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
537 for (auto& n : existing_cfs) {
538 // copy default CF settings, block cache, merge operators as
539 // the base for new CF
540 rocksdb::ColumnFamilyOptions cf_opt(opt);
541 bool found = false;
542 if (cfs) {
543 for (auto& i : *cfs) {
544 if (i.name == n) {
545 found = true;
546 status = rocksdb::GetColumnFamilyOptionsFromString(
547 cf_opt, i.option, &cf_opt);
548 if (!status.ok()) {
549 derr << __func__ << " invalid db column family options for CF '"
550 << i.name << "': " << i.option << dendl;
551 return -EINVAL;
552 }
553 }
554 }
555 }
556 if (n != rocksdb::kDefaultColumnFamilyName) {
557 install_cf_mergeop(n, &cf_opt);
558 }
559 column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
560 if (!found && n != rocksdb::kDefaultColumnFamilyName) {
561 dout(1) << __func__ << " column family '" << n
562 << "' exists but not expected" << dendl;
563 }
564 }
565 std::vector<rocksdb::ColumnFamilyHandle*> handles;
566 if (open_readonly) {
567 status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
568 path, column_families,
569 &handles, &db);
570 } else {
571 status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
572 path, column_families, &handles, &db);
573 }
574 if (!status.ok()) {
575 derr << status.ToString() << dendl;
576 return -EINVAL;
577 }
578 for (unsigned i = 0; i < existing_cfs.size(); ++i) {
579 if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
580 default_cf = handles[i];
581 must_close_default_cf = true;
582 } else {
583 add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
584 }
585 }
586 }
587 }
588 ceph_assert(default_cf != nullptr);
589
590 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
591 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
592 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
593 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
594 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
595 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
596 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
597 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
598 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
599 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
600 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
601 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
602 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
603 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
604 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
605 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
606 logger = plb.create_perf_counters();
607 cct->get_perfcounters_collection()->add(logger);
608
609 if (compact_on_mount) {
610 derr << "Compacting rocksdb store..." << dendl;
611 compact();
612 derr << "Finished compacting rocksdb store" << dendl;
613 }
614 return 0;
615 }
616
617 int RocksDBStore::_test_init(const string& dir)
618 {
619 rocksdb::Options options;
620 options.create_if_missing = true;
621 rocksdb::DB *db;
622 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
623 delete db;
624 db = nullptr;
625 return status.ok() ? 0 : -EIO;
626 }
627
628 RocksDBStore::~RocksDBStore()
629 {
630 close();
631 delete logger;
632
633 // Ensure db is destroyed before dependent db_cache and filterpolicy
634 for (auto& p : cf_handles) {
635 db->DestroyColumnFamilyHandle(
636 static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
637 p.second = nullptr;
638 }
639 if (must_close_default_cf) {
640 db->DestroyColumnFamilyHandle(default_cf);
641 must_close_default_cf = false;
642 }
643 default_cf = nullptr;
644 delete db;
645 db = nullptr;
646
647 if (priv) {
648 delete static_cast<rocksdb::Env*>(priv);
649 }
650 }
651
652 void RocksDBStore::close()
653 {
654 // stop compaction thread
655 compact_queue_lock.lock();
656 if (compact_thread.is_started()) {
657 compact_queue_stop = true;
658 compact_queue_cond.notify_all();
659 compact_queue_lock.unlock();
660 compact_thread.join();
661 } else {
662 compact_queue_lock.unlock();
663 }
664
665 if (logger)
666 cct->get_perfcounters_collection()->remove(logger);
667 }
668
669 int RocksDBStore::repair(std::ostream &out)
670 {
671 rocksdb::Options opt;
672 int r = load_rocksdb_options(false, opt);
673 if (r) {
674 dout(1) << __func__ << " load rocksdb options failed" << dendl;
675 out << "load rocksdb options failed" << std::endl;
676 return r;
677 }
678 rocksdb::Status status = rocksdb::RepairDB(path, opt);
679 if (status.ok()) {
680 return 0;
681 } else {
682 out << "repair rocksdb failed : " << status.ToString() << std::endl;
683 return 1;
684 }
685 }
686
687 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
688 std::stringstream ss;
689 ss.str(s);
690 std::string item;
691 while (std::getline(ss, item, delim)) {
692 elems.push_back(item);
693 }
694 }
695
696 bool RocksDBStore::get_property(
697 const std::string &property,
698 uint64_t *out)
699 {
700 return db->GetIntProperty(property, out);
701 }
702
703 int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
704 const string& key_prefix)
705 {
706 auto cf = get_cf_handle(prefix);
707 uint64_t size = 0;
708 uint8_t flags =
709 //rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables...
710 rocksdb::DB::INCLUDE_FILES;
711 if (cf) {
712 string start = key_prefix + string(1, '\x00');
713 string limit = key_prefix + string("\xff\xff\xff\xff");
714 rocksdb::Range r(start, limit);
715 db->GetApproximateSizes(cf, &r, 1, &size, flags);
716 } else {
717 string start = combine_strings(prefix , key_prefix);
718 string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
719 rocksdb::Range r(start, limit);
720 db->GetApproximateSizes(default_cf, &r, 1, &size, flags);
721 }
722 return size;
723 }
724
725 void RocksDBStore::get_statistics(Formatter *f)
726 {
727 if (!g_conf()->rocksdb_perf) {
728 dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
729 << dendl;
730 return;
731 }
732
733 if (g_conf()->rocksdb_collect_compaction_stats) {
734 std::string stat_str;
735 bool status = db->GetProperty("rocksdb.stats", &stat_str);
736 if (status) {
737 f->open_object_section("rocksdb_statistics");
738 f->dump_string("rocksdb_compaction_statistics", "");
739 vector<string> stats;
740 split_stats(stat_str, '\n', stats);
741 for (auto st :stats) {
742 f->dump_string("", st);
743 }
744 f->close_section();
745 }
746 }
747 if (g_conf()->rocksdb_collect_extended_stats) {
748 if (dbstats) {
749 f->open_object_section("rocksdb_extended_statistics");
750 string stat_str = dbstats->ToString();
751 vector<string> stats;
752 split_stats(stat_str, '\n', stats);
753 f->dump_string("rocksdb_extended_statistics", "");
754 for (auto st :stats) {
755 f->dump_string(".", st);
756 }
757 f->close_section();
758 }
759 f->open_object_section("rocksdbstore_perf_counters");
760 logger->dump_formatted(f,0);
761 f->close_section();
762 }
763 if (g_conf()->rocksdb_collect_memory_stats) {
764 f->open_object_section("rocksdb_memtable_statistics");
765 std::string str;
766 if (!bbt_opts.no_block_cache) {
767 str.append(stringify(bbt_opts.block_cache->GetUsage()));
768 f->dump_string("block_cache_usage", str.data());
769 str.clear();
770 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
771 f->dump_string("block_cache_pinned_blocks_usage", str);
772 str.clear();
773 }
774 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
775 f->dump_string("rocksdb_memtable_usage", str);
776 str.clear();
777 db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
778 f->dump_string("rocksdb_index_filter_blocks_usage", str);
779 f->close_section();
780 }
781 }
782
783 int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t)
784 {
785 // enable rocksdb breakdown
786 // considering performance overhead, default is disabled
787 if (g_conf()->rocksdb_perf) {
788 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
789 rocksdb::get_perf_context()->Reset();
790 }
791
792 RocksDBTransactionImpl * _t =
793 static_cast<RocksDBTransactionImpl *>(t.get());
794 woptions.disableWAL = disableWAL;
795 lgeneric_subdout(cct, rocksdb, 30) << __func__;
796 RocksWBHandler bat_txc;
797 _t->bat.Iterate(&bat_txc);
798 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
799
800 rocksdb::Status s = db->Write(woptions, &_t->bat);
801 if (!s.ok()) {
802 RocksWBHandler rocks_txc;
803 _t->bat.Iterate(&rocks_txc);
804 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
805 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
806 }
807
808 if (g_conf()->rocksdb_perf) {
809 utime_t write_memtable_time;
810 utime_t write_delay_time;
811 utime_t write_wal_time;
812 utime_t write_pre_and_post_process_time;
813 write_wal_time.set_from_double(
814 static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
815 write_memtable_time.set_from_double(
816 static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
817 write_delay_time.set_from_double(
818 static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
819 write_pre_and_post_process_time.set_from_double(
820 static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
821 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
822 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
823 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
824 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
825 }
826
827 return s.ok() ? 0 : -1;
828 }
829
830 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
831 {
832 utime_t start = ceph_clock_now();
833 rocksdb::WriteOptions woptions;
834 woptions.sync = false;
835
836 int result = submit_common(woptions, t);
837
838 utime_t lat = ceph_clock_now() - start;
839 logger->inc(l_rocksdb_txns);
840 logger->tinc(l_rocksdb_submit_latency, lat);
841
842 return result;
843 }
844
845 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
846 {
847 utime_t start = ceph_clock_now();
848 rocksdb::WriteOptions woptions;
849 // if disableWAL, sync can't set
850 woptions.sync = !disableWAL;
851
852 int result = submit_common(woptions, t);
853
854 utime_t lat = ceph_clock_now() - start;
855 logger->inc(l_rocksdb_txns_sync);
856 logger->tinc(l_rocksdb_submit_sync_latency, lat);
857
858 return result;
859 }
860
861 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
862 {
863 db = _db;
864 }
865
866 void RocksDBStore::RocksDBTransactionImpl::put_bat(
867 rocksdb::WriteBatch& bat,
868 rocksdb::ColumnFamilyHandle *cf,
869 const string &key,
870 const bufferlist &to_set_bl)
871 {
872 // bufferlist::c_str() is non-constant, so we can't call c_str()
873 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
874 bat.Put(cf,
875 rocksdb::Slice(key),
876 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
877 to_set_bl.length()));
878 } else {
879 rocksdb::Slice key_slice(key);
880 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
881 bat.Put(cf,
882 rocksdb::SliceParts(&key_slice, 1),
883 prepare_sliceparts(to_set_bl, &value_slices));
884 }
885 }
886
887 void RocksDBStore::RocksDBTransactionImpl::set(
888 const string &prefix,
889 const string &k,
890 const bufferlist &to_set_bl)
891 {
892 auto cf = db->get_cf_handle(prefix);
893 if (cf) {
894 put_bat(bat, cf, k, to_set_bl);
895 } else {
896 string key = combine_strings(prefix, k);
897 put_bat(bat, db->default_cf, key, to_set_bl);
898 }
899 }
900
901 void RocksDBStore::RocksDBTransactionImpl::set(
902 const string &prefix,
903 const char *k, size_t keylen,
904 const bufferlist &to_set_bl)
905 {
906 auto cf = db->get_cf_handle(prefix);
907 if (cf) {
908 string key(k, keylen); // fixme?
909 put_bat(bat, cf, key, to_set_bl);
910 } else {
911 string key;
912 combine_strings(prefix, k, keylen, &key);
913 put_bat(bat, cf, key, to_set_bl);
914 }
915 }
916
917 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
918 const string &k)
919 {
920 auto cf = db->get_cf_handle(prefix);
921 if (cf) {
922 bat.Delete(cf, rocksdb::Slice(k));
923 } else {
924 bat.Delete(db->default_cf, combine_strings(prefix, k));
925 }
926 }
927
928 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
929 const char *k,
930 size_t keylen)
931 {
932 auto cf = db->get_cf_handle(prefix);
933 if (cf) {
934 bat.Delete(cf, rocksdb::Slice(k, keylen));
935 } else {
936 string key;
937 combine_strings(prefix, k, keylen, &key);
938 bat.Delete(db->default_cf, rocksdb::Slice(key));
939 }
940 }
941
942 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
943 const string &k)
944 {
945 auto cf = db->get_cf_handle(prefix);
946 if (cf) {
947 bat.SingleDelete(cf, k);
948 } else {
949 bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
950 }
951 }
952
953 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
954 {
955 auto cf = db->get_cf_handle(prefix);
956 if (cf) {
957 string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
958 bat.DeleteRange(cf, string(), endprefix);
959 } else {
960 string endprefix = prefix;
961 endprefix.push_back('\x01');
962 bat.DeleteRange(db->default_cf,
963 combine_strings(prefix, string()),
964 combine_strings(endprefix, string()));
965 }
966 }
967
968 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
969 const string &start,
970 const string &end)
971 {
972 auto cf = db->get_cf_handle(prefix);
973 if (cf) {
974 bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
975 } else {
976 bat.DeleteRange(
977 db->default_cf,
978 rocksdb::Slice(combine_strings(prefix, start)),
979 rocksdb::Slice(combine_strings(prefix, end)));
980 }
981 }
982
983 void RocksDBStore::RocksDBTransactionImpl::merge(
984 const string &prefix,
985 const string &k,
986 const bufferlist &to_set_bl)
987 {
988 auto cf = db->get_cf_handle(prefix);
989 if (cf) {
990 // bufferlist::c_str() is non-constant, so we can't call c_str()
991 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
992 bat.Merge(
993 cf,
994 rocksdb::Slice(k),
995 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
996 } else {
997 // make a copy
998 rocksdb::Slice key_slice(k);
999 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1000 bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1001 prepare_sliceparts(to_set_bl, &value_slices));
1002 }
1003 } else {
1004 string key = combine_strings(prefix, k);
1005 // bufferlist::c_str() is non-constant, so we can't call c_str()
1006 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1007 bat.Merge(
1008 db->default_cf,
1009 rocksdb::Slice(key),
1010 rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1011 } else {
1012 // make a copy
1013 rocksdb::Slice key_slice(key);
1014 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1015 bat.Merge(
1016 db->default_cf,
1017 rocksdb::SliceParts(&key_slice, 1),
1018 prepare_sliceparts(to_set_bl, &value_slices));
1019 }
1020 }
1021 }
1022
1023 int RocksDBStore::get(
1024 const string &prefix,
1025 const std::set<string> &keys,
1026 std::map<string, bufferlist> *out)
1027 {
1028 utime_t start = ceph_clock_now();
1029 auto cf = get_cf_handle(prefix);
1030 if (cf) {
1031 for (auto& key : keys) {
1032 std::string value;
1033 auto status = db->Get(rocksdb::ReadOptions(),
1034 cf,
1035 rocksdb::Slice(key),
1036 &value);
1037 if (status.ok()) {
1038 (*out)[key].append(value);
1039 } else if (status.IsIOError()) {
1040 ceph_abort_msg(status.getState());
1041 }
1042 }
1043 } else {
1044 for (auto& key : keys) {
1045 std::string value;
1046 string k = combine_strings(prefix, key);
1047 auto status = db->Get(rocksdb::ReadOptions(),
1048 default_cf,
1049 rocksdb::Slice(k),
1050 &value);
1051 if (status.ok()) {
1052 (*out)[key].append(value);
1053 } else if (status.IsIOError()) {
1054 ceph_abort_msg(status.getState());
1055 }
1056 }
1057 }
1058 utime_t lat = ceph_clock_now() - start;
1059 logger->inc(l_rocksdb_gets);
1060 logger->tinc(l_rocksdb_get_latency, lat);
1061 return 0;
1062 }
1063
1064 int RocksDBStore::get(
1065 const string &prefix,
1066 const string &key,
1067 bufferlist *out)
1068 {
1069 ceph_assert(out && (out->length() == 0));
1070 utime_t start = ceph_clock_now();
1071 int r = 0;
1072 string value;
1073 rocksdb::Status s;
1074 auto cf = get_cf_handle(prefix);
1075 if (cf) {
1076 s = db->Get(rocksdb::ReadOptions(),
1077 cf,
1078 rocksdb::Slice(key),
1079 &value);
1080 } else {
1081 string k = combine_strings(prefix, key);
1082 s = db->Get(rocksdb::ReadOptions(),
1083 default_cf,
1084 rocksdb::Slice(k),
1085 &value);
1086 }
1087 if (s.ok()) {
1088 out->append(value);
1089 } else if (s.IsNotFound()) {
1090 r = -ENOENT;
1091 } else {
1092 ceph_abort_msg(s.getState());
1093 }
1094 utime_t lat = ceph_clock_now() - start;
1095 logger->inc(l_rocksdb_gets);
1096 logger->tinc(l_rocksdb_get_latency, lat);
1097 return r;
1098 }
1099
1100 int RocksDBStore::get(
1101 const string& prefix,
1102 const char *key,
1103 size_t keylen,
1104 bufferlist *out)
1105 {
1106 ceph_assert(out && (out->length() == 0));
1107 utime_t start = ceph_clock_now();
1108 int r = 0;
1109 string value;
1110 rocksdb::Status s;
1111 auto cf = get_cf_handle(prefix);
1112 if (cf) {
1113 s = db->Get(rocksdb::ReadOptions(),
1114 cf,
1115 rocksdb::Slice(key, keylen),
1116 &value);
1117 } else {
1118 string k;
1119 combine_strings(prefix, key, keylen, &k);
1120 s = db->Get(rocksdb::ReadOptions(),
1121 default_cf,
1122 rocksdb::Slice(k),
1123 &value);
1124 }
1125 if (s.ok()) {
1126 out->append(value);
1127 } else if (s.IsNotFound()) {
1128 r = -ENOENT;
1129 } else {
1130 ceph_abort_msg(s.getState());
1131 }
1132 utime_t lat = ceph_clock_now() - start;
1133 logger->inc(l_rocksdb_gets);
1134 logger->tinc(l_rocksdb_get_latency, lat);
1135 return r;
1136 }
1137
1138 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1139 {
1140 size_t prefix_len = 0;
1141
1142 // Find separator inside Slice
1143 char* separator = (char*) memchr(in.data(), 0, in.size());
1144 if (separator == NULL)
1145 return -EINVAL;
1146 prefix_len = size_t(separator - in.data());
1147 if (prefix_len >= in.size())
1148 return -EINVAL;
1149
1150 // Fetch prefix and/or key directly from Slice
1151 if (prefix)
1152 *prefix = string(in.data(), prefix_len);
1153 if (key)
1154 *key = string(separator+1, in.size()-prefix_len-1);
1155 return 0;
1156 }
1157
1158 void RocksDBStore::compact()
1159 {
1160 logger->inc(l_rocksdb_compact);
1161 rocksdb::CompactRangeOptions options;
1162 db->CompactRange(options, default_cf, nullptr, nullptr);
1163 for (auto cf : cf_handles) {
1164 db->CompactRange(
1165 options,
1166 static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
1167 nullptr, nullptr);
1168 }
1169 }
1170
1171
1172 void RocksDBStore::compact_thread_entry()
1173 {
1174 std::unique_lock l{compact_queue_lock};
1175 while (!compact_queue_stop) {
1176 while (!compact_queue.empty()) {
1177 pair<string,string> range = compact_queue.front();
1178 compact_queue.pop_front();
1179 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1180 l.unlock();
1181 logger->inc(l_rocksdb_compact_range);
1182 if (range.first.empty() && range.second.empty()) {
1183 compact();
1184 } else {
1185 compact_range(range.first, range.second);
1186 }
1187 l.lock();
1188 continue;
1189 }
1190 compact_queue_cond.wait(l);
1191 }
1192 }
1193
1194 void RocksDBStore::compact_range_async(const string& start, const string& end)
1195 {
1196 std::lock_guard l(compact_queue_lock);
1197
1198 // try to merge adjacent ranges. this is O(n), but the queue should
1199 // be short. note that we do not cover all overlap cases and merge
1200 // opportunities here, but we capture the ones we currently need.
1201 list< pair<string,string> >::iterator p = compact_queue.begin();
(1) Event cond_true: |
Condition "p != this->compact_queue.end()", taking true branch. |
1202 while (p != compact_queue.end()) {
(2) Event cond_true: |
Condition "p->first == start", taking true branch. |
(3) Event cond_false: |
Condition "p->second == end", taking false branch. |
1203 if (p->first == start && p->second == end) {
1204 // dup; no-op
1205 return;
(4) Event if_end: |
End of if statement. |
1206 }
(5) Event cond_true: |
Condition "start <= p->first", taking true branch. |
(6) Event cond_true: |
Condition "p->first <= end", taking true branch. |
1207 if (start <= p->first && p->first <= end) {
1208 // new region crosses start of existing range
1209 // select right bound that is bigger
1210 compact_queue.push_back(make_pair(start, end > p->second ? end : p->second));
(7) Event erase_iterator: |
"erase" invalidates iterator "p". |
Also see events: |
[use_iterator] |
1211 compact_queue.erase(p);
1212 logger->inc(l_rocksdb_compact_queue_merge);
(8) Event break: |
Breaking from loop. |
1213 break;
1214 }
1215 if (start <= p->second && p->second <= end) {
1216 // new region crosses end of existing range
1217 //p->first < p->second and p->second <= end, so p->first <= end.
1218 //But we break if previous condition, so start > p->first.
1219 compact_queue.push_back(make_pair(p->first, end));
1220 compact_queue.erase(p);
1221 logger->inc(l_rocksdb_compact_queue_merge);
1222 break;
1223 }
1224 ++p;
(9) Event loop_end: |
Reached end of loop. |
1225 }
(10) Event use_iterator: |
Using invalid iterator "p". |
Also see events: |
[erase_iterator] |
1226 if (p == compact_queue.end()) {
1227 // no merge, new entry.
1228 compact_queue.push_back(make_pair(start, end));
1229 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1230 }
1231 compact_queue_cond.notify_all();
1232 if (!compact_thread.is_started()) {
1233 compact_thread.create("rstore_compact");
1234 }
1235 }
1236 bool RocksDBStore::check_omap_dir(string &omap_dir)
1237 {
1238 rocksdb::Options options;
1239 options.create_if_missing = true;
1240 rocksdb::DB *db;
1241 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
1242 delete db;
1243 db = nullptr;
1244 return status.ok();
1245 }
1246 void RocksDBStore::compact_range(const string& start, const string& end)
1247 {
1248 rocksdb::CompactRangeOptions options;
1249 rocksdb::Slice cstart(start);
1250 rocksdb::Slice cend(end);
1251 db->CompactRange(options, &cstart, &cend);
1252 }
1253
1254 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
1255 {
1256 delete dbiter;
1257 }
1258 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
1259 {
1260 dbiter->SeekToFirst();
1261 ceph_assert(!dbiter->status().IsIOError());
1262 return dbiter->status().ok() ? 0 : -1;
1263 }
1264 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
1265 {
1266 rocksdb::Slice slice_prefix(prefix);
1267 dbiter->Seek(slice_prefix);
1268 ceph_assert(!dbiter->status().IsIOError());
1269 return dbiter->status().ok() ? 0 : -1;
1270 }
1271 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1272 {
1273 dbiter->SeekToLast();
1274 ceph_assert(!dbiter->status().IsIOError());
1275 return dbiter->status().ok() ? 0 : -1;
1276 }
1277 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
1278 {
1279 string limit = past_prefix(prefix);
1280 rocksdb::Slice slice_limit(limit);
1281 dbiter->Seek(slice_limit);
1282
1283 if (!dbiter->Valid()) {
1284 dbiter->SeekToLast();
1285 } else {
1286 dbiter->Prev();
1287 }
1288 return dbiter->status().ok() ? 0 : -1;
1289 }
1290 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
1291 {
1292 lower_bound(prefix, after);
1293 if (valid()) {
1294 pair<string,string> key = raw_key();
1295 if (key.first == prefix && key.second == after)
1296 next();
1297 }
1298 return dbiter->status().ok() ? 0 : -1;
1299 }
1300 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
1301 {
1302 string bound = combine_strings(prefix, to);
1303 rocksdb::Slice slice_bound(bound);
1304 dbiter->Seek(slice_bound);
1305 return dbiter->status().ok() ? 0 : -1;
1306 }
1307 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1308 {
1309 return dbiter->Valid();
1310 }
1311 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1312 {
1313 if (valid()) {
1314 dbiter->Next();
1315 }
1316 ceph_assert(!dbiter->status().IsIOError());
1317 return dbiter->status().ok() ? 0 : -1;
1318 }
1319 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1320 {
1321 if (valid()) {
1322 dbiter->Prev();
1323 }
1324 ceph_assert(!dbiter->status().IsIOError());
1325 return dbiter->status().ok() ? 0 : -1;
1326 }
1327 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1328 {
1329 string out_key;
1330 split_key(dbiter->key(), 0, &out_key);
1331 return out_key;
1332 }
1333 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1334 {
1335 string prefix, key;
1336 split_key(dbiter->key(), &prefix, &key);
1337 return make_pair(prefix, key);
1338 }
1339
1340 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
1341 // Look for "prefix\0" right in rocksb::Slice
1342 rocksdb::Slice key = dbiter->key();
1343 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1344 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1345 } else {
1346 return false;
1347 }
1348 }
1349
1350 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1351 {
1352 return to_bufferlist(dbiter->value());
1353 }
1354
1355 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1356 {
1357 return dbiter->key().size();
1358 }
1359
1360 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1361 {
1362 return dbiter->value().size();
1363 }
1364
1365 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1366 {
1367 rocksdb::Slice val = dbiter->value();
1368 return bufferptr(val.data(), val.size());
1369 }
1370
1371 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1372 {
1373 return dbiter->status().ok() ? 0 : -1;
1374 }
1375
1376 string RocksDBStore::past_prefix(const string &prefix)
1377 {
1378 string limit = prefix;
1379 limit.push_back(1);
1380 return limit;
1381 }
1382
1383 RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
1384 {
1385 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1386 db->NewIterator(rocksdb::ReadOptions(), default_cf));
1387 }
1388
1389 class CFIteratorImpl : public KeyValueDB::IteratorImpl {
1390 protected:
1391 string prefix;
1392 rocksdb::Iterator *dbiter;
1393 public:
1394 explicit CFIteratorImpl(const std::string& p,
1395 rocksdb::Iterator *iter)
1396 : prefix(p), dbiter(iter) { }
1397 ~CFIteratorImpl() {
1398 delete dbiter;
1399 }
1400
1401 int seek_to_first() override {
1402 dbiter->SeekToFirst();
1403 return dbiter->status().ok() ? 0 : -1;
1404 }
1405 int seek_to_last() override {
1406 dbiter->SeekToLast();
1407 return dbiter->status().ok() ? 0 : -1;
1408 }
1409 int upper_bound(const string &after) override {
1410 lower_bound(after);
1411 if (valid() && (key() == after)) {
1412 next();
1413 }
1414 return dbiter->status().ok() ? 0 : -1;
1415 }
1416 int lower_bound(const string &to) override {
1417 rocksdb::Slice slice_bound(to);
1418 dbiter->Seek(slice_bound);
1419 return dbiter->status().ok() ? 0 : -1;
1420 }
1421 int next() override {
1422 if (valid()) {
1423 dbiter->Next();
1424 }
1425 return dbiter->status().ok() ? 0 : -1;
1426 }
1427 int prev() override {
1428 if (valid()) {
1429 dbiter->Prev();
1430 }
1431 return dbiter->status().ok() ? 0 : -1;
1432 }
1433 bool valid() override {
1434 return dbiter->Valid();
1435 }
1436 string key() override {
1437 return dbiter->key().ToString();
1438 }
1439 std::pair<std::string, std::string> raw_key() override {
1440 return make_pair(prefix, key());
1441 }
1442 bufferlist value() override {
1443 return to_bufferlist(dbiter->value());
1444 }
1445 bufferptr value_as_ptr() override {
1446 rocksdb::Slice val = dbiter->value();
1447 return bufferptr(val.data(), val.size());
1448 }
1449 int status() override {
1450 return dbiter->status().ok() ? 0 : -1;
1451 }
1452 };
1453
1454 KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
1455 {
1456 rocksdb::ColumnFamilyHandle *cf_handle =
1457 static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
1458 if (cf_handle) {
1459 return std::make_shared<CFIteratorImpl>(
1460 prefix,
1461 db->NewIterator(rocksdb::ReadOptions(), cf_handle));
1462 } else {
1463 return KeyValueDB::get_iterator(prefix);
1464 }
1465 }
1466