1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include <set>
5    	#include <map>
6    	#include <string>
7    	#include <memory>
8    	#include <errno.h>
9    	#include <unistd.h>
10   	#include <sys/types.h>
11   	#include <sys/stat.h>
12   	
13   	#include "rocksdb/db.h"
14   	#include "rocksdb/table.h"
15   	#include "rocksdb/env.h"
16   	#include "rocksdb/slice.h"
17   	#include "rocksdb/cache.h"
18   	#include "rocksdb/filter_policy.h"
19   	#include "rocksdb/utilities/convenience.h"
20   	#include "rocksdb/merge_operator.h"
21   	
22   	using std::string;
23   	#include "common/perf_counters.h"
24   	#include "common/PriorityCache.h"
25   	#include "include/str_list.h"
26   	#include "include/stringify.h"
27   	#include "include/str_map.h"
28   	#include "KeyValueDB.h"
29   	#include "RocksDBStore.h"
30   	
31   	#include "common/debug.h"
32   	
33   	#define dout_context cct
34   	#define dout_subsys ceph_subsys_rocksdb
35   	#undef dout_prefix
36   	#define dout_prefix *_dout << "rocksdb: "
37   	
38   	static bufferlist to_bufferlist(rocksdb::Slice in) {
39   	  bufferlist bl;
40   	  bl.append(bufferptr(in.data(), in.size()));
41   	  return bl;
42   	}
43   	
44   	static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
45   						      vector<rocksdb::Slice> *slices)
46   	{
47   	  unsigned n = 0;
48   	  for (auto& buf : bl.buffers()) {
49   	    (*slices)[n].data_ = buf.c_str();
50   	    (*slices)[n].size_ = buf.length();
51   	    n++;
52   	  }
53   	  return rocksdb::SliceParts(slices->data(), slices->size());
54   	}
55   	
56   	
57   	//
58   	// One of these for the default rocksdb column family, routing each prefix
59   	// to the appropriate MergeOperator.
60   	//
61   	class RocksDBStore::MergeOperatorRouter
62   	  : public rocksdb::AssociativeMergeOperator
63   	{
64   	  RocksDBStore& store;
65   	public:
66   	  const char *Name() const override {
67   	    // Construct a name that rocksDB will validate against. We want to
68   	    // do this in a way that doesn't constrain the ordering of calls
69   	    // to set_merge_operator, so sort the merge operators and then
70   	    // construct a name from all of those parts.
71   	    store.assoc_name.clear();
72   	    map<std::string,std::string> names;
73   	
74   	    for (auto& p : store.merge_ops) {
75   	      names[p.first] = p.second->name();
76   	    }
77   	    for (auto& p : store.cf_handles) {
78   	      names.erase(p.first);
79   	    }
80   	    for (auto& p : names) {
81   	      store.assoc_name += '.';
82   	      store.assoc_name += p.first;
83   	      store.assoc_name += ':';
84   	      store.assoc_name += p.second;
85   	    }
86   	    return store.assoc_name.c_str();
87   	  }
88   	
89   	  explicit MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
90   	
91   	  bool Merge(const rocksdb::Slice& key,
92   		     const rocksdb::Slice* existing_value,
93   		     const rocksdb::Slice& value,
94   		     std::string* new_value,
95   		     rocksdb::Logger* logger) const override {
96   	    // for default column family
97   	    // extract prefix from key and compare against each registered merge op;
98   	    // even though merge operator for explicit CF is included in merge_ops,
99   	    // it won't be picked up, since it won't match.
100  	    for (auto& p : store.merge_ops) {
101  	      if (p.first.compare(0, p.first.length(),
102  				  key.data(), p.first.length()) == 0 &&
103  		  key.data()[p.first.length()] == 0) {
104  		if (existing_value) {
105  		  p.second->merge(existing_value->data(), existing_value->size(),
106  				  value.data(), value.size(),
107  				  new_value);
108  		} else {
109  		  p.second->merge_nonexistent(value.data(), value.size(), new_value);
110  		}
111  		break;
112  	      }
113  	    }
114  	    return true; // OK :)
115  	  }
116  	};
117  	
118  	//
119  	// One of these per non-default column family, linked directly to the
120  	// merge operator for that CF/prefix (if any).
121  	//
122  	class RocksDBStore::MergeOperatorLinker
123  	  : public rocksdb::AssociativeMergeOperator
124  	{
125  	private:
126  	  std::shared_ptr<KeyValueDB::MergeOperator> mop;
127  	public:
128  	  explicit MergeOperatorLinker(const std::shared_ptr<KeyValueDB::MergeOperator> &o) : mop(o) {}
129  	
130  	  const char *Name() const override {
131  	    return mop->name();
132  	  }
133  	
134  	  bool Merge(const rocksdb::Slice& key,
135  		     const rocksdb::Slice* existing_value,
136  		     const rocksdb::Slice& value,
137  		     std::string* new_value,
138  		     rocksdb::Logger* logger) const override {
139  	    if (existing_value) {
140  	      mop->merge(existing_value->data(), existing_value->size(),
141  			 value.data(), value.size(),
142  			 new_value);
143  	    } else {
144  	      mop->merge_nonexistent(value.data(), value.size(), new_value);
145  	    }
146  	    return true;
147  	  }
148  	};
149  	
150  	int RocksDBStore::set_merge_operator(
151  	  const string& prefix,
152  	  std::shared_ptr<KeyValueDB::MergeOperator> mop)
153  	{
154  	  // If you fail here, it's because you can't do this on an open database
155  	  ceph_assert(db == nullptr);
156  	  merge_ops.push_back(std::make_pair(prefix,mop));
157  	  return 0;
158  	}
159  	
160  	class CephRocksdbLogger : public rocksdb::Logger {
161  	  CephContext *cct;
162  	public:
163  	  explicit CephRocksdbLogger(CephContext *c) : cct(c) {
164  	    cct->get();
165  	  }
166  	  ~CephRocksdbLogger() override {
167  	    cct->put();
168  	  }
169  	
170  	  // Write an entry to the log file with the specified format.
171  	  void Logv(const char* format, va_list ap) override {
172  	    Logv(rocksdb::INFO_LEVEL, format, ap);
173  	  }
174  	
175  	  // Write an entry to the log file with the specified log level
176  	  // and format.  Any log with level under the internal log level
177  	  // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
178  	  // printed.
179  	  void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
180  		    va_list ap) override {
181  	    int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
182  	    dout(ceph::dout::need_dynamic(v));
(1) Event stack_use_local_overflow: Local variable "buf" uses 65536 bytes of stack space, which exceeds the maximum single use of 10000 bytes.
183  	    char buf[65536];
184  	    vsnprintf(buf, sizeof(buf), format, ap);
185  	    *_dout << buf << dendl;
186  	  }
187  	};
188  	
189  	rocksdb::Logger *create_rocksdb_ceph_logger()
190  	{
191  	  return new CephRocksdbLogger(g_ceph_context);
192  	}
193  	
194  	static int string2bool(const string &val, bool &b_val)
195  	{
196  	  if (strcasecmp(val.c_str(), "false") == 0) {
197  	    b_val = false;
198  	    return 0;
199  	  } else if (strcasecmp(val.c_str(), "true") == 0) {
200  	    b_val = true;
201  	    return 0;
202  	  } else {
203  	    std::string err;
204  	    int b = strict_strtol(val.c_str(), 10, &err);
205  	    if (!err.empty())
206  	      return -EINVAL;
207  	    b_val = !!b;
208  	    return 0;
209  	  }
210  	}
211  	  
212  	int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
213  	{
214  	  if (key == "compaction_threads") {
215  	    std::string err;
216  	    int f = strict_iecstrtoll(val.c_str(), &err);
217  	    if (!err.empty())
218  	      return -EINVAL;
219  	    //Low priority threadpool is used for compaction
220  	    opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
221  	  } else if (key == "flusher_threads") {
222  	    std::string err;
223  	    int f = strict_iecstrtoll(val.c_str(), &err);
224  	    if (!err.empty())
225  	      return -EINVAL;
226  	    //High priority threadpool is used for flusher
227  	    opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
228  	  } else if (key == "compact_on_mount") {
229  	    int ret = string2bool(val, compact_on_mount);
230  	    if (ret != 0)
231  	      return ret;
232  	  } else if (key == "disableWAL") {
233  	    int ret = string2bool(val, disableWAL);
234  	    if (ret != 0)
235  	      return ret;
236  	  } else {
237  	    //unrecognize config options.
238  	    return -EINVAL;
239  	  }
240  	  return 0;
241  	}
242  	
243  	int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
244  	{
245  	  map<string, string> str_map;
246  	  int r = get_str_map(opt_str, &str_map, ",\n;");
247  	  if (r < 0)
248  	    return r;
249  	  map<string, string>::iterator it;
250  	  for(it = str_map.begin(); it != str_map.end(); ++it) {
251  	    string this_opt = it->first + "=" + it->second;
252  	    rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); 
253  	    if (!status.ok()) {
254  	      //unrecognized by rocksdb, try to interpret by ourselves.
255  	      r = tryInterpret(it->first, it->second, opt);
256  	      if (r < 0) {
257  		derr << status.ToString() << dendl;
258  		return -EINVAL;
259  	      }
260  	    }
261  	    lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
262  				  << " = " << it->second << dendl;
263  	  }
264  	  return 0;
265  	}
266  	
267  	int RocksDBStore::init(string _options_str)
268  	{
269  	  options_str = _options_str;
270  	  rocksdb::Options opt;
271  	  //try parse options
272  	  if (options_str.length()) {
273  	    int r = ParseOptionsFromString(options_str, opt);
274  	    if (r != 0) {
275  	      return -EINVAL;
276  	    }
277  	  }
278  	  return 0;
279  	}
280  	
281  	int RocksDBStore::create_db_dir()
282  	{
283  	  if (env) {
284  	    unique_ptr<rocksdb::Directory> dir;
285  	    env->NewDirectory(path, &dir);
286  	  } else {
287  	    int r = ::mkdir(path.c_str(), 0755);
288  	    if (r < 0)
289  	      r = -errno;
290  	    if (r < 0 && r != -EEXIST) {
291  	      derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
292  		   << dendl;
293  	      return r;
294  	    }
295  	  }
296  	  return 0;
297  	}
298  	
299  	int RocksDBStore::install_cf_mergeop(
300  	  const string &cf_name,
301  	  rocksdb::ColumnFamilyOptions *cf_opt)
302  	{
303  	  ceph_assert(cf_opt != nullptr);
304  	  cf_opt->merge_operator.reset();
305  	  for (auto& i : merge_ops) {
306  	    if (i.first == cf_name) {
307  	      cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
308  	    }
309  	  }
310  	  return 0;
311  	}
312  	
313  	int RocksDBStore::create_and_open(ostream &out,
314  					  const vector<ColumnFamily>& cfs)
315  	{
316  	  int r = create_db_dir();
317  	  if (r < 0)
318  	    return r;
319  	  if (cfs.empty()) {
320  	    return do_open(out, true, false, nullptr);
321  	  } else {
322  	    return do_open(out, true, false, &cfs);
323  	  }
324  	}
325  	
326  	int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
327  	{
328  	  rocksdb::Status status;
329  	
330  	  if (options_str.length()) {
331  	    int r = ParseOptionsFromString(options_str, opt);
332  	    if (r != 0) {
333  	      return -EINVAL;
334  	    }
335  	  }
336  	
337  	  if (g_conf()->rocksdb_perf)  {
338  	    dbstats = rocksdb::CreateDBStatistics();
339  	    opt.statistics = dbstats;
340  	  }
341  	
342  	  opt.create_if_missing = create_if_missing;
343  	  if (kv_options.count("separate_wal_dir")) {
344  	    opt.wal_dir = path + ".wal";
345  	  }
346  	
347  	  // Since ceph::for_each_substr doesn't return a value and
348  	  // std::stoull does throw, we may as well just catch everything here.
349  	  try {
350  	    if (kv_options.count("db_paths")) {
351  	      list<string> paths;
352  	      get_str_list(kv_options["db_paths"], "; \t", paths);
353  	      for (auto& p : paths) {
354  		size_t pos = p.find(',');
355  		if (pos == std::string::npos) {
356  		  derr << __func__ << " invalid db path item " << p << " in "
357  		       << kv_options["db_paths"] << dendl;
358  		  return -EINVAL;
359  		}
360  		string path = p.substr(0, pos);
361  		string size_str = p.substr(pos + 1);
362  		uint64_t size = atoll(size_str.c_str());
363  		if (!size) {
364  		  derr << __func__ << " invalid db path item " << p << " in "
365  		       << kv_options["db_paths"] << dendl;
366  		  return -EINVAL;
367  		}
368  		opt.db_paths.push_back(rocksdb::DbPath(path, size));
369  		dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
370  	      }
371  	    }
372  	  } catch (const std::system_error& e) {
373  	    return -e.code().value();
374  	  }
375  	
376  	  if (g_conf()->rocksdb_log_to_ceph_log) {
377  	    opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
378  	  }
379  	
380  	  if (priv) {
381  	    dout(10) << __func__ << " using custom Env " << priv << dendl;
382  	    opt.env = static_cast<rocksdb::Env*>(priv);
383  	  }
384  	
385  	  opt.env->SetAllowNonOwnerAccess(false);
386  	
387  	  // caches
388  	  if (!set_cache_flag) {
389  	    cache_size = g_conf()->rocksdb_cache_size;
390  	  }
391  	  uint64_t row_cache_size = cache_size * g_conf()->rocksdb_cache_row_ratio;
392  	  uint64_t block_cache_size = cache_size - row_cache_size;
393  	
394  	  if (g_conf()->rocksdb_cache_type == "binned_lru") {
395  	    bbt_opts.block_cache = rocksdb_cache::NewBinnedLRUCache(
396  	      cct,
397  	      block_cache_size,
398  	      g_conf()->rocksdb_cache_shard_bits);
399  	  } else if (g_conf()->rocksdb_cache_type == "lru") {
400  	    bbt_opts.block_cache = rocksdb::NewLRUCache(
401  	      block_cache_size,
402  	      g_conf()->rocksdb_cache_shard_bits);
403  	  } else if (g_conf()->rocksdb_cache_type == "clock") {
404  	    bbt_opts.block_cache = rocksdb::NewClockCache(
405  	      block_cache_size,
406  	      g_conf()->rocksdb_cache_shard_bits);
407  	    if (!bbt_opts.block_cache) {
408  	      derr << "rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
409  	           << "' chosen, but RocksDB not compiled with LibTBB. "
410  	           << dendl;
411  	      return -EINVAL;
412  	    }
413  	  } else {
414  	    derr << "unrecognized rocksdb_cache_type '" << g_conf()->rocksdb_cache_type
415  	      << "'" << dendl;
416  	    return -EINVAL;
417  	  }
418  	  bbt_opts.block_size = g_conf()->rocksdb_block_size;
419  	
420  	  if (row_cache_size > 0)
421  	    opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
422  					     g_conf()->rocksdb_cache_shard_bits);
423  	  uint64_t bloom_bits = g_conf().get_val<uint64_t>("rocksdb_bloom_bits_per_key");
424  	  if (bloom_bits > 0) {
425  	    dout(10) << __func__ << " set bloom filter bits per key to "
426  		     << bloom_bits << dendl;
427  	    bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
428  	  }
429  	  using std::placeholders::_1;
430  	  if (g_conf().with_val<std::string>("rocksdb_index_type",
431  					    std::bind(std::equal_to<std::string>(), _1,
432  						      "binary_search")))
433  	    bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
434  	  if (g_conf().with_val<std::string>("rocksdb_index_type",
435  					    std::bind(std::equal_to<std::string>(), _1,
436  						      "hash_search")))
437  	    bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
438  	  if (g_conf().with_val<std::string>("rocksdb_index_type",
439  					    std::bind(std::equal_to<std::string>(), _1,
440  						      "two_level")))
441  	    bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
442  	  if (!bbt_opts.no_block_cache) {
443  	    bbt_opts.cache_index_and_filter_blocks =
444  	        g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks");
445  	    bbt_opts.cache_index_and_filter_blocks_with_high_priority =
446  	        g_conf().get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
447  	    bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
448  	      g_conf().get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
449  	  }
450  	  bbt_opts.partition_filters = g_conf().get_val<bool>("rocksdb_partition_filters");
451  	  if (g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size") > 0)
452  	    bbt_opts.metadata_block_size = g_conf().get_val<Option::size_t>("rocksdb_metadata_block_size");
453  	
454  	  opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
455  	  dout(10) << __func__ << " block size " << g_conf()->rocksdb_block_size
456  	           << ", block_cache size " << byte_u_t(block_cache_size)
457  		   << ", row_cache size " << byte_u_t(row_cache_size)
458  		   << "; shards "
459  		   << (1 << g_conf()->rocksdb_cache_shard_bits)
460  		   << ", type " << g_conf()->rocksdb_cache_type
461  		   << dendl;
462  	
463  	  opt.merge_operator.reset(new MergeOperatorRouter(*this));
464  	
465  	  return 0;
466  	}
467  	
468  	int RocksDBStore::do_open(ostream &out,
469  				  bool create_if_missing,
470  				  bool open_readonly,
471  				  const vector<ColumnFamily>* cfs)
472  	{
473  	  ceph_assert(!(create_if_missing && open_readonly));
474  	  rocksdb::Options opt;
475  	  int r = load_rocksdb_options(create_if_missing, opt);
476  	  if (r) {
477  	    dout(1) << __func__ << " load rocksdb options failed" << dendl;
478  	    return r;
479  	  }
480  	  rocksdb::Status status;
481  	  if (create_if_missing) {
482  	    status = rocksdb::DB::Open(opt, path, &db);
483  	    if (!status.ok()) {
484  	      derr << status.ToString() << dendl;
485  	      return -EINVAL;
486  	    }
487  	    // create and open column families
488  	    if (cfs) {
489  	      for (auto& p : *cfs) {
490  		// copy default CF settings, block cache, merge operators as
491  		// the base for new CF
492  		rocksdb::ColumnFamilyOptions cf_opt(opt);
493  		// user input options will override the base options
494  		status = rocksdb::GetColumnFamilyOptionsFromString(
495  		  cf_opt, p.option, &cf_opt);
496  		if (!status.ok()) {
497  		  derr << __func__ << " invalid db column family option string for CF: "
498  		       << p.name << dendl;
499  		  return -EINVAL;
500  		}
501  		install_cf_mergeop(p.name, &cf_opt);
502  		rocksdb::ColumnFamilyHandle *cf;
503  		status = db->CreateColumnFamily(cf_opt, p.name, &cf);
504  		if (!status.ok()) {
505  		  derr << __func__ << " Failed to create rocksdb column family: "
506  		       << p.name << dendl;
507  		  return -EINVAL;
508  		}
509  		// store the new CF handle
510  		add_column_family(p.name, static_cast<void*>(cf));
511  	      }
512  	    }
513  	    default_cf = db->DefaultColumnFamily();
514  	  } else {
515  	    std::vector<string> existing_cfs;
516  	    status = rocksdb::DB::ListColumnFamilies(
517  	      rocksdb::DBOptions(opt),
518  	      path,
519  	      &existing_cfs);
520  	    dout(1) << __func__ << " column families: " << existing_cfs << dendl;
521  	    if (existing_cfs.empty()) {
522  	      // no column families
523  	      if (open_readonly) {
524  		status = rocksdb::DB::Open(opt, path, &db);
525  	      } else {
526  		status = rocksdb::DB::OpenForReadOnly(opt, path, &db);
527  	      }
528  	      if (!status.ok()) {
529  		derr << status.ToString() << dendl;
530  		return -EINVAL;
531  	      }
532  	      default_cf = db->DefaultColumnFamily();
533  	    } else {
534  	      // we cannot change column families for a created database.  so, map
535  	      // what options we are given to whatever cf's already exist.
536  	      std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
537  	      for (auto& n : existing_cfs) {
538  		// copy default CF settings, block cache, merge operators as
539  		// the base for new CF
540  		rocksdb::ColumnFamilyOptions cf_opt(opt);
541  		bool found = false;
542  		if (cfs) {
543  		  for (auto& i : *cfs) {
544  		    if (i.name == n) {
545  		      found = true;
546  		      status = rocksdb::GetColumnFamilyOptionsFromString(
547  			cf_opt, i.option, &cf_opt);
548  		      if (!status.ok()) {
549  			derr << __func__ << " invalid db column family options for CF '"
550  			     << i.name << "': " << i.option << dendl;
551  			return -EINVAL;
552  		      }
553  		    }
554  		  }
555  		}
556  		if (n != rocksdb::kDefaultColumnFamilyName) {
557  		  install_cf_mergeop(n, &cf_opt);
558  		}
559  		column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
560  		if (!found && n != rocksdb::kDefaultColumnFamilyName) {
561  		  dout(1) << __func__ << " column family '" << n
562  			  << "' exists but not expected" << dendl;
563  		}
564  	      }
565  	      std::vector<rocksdb::ColumnFamilyHandle*> handles;
566  	      if (open_readonly) {
567  	        status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
568  					              path, column_families,
569  						      &handles, &db);
570  	      } else {
571  	        status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
572  					   path, column_families, &handles, &db);
573  	      }
574  	      if (!status.ok()) {
575  		derr << status.ToString() << dendl;
576  		return -EINVAL;
577  	      }
578  	      for (unsigned i = 0; i < existing_cfs.size(); ++i) {
579  		if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
580  		  default_cf = handles[i];
581  		  must_close_default_cf = true;
582  		} else {
583  		  add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
584  		}
585  	      }
586  	    }
587  	  }
588  	  ceph_assert(default_cf != nullptr);
589  	  
590  	  PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
591  	  plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
592  	  plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
593  	  plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
594  	  plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
595  	  plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
596  	  plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
597  	  plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
598  	  plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
599  	  plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
600  	  plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
601  	  plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
602  	  plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
603  	  plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
604  	  plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, 
605  	      "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
606  	  logger = plb.create_perf_counters();
607  	  cct->get_perfcounters_collection()->add(logger);
608  	
609  	  if (compact_on_mount) {
610  	    derr << "Compacting rocksdb store..." << dendl;
611  	    compact();
612  	    derr << "Finished compacting rocksdb store" << dendl;
613  	  }
614  	  return 0;
615  	}
616  	
617  	int RocksDBStore::_test_init(const string& dir)
618  	{
619  	  rocksdb::Options options;
620  	  options.create_if_missing = true;
621  	  rocksdb::DB *db;
622  	  rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
623  	  delete db;
624  	  db = nullptr;
625  	  return status.ok() ? 0 : -EIO;
626  	}
627  	
628  	RocksDBStore::~RocksDBStore()
629  	{
630  	  close();
631  	  delete logger;
632  	
633  	  // Ensure db is destroyed before dependent db_cache and filterpolicy
634  	  for (auto& p : cf_handles) {
635  	    db->DestroyColumnFamilyHandle(
636  	      static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
637  	    p.second = nullptr;
638  	  }
639  	  if (must_close_default_cf) {
640  	    db->DestroyColumnFamilyHandle(default_cf);
641  	    must_close_default_cf = false;
642  	  }
643  	  default_cf = nullptr;
644  	  delete db;
645  	  db = nullptr;
646  	
647  	  if (priv) {
648  	    delete static_cast<rocksdb::Env*>(priv);
649  	  }
650  	}
651  	
652  	void RocksDBStore::close()
653  	{
654  	  // stop compaction thread
655  	  compact_queue_lock.lock();
656  	  if (compact_thread.is_started()) {
657  	    compact_queue_stop = true;
658  	    compact_queue_cond.notify_all();
659  	    compact_queue_lock.unlock();
660  	    compact_thread.join();
661  	  } else {
662  	    compact_queue_lock.unlock();
663  	  }
664  	
665  	  if (logger)
666  	    cct->get_perfcounters_collection()->remove(logger);
667  	}
668  	
669  	int RocksDBStore::repair(std::ostream &out)
670  	{
671  	  rocksdb::Options opt;
672  	  int r = load_rocksdb_options(false, opt);
673  	  if (r) {
674  	    dout(1) << __func__ << " load rocksdb options failed" << dendl;
675  	    out << "load rocksdb options failed" << std::endl;
676  	    return r;
677  	  }
678  	  rocksdb::Status status = rocksdb::RepairDB(path, opt);
679  	  if (status.ok()) {
680  	    return 0;
681  	  } else {
682  	    out << "repair rocksdb failed : " << status.ToString() << std::endl;
683  	    return 1;
684  	  }
685  	}
686  	
687  	void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
688  	    std::stringstream ss;
689  	    ss.str(s);
690  	    std::string item;
691  	    while (std::getline(ss, item, delim)) {
692  	        elems.push_back(item);
693  	    }
694  	}
695  	
696  	bool RocksDBStore::get_property(
697  	  const std::string &property,
698  	  uint64_t *out)
699  	{
700  	  return db->GetIntProperty(property, out);
701  	}
702  	
703  	int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
704  						   const string& key_prefix)
705  	{
706  	  auto cf = get_cf_handle(prefix);
707  	  uint64_t size = 0;
708  	  uint8_t flags =
709  	    //rocksdb::DB::INCLUDE_MEMTABLES |  // do not include memtables...
710  	    rocksdb::DB::INCLUDE_FILES;
711  	  if (cf) {
712  	    string start = key_prefix + string(1, '\x00');
713  	    string limit = key_prefix + string("\xff\xff\xff\xff");
714  	    rocksdb::Range r(start, limit);
715  	    db->GetApproximateSizes(cf, &r, 1, &size, flags);
716  	  } else {
717  	    string start = combine_strings(prefix , key_prefix);
718  	    string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
719  	    rocksdb::Range r(start, limit);
720  	    db->GetApproximateSizes(default_cf, &r, 1, &size, flags);
721  	  }
722  	  return size;
723  	}
724  	
725  	void RocksDBStore::get_statistics(Formatter *f)
726  	{
727  	  if (!g_conf()->rocksdb_perf)  {
728  	    dout(20) << __func__ << " RocksDB perf is disabled, can't probe for stats"
729  		     << dendl;
730  	    return;
731  	  }
732  	
733  	  if (g_conf()->rocksdb_collect_compaction_stats) {
734  	    std::string stat_str;
735  	    bool status = db->GetProperty("rocksdb.stats", &stat_str);
736  	    if (status) {
737  	      f->open_object_section("rocksdb_statistics");
738  	      f->dump_string("rocksdb_compaction_statistics", "");
739  	      vector<string> stats;
740  	      split_stats(stat_str, '\n', stats);
741  	      for (auto st :stats) {
742  	        f->dump_string("", st);
743  	      }
744  	      f->close_section();
745  	    }
746  	  }
747  	  if (g_conf()->rocksdb_collect_extended_stats) {
748  	    if (dbstats) {
749  	      f->open_object_section("rocksdb_extended_statistics");
750  	      string stat_str = dbstats->ToString();
751  	      vector<string> stats;
752  	      split_stats(stat_str, '\n', stats);
753  	      f->dump_string("rocksdb_extended_statistics", "");
754  	      for (auto st :stats) {
755  	        f->dump_string(".", st);
756  	      }
757  	      f->close_section();
758  	    }
759  	    f->open_object_section("rocksdbstore_perf_counters");
760  	    logger->dump_formatted(f,0);
761  	    f->close_section();
762  	  }
763  	  if (g_conf()->rocksdb_collect_memory_stats) {
764  	    f->open_object_section("rocksdb_memtable_statistics");
765  	    std::string str;
766  	    if (!bbt_opts.no_block_cache) {
767  	      str.append(stringify(bbt_opts.block_cache->GetUsage()));
768  	      f->dump_string("block_cache_usage", str.data());
769  	      str.clear();
770  	      str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
771  	      f->dump_string("block_cache_pinned_blocks_usage", str);
772  	      str.clear();
773  	    }
774  	    db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
775  	    f->dump_string("rocksdb_memtable_usage", str);
776  	    str.clear();
777  	    db->GetProperty("rocksdb.estimate-table-readers-mem", &str);
778  	    f->dump_string("rocksdb_index_filter_blocks_usage", str);
779  	    f->close_section();
780  	  }
781  	}
782  	
783  	int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) 
784  	{
785  	  // enable rocksdb breakdown
786  	  // considering performance overhead, default is disabled
787  	  if (g_conf()->rocksdb_perf) {
788  	    rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
789  	    rocksdb::get_perf_context()->Reset();
790  	  }
791  	
792  	  RocksDBTransactionImpl * _t =
793  	    static_cast<RocksDBTransactionImpl *>(t.get());
794  	  woptions.disableWAL = disableWAL;
795  	  lgeneric_subdout(cct, rocksdb, 30) << __func__;
796  	  RocksWBHandler bat_txc;
797  	  _t->bat.Iterate(&bat_txc);
798  	  *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
799  	  
800  	  rocksdb::Status s = db->Write(woptions, &_t->bat);
801  	  if (!s.ok()) {
802  	    RocksWBHandler rocks_txc;
803  	    _t->bat.Iterate(&rocks_txc);
804  	    derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
805  	         << " Rocksdb transaction: " << rocks_txc.seen << dendl;
806  	  }
807  	
808  	  if (g_conf()->rocksdb_perf) {
809  	    utime_t write_memtable_time;
810  	    utime_t write_delay_time;
811  	    utime_t write_wal_time;
812  	    utime_t write_pre_and_post_process_time;
813  	    write_wal_time.set_from_double(
814  		static_cast<double>(rocksdb::get_perf_context()->write_wal_time)/1000000000);
815  	    write_memtable_time.set_from_double(
816  		static_cast<double>(rocksdb::get_perf_context()->write_memtable_time)/1000000000);
817  	    write_delay_time.set_from_double(
818  		static_cast<double>(rocksdb::get_perf_context()->write_delay_time)/1000000000);
819  	    write_pre_and_post_process_time.set_from_double(
820  		static_cast<double>(rocksdb::get_perf_context()->write_pre_and_post_process_time)/1000000000);
821  	    logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
822  	    logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
823  	    logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
824  	    logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
825  	  }
826  	
827  	  return s.ok() ? 0 : -1;
828  	}
829  	
830  	int RocksDBStore::submit_transaction(KeyValueDB::Transaction t) 
831  	{
832  	  utime_t start = ceph_clock_now();
833  	  rocksdb::WriteOptions woptions;
834  	  woptions.sync = false;
835  	
836  	  int result = submit_common(woptions, t);
837  	
838  	  utime_t lat = ceph_clock_now() - start;
839  	  logger->inc(l_rocksdb_txns);
840  	  logger->tinc(l_rocksdb_submit_latency, lat);
841  	  
842  	  return result;
843  	}
844  	
845  	int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
846  	{
847  	  utime_t start = ceph_clock_now();
848  	  rocksdb::WriteOptions woptions;
849  	  // if disableWAL, sync can't set
850  	  woptions.sync = !disableWAL;
851  	  
852  	  int result = submit_common(woptions, t);
853  	  
854  	  utime_t lat = ceph_clock_now() - start;
855  	  logger->inc(l_rocksdb_txns_sync);
856  	  logger->tinc(l_rocksdb_submit_sync_latency, lat);
857  	
858  	  return result;
859  	}
860  	
861  	RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
862  	{
863  	  db = _db;
864  	}
865  	
866  	void RocksDBStore::RocksDBTransactionImpl::put_bat(
867  	  rocksdb::WriteBatch& bat,
868  	  rocksdb::ColumnFamilyHandle *cf,
869  	  const string &key,
870  	  const bufferlist &to_set_bl)
871  	{
872  	  // bufferlist::c_str() is non-constant, so we can't call c_str()
873  	  if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
874  	    bat.Put(cf,
875  		    rocksdb::Slice(key),
876  		    rocksdb::Slice(to_set_bl.buffers().front().c_str(),
877  				   to_set_bl.length()));
878  	  } else {
879  	    rocksdb::Slice key_slice(key);
880  	    vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
881  	    bat.Put(cf,
882  		    rocksdb::SliceParts(&key_slice, 1),
883  	            prepare_sliceparts(to_set_bl, &value_slices));
884  	  }
885  	}
886  	
887  	void RocksDBStore::RocksDBTransactionImpl::set(
888  	  const string &prefix,
889  	  const string &k,
890  	  const bufferlist &to_set_bl)
891  	{
892  	  auto cf = db->get_cf_handle(prefix);
893  	  if (cf) {
894  	    put_bat(bat, cf, k, to_set_bl);
895  	  } else {
896  	    string key = combine_strings(prefix, k);
897  	    put_bat(bat, db->default_cf, key, to_set_bl);
898  	  }
899  	}
900  	
901  	void RocksDBStore::RocksDBTransactionImpl::set(
902  	  const string &prefix,
903  	  const char *k, size_t keylen,
904  	  const bufferlist &to_set_bl)
905  	{
906  	  auto cf = db->get_cf_handle(prefix);
907  	  if (cf) {
908  	    string key(k, keylen);  // fixme?
909  	    put_bat(bat, cf, key, to_set_bl);
910  	  } else {
911  	    string key;
912  	    combine_strings(prefix, k, keylen, &key);
913  	    put_bat(bat, cf, key, to_set_bl);
914  	  }
915  	}
916  	
917  	void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
918  						         const string &k)
919  	{
920  	  auto cf = db->get_cf_handle(prefix);
921  	  if (cf) {
922  	    bat.Delete(cf, rocksdb::Slice(k));
923  	  } else {
924  	    bat.Delete(db->default_cf, combine_strings(prefix, k));
925  	  }
926  	}
927  	
928  	void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
929  						         const char *k,
930  							 size_t keylen)
931  	{
932  	  auto cf = db->get_cf_handle(prefix);
933  	  if (cf) {
934  	    bat.Delete(cf, rocksdb::Slice(k, keylen));
935  	  } else {
936  	    string key;
937  	    combine_strings(prefix, k, keylen, &key);
938  	    bat.Delete(db->default_cf, rocksdb::Slice(key));
939  	  }
940  	}
941  	
942  	void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
943  						                 const string &k)
944  	{
945  	  auto cf = db->get_cf_handle(prefix);
946  	  if (cf) {
947  	    bat.SingleDelete(cf, k);
948  	  } else {
949  	    bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
950  	  }
951  	}
952  	
953  	void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
954  	{
955  	  auto cf = db->get_cf_handle(prefix);
956  	  if (cf) {
957  	    string endprefix("\xff\xff\xff\xff");  // FIXME: this is cheating...
958  	    bat.DeleteRange(cf, string(), endprefix);
959  	  } else {
960  	    string endprefix = prefix;
961  	    endprefix.push_back('\x01');
962  	    bat.DeleteRange(db->default_cf,
963  	      combine_strings(prefix, string()),
964  	      combine_strings(endprefix, string()));
965  	  }
966  	}
967  	
968  	void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
969  	                                                         const string &start,
970  	                                                         const string &end)
971  	{
972  	  auto cf = db->get_cf_handle(prefix);
973  	  if (cf) {
974  	    bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
975  	  } else {
976  	    bat.DeleteRange(
977  	        db->default_cf,
978  	        rocksdb::Slice(combine_strings(prefix, start)),
979  	        rocksdb::Slice(combine_strings(prefix, end)));
980  	  }
981  	}
982  	
983  	void RocksDBStore::RocksDBTransactionImpl::merge(
984  	  const string &prefix,
985  	  const string &k,
986  	  const bufferlist &to_set_bl)
987  	{
988  	  auto cf = db->get_cf_handle(prefix);
989  	  if (cf) {
990  	    // bufferlist::c_str() is non-constant, so we can't call c_str()
991  	    if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
992  	      bat.Merge(
993  		cf,
994  		rocksdb::Slice(k),
995  		rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
996  	    } else {
997  	      // make a copy
998  	      rocksdb::Slice key_slice(k);
999  	      vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1000 	      bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
1001 			prepare_sliceparts(to_set_bl, &value_slices));
1002 	    }
1003 	  } else {
1004 	    string key = combine_strings(prefix, k);
1005 	    // bufferlist::c_str() is non-constant, so we can't call c_str()
1006 	    if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
1007 	      bat.Merge(
1008 		db->default_cf,
1009 		rocksdb::Slice(key),
1010 		rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
1011 	    } else {
1012 	      // make a copy
1013 	      rocksdb::Slice key_slice(key);
1014 	      vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
1015 	      bat.Merge(
1016 		db->default_cf,
1017 		rocksdb::SliceParts(&key_slice, 1),
1018 		prepare_sliceparts(to_set_bl, &value_slices));
1019 	    }
1020 	  }
1021 	}
1022 	
1023 	int RocksDBStore::get(
1024 	    const string &prefix,
1025 	    const std::set<string> &keys,
1026 	    std::map<string, bufferlist> *out)
1027 	{
1028 	  utime_t start = ceph_clock_now();
1029 	  auto cf = get_cf_handle(prefix);
1030 	  if (cf) {
1031 	    for (auto& key : keys) {
1032 	      std::string value;
1033 	      auto status = db->Get(rocksdb::ReadOptions(),
1034 				    cf,
1035 				    rocksdb::Slice(key),
1036 				    &value);
1037 	      if (status.ok()) {
1038 		(*out)[key].append(value);
1039 	      } else if (status.IsIOError()) {
1040 		ceph_abort_msg(status.getState());
1041 	      }
1042 	    }
1043 	  } else {
1044 	    for (auto& key : keys) {
1045 	      std::string value;
1046 	      string k = combine_strings(prefix, key);
1047 	      auto status = db->Get(rocksdb::ReadOptions(),
1048 				    default_cf,
1049 				    rocksdb::Slice(k),
1050 				    &value);
1051 	      if (status.ok()) {
1052 		(*out)[key].append(value);
1053 	      } else if (status.IsIOError()) {
1054 		ceph_abort_msg(status.getState());
1055 	      }
1056 	    }
1057 	  }
1058 	  utime_t lat = ceph_clock_now() - start;
1059 	  logger->inc(l_rocksdb_gets);
1060 	  logger->tinc(l_rocksdb_get_latency, lat);
1061 	  return 0;
1062 	}
1063 	
1064 	int RocksDBStore::get(
1065 	    const string &prefix,
1066 	    const string &key,
1067 	    bufferlist *out)
1068 	{
1069 	  ceph_assert(out && (out->length() == 0));
1070 	  utime_t start = ceph_clock_now();
1071 	  int r = 0;
1072 	  string value;
1073 	  rocksdb::Status s;
1074 	  auto cf = get_cf_handle(prefix);
1075 	  if (cf) {
1076 	    s = db->Get(rocksdb::ReadOptions(),
1077 			cf,
1078 			rocksdb::Slice(key),
1079 			&value);
1080 	  } else {
1081 	    string k = combine_strings(prefix, key);
1082 	    s = db->Get(rocksdb::ReadOptions(),
1083 			default_cf,
1084 			rocksdb::Slice(k),
1085 			&value);
1086 	  }
1087 	  if (s.ok()) {
1088 	    out->append(value);
1089 	  } else if (s.IsNotFound()) {
1090 	    r = -ENOENT;
1091 	  } else {
1092 	    ceph_abort_msg(s.getState());
1093 	  }
1094 	  utime_t lat = ceph_clock_now() - start;
1095 	  logger->inc(l_rocksdb_gets);
1096 	  logger->tinc(l_rocksdb_get_latency, lat);
1097 	  return r;
1098 	}
1099 	
1100 	int RocksDBStore::get(
1101 	  const string& prefix,
1102 	  const char *key,
1103 	  size_t keylen,
1104 	  bufferlist *out)
1105 	{
1106 	  ceph_assert(out && (out->length() == 0));
1107 	  utime_t start = ceph_clock_now();
1108 	  int r = 0;
1109 	  string value;
1110 	  rocksdb::Status s;
1111 	  auto cf = get_cf_handle(prefix);
1112 	  if (cf) {
1113 	    s = db->Get(rocksdb::ReadOptions(),
1114 			cf,
1115 			rocksdb::Slice(key, keylen),
1116 			&value);
1117 	  } else {
1118 	    string k;
1119 	    combine_strings(prefix, key, keylen, &k);
1120 	    s = db->Get(rocksdb::ReadOptions(),
1121 			default_cf,
1122 			rocksdb::Slice(k),
1123 			&value);
1124 	  }
1125 	  if (s.ok()) {
1126 	    out->append(value);
1127 	  } else if (s.IsNotFound()) {
1128 	    r = -ENOENT;
1129 	  } else {
1130 	    ceph_abort_msg(s.getState());
1131 	  }
1132 	  utime_t lat = ceph_clock_now() - start;
1133 	  logger->inc(l_rocksdb_gets);
1134 	  logger->tinc(l_rocksdb_get_latency, lat);
1135 	  return r;
1136 	}
1137 	
1138 	int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
1139 	{
1140 	  size_t prefix_len = 0;
1141 	  
1142 	  // Find separator inside Slice
1143 	  char* separator = (char*) memchr(in.data(), 0, in.size());
1144 	  if (separator == NULL)
1145 	     return -EINVAL;
1146 	  prefix_len = size_t(separator - in.data());
1147 	  if (prefix_len >= in.size())
1148 	    return -EINVAL;
1149 	
1150 	  // Fetch prefix and/or key directly from Slice
1151 	  if (prefix)
1152 	    *prefix = string(in.data(), prefix_len);
1153 	  if (key)
1154 	    *key = string(separator+1, in.size()-prefix_len-1);
1155 	  return 0;
1156 	}
1157 	
1158 	void RocksDBStore::compact()
1159 	{
1160 	  logger->inc(l_rocksdb_compact);
1161 	  rocksdb::CompactRangeOptions options;
1162 	  db->CompactRange(options, default_cf, nullptr, nullptr);
1163 	  for (auto cf : cf_handles) {
1164 	    db->CompactRange(
1165 	      options,
1166 	      static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
1167 	      nullptr, nullptr);
1168 	  }
1169 	}
1170 	
1171 	
1172 	void RocksDBStore::compact_thread_entry()
1173 	{
1174 	  std::unique_lock l{compact_queue_lock};
1175 	  while (!compact_queue_stop) {
1176 	    while (!compact_queue.empty()) {
1177 	      pair<string,string> range = compact_queue.front();
1178 	      compact_queue.pop_front();
1179 	      logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1180 	      l.unlock();
1181 	      logger->inc(l_rocksdb_compact_range);
1182 	      if (range.first.empty() && range.second.empty()) {
1183 	        compact();
1184 	      } else {
1185 	        compact_range(range.first, range.second);
1186 	      }
1187 	      l.lock();
1188 	      continue;
1189 	    }
1190 	    compact_queue_cond.wait(l);
1191 	  }
1192 	}
1193 	
1194 	void RocksDBStore::compact_range_async(const string& start, const string& end)
1195 	{
1196 	  std::lock_guard l(compact_queue_lock);
1197 	
1198 	  // try to merge adjacent ranges.  this is O(n), but the queue should
1199 	  // be short.  note that we do not cover all overlap cases and merge
1200 	  // opportunities here, but we capture the ones we currently need.
1201 	  list< pair<string,string> >::iterator p = compact_queue.begin();
1202 	  while (p != compact_queue.end()) {
1203 	    if (p->first == start && p->second == end) {
1204 	      // dup; no-op
1205 	      return;
1206 	    }
1207 	    if (start <= p->first && p->first <= end) {
1208 	      // new region crosses start of existing range
1209 	      // select right bound that is bigger
1210 	      compact_queue.push_back(make_pair(start, end > p->second ? end : p->second));
1211 	      compact_queue.erase(p);
1212 	      logger->inc(l_rocksdb_compact_queue_merge);
1213 	      break;
1214 	    }
1215 	    if (start <= p->second && p->second <= end) {
1216 	      // new region crosses end of existing range
1217 	      //p->first < p->second and p->second <= end, so p->first <= end.
1218 	      //But we break if previous condition, so start > p->first.
1219 	      compact_queue.push_back(make_pair(p->first, end));
1220 	      compact_queue.erase(p);
1221 	      logger->inc(l_rocksdb_compact_queue_merge);
1222 	      break;
1223 	    }
1224 	    ++p;
1225 	  }
1226 	  if (p == compact_queue.end()) {
1227 	    // no merge, new entry.
1228 	    compact_queue.push_back(make_pair(start, end));
1229 	    logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
1230 	  }
1231 	  compact_queue_cond.notify_all();
1232 	  if (!compact_thread.is_started()) {
1233 	    compact_thread.create("rstore_compact");
1234 	  }
1235 	}
1236 	bool RocksDBStore::check_omap_dir(string &omap_dir)
1237 	{
1238 	  rocksdb::Options options;
1239 	  options.create_if_missing = true;
1240 	  rocksdb::DB *db;
1241 	  rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
1242 	  delete db;
1243 	  db = nullptr;
1244 	  return status.ok();
1245 	}
1246 	void RocksDBStore::compact_range(const string& start, const string& end)
1247 	{
1248 	  rocksdb::CompactRangeOptions options;
1249 	  rocksdb::Slice cstart(start);
1250 	  rocksdb::Slice cend(end);
1251 	  db->CompactRange(options, &cstart, &cend);
1252 	}
1253 	
1254 	RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
1255 	{
1256 	  delete dbiter;
1257 	}
1258 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
1259 	{
1260 	  dbiter->SeekToFirst();
1261 	  ceph_assert(!dbiter->status().IsIOError());
1262 	  return dbiter->status().ok() ? 0 : -1;
1263 	}
1264 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
1265 	{
1266 	  rocksdb::Slice slice_prefix(prefix);
1267 	  dbiter->Seek(slice_prefix);
1268 	  ceph_assert(!dbiter->status().IsIOError());
1269 	  return dbiter->status().ok() ? 0 : -1;
1270 	}
1271 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
1272 	{
1273 	  dbiter->SeekToLast();
1274 	  ceph_assert(!dbiter->status().IsIOError());
1275 	  return dbiter->status().ok() ? 0 : -1;
1276 	}
1277 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
1278 	{
1279 	  string limit = past_prefix(prefix);
1280 	  rocksdb::Slice slice_limit(limit);
1281 	  dbiter->Seek(slice_limit);
1282 	
1283 	  if (!dbiter->Valid()) {
1284 	    dbiter->SeekToLast();
1285 	  } else {
1286 	    dbiter->Prev();
1287 	  }
1288 	  return dbiter->status().ok() ? 0 : -1;
1289 	}
1290 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
1291 	{
1292 	  lower_bound(prefix, after);
1293 	  if (valid()) {
1294 	  pair<string,string> key = raw_key();
1295 	    if (key.first == prefix && key.second == after)
1296 	      next();
1297 	  }
1298 	  return dbiter->status().ok() ? 0 : -1;
1299 	}
1300 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
1301 	{
1302 	  string bound = combine_strings(prefix, to);
1303 	  rocksdb::Slice slice_bound(bound);
1304 	  dbiter->Seek(slice_bound);
1305 	  return dbiter->status().ok() ? 0 : -1;
1306 	}
1307 	bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
1308 	{
1309 	  return dbiter->Valid();
1310 	}
1311 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
1312 	{
1313 	  if (valid()) {
1314 	    dbiter->Next();
1315 	  }
1316 	  ceph_assert(!dbiter->status().IsIOError());
1317 	  return dbiter->status().ok() ? 0 : -1;
1318 	}
1319 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
1320 	{
1321 	  if (valid()) {
1322 	    dbiter->Prev();
1323 	  }
1324 	  ceph_assert(!dbiter->status().IsIOError());
1325 	  return dbiter->status().ok() ? 0 : -1;
1326 	}
1327 	string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
1328 	{
1329 	  string out_key;
1330 	  split_key(dbiter->key(), 0, &out_key);
1331 	  return out_key;
1332 	}
1333 	pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
1334 	{
1335 	  string prefix, key;
1336 	  split_key(dbiter->key(), &prefix, &key);
1337 	  return make_pair(prefix, key);
1338 	}
1339 	
1340 	bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
1341 	  // Look for "prefix\0" right in rocksb::Slice
1342 	  rocksdb::Slice key = dbiter->key();
1343 	  if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1344 	    return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1345 	  } else {
1346 	    return false;
1347 	  }
1348 	}
1349 	
1350 	bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1351 	{
1352 	  return to_bufferlist(dbiter->value());
1353 	}
1354 	
1355 	size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1356 	{
1357 	  return dbiter->key().size();
1358 	}
1359 	
1360 	size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1361 	{
1362 	  return dbiter->value().size();
1363 	}
1364 	
1365 	bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1366 	{
1367 	  rocksdb::Slice val = dbiter->value();
1368 	  return bufferptr(val.data(), val.size());
1369 	}
1370 	
1371 	int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1372 	{
1373 	  return dbiter->status().ok() ? 0 : -1;
1374 	}
1375 	
1376 	string RocksDBStore::past_prefix(const string &prefix)
1377 	{
1378 	  string limit = prefix;
1379 	  limit.push_back(1);
1380 	  return limit;
1381 	}
1382 	
1383 	RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
1384 	{
1385 	  return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1386 	    db->NewIterator(rocksdb::ReadOptions(), default_cf));
1387 	}
1388 	
1389 	class CFIteratorImpl : public KeyValueDB::IteratorImpl {
1390 	protected:
1391 	  string prefix;
1392 	  rocksdb::Iterator *dbiter;
1393 	public:
1394 	  explicit CFIteratorImpl(const std::string& p,
1395 					 rocksdb::Iterator *iter)
1396 	    : prefix(p), dbiter(iter) { }
1397 	  ~CFIteratorImpl() {
1398 	    delete dbiter;
1399 	  }
1400 	
1401 	  int seek_to_first() override {
1402 	    dbiter->SeekToFirst();
1403 	    return dbiter->status().ok() ? 0 : -1;
1404 	  }
1405 	  int seek_to_last() override {
1406 	    dbiter->SeekToLast();
1407 	    return dbiter->status().ok() ? 0 : -1;
1408 	  }
1409 	  int upper_bound(const string &after) override {
1410 	    lower_bound(after);
1411 	    if (valid() && (key() == after)) {
1412 	      next();
1413 	    }
1414 	    return dbiter->status().ok() ? 0 : -1;
1415 	  }
1416 	  int lower_bound(const string &to) override {
1417 	    rocksdb::Slice slice_bound(to);
1418 	    dbiter->Seek(slice_bound);
1419 	    return dbiter->status().ok() ? 0 : -1;
1420 	  }
1421 	  int next() override {
1422 	    if (valid()) {
1423 	      dbiter->Next();
1424 	    }
1425 	    return dbiter->status().ok() ? 0 : -1;
1426 	  }
1427 	  int prev() override {
1428 	    if (valid()) {
1429 	      dbiter->Prev();
1430 	    }
1431 	    return dbiter->status().ok() ? 0 : -1;
1432 	  }
1433 	  bool valid() override {
1434 	    return dbiter->Valid();
1435 	  }
1436 	  string key() override {
1437 	    return dbiter->key().ToString();
1438 	  }
1439 	  std::pair<std::string, std::string> raw_key() override {
1440 	    return make_pair(prefix, key());
1441 	  }
1442 	  bufferlist value() override {
1443 	    return to_bufferlist(dbiter->value());
1444 	  }
1445 	  bufferptr value_as_ptr() override {
1446 	    rocksdb::Slice val = dbiter->value();
1447 	    return bufferptr(val.data(), val.size());
1448 	  }
1449 	  int status() override {
1450 	    return dbiter->status().ok() ? 0 : -1;
1451 	  }
1452 	};
1453 	
1454 	KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
1455 	{
1456 	  rocksdb::ColumnFamilyHandle *cf_handle =
1457 	    static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
1458 	  if (cf_handle) {
1459 	    return std::make_shared<CFIteratorImpl>(
1460 	      prefix,
1461 	      db->NewIterator(rocksdb::ReadOptions(), cf_handle));
1462 	  } else {
1463 	    return KeyValueDB::get_iterator(prefix);
1464 	  }
1465 	}
1466