1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	#include "LevelDBStore.h"
4    	
5    	#include <set>
6    	#include <map>
7    	#include <string>
8    	#include <cerrno>
9    	
10   	using std::string;
11   	
12   	#include "common/debug.h"
13   	#include "common/perf_counters.h"
14   	
15   	// re-include our assert to clobber the system one; fix dout:
16   	#include "include/ceph_assert.h"
17   	
18   	#define dout_context cct
19   	#define dout_subsys ceph_subsys_leveldb
20   	#undef dout_prefix
21   	#define dout_prefix *_dout << "leveldb: "
22   	
23   	class CephLevelDBLogger : public leveldb::Logger {
24   	  CephContext *cct;
25   	public:
26   	  explicit CephLevelDBLogger(CephContext *c) : cct(c) {
27   	    cct->get();
28   	  }
29   	  ~CephLevelDBLogger() override {
30   	    cct->put();
31   	  }
32   	
33   	  // Write an entry to the log file with the specified format.
34   	  void Logv(const char* format, va_list ap) override {
35   	    dout(1);
36   	    char buf[65536];
37   	    vsnprintf(buf, sizeof(buf), format, ap);
38   	    *_dout << buf << dendl;
39   	  }
40   	};
41   	
42   	leveldb::Logger *create_leveldb_ceph_logger()
43   	{
44   	  return new CephLevelDBLogger(g_ceph_context);
45   	}
46   	
47   	int LevelDBStore::init(string option_str)
48   	{
49   	  // init defaults.  caller can override these if they want
50   	  // prior to calling open.
51   	  options.write_buffer_size = g_conf()->leveldb_write_buffer_size;
52   	  options.cache_size = g_conf()->leveldb_cache_size;
53   	  options.block_size = g_conf()->leveldb_block_size;
54   	  options.bloom_size = g_conf()->leveldb_bloom_size;
55   	  options.compression_enabled = g_conf()->leveldb_compression;
56   	  options.paranoid_checks = g_conf()->leveldb_paranoid;
57   	  options.max_open_files = g_conf()->leveldb_max_open_files;
58   	  options.log_file = g_conf()->leveldb_log;
59   	  return 0;
60   	}
61   	
62   	int LevelDBStore::open(ostream &out, const vector<ColumnFamily>& cfs)  {
63   	  if (!cfs.empty()) {
64   	    ceph_abort_msg("Not implemented");
65   	  }
66   	  return do_open(out, false);
67   	}
68   	
69   	int LevelDBStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) {
70   	  if (!cfs.empty()) {
71   	    ceph_abort_msg("Not implemented");
72   	  }
73   	  return do_open(out, true);
74   	}
75   	
76   	int LevelDBStore::load_leveldb_options(bool create_if_missing, leveldb::Options &ldoptions)
77   	{
78   	  if (options.write_buffer_size)
79   	    ldoptions.write_buffer_size = options.write_buffer_size;
80   	  if (options.max_open_files)
81   	    ldoptions.max_open_files = options.max_open_files;
82   	  if (options.cache_size) {
83   	    leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size);
84   	    db_cache.reset(_db_cache);
85   	    ldoptions.block_cache = db_cache.get();
86   	  }
87   	  if (options.block_size)
88   	    ldoptions.block_size = options.block_size;
89   	  if (options.bloom_size) {
90   	#ifdef HAVE_LEVELDB_FILTER_POLICY
91   	    const leveldb::FilterPolicy *_filterpolicy =
92   		leveldb::NewBloomFilterPolicy(options.bloom_size);
93   	    filterpolicy.reset(_filterpolicy);
94   	    ldoptions.filter_policy = filterpolicy.get();
95   	#else
96   	    ceph_abort_msg(0 == "bloom size set but installed leveldb doesn't support bloom filters");
97   	#endif
98   	  }
99   	  if (options.compression_enabled)
100  	    ldoptions.compression = leveldb::kSnappyCompression;
101  	  else
102  	    ldoptions.compression = leveldb::kNoCompression;
103  	  if (options.block_restart_interval)
104  	    ldoptions.block_restart_interval = options.block_restart_interval;
105  	
106  	  ldoptions.error_if_exists = options.error_if_exists;
107  	  ldoptions.paranoid_checks = options.paranoid_checks;
108  	  ldoptions.create_if_missing = create_if_missing;
109  	
110  	  if (g_conf()->leveldb_log_to_ceph_log) {
111  	    ceph_logger = new CephLevelDBLogger(g_ceph_context);
112  	    ldoptions.info_log = ceph_logger;
113  	  }
114  	  
115  	  if (options.log_file.length()) {
116  	    leveldb::Env *env = leveldb::Env::Default();
117  	    env->NewLogger(options.log_file, &ldoptions.info_log);
118  	  }
119  	  return 0;
120  	}
121  	
122  	int LevelDBStore::do_open(ostream &out, bool create_if_missing)
123  	{
124  	  leveldb::Options ldoptions;
125  	  int r = load_leveldb_options(create_if_missing, ldoptions);
126  	  if (r) {
127  	    dout(1) << "load leveldb options failed" << dendl;
128  	    return r;
129  	  }
130  	
131  	  leveldb::DB *_db;
132  	  leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db);
133  	  db.reset(_db);
134  	  if (!status.ok()) {
135  	    out << status.ToString() << std::endl;
136  	    return -EINVAL;
137  	  }
138  	
139  	  PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
140  	  plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets");
141  	  plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions");
142  	  plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency");
143  	  plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency");
144  	  plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency");
145  	  plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions");
146  	  plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range");
147  	  plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
148  	  plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue");
149  	  logger = plb.create_perf_counters();
150  	  cct->get_perfcounters_collection()->add(logger);
151  	
152  	  if (g_conf()->leveldb_compact_on_mount) {
153  	    derr << "Compacting leveldb store..." << dendl;
154  	    compact();
155  	    derr << "Finished compacting leveldb store" << dendl;
156  	  }
157  	  return 0;
158  	}
159  	
160  	int LevelDBStore::_test_init(const string& dir)
161  	{
162  	  leveldb::Options options;
163  	  options.create_if_missing = true;
164  	  leveldb::DB *db;
165  	  leveldb::Status status = leveldb::DB::Open(options, dir, &db);
166  	  delete db;
167  	  return status.ok() ? 0 : -EIO;
168  	}
169  	
170  	LevelDBStore::~LevelDBStore()
171  	{
172  	  close();
173  	  delete logger;
174  	
175  	  // Ensure db is destroyed before dependent db_cache and filterpolicy
176  	  db.reset();
177  	  delete ceph_logger;
178  	}
179  	
180  	void LevelDBStore::close()
181  	{
182  	  // stop compaction thread
183  	  compact_queue_lock.lock();
184  	  if (compact_thread.is_started()) {
185  	    compact_queue_stop = true;
186  	    compact_queue_cond.notify_all();
187  	    compact_queue_lock.unlock();
188  	    compact_thread.join();
189  	  } else {
190  	    compact_queue_lock.unlock();
191  	  }
192  	
193  	  if (logger)
194  	    cct->get_perfcounters_collection()->remove(logger);
195  	}
196  	
197  	int LevelDBStore::repair(std::ostream &out)
198  	{
199  	  leveldb::Options ldoptions;
200  	  int r = load_leveldb_options(false, ldoptions);
201  	  if (r) {
202  	    dout(1) << "load leveldb options failed" << dendl;
203  	    out << "load leveldb options failed" << std::endl;
204  	    return r;
205  	  }
206  	  leveldb::Status status = leveldb::RepairDB(path, ldoptions);
207  	  if (status.ok()) {
208  	    return 0;
209  	  } else {
210  	    out << "repair leveldb failed : " << status.ToString() << std::endl;
211  	    return 1;
212  	  }
213  	}
214  	
215  	int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
216  	{
217  	  utime_t start = ceph_clock_now();
218  	  LevelDBTransactionImpl * _t =
219  	    static_cast<LevelDBTransactionImpl *>(t.get());
220  	  leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
221  	  utime_t lat = ceph_clock_now() - start;
222  	  logger->inc(l_leveldb_txns);
223  	  logger->tinc(l_leveldb_submit_latency, lat);
224  	  return s.ok() ? 0 : -1;
225  	}
226  	
227  	int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
228  	{
229  	  utime_t start = ceph_clock_now();
230  	  LevelDBTransactionImpl * _t =
231  	    static_cast<LevelDBTransactionImpl *>(t.get());
232  	  leveldb::WriteOptions options;
233  	  options.sync = true;
234  	  leveldb::Status s = db->Write(options, &(_t->bat));
235  	  utime_t lat = ceph_clock_now() - start;
236  	  logger->inc(l_leveldb_txns);
237  	  logger->tinc(l_leveldb_submit_sync_latency, lat);
238  	  return s.ok() ? 0 : -1;
239  	}
240  	
241  	void LevelDBStore::LevelDBTransactionImpl::set(
242  	  const string &prefix,
243  	  const string &k,
244  	  const bufferlist &to_set_bl)
245  	{
246  	  string key = combine_strings(prefix, k);
247  	  size_t bllen = to_set_bl.length();
248  	  // bufferlist::c_str() is non-constant, so we can't call c_str()
249  	  if (to_set_bl.is_contiguous() && bllen > 0) {
250  	    // bufferlist contains just one ptr or they're contiguous
251  	    bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen));
252  	  } else if ((bllen <= 32 * 1024) && (bllen > 0)) {
253  	    // 2+ bufferptrs that are not contiguopus
254  	    // allocate buffer on stack and copy bl contents to that buffer
255  	    // make sure the buffer isn't too large or we might crash here...    
256  	    char* slicebuf = (char*) alloca(bllen);
257  	    leveldb::Slice newslice(slicebuf, bllen);
258  	    for (const auto& node : to_set_bl.buffers()) {
259  	      const size_t ptrlen = node.length();
260  	      memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen);
261  	      slicebuf += ptrlen;
262  	    } 
263  	    bat.Put(leveldb::Slice(key), newslice);
264  	  } else {
265  	    // 2+ bufferptrs that are not contiguous, and enormous in size
266  	    bufferlist val = to_set_bl;
267  	    bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length()));
268  	  }
269  	}
270  	
271  	void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix,
272  						         const string &k)
273  	{
274  	  string key = combine_strings(prefix, k);
275  	  bat.Delete(leveldb::Slice(key));
276  	}
277  	
278  	void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
279  	{
280  	  KeyValueDB::Iterator it = db->get_iterator(prefix);
281  	  for (it->seek_to_first();
282  	       it->valid();
283  	       it->next()) {
284  	    bat.Delete(leveldb::Slice(combine_strings(prefix, it->key())));
285  	  }
286  	}
287  	
288  	void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
289  	{
290  	  KeyValueDB::Iterator it = db->get_iterator(prefix);
291  	  it->lower_bound(start);
292  	  while (it->valid()) {
293  	    if (it->key() >= end) {
294  	      break;
295  	    }
296  	    bat.Delete(combine_strings(prefix, it->key()));
297  	    it->next();
298  	  }
299  	}
300  	
301  	int LevelDBStore::get(
302  	    const string &prefix,
303  	    const std::set<string> &keys,
304  	    std::map<string, bufferlist> *out)
305  	{
306  	  utime_t start = ceph_clock_now();
307  	  for (std::set<string>::const_iterator i = keys.begin();
308  	       i != keys.end(); ++i) {
309  	    std::string value;
310  	    std::string bound = combine_strings(prefix, *i);
311  	    auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value);
312  	    if (status.ok())
313  	      (*out)[*i].append(value);
314  	  }
315  	  utime_t lat = ceph_clock_now() - start;
316  	  logger->inc(l_leveldb_gets);
317  	  logger->tinc(l_leveldb_get_latency, lat);
318  	  return 0;
319  	}
320  	
321  	int LevelDBStore::get(const string &prefix, 
322  	      const string &key,
323  	      bufferlist *out)
324  	{
325  	  ceph_assert(out && (out->length() == 0));
326  	  utime_t start = ceph_clock_now();
327  	  int r = 0;
328  	  string value, k;
329  	  leveldb::Status s;
330  	  k = combine_strings(prefix, key);
331  	  s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value);
332  	  if (s.ok()) {
333  	    out->append(value);
334  	  } else {
335  	    r = -ENOENT;
336  	  }
337  	  utime_t lat = ceph_clock_now() - start;
338  	  logger->inc(l_leveldb_gets);
339  	  logger->tinc(l_leveldb_get_latency, lat);
340  	  return r;
341  	}
342  	
343  	string LevelDBStore::combine_strings(const string &prefix, const string &value)
344  	{
345  	  string out = prefix;
346  	  out.push_back(0);
347  	  out.append(value);
348  	  return out;
349  	}
350  	
351  	bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in)
352  	{
353  	  bufferlist bl;
354  	  bl.append(bufferptr(in.data(), in.size()));
355  	  return bl;
356  	}
357  	
358  	int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
359  	{
360  	  size_t prefix_len = 0;
361  	  
362  	  // Find separator inside Slice
363  	  char* separator = (char*) memchr(in.data(), 0, in.size());
364  	  if (separator == NULL)
365  	     return -EINVAL;
366  	  prefix_len = size_t(separator - in.data());
367  	  if (prefix_len >= in.size())
368  	    return -EINVAL;
369  	
370  	  if (prefix)
371  	    *prefix = string(in.data(), prefix_len);
372  	  if (key)
373  	    *key = string(separator+1, in.size() - prefix_len - 1);
374  	   return 0;
375  	}
376  	
377  	void LevelDBStore::compact()
378  	{
379  	  logger->inc(l_leveldb_compact);
380  	  db->CompactRange(NULL, NULL);
381  	}
382  	
383  	
384  	void LevelDBStore::compact_thread_entry()
385  	{
386  	  std::unique_lock l{compact_queue_lock};
387  	  while (!compact_queue_stop) {
388  	    while (!compact_queue.empty()) {
389  	      pair<string,string> range = compact_queue.front();
390  	      compact_queue.pop_front();
391  	      logger->set(l_leveldb_compact_queue_len, compact_queue.size());
392  	      l.unlock();
393  	      logger->inc(l_leveldb_compact_range);
394  	      if (range.first.empty() && range.second.empty()) {
395  	        compact();
396  	      } else {
397  	        compact_range(range.first, range.second);
398  	      }
399  	      l.lock();
400  	      continue;
401  	    }
402  	    if (compact_queue_stop)
403  	      break;
404  	    compact_queue_cond.wait(l);
405  	  }
406  	}
407  	
408  	void LevelDBStore::compact_range_async(const string& start, const string& end)
409  	{
410  	  std::lock_guard l(compact_queue_lock);
411  	
412  	  // try to merge adjacent ranges.  this is O(n), but the queue should
413  	  // be short.  note that we do not cover all overlap cases and merge
414  	  // opportunities here, but we capture the ones we currently need.
415  	  list< pair<string,string> >::iterator p = compact_queue.begin();
(1) Event cond_true: Condition "p != this->compact_queue.end()", taking true branch.
416  	  while (p != compact_queue.end()) {
(2) Event cond_true: Condition "p->first == start", taking true branch.
(3) Event cond_false: Condition "p->second == end", taking false branch.
417  	    if (p->first == start && p->second == end) {
418  	      // dup; no-op
419  	      return;
(4) Event if_end: End of if statement.
420  	    }
(5) Event cond_true: Condition "p->first <= end", taking true branch.
(6) Event cond_false: Condition "p->first > start", taking false branch.
421  	    if (p->first <= end && p->first > start) {
422  	      // merge with existing range to the right
423  	      compact_queue.push_back(make_pair(start, p->second));
424  	      compact_queue.erase(p);
425  	      logger->inc(l_leveldb_compact_queue_merge);
426  	      break;
(7) Event if_end: End of if statement.
427  	    }
(8) Event cond_true: Condition "p->second >= start", taking true branch.
(9) Event cond_true: Condition "p->second < end", taking true branch.
428  	    if (p->second >= start && p->second < end) {
429  	      // merge with existing range to the left
430  	      compact_queue.push_back(make_pair(p->first, end));
(10) Event erase_iterator: "erase" invalidates iterator "p".
Also see events: [use_iterator]
431  	      compact_queue.erase(p);
432  	      logger->inc(l_leveldb_compact_queue_merge);
(11) Event break: Breaking from loop.
433  	      break;
434  	    }
435  	    ++p;
(12) Event loop_end: Reached end of loop.
436  	  }
(13) Event use_iterator: Using invalid iterator "p".
Also see events: [erase_iterator]
437  	  if (p == compact_queue.end()) {
438  	    // no merge, new entry.
439  	    compact_queue.push_back(make_pair(start, end));
440  	    logger->set(l_leveldb_compact_queue_len, compact_queue.size());
441  	  }
442  	  compact_queue_cond.notify_all();
443  	  if (!compact_thread.is_started()) {
444  	    compact_thread.create("levdbst_compact");
445  	  }
446  	}
447