1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2014 Red Hat
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#ifndef CEPH_OSD_BLUESTORE_H
16   	#define CEPH_OSD_BLUESTORE_H
17   	
18   	#include "acconfig.h"
19   	
20   	#include <unistd.h>
21   	
22   	#include <atomic>
23   	#include <chrono>
24   	#include <ratio>
25   	#include <mutex>
26   	#include <condition_variable>
27   	
28   	#include <boost/intrusive/list.hpp>
29   	#include <boost/intrusive/unordered_set.hpp>
30   	#include <boost/intrusive/set.hpp>
31   	#include <boost/functional/hash.hpp>
32   	#include <boost/dynamic_bitset.hpp>
33   	#include <boost/circular_buffer.hpp>
34   	
35   	#include "include/cpp-btree/btree_set.h"
36   	
37   	#include "include/ceph_assert.h"
38   	#include "include/unordered_map.h"
39   	#include "include/mempool.h"
40   	#include "include/hash.h"
41   	#include "common/bloom_filter.hpp"
42   	#include "common/Finisher.h"
43   	#include "common/ceph_mutex.h"
44   	#include "common/Throttle.h"
45   	#include "common/perf_counters.h"
46   	#include "common/PriorityCache.h"
47   	#include "compressor/Compressor.h"
48   	#include "os/ObjectStore.h"
49   	
50   	#include "bluestore_types.h"
51   	#include "BlockDevice.h"
52   	#include "BlueFS.h"
53   	#include "common/EventTrace.h"
54   	
55   	class Allocator;
56   	class FreelistManager;
57   	class BlueStoreRepairer;
58   	
59   	//#define DEBUG_CACHE
60   	//#define DEBUG_DEFERRED
61   	
62   	
63   	
64   	// constants for Buffer::optimize()
65   	#define MAX_BUFFER_SLOP_RATIO_DEN  8  // so actually 1/N
66   	
67   	
68   	enum {
69   	  l_bluestore_first = 732430,
70   	  l_bluestore_kv_flush_lat,
71   	  l_bluestore_kv_commit_lat,
72   	  l_bluestore_kv_sync_lat,
73   	  l_bluestore_kv_final_lat,
74   	  l_bluestore_state_prepare_lat,
75   	  l_bluestore_state_aio_wait_lat,
76   	  l_bluestore_state_io_done_lat,
77   	  l_bluestore_state_kv_queued_lat,
78   	  l_bluestore_state_kv_committing_lat,
79   	  l_bluestore_state_kv_done_lat,
80   	  l_bluestore_state_deferred_queued_lat,
81   	  l_bluestore_state_deferred_aio_wait_lat,
82   	  l_bluestore_state_deferred_cleanup_lat,
83   	  l_bluestore_state_finishing_lat,
84   	  l_bluestore_state_done_lat,
85   	  l_bluestore_throttle_lat,
86   	  l_bluestore_submit_lat,
87   	  l_bluestore_commit_lat,
88   	  l_bluestore_read_lat,
89   	  l_bluestore_read_onode_meta_lat,
90   	  l_bluestore_read_wait_aio_lat,
91   	  l_bluestore_compress_lat,
92   	  l_bluestore_decompress_lat,
93   	  l_bluestore_csum_lat,
94   	  l_bluestore_compress_success_count,
95   	  l_bluestore_compress_rejected_count,
96   	  l_bluestore_write_pad_bytes,
97   	  l_bluestore_deferred_write_ops,
98   	  l_bluestore_deferred_write_bytes,
99   	  l_bluestore_write_penalty_read_ops,
100  	  l_bluestore_allocated,
101  	  l_bluestore_stored,
102  	  l_bluestore_compressed,
103  	  l_bluestore_compressed_allocated,
104  	  l_bluestore_compressed_original,
105  	  l_bluestore_onodes,
106  	  l_bluestore_onode_hits,
107  	  l_bluestore_onode_misses,
108  	  l_bluestore_onode_shard_hits,
109  	  l_bluestore_onode_shard_misses,
110  	  l_bluestore_extents,
111  	  l_bluestore_blobs,
112  	  l_bluestore_buffers,
113  	  l_bluestore_buffer_bytes,
114  	  l_bluestore_buffer_hit_bytes,
115  	  l_bluestore_buffer_miss_bytes,
116  	  l_bluestore_write_big,
117  	  l_bluestore_write_big_bytes,
118  	  l_bluestore_write_big_blobs,
119  	  l_bluestore_write_small,
120  	  l_bluestore_write_small_bytes,
121  	  l_bluestore_write_small_unused,
122  	  l_bluestore_write_small_deferred,
123  	  l_bluestore_write_small_pre_read,
124  	  l_bluestore_write_small_new,
125  	  l_bluestore_txc,
126  	  l_bluestore_onode_reshard,
127  	  l_bluestore_blob_split,
128  	  l_bluestore_extent_compress,
129  	  l_bluestore_gc_merged,
130  	  l_bluestore_read_eio,
131  	  l_bluestore_reads_with_retries,
132  	  l_bluestore_fragmentation,
133  	  l_bluestore_omap_seek_to_first_lat,
134  	  l_bluestore_omap_upper_bound_lat,
135  	  l_bluestore_omap_lower_bound_lat,
136  	  l_bluestore_omap_next_lat,
137  	  l_bluestore_clist_lat,
138  	  l_bluestore_last
139  	};
140  	
141  	#define META_POOL_ID ((uint64_t)-1ull)
142  	
143  	class BlueStore : public ObjectStore,
144  			  public BlueFSDeviceExpander,
145  			  public md_config_obs_t {
146  	  // -----------------------------------------------------
147  	  // types
148  	public:
149  	  // config observer
150  	  const char** get_tracked_conf_keys() const override;
151  	  void handle_conf_change(const ConfigProxy& conf,
152  				  const std::set<std::string> &changed) override;
153  	
154  	  //handler for discard event
155  	  void handle_discard(interval_set<uint64_t>& to_release);
156  	
157  	  void _set_csum();
158  	  void _set_compression();
159  	  void _set_throttle_params();
160  	  int _set_cache_sizes();
161  	  void _set_max_defer_interval() {
162  	    max_defer_interval =
163  		cct->_conf.get_val<double>("bluestore_max_defer_interval");
164  	  }
165  	
166  	  class TransContext;
167  	
168  	  typedef map<uint64_t, bufferlist> ready_regions_t;
169  	
170  	
171  	  struct BufferSpace;
172  	  struct Collection;
173  	  typedef boost::intrusive_ptr<Collection> CollectionRef;
174  	
175  	  struct AioContext {
176  	    virtual void aio_finish(BlueStore *store) = 0;
177  	    virtual ~AioContext() {}
178  	  };
179  	
180  	  /// cached buffer
181  	  struct Buffer {
182  	    MEMPOOL_CLASS_HELPERS();
183  	
184  	    enum {
185  	      STATE_EMPTY,     ///< empty buffer -- used for cache history
186  	      STATE_CLEAN,     ///< clean data that is up to date
187  	      STATE_WRITING,   ///< data that is being written (io not yet complete)
188  	    };
189  	    static const char *get_state_name(int s) {
190  	      switch (s) {
191  	      case STATE_EMPTY: return "empty";
192  	      case STATE_CLEAN: return "clean";
193  	      case STATE_WRITING: return "writing";
194  	      default: return "???";
195  	      }
196  	    }
197  	    enum {
198  	      FLAG_NOCACHE = 1,  ///< trim when done WRITING (do not become CLEAN)
199  	      // NOTE: fix operator<< when you define a second flag
200  	    };
201  	    static const char *get_flag_name(int s) {
202  	      switch (s) {
203  	      case FLAG_NOCACHE: return "nocache";
204  	      default: return "???";
205  	      }
206  	    }
207  	
208  	    BufferSpace *space;
209  	    uint16_t state;             ///< STATE_*
210  	    uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
211  	    uint32_t flags;             ///< FLAG_*
212  	    uint64_t seq;
213  	    uint32_t offset, length;
214  	    bufferlist data;
215  	
216  	    boost::intrusive::list_member_hook<> lru_item;
217  	    boost::intrusive::list_member_hook<> state_item;
218  	
219  	    Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
220  		   unsigned f = 0)
221  	      : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
222  	    Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
223  		   unsigned f = 0)
224  	      : space(space), state(s), flags(f), seq(q), offset(o),
225  		length(b.length()), data(b) {}
226  	
227  	    bool is_empty() const {
228  	      return state == STATE_EMPTY;
229  	    }
230  	    bool is_clean() const {
231  	      return state == STATE_CLEAN;
232  	    }
233  	    bool is_writing() const {
234  	      return state == STATE_WRITING;
235  	    }
236  	
237  	    uint32_t end() const {
238  	      return offset + length;
239  	    }
240  	
241  	    void truncate(uint32_t newlen) {
242  	      ceph_assert(newlen < length);
243  	      if (data.length()) {
244  		bufferlist t;
245  		t.substr_of(data, 0, newlen);
246  		data.claim(t);
247  	      }
248  	      length = newlen;
249  	    }
250  	    void maybe_rebuild() {
251  	      if (data.length() &&
252  		  (data.get_num_buffers() > 1 ||
253  		   data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
254  		data.rebuild();
255  	      }
256  	    }
257  	
258  	    void dump(Formatter *f) const {
259  	      f->dump_string("state", get_state_name(state));
260  	      f->dump_unsigned("seq", seq);
261  	      f->dump_unsigned("offset", offset);
262  	      f->dump_unsigned("length", length);
263  	      f->dump_unsigned("data_length", data.length());
264  	    }
265  	  };
266  	
267  	  struct BufferCacheShard;
268  	
269  	  /// map logical extent range (object) onto buffers
270  	  struct BufferSpace {
271  	    enum {
272  	      BYPASS_CLEAN_CACHE = 0x1,  // bypass clean cache
273  	    };
274  	
275  	    typedef boost::intrusive::list<
276  	      Buffer,
277  	      boost::intrusive::member_hook<
278  	        Buffer,
279  		boost::intrusive::list_member_hook<>,
280  		&Buffer::state_item> > state_list_t;
281  	
282  	    mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
283  	      buffer_map;
284  	
285  	    // we use a bare intrusive list here instead of std::map because
286  	    // it uses less memory and we expect this to be very small (very
287  	    // few IOs in flight to the same Blob at the same time).
288  	    state_list_t writing;   ///< writing buffers, sorted by seq, ascending
289  	
290  	    ~BufferSpace() {
291  	      ceph_assert(buffer_map.empty());
292  	      ceph_assert(writing.empty());
293  	    }
294  	
295  	    void _add_buffer(BufferCacheShard* cache, Buffer *b, int level, Buffer *near) {
296  	      cache->_audit("_add_buffer start");
297  	      buffer_map[b->offset].reset(b);
298  	      if (b->is_writing()) {
299  		b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
300  	        if (writing.empty() || writing.rbegin()->seq <= b->seq) {
301  	          writing.push_back(*b);
302  	        } else {
303  	          auto it = writing.begin();
304  	          while (it->seq < b->seq) {
305  	            ++it;
306  	          }
307  	
308  	          ceph_assert(it->seq >= b->seq);
309  	          // note that this will insert b before it
310  	          // hence the order is maintained
311  	          writing.insert(it, *b);
312  	        }
313  	      } else {
314  		b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
315  		cache->_add(b, level, near);
316  	      }
317  	      cache->_audit("_add_buffer end");
318  	    }
319  	    void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
320  	      _rm_buffer(cache, buffer_map.find(b->offset));
321  	    }
322  	    void _rm_buffer(BufferCacheShard* cache,
323  			    map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
324  	      ceph_assert(p != buffer_map.end());
325  	      cache->_audit("_rm_buffer start");
326  	      if (p->second->is_writing()) {
327  	        writing.erase(writing.iterator_to(*p->second));
328  	      } else {
329  		cache->_rm(p->second.get());
330  	      }
331  	      buffer_map.erase(p);
332  	      cache->_audit("_rm_buffer end");
333  	    }
334  	
335  	    map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
336  	      uint32_t offset) {
337  	      auto i = buffer_map.lower_bound(offset);
338  	      if (i != buffer_map.begin()) {
339  		--i;
340  		if (i->first + i->second->length <= offset)
341  		  ++i;
342  	      }
343  	      return i;
344  	    }
345  	
346  	    // must be called under protection of the Cache lock
347  	    void _clear(BufferCacheShard* cache);
348  	
349  	    // return value is the highest cache_private of a trimmed buffer, or 0.
350  	    int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
351  	      std::lock_guard l(cache->lock);
352  	      int ret = _discard(cache, offset, length);
353  	      cache->_trim();
354  	      return ret;
355  	    }
356  	    int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
357  	
358  	    void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
359  		       unsigned flags) {
360  	      std::lock_guard l(cache->lock);
361  	      Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
362  				     flags);
363  	      b->cache_private = _discard(cache, offset, bl.length());
364  	      _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
365  	      cache->_trim();
366  	    }
367  	    void _finish_write(BufferCacheShard* cache, uint64_t seq);
368  	    void did_read(BufferCacheShard* cache, uint32_t offset, bufferlist& bl) {
369  	      std::lock_guard l(cache->lock);
370  	      Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
371  	      b->cache_private = _discard(cache, offset, bl.length());
372  	      _add_buffer(cache, b, 1, nullptr);
373  	      cache->_trim();
374  	    }
375  	
376  	    void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
377  		      BlueStore::ready_regions_t& res,
378  		      interval_set<uint32_t>& res_intervals,
379  		      int flags = 0);
380  	
381  	    void truncate(BufferCacheShard* cache, uint32_t offset) {
382  	      discard(cache, offset, (uint32_t)-1 - offset);
383  	    }
384  	
385  	    void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
386  	
387  	    void dump(BufferCacheShard* cache, Formatter *f) const {
388  	      std::lock_guard l(cache->lock);
389  	      f->open_array_section("buffers");
390  	      for (auto& i : buffer_map) {
391  		f->open_object_section("buffer");
392  		ceph_assert(i.first == i.second->offset);
393  		i.second->dump(f);
394  		f->close_section();
395  	      }
396  	      f->close_section();
397  	    }
398  	  };
399  	
400  	  struct SharedBlobSet;
401  	
402  	  /// in-memory shared blob state (incl cached buffers)
403  	  struct SharedBlob {
404  	    MEMPOOL_CLASS_HELPERS();
405  	
406  	    std::atomic_int nref = {0}; ///< reference count
407  	    bool loaded = false;
408  	
409  	    CollectionRef coll;
410  	    union {
411  	      uint64_t sbid_unloaded;              ///< sbid if persistent isn't loaded
412  	      bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
413  	    };
414  	    BufferSpace bc;             ///< buffer cache
415  	
416  	    SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
417  	      if (get_cache()) {
418  		get_cache()->add_blob();
419  	      }
420  	    }
421  	    SharedBlob(uint64_t i, Collection *_coll);
422  	    ~SharedBlob();
423  	
424  	    uint64_t get_sbid() const {
425  	      return loaded ? persistent->sbid : sbid_unloaded;
426  	    }
427  	
428  	    friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
429  	    friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
430  	
431  	    void dump(Formatter* f) const;
432  	    friend ostream& operator<<(ostream& out, const SharedBlob& sb);
433  	
434  	    void get() {
435  	      ++nref;
436  	    }
437  	    void put();
438  	
439  	    /// get logical references
440  	    void get_ref(uint64_t offset, uint32_t length);
441  	
442  	    /// put logical references, and get back any released extents
443  	    void put_ref(uint64_t offset, uint32_t length,
444  			 PExtentVector *r, bool *unshare);
445  	
446  	    void finish_write(uint64_t seq);
447  	
448  	    friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
449  	      return l.get_sbid() == r.get_sbid();
450  	    }
451  	    inline BufferCacheShard* get_cache() {
452  	      return coll ? coll->cache : nullptr;
453  	    }
454  	    inline SharedBlobSet* get_parent() {
455  	      return coll ? &(coll->shared_blob_set) : nullptr;
456  	    }
457  	    inline bool is_loaded() const {
458  	      return loaded;
459  	    }
460  	
461  	  };
462  	  typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
463  	
464  	  /// a lookup table of SharedBlobs
465  	  struct SharedBlobSet {
466  	    /// protect lookup, insertion, removal
467  	    ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
468  	
469  	    // we use a bare pointer because we don't want to affect the ref
470  	    // count
471  	    mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
472  	
473  	    SharedBlobRef lookup(uint64_t sbid) {
474  	      std::lock_guard l(lock);
475  	      auto p = sb_map.find(sbid);
476  	      if (p == sb_map.end() ||
477  		  p->second->nref == 0) {
478  	        return nullptr;
479  	      }
480  	      return p->second;
481  	    }
482  	
483  	    void add(Collection* coll, SharedBlob *sb) {
484  	      std::lock_guard l(lock);
485  	      sb_map[sb->get_sbid()] = sb;
486  	      sb->coll = coll;
487  	    }
488  	
489  	    bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
490  	      std::lock_guard l(lock);
491  	      ceph_assert(sb->get_parent() == this);
492  	      if (verify_nref_is_zero && sb->nref != 0) {
493  		return false;
494  	      }
495  	      // only remove if it still points to us
496  	      auto p = sb_map.find(sb->get_sbid());
497  	      if (p != sb_map.end() &&
498  		  p->second == sb) {
499  		sb_map.erase(p);
500  	      }
501  	      return true;
502  	    }
503  	
504  	    bool empty() {
505  	      std::lock_guard l(lock);
506  	      return sb_map.empty();
507  	    }
508  	
509  	    template <int LogLevelV>
510  	    void dump(CephContext *cct);
511  	  };
512  	
513  	//#define CACHE_BLOB_BL  // not sure if this is a win yet or not... :/
514  	
515  	  /// in-memory blob metadata and associated cached buffers (if any)
516  	  struct Blob {
517  	    MEMPOOL_CLASS_HELPERS();
518  	
519  	    std::atomic_int nref = {0};     ///< reference count
520  	    int16_t id = -1;                ///< id, for spanning blobs only, >= 0
521  	    int16_t last_encoded_id = -1;   ///< (ephemeral) used during encoding only
522  	    SharedBlobRef shared_blob;      ///< shared blob state (if any)
523  	
524  	  private:
525  	    mutable bluestore_blob_t blob;  ///< decoded blob metadata
526  	#ifdef CACHE_BLOB_BL
527  	    mutable bufferlist blob_bl;     ///< cached encoded blob, blob is dirty if empty
528  	#endif
529  	    /// refs from this shard.  ephemeral if id<0, persisted if spanning.
530  	    bluestore_blob_use_tracker_t used_in_blob;
531  	
532  	  public:
533  	
534  	    friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
535  	    friend void intrusive_ptr_release(Blob *b) { b->put(); }
536  	
537  	    void dump(Formatter* f) const;
538  	    friend ostream& operator<<(ostream& out, const Blob &b);
539  	
540  	    const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
541  	      return used_in_blob;
542  	    }
543  	    bool is_referenced() const {
544  	      return used_in_blob.is_not_empty();
545  	    }
546  	    uint32_t get_referenced_bytes() const {
547  	      return used_in_blob.get_referenced_bytes();
548  	    }
549  	
550  	    bool is_spanning() const {
551  	      return id >= 0;
552  	    }
553  	
554  	    bool can_split() const {
555  	      std::lock_guard l(shared_blob->get_cache()->lock);
556  	      // splitting a BufferSpace writing list is too hard; don't try.
557  	      return shared_blob->bc.writing.empty() &&
558  	             used_in_blob.can_split() &&
559  	             get_blob().can_split();
560  	    }
561  	
562  	    bool can_split_at(uint32_t blob_offset) const {
563  	      return used_in_blob.can_split_at(blob_offset) &&
564  	             get_blob().can_split_at(blob_offset);
565  	    }
566  	
567  	    bool can_reuse_blob(uint32_t min_alloc_size,
568  				uint32_t target_blob_size,
569  				uint32_t b_offset,
570  				uint32_t *length0);
571  	
572  	    void dup(Blob& o) {
573  	      o.shared_blob = shared_blob;
574  	      o.blob = blob;
575  	#ifdef CACHE_BLOB_BL
576  	      o.blob_bl = blob_bl;
577  	#endif
578  	    }
579  	
580  	    inline const bluestore_blob_t& get_blob() const {
581  	      return blob;
582  	    }
583  	    inline bluestore_blob_t& dirty_blob() {
584  	#ifdef CACHE_BLOB_BL
585  	      blob_bl.clear();
586  	#endif
587  	      return blob;
588  	    }
589  	
590  	    /// discard buffers for unallocated regions
591  	    void discard_unallocated(Collection *coll);
592  	
593  	    /// get logical references
594  	    void get_ref(Collection *coll, uint32_t offset, uint32_t length);
595  	    /// put logical references, and get back any released extents
596  	    bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
597  			 PExtentVector *r);
598  	
599  	    /// split the blob
600  	    void split(Collection *coll, uint32_t blob_offset, Blob *o);
601  	
602  	    void get() {
603  	      ++nref;
604  	    }
605  	    void put() {
606  	      if (--nref == 0)
607  		delete this;
608  	    }
609  	
610  	
611  	#ifdef CACHE_BLOB_BL
612  	    void _encode() const {
613  	      if (blob_bl.length() == 0 ) {
614  		encode(blob, blob_bl);
615  	      } else {
616  		ceph_assert(blob_bl.length());
617  	      }
618  	    }
619  	    void bound_encode(
620  	      size_t& p,
621  	      bool include_ref_map) const {
622  	      _encode();
623  	      p += blob_bl.length();
624  	      if (include_ref_map) {
625  		used_in_blob.bound_encode(p);
626  	      }
627  	    }
628  	    void encode(
629  	      bufferlist::contiguous_appender& p,
630  	      bool include_ref_map) const {
631  	      _encode();
632  	      p.append(blob_bl);
633  	      if (include_ref_map) {
634  		used_in_blob.encode(p);
635  	      }
636  	    }
637  	    void decode(
638  	      Collection */*coll*/,
639  	      bufferptr::const_iterator& p,
640  	      bool include_ref_map) {
641  	      const char *start = p.get_pos();
642  	      denc(blob, p);
643  	      const char *end = p.get_pos();
644  	      blob_bl.clear();
645  	      blob_bl.append(start, end - start);
646  	      if (include_ref_map) {
647  		used_in_blob.decode(p);
648  	      }
649  	    }
650  	#else
651  	    void bound_encode(
652  	      size_t& p,
653  	      uint64_t struct_v,
654  	      uint64_t sbid,
655  	      bool include_ref_map) const {
656  	      denc(blob, p, struct_v);
657  	      if (blob.is_shared()) {
658  	        denc(sbid, p);
659  	      }
660  	      if (include_ref_map) {
661  		used_in_blob.bound_encode(p);
662  	      }
663  	    }
664  	    void encode(
665  	      bufferlist::contiguous_appender& p,
666  	      uint64_t struct_v,
667  	      uint64_t sbid,
668  	      bool include_ref_map) const {
669  	      denc(blob, p, struct_v);
670  	      if (blob.is_shared()) {
671  	        denc(sbid, p);
672  	      }
673  	      if (include_ref_map) {
674  		used_in_blob.encode(p);
675  	      }
676  	    }
677  	    void decode(
678  	      Collection *coll,
679  	      bufferptr::const_iterator& p,
680  	      uint64_t struct_v,
681  	      uint64_t* sbid,
682  	      bool include_ref_map);
683  	#endif
684  	  };
685  	  typedef boost::intrusive_ptr<Blob> BlobRef;
686  	  typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
687  	
688  	  /// a logical extent, pointing to (some portion of) a blob
689  	  typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
690  	  struct Extent : public ExtentBase {
691  	    MEMPOOL_CLASS_HELPERS();
692  	
693  	    uint32_t logical_offset = 0;      ///< logical offset
694  	    uint32_t blob_offset = 0;         ///< blob offset
695  	    uint32_t length = 0;              ///< length
696  	    BlobRef  blob;                    ///< the blob with our data
697  	
698  	    /// ctor for lookup only
699  	    explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
700  	    /// ctor for delayed initialization (see decode_some())
701  	    explicit Extent() : ExtentBase() {
702  	    }
703  	    /// ctor for general usage
704  	    Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
705  	      : ExtentBase(),
706  	        logical_offset(lo), blob_offset(o), length(l) {
707  	      assign_blob(b);
708  	    }
709  	    ~Extent() {
710  	      if (blob) {
711  		blob->shared_blob->get_cache()->rm_extent();
712  	      }
713  	    }
714  	
715  	    void dump(Formatter* f) const;
716  	
717  	    void assign_blob(const BlobRef& b) {
718  	      ceph_assert(!blob);
719  	      blob = b;
720  	      blob->shared_blob->get_cache()->add_extent();
721  	    }
722  	
723  	    // comparators for intrusive_set
724  	    friend bool operator<(const Extent &a, const Extent &b) {
725  	      return a.logical_offset < b.logical_offset;
726  	    }
727  	    friend bool operator>(const Extent &a, const Extent &b) {
728  	      return a.logical_offset > b.logical_offset;
729  	    }
730  	    friend bool operator==(const Extent &a, const Extent &b) {
731  	      return a.logical_offset == b.logical_offset;
732  	    }
733  	
734  	    uint32_t blob_start() const {
735  	      return logical_offset - blob_offset;
736  	    }
737  	
738  	    uint32_t blob_end() const {
739  	      return blob_start() + blob->get_blob().get_logical_length();
740  	    }
741  	
742  	    uint32_t logical_end() const {
743  	      return logical_offset + length;
744  	    }
745  	
746  	    // return true if any piece of the blob is out of
747  	    // the given range [o, o + l].
748  	    bool blob_escapes_range(uint32_t o, uint32_t l) const {
749  	      return blob_start() < o || blob_end() > o + l;
750  	    }
751  	  };
752  	  typedef boost::intrusive::set<Extent> extent_map_t;
753  	
754  	
755  	  friend ostream& operator<<(ostream& out, const Extent& e);
756  	
757  	  struct OldExtent {
758  	    boost::intrusive::list_member_hook<> old_extent_item;
759  	    Extent e;
760  	    PExtentVector r;
761  	    bool blob_empty; // flag to track the last removed extent that makes blob
762  	                     // empty - required to update compression stat properly
763  	    OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
764  	      : e(lo, o, l, b), blob_empty(false) {
765  	    }
766  	    static OldExtent* create(CollectionRef c,
767  	                             uint32_t lo,
768  				     uint32_t o,
769  				     uint32_t l,
770  				     BlobRef& b);
771  	  };
772  	  typedef boost::intrusive::list<
773  	      OldExtent,
774  	      boost::intrusive::member_hook<
775  	        OldExtent,
776  	    boost::intrusive::list_member_hook<>,
777  	    &OldExtent::old_extent_item> > old_extent_map_t;
778  	
779  	  struct Onode;
780  	
781  	  /// a sharded extent map, mapping offsets to lextents to blobs
782  	  struct ExtentMap {
783  	    Onode *onode;
784  	    extent_map_t extent_map;        ///< map of Extents to Blobs
785  	    blob_map_t spanning_blob_map;   ///< blobs that span shards
786  	    typedef boost::intrusive_ptr<Onode> OnodeRef;
787  	
788  	    struct Shard {
789  	      bluestore_onode_t::shard_info *shard_info = nullptr;
790  	      unsigned extents = 0;  ///< count extents in this shard
791  	      bool loaded = false;   ///< true if shard is loaded
792  	      bool dirty = false;    ///< true if shard is dirty and needs reencoding
793  	    };
794  	    mempool::bluestore_cache_other::vector<Shard> shards;    ///< shards
795  	
796  	    bufferlist inline_bl;    ///< cached encoded map, if unsharded; empty=>dirty
797  	
798  	    uint32_t needs_reshard_begin = 0;
799  	    uint32_t needs_reshard_end = 0;
800  	
801  	    void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
802  	      uint64_t&, uint64_t&, uint64_t&);
803  	
804  	    bool needs_reshard() const {
805  	      return needs_reshard_end > needs_reshard_begin;
806  	    }
807  	    void clear_needs_reshard() {
808  	      needs_reshard_begin = needs_reshard_end = 0;
809  	    }
810  	    void request_reshard(uint32_t begin, uint32_t end) {
811  	      if (begin < needs_reshard_begin) {
812  		needs_reshard_begin = begin;
813  	      }
814  	      if (end > needs_reshard_end) {
815  		needs_reshard_end = end;
816  	      }
817  	    }
818  	
819  	    struct DeleteDisposer {
820  	      void operator()(Extent *e) { delete e; }
821  	    };
822  	
823  	    ExtentMap(Onode *o);
824  	    ~ExtentMap() {
825  	      extent_map.clear_and_dispose(DeleteDisposer());
826  	    }
827  	
828  	    void clear() {
829  	      extent_map.clear_and_dispose(DeleteDisposer());
830  	      shards.clear();
831  	      inline_bl.clear();
832  	      clear_needs_reshard();
833  	    }
834  	
835  	    void dump(Formatter* f) const;
836  	
837  	    bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
838  			     unsigned *pn);
839  	    unsigned decode_some(bufferlist& bl);
840  	
841  	    void bound_encode_spanning_blobs(size_t& p);
842  	    void encode_spanning_blobs(bufferlist::contiguous_appender& p);
843  	    void decode_spanning_blobs(bufferptr::const_iterator& p);
844  	
845  	    BlobRef get_spanning_blob(int id) {
846  	      auto p = spanning_blob_map.find(id);
847  	      ceph_assert(p != spanning_blob_map.end());
848  	      return p->second;
849  	    }
850  	
851  	    void update(KeyValueDB::Transaction t, bool force);
852  	    decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
853  	    void reshard(
854  	      KeyValueDB *db,
855  	      KeyValueDB::Transaction t);
856  	
857  	    /// initialize Shards from the onode
858  	    void init_shards(bool loaded, bool dirty);
859  	
860  	    /// return index of shard containing offset
861  	    /// or -1 if not found
862  	    int seek_shard(uint32_t offset) {
863  	      size_t end = shards.size();
864  	      size_t mid, left = 0;
865  	      size_t right = end; // one passed the right end
866  	
867  	      while (left < right) {
868  	        mid = left + (right - left) / 2;
869  	        if (offset >= shards[mid].shard_info->offset) {
870  	          size_t next = mid + 1;
871  	          if (next >= end || offset < shards[next].shard_info->offset)
872  	            return mid;
873  	          //continue to search forwards
874  	          left = next;
875  	        } else {
876  	          //continue to search backwards
877  	          right = mid;
878  	        }
879  	      }
880  	
881  	      return -1; // not found
882  	    }
883  	
884  	    /// check if a range spans a shard
885  	    bool spans_shard(uint32_t offset, uint32_t length) {
886  	      if (shards.empty()) {
887  		return false;
888  	      }
889  	      int s = seek_shard(offset);
890  	      ceph_assert(s >= 0);
891  	      if (s == (int)shards.size() - 1) {
892  		return false; // last shard
893  	      }
894  	      if (offset + length <= shards[s+1].shard_info->offset) {
895  		return false;
896  	      }
897  	      return true;
898  	    }
899  	
900  	    /// ensure that a range of the map is loaded
901  	    void fault_range(KeyValueDB *db,
902  			     uint32_t offset, uint32_t length);
903  	
904  	    /// ensure a range of the map is marked dirty
905  	    void dirty_range(uint32_t offset, uint32_t length);
906  	
907  	    /// for seek_lextent test
908  	    extent_map_t::iterator find(uint64_t offset);
909  	
910  	    /// seek to the first lextent including or after offset
911  	    extent_map_t::iterator seek_lextent(uint64_t offset);
912  	    extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
913  	
914  	    /// add a new Extent
915  	    void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
916  	      extent_map.insert(*new Extent(lo, o, l, b));
917  	    }
918  	
919  	    /// remove (and delete) an Extent
920  	    void rm(extent_map_t::iterator p) {
921  	      extent_map.erase_and_dispose(p, DeleteDisposer());
922  	    }
923  	
924  	    bool has_any_lextents(uint64_t offset, uint64_t length);
925  	
926  	    /// consolidate adjacent lextents in extent_map
927  	    int compress_extent_map(uint64_t offset, uint64_t length);
928  	
929  	    /// punch a logical hole.  add lextents to deref to target list.
930  	    void punch_hole(CollectionRef &c,
931  			    uint64_t offset, uint64_t length,
932  			    old_extent_map_t *old_extents);
933  	
934  	    /// put new lextent into lextent_map overwriting existing ones if
935  	    /// any and update references accordingly
936  	    Extent *set_lextent(CollectionRef &c,
937  				uint64_t logical_offset,
938  				uint64_t offset, uint64_t length,
939  	                        BlobRef b,
940  				old_extent_map_t *old_extents);
941  	
942  	    /// split a blob (and referring extents)
943  	    BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
944  	  };
945  	
946  	  /// Compressed Blob Garbage collector
947  	  /*
948  	  The primary idea of the collector is to estimate a difference between
949  	  allocation units(AU) currently present for compressed blobs and new AUs
950  	  required to store that data uncompressed. 
951  	  Estimation is performed for protrusive extents within a logical range
952  	  determined by a concatenation of old_extents collection and specific(current)
953  	  write request.
954  	  The root cause for old_extents use is the need to handle blob ref counts
955  	  properly. Old extents still hold blob refs and hence we need to traverse
956  	  the collection to determine if blob to be released.
957  	  Protrusive extents are extents that fit into the blob set in action
958  	  (ones that are below the logical range from above) but not removed totally
959  	  due to the current write. 
960  	  E.g. for
961  	  extent1 <loffs = 100, boffs = 100, len  = 100> -> 
962  	    blob1<compressed, len_on_disk=4096, logical_len=8192>
963  	  extent2 <loffs = 200, boffs = 200, len  = 100> ->
964  	    blob2<raw, len_on_disk=4096, llen=4096>
965  	  extent3 <loffs = 300, boffs = 300, len  = 100> ->
966  	    blob1<compressed, len_on_disk=4096, llen=8192>
967  	  extent4 <loffs = 4096, boffs = 0, len  = 100>  ->
968  	    blob3<raw, len_on_disk=4096, llen=4096>
969  	  write(300~100)
970  	  protrusive extents are within the following ranges <0~300, 400~8192-400>
971  	  In this case existing AUs that might be removed due to GC (i.e. blob1) 
972  	  use 2x4K bytes.
973  	  And new AUs expected after GC = 0 since extent1 to be merged into blob2.
974  	  Hence we should do a collect.
975  	  */
976  	  class GarbageCollector
977  	  {
978  	  public:
979  	    /// return amount of allocation units that might be saved due to GC
980  	    int64_t estimate(
981  	      uint64_t offset,
982  	      uint64_t length,
983  	      const ExtentMap& extent_map,
984  	      const old_extent_map_t& old_extents,
985  	      uint64_t min_alloc_size);
986  	
987  	    /// return a collection of extents to perform GC on
988  	    const interval_set<uint64_t>& get_extents_to_collect() const {
989  	      return extents_to_collect;
990  	    }
991  	    GarbageCollector(CephContext* _cct) : cct(_cct) {}
992  	
993  	  private:
994  	    struct BlobInfo {
995  	      uint64_t referenced_bytes = 0;    ///< amount of bytes referenced in blob
996  	      int64_t expected_allocations = 0; ///< new alloc units required 
997  	                                        ///< in case of gc fulfilled
998  	      bool collect_candidate = false;   ///< indicate if blob has any extents 
999  	                                        ///< eligible for GC.
1000 	      extent_map_t::const_iterator first_lextent; ///< points to the first 
1001 	                                                  ///< lextent referring to 
1002 	                                                  ///< the blob if any.
1003 	                                                  ///< collect_candidate flag 
1004 	                                                  ///< determines the validity
1005 	      extent_map_t::const_iterator last_lextent;  ///< points to the last 
1006 	                                                  ///< lextent referring to 
1007 	                                                  ///< the blob if any.
1008 	
1009 	      BlobInfo(uint64_t ref_bytes) :
1010 	        referenced_bytes(ref_bytes) {
1011 	      }
1012 	    };
1013 	    CephContext* cct;
1014 	    map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1015 	                                         ///< copies that are affected by the
1016 	                                         ///< specific write
1017 	
1018 	    ///< protrusive extents that should be collected if GC takes place
1019 	    interval_set<uint64_t> extents_to_collect;
1020 	
1021 	    boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1022 	                                                ///<  unit when traversing 
1023 	                                                ///< protrusive extents. 
1024 	                                                ///< Other extents mapped to
1025 	                                                ///< this AU to be ignored 
1026 	                                                ///< (except the case where
1027 	                                                ///< uncompressed extent follows
1028 	                                                ///< compressed one - see below).
1029 	    BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
1030 	                                           ///< caused expected_allocations
1031 						   ///< counter increment at this blob.
1032 	                                           ///< if uncompressed extent follows 
1033 	                                           ///< a decrement for the 
1034 	                                	   ///< expected_allocations counter 
1035 	                                           ///< is needed
1036 	    int64_t expected_allocations = 0;      ///< new alloc units required in case
1037 	                                           ///< of gc fulfilled
1038 	    int64_t expected_for_release = 0;      ///< alloc units currently used by
1039 	                                           ///< compressed blobs that might
1040 	                                           ///< gone after GC
1041 	
1042 	  protected:
1043 	    void process_protrusive_extents(const BlueStore::ExtentMap& extent_map, 
1044 					    uint64_t start_offset,
1045 					    uint64_t end_offset,
1046 					    uint64_t start_touch_offset,
1047 					    uint64_t end_touch_offset,
1048 					    uint64_t min_alloc_size);
1049 	  };
1050 	
1051 	  struct OnodeSpace;
1052 	
1053 	  /// an in-memory object
1054 	  struct Onode {
1055 	    MEMPOOL_CLASS_HELPERS();
1056 	
1057 	    std::atomic_int nref;  ///< reference count
1058 	    Collection *c;
1059 	
1060 	    ghobject_t oid;
1061 	
1062 	    /// key under PREFIX_OBJ where we are stored
1063 	    mempool::bluestore_cache_other::string key;
1064 	
1065 	    boost::intrusive::list_member_hook<> lru_item;
1066 	
1067 	    bluestore_onode_t onode;  ///< metadata stored as value in kv store
1068 	    bool exists;              ///< true if object logically exists
1069 	
1070 	    ExtentMap extent_map;
1071 	
1072 	    // track txc's that have not been committed to kv store (and whose
1073 	    // effects cannot be read via the kvdb read methods)
1074 	    std::atomic<int> flushing_count = {0};
1075 	    std::atomic<int> waiting_count = {0};
1076 	    /// protect flush_txns
1077 	    ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1078 	    ceph::condition_variable flush_cond;   ///< wait here for uncommitted txns
1079 	
1080 	    Onode(Collection *c, const ghobject_t& o,
1081 		  const mempool::bluestore_cache_other::string& k)
1082 	      : nref(0),
1083 		c(c),
1084 		oid(o),
1085 		key(k),
1086 		exists(false),
1087 		extent_map(this) {
1088 	    }
1089 	    Onode(Collection* c, const ghobject_t& o,
1090 	      const string& k)
1091 	      : nref(0),
1092 	      c(c),
1093 	      oid(o),
1094 	      key(k),
1095 	      exists(false),
1096 	      extent_map(this) {
1097 	    }
1098 	    Onode(Collection* c, const ghobject_t& o,
1099 	      const char* k)
1100 	      : nref(0),
1101 	      c(c),
1102 	      oid(o),
1103 	      key(k),
1104 	      exists(false),
1105 	      extent_map(this) {
1106 	    }
1107 	
1108 	    static Onode* decode(
1109 	      CollectionRef c,
1110 	      const ghobject_t& oid,
1111 	      const string& key,
1112 	      const bufferlist& v);
1113 	
1114 	    void dump(Formatter* f) const;
1115 	
1116 	    void flush();
1117 	    void get() {
1118 	      ++nref;
1119 	    }
1120 	    void put() {
1121 	      if (--nref == 0)
1122 		delete this;
1123 	    }
1124 	
1125 	    const string& get_omap_prefix();
1126 	    void get_omap_header(string *out);
1127 	    void get_omap_key(const string& key, string *out);
1128 	    void rewrite_omap_key(const string& old, string *out);
1129 	    void get_omap_tail(string *out);
1130 	    void decode_omap_key(const string& key, string *user_key);
1131 	  };
1132 	  typedef boost::intrusive_ptr<Onode> OnodeRef;
1133 	
1134 	  /// A generic Cache Shard
1135 	  struct CacheShard {
1136 	    CephContext *cct;
1137 	    PerfCounters *logger;
1138 	
1139 	    /// protect lru and other structures
1140 	    ceph::recursive_mutex lock = {
1141 	      ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1142 	
1143 	    std::atomic<uint64_t> max = {0};
1144 	    std::atomic<uint64_t> num = {0};
1145 	
1146 	    CacheShard(CephContext* cct) : cct(cct), logger(nullptr) {}
1147 	    virtual ~CacheShard() {}
1148 	
1149 	    void set_max(uint64_t max_) {
1150 	      max = max_;
1151 	    }
1152 	
1153 	    uint64_t _get_num() {
1154 	      return num;
1155 	    }
1156 	
1157 	    virtual void _trim_to(uint64_t max) = 0;
1158 	    void _trim() {
1159 	      if (cct->_conf->objectstore_blackhole) {
1160 		// do not trim if we are throwing away IOs a layer down
1161 		return;
1162 	      }
1163 	      _trim_to(max);
1164 	    }
1165 	    void trim() {
1166 	      std::lock_guard l(lock);
1167 	      _trim();    
1168 	    }
1169 	    void flush() {
1170 	      std::lock_guard l(lock);
1171 	      // we should not be shutting down after the blackhole is enabled
1172 	      assert(!cct->_conf->objectstore_blackhole);
1173 	      _trim_to(0);
1174 	    }
1175 	
1176 	#ifdef DEBUG_CACHE
1177 	    virtual void _audit(const char *s) = 0;
1178 	#else
1179 	    void _audit(const char *s) { /* no-op */ }
1180 	#endif
1181 	  };
1182 	
1183 	  /// A Generic onode Cache Shard
1184 	  struct OnodeCacheShard : public CacheShard {
1185 	    std::array<std::pair<ghobject_t, mono_clock::time_point>, 64> dumped_onodes;
1186 	  public:
1187 	    OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1188 	    static OnodeCacheShard *create(CephContext* cct, string type,
1189 	                                   PerfCounters *logger);
1190 	    virtual void _add(OnodeRef& o, int level) = 0;
1191 	    virtual void _rm(OnodeRef& o) = 0;
1192 	    virtual void _touch(OnodeRef& o) = 0;
1193 	    virtual void add_stats(uint64_t *onodes) = 0;
1194 	
1195 	    bool empty() {
1196 	      return _get_num() == 0;
1197 	    }
1198 	  };
1199 	
1200 	  /// A Generic buffer Cache Shard
1201 	  struct BufferCacheShard : public CacheShard {
1202 	    std::atomic<uint64_t> num_extents = {0};
1203 	    std::atomic<uint64_t> num_blobs = {0};
1204 	    uint64_t buffer_bytes = 0;
1205 	
1206 	  public:
1207 	    BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1208 	    static BufferCacheShard *create(CephContext* cct, string type, 
1209 	                                    PerfCounters *logger);
1210 	    virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1211 	    virtual void _rm(Buffer *b) = 0;
1212 	    virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1213 	    virtual void _touch(Buffer *b) = 0;
1214 	    virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1215 	
1216 	    uint64_t _get_bytes() {
1217 	      return buffer_bytes;
1218 	    }
1219 	
1220 	    void add_extent() {
1221 	      ++num_extents;
1222 	    }
1223 	    void rm_extent() {
1224 	      --num_extents;
1225 	    }
1226 	
1227 	    void add_blob() {
1228 	      ++num_blobs;
1229 	    }
1230 	    void rm_blob() {
1231 	      --num_blobs;
1232 	    }
1233 	
1234 	    virtual void add_stats(uint64_t *extents,
1235 	                           uint64_t *blobs,
1236 	                           uint64_t *buffers,
1237 	                           uint64_t *bytes) = 0;
1238 	
1239 	    bool empty() {
1240 	      std::lock_guard l(lock);
1241 	      return _get_bytes() == 0;
1242 	    }
1243 	  };
1244 	
1245 	  struct OnodeSpace {
1246 	    OnodeCacheShard *cache;
1247 	
1248 	  private:
1249 	    /// forward lookups
1250 	    mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1251 	
1252 	    friend class Collection; // for split_cache()
1253 	
1254 	  public:
1255 	    OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1256 	    ~OnodeSpace() {
1257 	      clear();
1258 	    }
1259 	
1260 	    OnodeRef add(const ghobject_t& oid, OnodeRef o);
1261 	    OnodeRef lookup(const ghobject_t& o);
1262 	    void remove(const ghobject_t& oid) {
1263 	      onode_map.erase(oid);
1264 	    }
1265 	    void rename(OnodeRef& o, const ghobject_t& old_oid,
1266 			const ghobject_t& new_oid,
1267 			const mempool::bluestore_cache_other::string& new_okey);
1268 	    void clear();
1269 	    bool empty();
1270 	
1271 	    template <int LogLevelV>
1272 	    void dump(CephContext *cct);
1273 	
1274 	    /// return true if f true for any item
1275 	    bool map_any(std::function<bool(OnodeRef)> f);
1276 	  };
1277 	
1278 	  class OpSequencer;
1279 	  using OpSequencerRef = ceph::ref_t<OpSequencer>;
1280 	
1281 	  struct Collection : public CollectionImpl {
1282 	    BlueStore *store;
1283 	    OpSequencerRef osr;
1284 	    BufferCacheShard *cache;       ///< our cache shard
1285 	    bluestore_cnode_t cnode;
1286 	    ceph::shared_mutex lock =
1287 	      ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1288 	
1289 	    bool exists;
1290 	
1291 	    SharedBlobSet shared_blob_set;      ///< open SharedBlobs
1292 	
1293 	    // cache onodes on a per-collection basis to avoid lock
1294 	    // contention.
1295 	    OnodeSpace onode_map;
1296 	
1297 	    //pool options
1298 	    pool_opts_t pool_opts;
1299 	    ContextQueue *commit_queue;
1300 	
1301 	    OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1302 	
1303 	    // the terminology is confusing here, sorry!
1304 	    //
1305 	    //  blob_t     shared_blob_t
1306 	    //  !shared    unused                -> open
1307 	    //  shared     !loaded               -> open + shared
1308 	    //  shared     loaded                -> open + shared + loaded
1309 	    //
1310 	    // i.e.,
1311 	    //  open = SharedBlob is instantiated
1312 	    //  shared = blob_t shared flag is set; SharedBlob is hashed.
1313 	    //  loaded = SharedBlob::shared_blob_t is loaded from kv store
1314 	    void open_shared_blob(uint64_t sbid, BlobRef b);
1315 	    void load_shared_blob(SharedBlobRef sb);
1316 	    void make_blob_shared(uint64_t sbid, BlobRef b);
1317 	    uint64_t make_blob_unshared(SharedBlob *sb);
1318 	
1319 	    BlobRef new_blob() {
1320 	      BlobRef b = new Blob();
1321 	      b->shared_blob = new SharedBlob(this);
1322 	      return b;
1323 	    }
1324 	
1325 	    bool contains(const ghobject_t& oid) {
1326 	      if (cid.is_meta())
1327 		return oid.hobj.pool == -1;
1328 	      spg_t spgid;
1329 	      if (cid.is_pg(&spgid))
1330 		return
1331 		  spgid.pgid.contains(cnode.bits, oid) &&
1332 		  oid.shard_id == spgid.shard;
1333 	      return false;
1334 	    }
1335 	
1336 	    int64_t pool() const {
1337 	      return cid.pool();
1338 	    }
1339 	
1340 	    void split_cache(Collection *dest);
1341 	
1342 	    bool flush_commit(Context *c) override;
1343 	    void flush() override;
1344 	    void flush_all_but_last();
1345 	
1346 	    Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1347 	  };
1348 	
1349 	  class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1350 	    CollectionRef c;
1351 	    OnodeRef o;
1352 	    KeyValueDB::Iterator it;
1353 	    string head, tail;
1354 	
1355 	    string _stringify() const;
1356 	
1357 	  public:
1358 	    OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1359 	    int seek_to_first() override;
1360 	    int upper_bound(const string &after) override;
1361 	    int lower_bound(const string &to) override;
1362 	    bool valid() override;
1363 	    int next() override;
1364 	    string key() override;
1365 	    bufferlist value() override;
1366 	    std::string tail_key() {
1367 	      return tail;
1368 	    }
1369 	
1370 	    int status() override {
1371 	      return 0;
1372 	    }
1373 	  };
1374 	
1375 	  struct volatile_statfs{
1376 	    enum {
1377 	      STATFS_ALLOCATED = 0,
1378 	      STATFS_STORED,
1379 	      STATFS_COMPRESSED_ORIGINAL,
1380 	      STATFS_COMPRESSED,
1381 	      STATFS_COMPRESSED_ALLOCATED,
1382 	      STATFS_LAST
1383 	    };
1384 	    int64_t values[STATFS_LAST];
1385 	    volatile_statfs() {
1386 	      memset(this, 0, sizeof(volatile_statfs));
1387 	    }
1388 	    void reset() {
1389 	      *this = volatile_statfs();
1390 	    }
1391 	    void publish(store_statfs_t* buf) const {
1392 	      buf->allocated = allocated();
1393 	      buf->data_stored = stored();
1394 	      buf->data_compressed = compressed();
1395 	      buf->data_compressed_original = compressed_original();
1396 	      buf->data_compressed_allocated = compressed_allocated();
1397 	    }
1398 	
1399 	    volatile_statfs& operator+=(const volatile_statfs& other) {
1400 	      for (size_t i = 0; i < STATFS_LAST; ++i) {
1401 		values[i] += other.values[i];
1402 	      }
1403 	      return *this;
1404 	    }
1405 	    int64_t& allocated() {
1406 	      return values[STATFS_ALLOCATED];
1407 	    }
1408 	    int64_t& stored() {
1409 	      return values[STATFS_STORED];
1410 	    }
1411 	    int64_t& compressed_original() {
1412 	      return values[STATFS_COMPRESSED_ORIGINAL];
1413 	    }
1414 	    int64_t& compressed() {
1415 	      return values[STATFS_COMPRESSED];
1416 	    }
1417 	    int64_t& compressed_allocated() {
1418 	      return values[STATFS_COMPRESSED_ALLOCATED];
1419 	    }
1420 	    int64_t allocated() const {
1421 	      return values[STATFS_ALLOCATED];
1422 	    }
1423 	    int64_t stored() const {
1424 	      return values[STATFS_STORED];
1425 	    }
1426 	    int64_t compressed_original() const {
1427 	      return values[STATFS_COMPRESSED_ORIGINAL];
1428 	    }
1429 	    int64_t compressed() const {
1430 	      return values[STATFS_COMPRESSED];
1431 	    }
1432 	    int64_t compressed_allocated() const {
1433 	      return values[STATFS_COMPRESSED_ALLOCATED];
1434 	    }
1435 	    volatile_statfs& operator=(const store_statfs_t& st) {
1436 	      values[STATFS_ALLOCATED] = st.allocated;
1437 	      values[STATFS_STORED] = st.data_stored;
1438 	      values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1439 	      values[STATFS_COMPRESSED] = st.data_compressed;
1440 	      values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1441 	      return *this;
1442 	    }
1443 	    bool is_empty() {
1444 	      return values[STATFS_ALLOCATED] == 0 &&
1445 		values[STATFS_STORED] == 0 &&
1446 		values[STATFS_COMPRESSED] == 0 &&
1447 		values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1448 		values[STATFS_COMPRESSED_ALLOCATED] == 0;
1449 	    }
1450 	    void decode(bufferlist::const_iterator& it) {
1451 	      using ceph::decode;
1452 	      for (size_t i = 0; i < STATFS_LAST; i++) {
1453 		decode(values[i], it);
1454 	      }
1455 	    }
1456 	
1457 	    void encode(bufferlist& bl) {
1458 	      using ceph::encode;
1459 	      for (size_t i = 0; i < STATFS_LAST; i++) {
1460 		encode(values[i], bl);
1461 	      }
1462 	    }
1463 	  };
1464 	
1465 	  struct TransContext final : public AioContext {
1466 	    MEMPOOL_CLASS_HELPERS();
1467 	
1468 	    typedef enum {
1469 	      STATE_PREPARE,
1470 	      STATE_AIO_WAIT,
1471 	      STATE_IO_DONE,
1472 	      STATE_KV_QUEUED,     // queued for kv_sync_thread submission
1473 	      STATE_KV_SUBMITTED,  // submitted to kv; not yet synced
1474 	      STATE_KV_DONE,
1475 	      STATE_DEFERRED_QUEUED,    // in deferred_queue (pending or running)
1476 	      STATE_DEFERRED_CLEANUP,   // remove deferred kv record
1477 	      STATE_DEFERRED_DONE,
1478 	      STATE_FINISHING,
1479 	      STATE_DONE,
1480 	    } state_t;
1481 	
1482 	    state_t state = STATE_PREPARE;
1483 	
1484 	    const char *get_state_name() {
1485 	      switch (state) {
1486 	      case STATE_PREPARE: return "prepare";
1487 	      case STATE_AIO_WAIT: return "aio_wait";
1488 	      case STATE_IO_DONE: return "io_done";
1489 	      case STATE_KV_QUEUED: return "kv_queued";
1490 	      case STATE_KV_SUBMITTED: return "kv_submitted";
1491 	      case STATE_KV_DONE: return "kv_done";
1492 	      case STATE_DEFERRED_QUEUED: return "deferred_queued";
1493 	      case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1494 	      case STATE_DEFERRED_DONE: return "deferred_done";
1495 	      case STATE_FINISHING: return "finishing";
1496 	      case STATE_DONE: return "done";
1497 	      }
1498 	      return "???";
1499 	    }
1500 	
1501 	#if defined(WITH_LTTNG)
1502 	    const char *get_state_latency_name(int state) {
1503 	      switch (state) {
1504 	      case l_bluestore_state_prepare_lat: return "prepare";
1505 	      case l_bluestore_state_aio_wait_lat: return "aio_wait";
1506 	      case l_bluestore_state_io_done_lat: return "io_done";
1507 	      case l_bluestore_state_kv_queued_lat: return "kv_queued";
1508 	      case l_bluestore_state_kv_committing_lat: return "kv_committing";
1509 	      case l_bluestore_state_kv_done_lat: return "kv_done";
1510 	      case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1511 	      case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1512 	      case l_bluestore_state_finishing_lat: return "finishing";
1513 	      case l_bluestore_state_done_lat: return "done";
1514 	      }
1515 	      return "???";
1516 	    }
1517 	#endif
1518 	
1519 	    CollectionRef ch;
1520 	    OpSequencerRef osr;  // this should be ch->osr
1521 	    boost::intrusive::list_member_hook<> sequencer_item;
1522 	
1523 	    uint64_t bytes = 0, ios = 0, cost = 0;
1524 	
1525 	    set<OnodeRef> onodes;     ///< these need to be updated/written
1526 	    set<OnodeRef> modified_objects;  ///< objects we modified (and need a ref)
1527 	    set<SharedBlobRef> shared_blobs;  ///< these need to be updated/written
1528 	    set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1529 	
1530 	    KeyValueDB::Transaction t; ///< then we will commit this
1531 	    list<Context*> oncommits;  ///< more commit completions
1532 	    list<CollectionRef> removed_collections; ///< colls we removed
1533 	
1534 	    boost::intrusive::list_member_hook<> deferred_queue_item;
1535 	    bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1536 	
1537 	    interval_set<uint64_t> allocated, released;
1538 	    volatile_statfs statfs_delta;	   ///< overall store statistics delta
1539 	    uint64_t osd_pool_id = META_POOL_ID;    ///< osd pool id we're operating on
1540 	    
1541 	    IOContext ioc;
1542 	    bool had_ios = false;  ///< true if we submitted IOs before our kv txn
1543 	
1544 	    uint64_t seq = 0;
1545 	    utime_t start;
1546 	    utime_t last_stamp;
1547 	
1548 	    uint64_t last_nid = 0;     ///< if non-zero, highest new nid we allocated
1549 	    uint64_t last_blobid = 0;  ///< if non-zero, highest new blobid we allocated
1550 	
1551 	#if defined(WITH_LTTNG)
1552 	    bool tracing = false;
1553 	#endif
1554 	
1555 	    explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1556 				  list<Context*> *on_commits)
1557 	      : ch(c),
1558 		osr(o),
1559 		ioc(cct, this),
1560 		start(ceph_clock_now()) {
1561 	      last_stamp = start;
1562 	      if (on_commits) {
1563 		oncommits.swap(*on_commits);
1564 	      }
1565 	    }
1566 	    ~TransContext() {
1567 	      delete deferred_txn;
1568 	    }
1569 	
1570 	    void write_onode(OnodeRef &o) {
1571 	      onodes.insert(o);
1572 	    }
1573 	    void write_shared_blob(SharedBlobRef &sb) {
1574 	      shared_blobs.insert(sb);
1575 	    }
1576 	    void unshare_blob(SharedBlob *sb) {
1577 	      shared_blobs.erase(sb);
1578 	    }
1579 	
1580 	    /// note we logically modified object (when onode itself is unmodified)
1581 	    void note_modified_object(OnodeRef &o) {
1582 	      // onode itself isn't written, though
1583 	      modified_objects.insert(o);
1584 	    }
1585 	    void note_removed_object(OnodeRef& o) {
1586 	      onodes.erase(o);
1587 	      modified_objects.insert(o);
1588 	    }
1589 	
1590 	    void aio_finish(BlueStore *store) override {
1591 	      store->txc_aio_finish(this);
1592 	    }
1593 	  };
1594 	
1595 	  class BlueStoreThrottle {
1596 	#if defined(WITH_LTTNG)
1597 	    const std::chrono::time_point<mono_clock> time_base = mono_clock::now();
1598 	
1599 	    // Time of last chosen io (microseconds)
1600 	    std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1601 	    std::atomic<uint64_t> ios_started_since_last_traced = {0};
1602 	    std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1603 	
1604 	    std::atomic_uint pending_kv_ios = {0};
1605 	    std::atomic_uint pending_deferred_ios = {0};
1606 	
1607 	    // Min period between trace points (microseconds)
1608 	    std::atomic<uint64_t> trace_period_mcs = {0};
1609 	
1610 	    bool should_trace(
1611 	      uint64_t *started,
1612 	      uint64_t *completed) {
1613 	      uint64_t min_period_mcs = trace_period_mcs.load(
1614 		std::memory_order_relaxed);
1615 	
1616 	      if (min_period_mcs == 0) {
1617 		*started = 1;
1618 		*completed = ios_completed_since_last_traced.exchange(0);
1619 		return true;
1620 	      } else {
1621 		ios_started_since_last_traced++;
1622 		auto now_mcs = ceph::to_microseconds<uint64_t>(
1623 		  mono_clock::now() - time_base);
1624 		uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1625 		uint64_t period_mcs = now_mcs - previous_mcs;
1626 		if (period_mcs > min_period_mcs) {
1627 		  if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1628 			previous_mcs, now_mcs)) {
1629 		    // This would be racy at a sufficiently extreme trace rate, but isn't
1630 		    // worth the overhead of doing it more carefully.
1631 		    *started = ios_started_since_last_traced.exchange(0);
1632 		    *completed = ios_completed_since_last_traced.exchange(0);
1633 		    return true;
1634 		  }
1635 		}
1636 		return false;
1637 	      }
1638 	    }
1639 	#endif
1640 	
1641 	#if defined(WITH_LTTNG)
1642 	    void emit_initial_tracepoint(
1643 	      KeyValueDB &db,
1644 	      TransContext &txc,
1645 	      mono_clock::time_point);
1646 	#else
1647 	    void emit_initial_tracepoint(
1648 	      KeyValueDB &db,
1649 	      TransContext &txc,
1650 	      mono_clock::time_point) {}
1651 	#endif
1652 	
1653 	    Throttle throttle_bytes;           ///< submit to commit
1654 	    Throttle throttle_deferred_bytes;  ///< submit to deferred complete
1655 	
1656 	  public:
1657 	    BlueStoreThrottle(CephContext *cct) :
1658 	      throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1659 	      throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1660 	    {
1661 	      reset_throttle(cct->_conf);
1662 	    }
1663 	
1664 	#if defined(WITH_LTTNG)
1665 	    void complete_kv(TransContext &txc);
1666 	    void complete(TransContext &txc);
1667 	#else
1668 	    void complete_kv(TransContext &txc) {}
1669 	    void complete(TransContext &txc) {}
1670 	#endif
1671 	
1672 	    utime_t log_state_latency(
1673 	      TransContext &txc, PerfCounters *logger, int state);
1674 	    bool try_start_transaction(
1675 	      KeyValueDB &db,
1676 	      TransContext &txc,
1677 	      mono_clock::time_point);
1678 	    void finish_start_transaction(
1679 	      KeyValueDB &db,
1680 	      TransContext &txc,
1681 	      mono_clock::time_point);
1682 	    void release_kv_throttle(uint64_t cost) {
1683 	      throttle_bytes.put(cost);
1684 	    }
1685 	    void release_deferred_throttle(uint64_t cost) {
1686 	      throttle_deferred_bytes.put(cost);
1687 	    }
1688 	    bool should_submit_deferred() {
1689 	      return throttle_deferred_bytes.past_midpoint();
1690 	    }
1691 	    void reset_throttle(const ConfigProxy &conf) {
1692 	      throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1693 	      throttle_deferred_bytes.reset_max(
1694 		conf->bluestore_throttle_bytes +
1695 		conf->bluestore_throttle_deferred_bytes);
1696 	#if defined(WITH_LTTNG)
1697 	      double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1698 	      trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1699 	#endif
1700 	    }
1701 	  } throttle;
1702 	
1703 	  typedef boost::intrusive::list<
1704 	    TransContext,
1705 	    boost::intrusive::member_hook<
1706 	      TransContext,
1707 	      boost::intrusive::list_member_hook<>,
1708 	      &TransContext::deferred_queue_item> > deferred_queue_t;
1709 	
1710 	  struct DeferredBatch final : public AioContext {
1711 	    OpSequencer *osr;
1712 	    struct deferred_io {
1713 	      bufferlist bl;    ///< data
1714 	      uint64_t seq;     ///< deferred transaction seq
1715 	    };
1716 	    map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1717 	    deferred_queue_t txcs;           ///< txcs in this batch
1718 	    IOContext ioc;                   ///< our aios
1719 	    /// bytes of pending io for each deferred seq (may be 0)
1720 	    map<uint64_t,int> seq_bytes;
1721 	
1722 	    void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1723 	    void _audit(CephContext *cct);
1724 	
1725 	    DeferredBatch(CephContext *cct, OpSequencer *osr)
1726 	      : osr(osr), ioc(cct, this) {}
1727 	
1728 	    /// prepare a write
1729 	    void prepare_write(CephContext *cct,
1730 			       uint64_t seq, uint64_t offset, uint64_t length,
1731 			       bufferlist::const_iterator& p);
1732 	
1733 	    void aio_finish(BlueStore *store) override {
1734 	      store->_deferred_aio_finish(osr);
1735 	    }
1736 	  };
1737 	
1738 	  class OpSequencer : public RefCountedObject {
1739 	  public:
1740 	    ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1741 	    ceph::condition_variable qcond;
1742 	    typedef boost::intrusive::list<
1743 	      TransContext,
1744 	      boost::intrusive::member_hook<
1745 	        TransContext,
1746 		boost::intrusive::list_member_hook<>,
1747 		&TransContext::sequencer_item> > q_list_t;
1748 	    q_list_t q;  ///< transactions
1749 	
1750 	    boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1751 	
1752 	    DeferredBatch *deferred_running = nullptr;
1753 	    DeferredBatch *deferred_pending = nullptr;
1754 	
1755 	    BlueStore *store;
1756 	    coll_t cid;
1757 	
1758 	    uint64_t last_seq = 0;
1759 	
1760 	    std::atomic_int txc_with_unstable_io = {0};  ///< num txcs with unstable io
1761 	
1762 	    std::atomic_int kv_committing_serially = {0};
1763 	
1764 	    std::atomic_int kv_submitted_waiters = {0};
1765 	
1766 	    std::atomic_int kv_drain_preceding_waiters = {0};
1767 	
1768 	    std::atomic_bool zombie = {false};    ///< in zombie_osr set (collection going away)
1769 	
1770 	    const uint32_t sequencer_id;
1771 	
1772 	    uint32_t get_sequencer_id() const {
1773 	      return sequencer_id;
1774 	    }
1775 	
1776 	    void queue_new(TransContext *txc) {
1777 	      std::lock_guard l(qlock);
1778 	      txc->seq = ++last_seq;
1779 	      q.push_back(*txc);
1780 	    }
1781 	
1782 	    void drain() {
1783 	      std::unique_lock l(qlock);
1784 	      while (!q.empty())
1785 		qcond.wait(l);
1786 	    }
1787 	
1788 	    void drain_preceding(TransContext *txc) {
1789 	      std::unique_lock l(qlock);
1790 	      while (&q.front() != txc)
1791 		qcond.wait(l);
1792 	    }
1793 	
1794 	    bool _is_all_kv_submitted() {
1795 	      // caller must hold qlock & q.empty() must not empty
1796 	      ceph_assert(!q.empty());
1797 	      TransContext *txc = &q.back();
1798 	      if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1799 		return true;
1800 	      }
1801 	      return false;
1802 	    }
1803 	
1804 	    void flush() {
1805 	      std::unique_lock l(qlock);
1806 	      while (true) {
1807 		// set flag before the check because the condition
1808 		// may become true outside qlock, and we need to make
1809 		// sure those threads see waiters and signal qcond.
1810 		++kv_submitted_waiters;
1811 		if (q.empty() || _is_all_kv_submitted()) {
1812 		  --kv_submitted_waiters;
1813 		  return;
1814 		}
1815 		qcond.wait(l);
1816 		--kv_submitted_waiters;
1817 	      }
1818 	    }
1819 	
1820 	    void flush_all_but_last() {
1821 	      std::unique_lock l(qlock);
1822 	      assert (q.size() >= 1);
1823 	      while (true) {
1824 		// set flag before the check because the condition
1825 		// may become true outside qlock, and we need to make
1826 		// sure those threads see waiters and signal qcond.
1827 		++kv_submitted_waiters;
1828 		if (q.size() <= 1) {
1829 		  --kv_submitted_waiters;
1830 		  return;
1831 		} else {
1832 		  auto it = q.rbegin();
1833 		  it++;
1834 		  if (it->state >= TransContext::STATE_KV_SUBMITTED) {
1835 		    --kv_submitted_waiters;
1836 		    return;
1837 	          }
1838 		}
1839 		qcond.wait(l);
1840 		--kv_submitted_waiters;
1841 	      }
1842 	    }
1843 	
1844 	    bool flush_commit(Context *c) {
1845 	      std::lock_guard l(qlock);
1846 	      if (q.empty()) {
1847 		return true;
1848 	      }
1849 	      TransContext *txc = &q.back();
1850 	      if (txc->state >= TransContext::STATE_KV_DONE) {
1851 		return true;
1852 	      }
1853 	      txc->oncommits.push_back(c);
1854 	      return false;
1855 	    }
1856 	  private:
1857 	    FRIEND_MAKE_REF(OpSequencer);
1858 	    OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
1859 	      : RefCountedObject(store->cct),
1860 		store(store), cid(c), sequencer_id(sequencer_id) {
1861 	    }
1862 	    ~OpSequencer() {
1863 	      ceph_assert(q.empty());
1864 	    }
1865 	  };
1866 	
1867 	  typedef boost::intrusive::list<
1868 	    OpSequencer,
1869 	    boost::intrusive::member_hook<
1870 	      OpSequencer,
1871 	      boost::intrusive::list_member_hook<>,
1872 	      &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1873 	
1874 	  struct KVSyncThread : public Thread {
1875 	    BlueStore *store;
1876 	    explicit KVSyncThread(BlueStore *s) : store(s) {}
1877 	    void *entry() override {
1878 	      store->_kv_sync_thread();
1879 	      return NULL;
1880 	    }
1881 	  };
1882 	  struct KVFinalizeThread : public Thread {
1883 	    BlueStore *store;
1884 	    explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1885 	    void *entry() {
1886 	      store->_kv_finalize_thread();
1887 	      return NULL;
1888 	    }
1889 	  };
1890 	
1891 	  struct DBHistogram {
1892 	    struct value_dist {
1893 	      uint64_t count;
1894 	      uint32_t max_len;
1895 	    };
1896 	
1897 	    struct key_dist {
1898 	      uint64_t count;
1899 	      uint32_t max_len;
1900 	      map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1901 	    };
1902 	
1903 	    map<string, map<int, struct key_dist> > key_hist;
1904 	    map<int, uint64_t> value_hist;
1905 	    int get_key_slab(size_t sz);
1906 	    string get_key_slab_to_range(int slab);
1907 	    int get_value_slab(size_t sz);
1908 	    string get_value_slab_to_range(int slab);
1909 	    void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1910 				  const string &prefix, size_t key_size, size_t value_size);
1911 	    void dump(Formatter *f);
1912 	  };
1913 	
1914 	  // --------------------------------------------------------
1915 	  // members
1916 	private:
1917 	  BlueFS *bluefs = nullptr;
1918 	  bluefs_layout_t bluefs_layout;
1919 	  mono_time bluefs_last_balance;
1920 	  utime_t next_dump_on_bluefs_alloc_failure;
1921 	
1922 	  KeyValueDB *db = nullptr;
1923 	  BlockDevice *bdev = nullptr;
1924 	  std::string freelist_type;
1925 	  FreelistManager *fm = nullptr;
1926 	  Allocator *alloc = nullptr;
1927 	  uuid_d fsid;
1928 	  int path_fd = -1;  ///< open handle to $path
1929 	  int fsid_fd = -1;  ///< open handle (locked) to $path/fsid
1930 	  bool mounted = false;
1931 	
1932 	  ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock");  ///< rwlock to protect coll_map
1933 	  mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1934 	  bool collections_had_errors = false;
1935 	  map<coll_t,CollectionRef> new_coll_map;
1936 	
1937 	  vector<OnodeCacheShard*> onode_cache_shards;
1938 	  vector<BufferCacheShard*> buffer_cache_shards;
1939 	
1940 	  /// protect zombie_osr_set
1941 	  ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
1942 	  uint32_t next_sequencer_id = 0;
1943 	  std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
1944 	
1945 	  std::atomic<uint64_t> nid_last = {0};
1946 	  std::atomic<uint64_t> nid_max = {0};
1947 	  std::atomic<uint64_t> blobid_last = {0};
1948 	  std::atomic<uint64_t> blobid_max = {0};
1949 	
1950 	  interval_set<uint64_t> bluefs_extents;  ///< block extents owned by bluefs
1951 	  interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1952 	
1953 	  ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
1954 	  std::atomic<uint64_t> deferred_seq = {0};
1955 	  deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1956 	  int deferred_queue_size = 0;         ///< num txc's queued across all osrs
1957 	  atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1958 	  Finisher  finisher;
1959 	  utime_t  deferred_last_submitted = utime_t();
1960 	
1961 	  KVSyncThread kv_sync_thread;
1962 	  ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
1963 	  ceph::condition_variable kv_cond;
1964 	  bool _kv_only = false;
1965 	  bool kv_sync_started = false;
1966 	  bool kv_stop = false;
1967 	  bool kv_finalize_started = false;
1968 	  bool kv_finalize_stop = false;
1969 	  deque<TransContext*> kv_queue;             ///< ready, already submitted
1970 	  deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1971 	  deque<TransContext*> kv_committing;        ///< currently syncing
1972 	  deque<DeferredBatch*> deferred_done_queue;   ///< deferred ios done
1973 	  bool kv_sync_in_progress = false;
1974 	
1975 	  KVFinalizeThread kv_finalize_thread;
1976 	  ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
1977 	  ceph::condition_variable kv_finalize_cond;
1978 	  deque<TransContext*> kv_committing_to_finalize;   ///< pending finalization
1979 	  deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1980 	  bool kv_finalize_in_progress = false;
1981 	
1982 	  PerfCounters *logger = nullptr;
1983 	
1984 	  list<CollectionRef> removed_collections;
1985 	
1986 	  ceph::shared_mutex debug_read_error_lock =
1987 	    ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
1988 	  set<ghobject_t> debug_data_error_objects;
1989 	  set<ghobject_t> debug_mdata_error_objects;
1990 	
1991 	  std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1992 	
1993 	  uint64_t block_size = 0;     ///< block size of block device (power of 2)
1994 	  uint64_t block_mask = 0;     ///< mask to get just the block offset
1995 	  size_t block_size_order = 0; ///< bits to shift to get block size
1996 	
1997 	  uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
1998 	  ///< bits for min_alloc_size
1999 	  uint8_t min_alloc_size_order = 0;
2000 	  static_assert(std::numeric_limits<uint8_t>::max() >
2001 			std::numeric_limits<decltype(min_alloc_size)>::digits,
2002 			"not enough bits for min_alloc_size");
2003 	
2004 	  bool per_pool_omap = false;
2005 	
2006 	  ///< maximum allocation unit (power of 2)
2007 	  std::atomic<uint64_t> max_alloc_size = {0};
2008 	
2009 	  ///< number threshold for forced deferred writes
2010 	  std::atomic<int> deferred_batch_ops = {0};
2011 	
2012 	  ///< size threshold for forced deferred writes
2013 	  std::atomic<uint64_t> prefer_deferred_size = {0};
2014 	
2015 	  ///< approx cost per io, in bytes
2016 	  std::atomic<uint64_t> throttle_cost_per_io = {0};
2017 	
2018 	  std::atomic<Compressor::CompressionMode> comp_mode =
2019 	    {Compressor::COMP_NONE}; ///< compression mode
2020 	  CompressorRef compressor;
2021 	  std::atomic<uint64_t> comp_min_blob_size = {0};
2022 	  std::atomic<uint64_t> comp_max_blob_size = {0};
2023 	
2024 	  std::atomic<uint64_t> max_blob_size = {0};  ///< maximum blob size
2025 	
2026 	  uint64_t kv_ios = 0;
2027 	  uint64_t kv_throttle_costs = 0;
2028 	
2029 	  // cache trim control
2030 	  uint64_t cache_size = 0;       ///< total cache size
2031 	  double cache_meta_ratio = 0;   ///< cache ratio dedicated to metadata
2032 	  double cache_kv_ratio = 0;     ///< cache ratio dedicated to kv (e.g., rocksdb)
2033 	  double cache_data_ratio = 0;   ///< cache ratio dedicated to object data
2034 	  bool cache_autotune = false;   ///< cache autotune setting
2035 	  double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2036 	  uint64_t osd_memory_target = 0;   ///< OSD memory target when autotuning cache
2037 	  uint64_t osd_memory_base = 0;     ///< OSD base memory when autotuning cache
2038 	  double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2039 	  uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2040 	  double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing 
2041 	  double max_defer_interval = 0; ///< Time to wait between last deferred submit
2042 	  std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2043 	
2044 	  typedef map<uint64_t, volatile_statfs> osd_pools_map;
2045 	
2046 	  ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2047 	  volatile_statfs vstatfs;
2048 	  osd_pools_map osd_pools; // protected by vstatfs_lock as well
2049 	
2050 	  bool per_pool_stat_collection = true;
2051 	
2052 	  struct MempoolThread : public Thread {
2053 	  public:
2054 	    BlueStore *store;
2055 	
2056 	    ceph::condition_variable cond;
2057 	    ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2058 	    bool stop = false;
2059 	    std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2060 	    std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2061 	
2062 	    struct MempoolCache : public PriorityCache::PriCache {
2063 	      BlueStore *store;
2064 	      int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2065 	      int64_t committed_bytes = 0;
2066 	      double cache_ratio = 0;
2067 	
2068 	      MempoolCache(BlueStore *s) : store(s) {};
2069 	
2070 	      virtual uint64_t _get_used_bytes() const = 0;
2071 	
2072 	      virtual int64_t request_cache_bytes(
2073 	          PriorityCache::Priority pri, uint64_t total_cache) const {
2074 	        int64_t assigned = get_cache_bytes(pri);
2075 	
2076 	        switch (pri) {
2077 	        // All cache items are currently shoved into the PRI1 priority 
2078 	        case PriorityCache::Priority::PRI1:
2079 	          {
2080 	            int64_t request = _get_used_bytes();
2081 	            return(request > assigned) ? request - assigned : 0;
2082 	          }
2083 	        default:
2084 	          break;
2085 	        }
2086 	        return -EOPNOTSUPP;
2087 	      }
2088 	 
2089 	      virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2090 	        return cache_bytes[pri];
2091 	      }
2092 	      virtual int64_t get_cache_bytes() const { 
2093 	        int64_t total = 0;
2094 	
2095 	        for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2096 	          PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2097 	          total += get_cache_bytes(pri);
2098 	        }
2099 	        return total;
2100 	      }
2101 	      virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2102 	        cache_bytes[pri] = bytes;
2103 	      }
2104 	      virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2105 	        cache_bytes[pri] += bytes;
2106 	      }
2107 	      virtual int64_t commit_cache_size(uint64_t total_cache) {
2108 	        committed_bytes = PriorityCache::get_chunk(
2109 	            get_cache_bytes(), total_cache);
2110 	        return committed_bytes;
2111 	      }
2112 	      virtual int64_t get_committed_size() const {
2113 	        return committed_bytes;
2114 	      }
2115 	      virtual double get_cache_ratio() const {
2116 	        return cache_ratio;
2117 	      }
2118 	      virtual void set_cache_ratio(double ratio) {
2119 	        cache_ratio = ratio;
2120 	      }
2121 	      virtual string get_cache_name() const = 0;
2122 	    };
2123 	
2124 	    struct MetaCache : public MempoolCache {
2125 	      MetaCache(BlueStore *s) : MempoolCache(s) {};
2126 	
2127 	      virtual uint64_t _get_used_bytes() const {
2128 	        return mempool::bluestore_cache_other::allocated_bytes() +
2129 	            mempool::bluestore_cache_onode::allocated_bytes();
2130 	      }
2131 	
2132 	      virtual string get_cache_name() const {
2133 	        return "BlueStore Meta Cache";
2134 	      }
2135 	
2136 	      uint64_t _get_num_onodes() const {
2137 	        uint64_t onode_num =
2138 	            mempool::bluestore_cache_onode::allocated_items();
2139 	        return (2 > onode_num) ? 2 : onode_num;
2140 	      }
2141 	
2142 	      double get_bytes_per_onode() const {
2143 	        return (double)_get_used_bytes() / (double)_get_num_onodes();
2144 	      }
2145 	    };
2146 	    std::shared_ptr<MetaCache> meta_cache;
2147 	
2148 	    struct DataCache : public MempoolCache {
2149 	      DataCache(BlueStore *s) : MempoolCache(s) {};
2150 	
2151 	      virtual uint64_t _get_used_bytes() const {
2152 	        uint64_t bytes = 0;
2153 	        for (auto i : store->buffer_cache_shards) {
2154 	          bytes += i->_get_bytes();
2155 	        }
2156 	        return bytes; 
2157 	      }
2158 	      virtual string get_cache_name() const {
2159 	        return "BlueStore Data Cache";
2160 	      }
2161 	    };
2162 	    std::shared_ptr<DataCache> data_cache;
2163 	
2164 	  public:
2165 	    explicit MempoolThread(BlueStore *s)
2166 	      : store(s),
2167 	        meta_cache(new MetaCache(s)),
2168 	        data_cache(new DataCache(s)) {}
2169 	
2170 	    void *entry() override;
2171 	    void init() {
2172 	      ceph_assert(stop == false);
2173 	      create("bstore_mempool");
2174 	    }
2175 	    void shutdown() {
2176 	      lock.lock();
2177 	      stop = true;
2178 	      cond.notify_all();
2179 	      lock.unlock();
2180 	      join();
2181 	    }
2182 	
2183 	  private:
2184 	    void _adjust_cache_settings();
2185 	    void _update_cache_settings();
2186 	    void _resize_shards(bool interval_stats);
2187 	  } mempool_thread;
2188 	
2189 	  // --------------------------------------------------------
2190 	  // private methods
2191 	
2192 	  void _init_logger();
2193 	  void _shutdown_logger();
2194 	  int _reload_logger();
2195 	
2196 	  int _open_path();
2197 	  void _close_path();
2198 	  int _open_fsid(bool create);
2199 	  int _lock_fsid();
2200 	  int _read_fsid(uuid_d *f);
2201 	  int _write_fsid();
2202 	  void _close_fsid();
2203 	  void _set_alloc_sizes();
2204 	  void _set_blob_size();
2205 	  void _set_finisher_num();
2206 	  void _update_osd_memory_options();
2207 	
2208 	  int _open_bdev(bool create);
2209 	  // Verifies if disk space is enough for reserved + min bluefs
2210 	  // and alters the latter if needed.
2211 	  // Depends on min_alloc_size hence should be called after
2212 	  // its initialization (and outside of _open_bdev)
2213 	  void _validate_bdev();
2214 	  void _close_bdev();
2215 	
2216 	  int _minimal_open_bluefs(bool create);
2217 	  void _minimal_close_bluefs();
2218 	  int _open_bluefs(bool create);
2219 	  void _close_bluefs();
2220 	
2221 	  // Limited (u)mount intended for BlueFS operations only
2222 	  int _mount_for_bluefs();
2223 	  void _umount_for_bluefs();
2224 	
2225 	
2226 	  int _is_bluefs(bool create, bool* ret);
2227 	  /*
2228 	  * opens both DB and dependant super_meta, FreelistManager and allocator
2229 	  * in the proper order
2230 	  */
2231 	  int _open_db_and_around(bool read_only);
2232 	  void _close_db_and_around();
2233 	
2234 	  // updates legacy bluefs related recs in DB to a state valid for
2235 	  // downgrades from nautilus.
2236 	  void _sync_bluefs_and_fm();
2237 	
2238 	  /*
2239 	   * @warning to_repair_db means that we open this db to repair it, will not
2240 	   * hold the rocksdb's file lock.
2241 	   */
2242 	  int _open_db(bool create,
2243 		       bool to_repair_db=false,
2244 		       bool read_only = false);
2245 	  void _close_db();
2246 	  int _open_fm(KeyValueDB::Transaction t);
2247 	  void _close_fm();
2248 	  int _open_alloc();
2249 	  void _close_alloc();
2250 	  int _open_collections();
2251 	  void _fsck_collections(int64_t* errors);
2252 	  void _close_collections();
2253 	
2254 	  int _setup_block_symlink_or_file(string name, string path, uint64_t size,
2255 					   bool create);
2256 	
2257 	public:
2258 	  utime_t get_deferred_last_submitted() {
2259 	    std::lock_guard l(deferred_lock);
2260 	    return deferred_last_submitted;
2261 	  }
2262 	
2263 	  static int _write_bdev_label(CephContext* cct,
2264 				       string path, bluestore_bdev_label_t label);
2265 	  static int _read_bdev_label(CephContext* cct, string path,
2266 				      bluestore_bdev_label_t *label);
2267 	private:
2268 	  int _check_or_set_bdev_label(string path, uint64_t size, string desc,
2269 				       bool create);
2270 	
2271 	  int _open_super_meta();
2272 	
2273 	  void _open_statfs();
2274 	  void _get_statfs_overall(struct store_statfs_t *buf);
2275 	
2276 	  void _dump_alloc_on_failure();
2277 	
2278 	  int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
2279 	  int _balance_bluefs_freespace();
2280 	
2281 	  CollectionRef _get_collection(const coll_t& cid);
2282 	  void _queue_reap_collection(CollectionRef& c);
2283 	  void _reap_collections();
2284 	  void _update_cache_logger();
2285 	
2286 	  void _assign_nid(TransContext *txc, OnodeRef o);
2287 	  uint64_t _assign_blobid(TransContext *txc);
2288 	
2289 	  template <int LogLevelV>
2290 	  friend void _dump_onode(CephContext *cct, const Onode& o);
2291 	  template <int LogLevelV>
2292 	  friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2293 	  template <int LogLevelV>
2294 	  friend void _dump_transaction(CephContext *cct, Transaction *t);
2295 	
2296 	  TransContext *_txc_create(Collection *c, OpSequencer *osr,
2297 				    list<Context*> *on_commits);
2298 	  void _txc_update_store_statfs(TransContext *txc);
2299 	  void _txc_add_transaction(TransContext *txc, Transaction *t);
2300 	  void _txc_calc_cost(TransContext *txc);
2301 	  void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2302 	  void _txc_state_proc(TransContext *txc);
2303 	  void _txc_aio_submit(TransContext *txc);
2304 	public:
2305 	  void txc_aio_finish(void *p) {
2306 	    _txc_state_proc(static_cast<TransContext*>(p));
2307 	  }
2308 	private:
2309 	  void _txc_finish_io(TransContext *txc);
2310 	  void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2311 	  void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2312 	  void _txc_committed_kv(TransContext *txc);
2313 	  void _txc_finish(TransContext *txc);
2314 	  void _txc_release_alloc(TransContext *txc);
2315 	
2316 	  void _osr_attach(Collection *c);
2317 	  void _osr_register_zombie(OpSequencer *osr);
2318 	  void _osr_drain(OpSequencer *osr);
2319 	  void _osr_drain_preceding(TransContext *txc);
2320 	  void _osr_drain_all();
2321 	
2322 	  void _kv_start();
2323 	  void _kv_stop();
2324 	  void _kv_sync_thread();
2325 	  void _kv_finalize_thread();
2326 	
2327 	  bluestore_deferred_op_t *_get_deferred_op(TransContext *txc);
2328 	  void _deferred_queue(TransContext *txc);
2329 	public:
2330 	  void deferred_try_submit();
2331 	private:
2332 	  void _deferred_submit_unlock(OpSequencer *osr);
2333 	  void _deferred_aio_finish(OpSequencer *osr);
2334 	  int _deferred_replay();
2335 	
2336 	public:
2337 	  using mempool_dynamic_bitset =
2338 	    boost::dynamic_bitset<uint64_t,
2339 				  mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2340 	  using  per_pool_statfs =
2341 	    mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2342 	
2343 	  enum FSCKDepth {
2344 	    FSCK_REGULAR,
2345 	    FSCK_DEEP,
2346 	    FSCK_SHALLOW
2347 	  };
2348 	
2349 	private:
2350 	  int _fsck_check_extents(
2351 	    const coll_t& cid,
2352 	    const ghobject_t& oid,
2353 	    const PExtentVector& extents,
2354 	    bool compressed,
2355 	    mempool_dynamic_bitset &used_blocks,
2356 	    uint64_t granularity,
2357 	    BlueStoreRepairer* repairer,
2358 	    store_statfs_t& expected_statfs,
2359 	    FSCKDepth depth);
2360 	
2361 	  void _fsck_check_pool_statfs(
2362 	    per_pool_statfs& expected_pool_statfs,
2363 	    int64_t& errors,
2364 	    int64_t &warnings,
2365 	    BlueStoreRepairer* repairer);
2366 	
2367 	  int _fsck(FSCKDepth depth, bool repair);
2368 	  int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2369 	
2370 	  void _buffer_cache_write(
2371 	    TransContext *txc,
2372 	    BlobRef b,
2373 	    uint64_t offset,
2374 	    bufferlist& bl,
2375 	    unsigned flags) {
2376 	    b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2377 				     flags);
2378 	    txc->shared_blobs_written.insert(b->shared_blob);
2379 	  }
2380 	
2381 	  int _collection_list(
2382 	    Collection *c, const ghobject_t& start, const ghobject_t& end,
2383 	    int max, vector<ghobject_t> *ls, ghobject_t *next);
2384 	
2385 	  template <typename T, typename F>
2386 	  T select_option(const std::string& opt_name, T val1, F f) {
2387 	    //NB: opt_name reserved for future use
2388 	    boost::optional<T> val2 = f();
2389 	    if (val2) {
2390 	      return *val2;
2391 	    }
2392 	    return val1;
2393 	  }
2394 	
2395 	  void _apply_padding(uint64_t head_pad,
2396 			      uint64_t tail_pad,
2397 			      bufferlist& padded);
2398 	
2399 	  void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2400 	
2401 	  // -- ondisk version ---
2402 	public:
2403 	  const int32_t latest_ondisk_format = 3;        ///< our version
2404 	  const int32_t min_readable_ondisk_format = 1;  ///< what we can read
2405 	  const int32_t min_compat_ondisk_format = 3;    ///< who can read us
2406 	
2407 	private:
2408 	  int32_t ondisk_format = 0;  ///< value detected on mount
2409 	
2410 	  int _upgrade_super();  ///< upgrade (called during open_super)
2411 	  uint64_t _get_ondisk_reserved() const;
2412 	  void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2413 	
2414 	  // --- public interface ---
2415 	public:
2416 	  BlueStore(CephContext *cct, const string& path);
2417 	  BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2418 	  ~BlueStore() override;
2419 	
2420 	  string get_type() override {
2421 	    return "bluestore";
2422 	  }
2423 	
2424 	  bool needs_journal() override { return false; };
2425 	  bool wants_journal() override { return false; };
2426 	  bool allows_journal() override { return false; };
2427 	
2428 	  uint64_t get_min_alloc_size() const override {
2429 	    return min_alloc_size;
2430 	  }
2431 	
2432 	  int get_devices(set<string> *ls) override;
2433 	
2434 	  bool is_rotational() override;
2435 	  bool is_journal_rotational() override;
2436 	
2437 	  string get_default_device_class() override {
2438 	    string device_class;
2439 	    map<string, string> metadata;
2440 	    collect_metadata(&metadata);
2441 	    auto it = metadata.find("bluestore_bdev_type");
2442 	    if (it != metadata.end()) {
2443 	      device_class = it->second;
2444 	    }
2445 	    return device_class;
2446 	  }
2447 	
2448 	  int get_numa_node(
2449 	    int *numa_node,
2450 	    set<int> *nodes,
2451 	    set<string> *failed) override;
2452 	
2453 	  static int get_block_device_fsid(CephContext* cct, const string& path,
2454 					   uuid_d *fsid);
2455 	
2456 	  bool test_mount_in_use() override;
2457 	
2458 	private:
2459 	  int _mount(bool kv_only, bool open_db=true);
2460 	public:
2461 	  int mount() override {
2462 	    return _mount(false);
2463 	  }
2464 	  int umount() override;
2465 	
2466 	  int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
2467 	    int r = _mount(true, open_db);
2468 	    if (r < 0)
2469 	      return r;
2470 	    *pdb = db;
2471 	    return 0;
2472 	  }
2473 	
2474 	  int write_meta(const std::string& key, const std::string& value) override;
2475 	  int read_meta(const std::string& key, std::string *value) override;
2476 	
2477 	  int cold_open();
2478 	  int cold_close();
2479 	
2480 	  int fsck(bool deep) override {
2481 	    return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2482 	  }
2483 	  int repair(bool deep) override {
2484 	    return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2485 	  }
2486 	  int quick_fix() override {
2487 	    return _fsck(FSCK_SHALLOW, true);
2488 	  }
2489 	
2490 	  void set_cache_shards(unsigned num) override;
2491 	  void dump_cache_stats(Formatter *f) override {
2492 	    int onode_count = 0, buffers_bytes = 0;
2493 	    for (auto i: onode_cache_shards) {
2494 	      onode_count += i->_get_num();
2495 	    }
2496 	    for (auto i: buffer_cache_shards) {
2497 	      buffers_bytes += i->_get_bytes();
2498 	    }
2499 	    f->dump_int("bluestore_onode", onode_count);
2500 	    f->dump_int("bluestore_buffers", buffers_bytes);
2501 	  }
2502 	  void dump_cache_stats(ostream& ss) override {
2503 	    int onode_count = 0, buffers_bytes = 0;
2504 	    for (auto i: onode_cache_shards) {
2505 	      onode_count += i->_get_num();
2506 	    }
2507 	    for (auto i: buffer_cache_shards) {
2508 	      buffers_bytes += i->_get_bytes();
2509 	    }
2510 	    ss << "bluestore_onode: " << onode_count;
2511 	    ss << "bluestore_buffers: " << buffers_bytes;
2512 	  }
2513 	
2514 	  int validate_hobject_key(const hobject_t &obj) const override {
2515 	    return 0;
2516 	  }
2517 	  unsigned get_max_attr_name_length() override {
2518 	    return 256;  // arbitrary; there is no real limit internally
2519 	  }
2520 	
2521 	  int mkfs() override;
2522 	  int mkjournal() override {
2523 	    return 0;
2524 	  }
2525 	
2526 	  void get_db_statistics(Formatter *f) override;
2527 	  void generate_db_histogram(Formatter *f) override;
2528 	  void _flush_cache();
2529 	  int flush_cache(ostream *os = NULL) override;
2530 	  void dump_perf_counters(Formatter *f) override {
2531 	    f->open_object_section("perf_counters");
2532 	    logger->dump_formatted(f, false);
2533 	    f->close_section();
2534 	  }
2535 	
2536 	  int add_new_bluefs_device(int id, const string& path);
2537 	  int migrate_to_existing_bluefs_device(const set<int>& devs_source,
2538 	    int id);
2539 	  int migrate_to_new_bluefs_device(const set<int>& devs_source,
2540 	    int id,
2541 	    const string& path);
2542 	  int expand_devices(ostream& out);
2543 	  string get_device_path(unsigned id);
2544 	
2545 	public:
2546 	  int statfs(struct store_statfs_t *buf,
2547 	             osd_alert_list_t* alerts = nullptr) override;
2548 	  int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2549 			  bool *per_pool_omap) override;
2550 	
2551 	  void collect_metadata(map<string,string> *pm) override;
2552 	
2553 	  bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2554 	  int set_collection_opts(
2555 	    CollectionHandle& c,
2556 	    const pool_opts_t& opts) override;
2557 	  int stat(
2558 	    CollectionHandle &c,
2559 	    const ghobject_t& oid,
2560 	    struct stat *st,
2561 	    bool allow_eio = false) override;
2562 	  int read(
2563 	    CollectionHandle &c,
2564 	    const ghobject_t& oid,
2565 	    uint64_t offset,
2566 	    size_t len,
2567 	    bufferlist& bl,
2568 	    uint32_t op_flags = 0) override;
2569 	
2570 	private:
2571 	
2572 	  // --------------------------------------------------------
2573 	  // intermediate data structures used while reading
2574 	  struct region_t {
2575 	    uint64_t logical_offset;
2576 	    uint64_t blob_xoffset;   //region offset within the blob
2577 	    uint64_t length;
2578 	
2579 	    // used later in read process
2580 	    uint64_t front = 0;
2581 	
2582 	    region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2583 	      : logical_offset(offset),
2584 	      blob_xoffset(b_offs),
2585 	      length(len),
2586 	      front(front){}
2587 	    region_t(const region_t& from)
2588 	      : logical_offset(from.logical_offset),
2589 	      blob_xoffset(from.blob_xoffset),
2590 	      length(from.length),
2591 	      front(from.front){}
2592 	
2593 	    friend ostream& operator<<(ostream& out, const region_t& r) {
2594 	      return out << "0x" << std::hex << r.logical_offset << ":"
2595 	        << r.blob_xoffset << "~" << r.length << std::dec;
2596 	    }
2597 	  };
2598 	
2599 	  // merged blob read request
2600 	  struct read_req_t {
2601 	    uint64_t r_off = 0;
2602 	    uint64_t r_len = 0;
2603 	    bufferlist bl;
2604 	    std::list<region_t> regs; // original read regions
2605 	
2606 	    read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2607 	
2608 	    friend ostream& operator<<(ostream& out, const read_req_t& r) {
2609 	      out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2610 	      for (const auto& reg : r.regs)
2611 	        out << reg;
2612 	      return out << "]}" << std::dec;
2613 	    }
2614 	  };
2615 	
2616 	  typedef list<read_req_t> regions2read_t;
2617 	  typedef map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2618 	
2619 	  void _read_cache(
2620 	    OnodeRef o,
2621 	    uint64_t offset,
2622 	    size_t length,
2623 	    int read_cache_policy,
2624 	    ready_regions_t& ready_regions,
2625 	    blobs2read_t& blobs2read);
2626 	
2627 	
2628 	  int _prepare_read_ioc(
2629 	    blobs2read_t& blobs2read,
2630 	    vector<bufferlist>* compressed_blob_bls,
2631 	    IOContext* ioc);
2632 	
2633 	  int _generate_read_result_bl(
2634 	    OnodeRef o,
2635 	    uint64_t offset,
2636 	    size_t length,
2637 	    ready_regions_t& ready_regions,
2638 	    vector<bufferlist>& compressed_blob_bls,
2639 	    blobs2read_t& blobs2read,
2640 	    bool buffered,
2641 	    bool* csum_error,
2642 	    bufferlist& bl);
2643 	
2644 	  int _do_read(
2645 	    Collection *c,
2646 	    OnodeRef o,
2647 	    uint64_t offset,
2648 	    size_t len,
2649 	    bufferlist& bl,
2650 	    uint32_t op_flags = 0,
2651 	    uint64_t retry_count = 0);
2652 	
2653 	  int _do_readv(
2654 	    Collection *c,
2655 	    OnodeRef o,
2656 	    const interval_set<uint64_t>& m,
2657 	    bufferlist& bl,
2658 	    uint32_t op_flags = 0,
2659 	    uint64_t retry_count = 0);
2660 	
2661 	  int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2662 	 	     uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2663 	public:
2664 	  int fiemap(CollectionHandle &c, const ghobject_t& oid,
2665 		     uint64_t offset, size_t len, bufferlist& bl) override;
2666 	  int fiemap(CollectionHandle &c, const ghobject_t& oid,
2667 		     uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2668 	
2669 	  int readv(
2670 	    CollectionHandle &c_,
2671 	    const ghobject_t& oid,
2672 	    interval_set<uint64_t>& m,
2673 	    bufferlist& bl,
2674 	    uint32_t op_flags) override;
2675 	
2676 	  int dump_onode(CollectionHandle &c, const ghobject_t& oid,
2677 	    const string& section_name, Formatter *f) override;
2678 	
2679 	  int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2680 		      bufferptr& value) override;
2681 	
2682 	  int getattrs(CollectionHandle &c, const ghobject_t& oid,
2683 		       map<string,bufferptr>& aset) override;
2684 	
2685 	  int list_collections(vector<coll_t>& ls) override;
2686 	
2687 	  CollectionHandle open_collection(const coll_t &c) override;
2688 	  CollectionHandle create_new_collection(const coll_t& cid) override;
2689 	  void set_collection_commit_queue(const coll_t& cid,
2690 					   ContextQueue *commit_queue) override;
2691 	
2692 	  bool collection_exists(const coll_t& c) override;
2693 	  int collection_empty(CollectionHandle& c, bool *empty) override;
2694 	  int collection_bits(CollectionHandle& c) override;
2695 	
2696 	  int collection_list(CollectionHandle &c,
2697 			      const ghobject_t& start,
2698 			      const ghobject_t& end,
2699 			      int max,
2700 			      vector<ghobject_t> *ls, ghobject_t *next) override;
2701 	
2702 	  int omap_get(
2703 	    CollectionHandle &c,     ///< [in] Collection containing oid
2704 	    const ghobject_t &oid,   ///< [in] Object containing omap
2705 	    bufferlist *header,      ///< [out] omap header
2706 	    map<string, bufferlist> *out /// < [out] Key to value map
2707 	    ) override;
2708 	  int _omap_get(
2709 	    Collection *c,     ///< [in] Collection containing oid
2710 	    const ghobject_t &oid,   ///< [in] Object containing omap
2711 	    bufferlist *header,      ///< [out] omap header
2712 	    map<string, bufferlist> *out /// < [out] Key to value map
2713 	    );
2714 	
2715 	  /// Get omap header
2716 	  int omap_get_header(
2717 	    CollectionHandle &c,                ///< [in] Collection containing oid
2718 	    const ghobject_t &oid,   ///< [in] Object containing omap
2719 	    bufferlist *header,      ///< [out] omap header
2720 	    bool allow_eio = false ///< [in] don't assert on eio
2721 	    ) override;
2722 	
2723 	  /// Get keys defined on oid
2724 	  int omap_get_keys(
2725 	    CollectionHandle &c,              ///< [in] Collection containing oid
2726 	    const ghobject_t &oid, ///< [in] Object containing omap
2727 	    set<string> *keys      ///< [out] Keys defined on oid
2728 	    ) override;
2729 	
2730 	  /// Get key values
2731 	  int omap_get_values(
2732 	    CollectionHandle &c,         ///< [in] Collection containing oid
2733 	    const ghobject_t &oid,       ///< [in] Object containing omap
2734 	    const set<string> &keys,     ///< [in] Keys to get
2735 	    map<string, bufferlist> *out ///< [out] Returned keys and values
2736 	    ) override;
2737 	
2738 	  /// Filters keys into out which are defined on oid
2739 	  int omap_check_keys(
2740 	    CollectionHandle &c,                ///< [in] Collection containing oid
2741 	    const ghobject_t &oid,   ///< [in] Object containing omap
2742 	    const set<string> &keys, ///< [in] Keys to check
2743 	    set<string> *out         ///< [out] Subset of keys defined on oid
2744 	    ) override;
2745 	
2746 	  ObjectMap::ObjectMapIterator get_omap_iterator(
2747 	    CollectionHandle &c,   ///< [in] collection
2748 	    const ghobject_t &oid  ///< [in] object
2749 	    ) override;
2750 	
2751 	  void set_fsid(uuid_d u) override {
2752 	    fsid = u;
2753 	  }
2754 	  uuid_d get_fsid() override {
2755 	    return fsid;
2756 	  }
2757 	
2758 	  uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2759 	    return num_objects * 300; //assuming per-object overhead is 300 bytes
2760 	  }
2761 	
2762 	  struct BSPerfTracker {
2763 	    PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2764 	    PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2765 	
2766 	    objectstore_perf_stat_t get_cur_stats() const {
2767 	      objectstore_perf_stat_t ret;
2768 	      ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2769 	      ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2770 	      return ret;
2771 	    }
2772 	
2773 	    void update_from_perfcounters(PerfCounters &logger);
2774 	  } perf_tracker;
2775 	
2776 	  objectstore_perf_stat_t get_cur_stats() override {
2777 	    perf_tracker.update_from_perfcounters(*logger);
2778 	    return perf_tracker.get_cur_stats();
2779 	  }
2780 	  const PerfCounters* get_perf_counters() const override {
2781 	    return logger;
2782 	  }
2783 	
2784 	  int queue_transactions(
2785 	    CollectionHandle& ch,
2786 	    vector<Transaction>& tls,
2787 	    TrackedOpRef op = TrackedOpRef(),
2788 	    ThreadPool::TPHandle *handle = NULL) override;
2789 	
2790 	  // error injection
2791 	  void inject_data_error(const ghobject_t& o) override {
2792 	    std::unique_lock l(debug_read_error_lock);
2793 	    debug_data_error_objects.insert(o);
2794 	  }
2795 	  void inject_mdata_error(const ghobject_t& o) override {
2796 	    std::unique_lock l(debug_read_error_lock);
2797 	    debug_mdata_error_objects.insert(o);
2798 	  }
2799 	
2800 	  /// methods to inject various errors fsck can repair
2801 	  void inject_broken_shared_blob_key(const string& key,
2802 				 const bufferlist& bl);
2803 	  void inject_leaked(uint64_t len);
2804 	  void inject_false_free(coll_t cid, ghobject_t oid);
2805 	  void inject_statfs(const string& key, const store_statfs_t& new_statfs);
2806 	  void inject_global_statfs(const store_statfs_t& new_statfs);
2807 	  void inject_misreference(coll_t cid1, ghobject_t oid1,
2808 				   coll_t cid2, ghobject_t oid2,
2809 				   uint64_t offset);
2810 	
2811 	  void compact() override {
2812 	    ceph_assert(db);
2813 	    db->compact();
2814 	  }
2815 	  bool has_builtin_csum() const override {
2816 	    return true;
2817 	  }
2818 	
2819 	  /*
2820 	  Allocate space for BlueFS from slow device.
2821 	  Either automatically applies allocated extents to underlying 
2822 	  BlueFS (extents == nullptr) or just return them (non-null extents) provided
2823 	  */
2824 	  int allocate_bluefs_freespace(
2825 	    uint64_t min_size,
2826 	    uint64_t size,
2827 	    PExtentVector* extents);
2828 	
2829 	  inline void log_latency(const char* name,
2830 	    int idx,
2831 	    const ceph::timespan& lat,
2832 	    double lat_threshold,
2833 	    const char* info = "") const;
2834 	
2835 	  inline void log_latency_fn(const char* name,
2836 	    int idx,
2837 	    const ceph::timespan& lat,
2838 	    double lat_threshold,
2839 	    std::function<string (const ceph::timespan& lat)> fn) const;
2840 	
2841 	private:
2842 	  bool _debug_data_eio(const ghobject_t& o) {
2843 	    if (!cct->_conf->bluestore_debug_inject_read_err) {
2844 	      return false;
2845 	    }
2846 	    std::shared_lock l(debug_read_error_lock);
2847 	    return debug_data_error_objects.count(o);
2848 	  }
2849 	  bool _debug_mdata_eio(const ghobject_t& o) {
2850 	    if (!cct->_conf->bluestore_debug_inject_read_err) {
2851 	      return false;
2852 	    }
2853 	    std::shared_lock l(debug_read_error_lock);
2854 	    return debug_mdata_error_objects.count(o);
2855 	  }
2856 	  void _debug_obj_on_delete(const ghobject_t& o) {
2857 	    if (cct->_conf->bluestore_debug_inject_read_err) {
2858 	      std::unique_lock l(debug_read_error_lock);
2859 	      debug_data_error_objects.erase(o);
2860 	      debug_mdata_error_objects.erase(o);
2861 	    }
2862 	  }
2863 	private:
2864 	  ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
2865 	  string failed_cmode;
2866 	  set<string> failed_compressors;
2867 	  string spillover_alert;
2868 	  string legacy_statfs_alert;
2869 	  string no_per_pool_omap_alert;
2870 	  string disk_size_mismatch_alert;
2871 	
2872 	  void _log_alerts(osd_alert_list_t& alerts);
2873 	  bool _set_compression_alert(bool cmode, const char* s) {
2874 	    std::lock_guard l(qlock);
2875 	    if (cmode) {
2876 	      bool ret = failed_cmode.empty();
2877 	      failed_cmode = s;
2878 	      return ret;
2879 	    }
2880 	    return failed_compressors.emplace(s).second;
2881 	  }
2882 	  void _clear_compression_alert() {
2883 	    std::lock_guard l(qlock);
2884 	    failed_compressors.clear();
2885 	    failed_cmode.clear();
2886 	  }
2887 	
2888 	  void _set_spillover_alert(const string& s) {
2889 	    std::lock_guard l(qlock);
2890 	    spillover_alert = s;
2891 	  }
2892 	  void _clear_spillover_alert() {
2893 	    std::lock_guard l(qlock);
2894 	    spillover_alert.clear();
2895 	  }
2896 	
2897 	  void _check_legacy_statfs_alert();
2898 	  void _check_no_per_pool_omap_alert();
2899 	  void _set_disk_size_mismatch_alert(const string& s) {
2900 	    std::lock_guard l(qlock);
2901 	    disk_size_mismatch_alert = s;
2902 	  }
2903 	
2904 	private:
2905 	
2906 	  // --------------------------------------------------------
2907 	  // read processing internal methods
2908 	  int _verify_csum(
2909 	    OnodeRef& o,
2910 	    const bluestore_blob_t* blob,
2911 	    uint64_t blob_xoffset,
2912 	    const bufferlist& bl,
2913 	    uint64_t logical_offset) const;
2914 	  int _decompress(bufferlist& source, bufferlist* result);
2915 	
2916 	
2917 	  // --------------------------------------------------------
2918 	  // write ops
2919 	
2920 	  struct WriteContext {
2921 	    bool buffered = false;          ///< buffered write
2922 	    bool compress = false;          ///< compressed write
2923 	    uint64_t target_blob_size = 0;  ///< target (max) blob size
2924 	    unsigned csum_order = 0;        ///< target checksum chunk order
2925 	
2926 	    old_extent_map_t old_extents;   ///< must deref these blobs
2927 	    interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
2928 	
2929 	    struct write_item {
2930 	      uint64_t logical_offset;      ///< write logical offset
2931 	      BlobRef b;
2932 	      uint64_t blob_length;
2933 	      uint64_t b_off;
2934 	      bufferlist bl;
2935 	      uint64_t b_off0; ///< original offset in a blob prior to padding
2936 	      uint64_t length0; ///< original data length prior to padding
2937 	
2938 	      bool mark_unused;
2939 	      bool new_blob; ///< whether new blob was created
2940 	
2941 	      bool compressed = false;
2942 	      bufferlist compressed_bl;
2943 	      size_t compressed_len = 0;
2944 	
2945 	      write_item(
2946 		uint64_t logical_offs,
2947 	        BlobRef b,
2948 	        uint64_t blob_len,
2949 	        uint64_t o,
2950 	        bufferlist& bl,
2951 	        uint64_t o0,
2952 	        uint64_t l0,
2953 	        bool _mark_unused,
2954 		bool _new_blob)
2955 	       :
2956 	         logical_offset(logical_offs),
2957 	         b(b),
2958 	         blob_length(blob_len),
2959 	         b_off(o),
2960 	         bl(bl),
2961 	         b_off0(o0),
2962 	         length0(l0),
2963 	         mark_unused(_mark_unused),
2964 		 new_blob(_new_blob) {}
2965 	    };
2966 	    vector<write_item> writes;                 ///< blobs we're writing
2967 	
2968 	    /// partial clone of the context
2969 	    void fork(const WriteContext& other) {
2970 	      buffered = other.buffered;
2971 	      compress = other.compress;
2972 	      target_blob_size = other.target_blob_size;
2973 	      csum_order = other.csum_order;
2974 	    }
2975 	    void write(
2976 	      uint64_t loffs,
2977 	      BlobRef b,
2978 	      uint64_t blob_len,
2979 	      uint64_t o,
2980 	      bufferlist& bl,
2981 	      uint64_t o0,
2982 	      uint64_t len0,
2983 	      bool _mark_unused,
2984 	      bool _new_blob) {
2985 	      writes.emplace_back(loffs,
2986 	                          b,
2987 	                          blob_len,
2988 	                          o,
2989 	                          bl,
2990 	                          o0,
2991 	                          len0,
2992 	                          _mark_unused,
2993 	                          _new_blob);
2994 	    }
2995 	    /// Checks for writes to the same pextent within a blob
2996 	    bool has_conflict(
2997 	      BlobRef b,
2998 	      uint64_t loffs,
2999 	      uint64_t loffs_end,
3000 	      uint64_t min_alloc_size);
3001 	  };
3002 	
3003 	  void _do_write_small(
3004 	    TransContext *txc,
3005 	    CollectionRef &c,
3006 	    OnodeRef o,
3007 	    uint64_t offset, uint64_t length,
3008 	    bufferlist::iterator& blp,
3009 	    WriteContext *wctx);
3010 	  void _do_write_big(
3011 	    TransContext *txc,
3012 	    CollectionRef &c,
3013 	    OnodeRef o,
3014 	    uint64_t offset, uint64_t length,
3015 	    bufferlist::iterator& blp,
3016 	    WriteContext *wctx);
3017 	  int _do_alloc_write(
3018 	    TransContext *txc,
3019 	    CollectionRef c,
3020 	    OnodeRef o,
3021 	    WriteContext *wctx);
3022 	  void _wctx_finish(
3023 	    TransContext *txc,
3024 	    CollectionRef& c,
3025 	    OnodeRef o,
3026 	    WriteContext *wctx,
3027 	    set<SharedBlob*> *maybe_unshared_blobs=0);
3028 	
3029 	  int _write(TransContext *txc,
3030 		     CollectionRef& c,
3031 		     OnodeRef& o,
3032 		     uint64_t offset, size_t len,
3033 		     bufferlist& bl,
3034 		     uint32_t fadvise_flags);
3035 	  void _pad_zeros(bufferlist *bl, uint64_t *offset,
3036 			  uint64_t chunk_size);
3037 	
3038 	  void _choose_write_options(CollectionRef& c,
3039 	                             OnodeRef o,
3040 	                             uint32_t fadvise_flags,
3041 	                             WriteContext *wctx);
3042 	
3043 	  int _do_gc(TransContext *txc,
3044 	             CollectionRef& c,
3045 	             OnodeRef o,
3046 	             const WriteContext& wctx,
3047 	             uint64_t *dirty_start,
3048 	             uint64_t *dirty_end);
3049 	
3050 	  int _do_write(TransContext *txc,
3051 			CollectionRef &c,
3052 			OnodeRef o,
3053 			uint64_t offset, uint64_t length,
3054 			bufferlist& bl,
3055 			uint32_t fadvise_flags);
3056 	  void _do_write_data(TransContext *txc,
3057 	                      CollectionRef& c,
3058 	                      OnodeRef o,
3059 	                      uint64_t offset,
3060 	                      uint64_t length,
3061 	                      bufferlist& bl,
3062 	                      WriteContext *wctx);
3063 	
3064 	  int _touch(TransContext *txc,
3065 		     CollectionRef& c,
3066 		     OnodeRef& o);
3067 	  int _do_zero(TransContext *txc,
3068 		       CollectionRef& c,
3069 		       OnodeRef& o,
3070 		       uint64_t offset, size_t len);
3071 	  int _zero(TransContext *txc,
3072 		    CollectionRef& c,
3073 		    OnodeRef& o,
3074 		    uint64_t offset, size_t len);
3075 	  void _do_truncate(TransContext *txc,
3076 			   CollectionRef& c,
3077 			   OnodeRef o,
3078 			   uint64_t offset,
3079 			   set<SharedBlob*> *maybe_unshared_blobs=0);
3080 	  int _truncate(TransContext *txc,
3081 			CollectionRef& c,
3082 			OnodeRef& o,
3083 			uint64_t offset);
3084 	  int _remove(TransContext *txc,
3085 		      CollectionRef& c,
3086 		      OnodeRef& o);
3087 	  int _do_remove(TransContext *txc,
3088 			 CollectionRef& c,
3089 			 OnodeRef o);
3090 	  int _setattr(TransContext *txc,
3091 		       CollectionRef& c,
3092 		       OnodeRef& o,
3093 		       const string& name,
3094 		       bufferptr& val);
3095 	  int _setattrs(TransContext *txc,
3096 			CollectionRef& c,
3097 			OnodeRef& o,
3098 			const map<string,bufferptr>& aset);
3099 	  int _rmattr(TransContext *txc,
3100 		      CollectionRef& c,
3101 		      OnodeRef& o,
3102 		      const string& name);
3103 	  int _rmattrs(TransContext *txc,
3104 		       CollectionRef& c,
3105 		       OnodeRef& o);
3106 	  void _do_omap_clear(TransContext *txc, OnodeRef &o);
3107 	  int _omap_clear(TransContext *txc,
3108 			  CollectionRef& c,
3109 			  OnodeRef& o);
3110 	  int _omap_setkeys(TransContext *txc,
3111 			    CollectionRef& c,
3112 			    OnodeRef& o,
3113 			    bufferlist& bl);
3114 	  int _omap_setheader(TransContext *txc,
3115 			      CollectionRef& c,
3116 			      OnodeRef& o,
3117 			      bufferlist& header);
3118 	  int _omap_rmkeys(TransContext *txc,
3119 			   CollectionRef& c,
3120 			   OnodeRef& o,
3121 			   bufferlist& bl);
3122 	  int _omap_rmkey_range(TransContext *txc,
3123 				CollectionRef& c,
3124 				OnodeRef& o,
3125 				const string& first, const string& last);
3126 	  int _set_alloc_hint(
3127 	    TransContext *txc,
3128 	    CollectionRef& c,
3129 	    OnodeRef& o,
3130 	    uint64_t expected_object_size,
3131 	    uint64_t expected_write_size,
3132 	    uint32_t flags);
3133 	  int _do_clone_range(TransContext *txc,
3134 			      CollectionRef& c,
3135 			      OnodeRef& oldo,
3136 			      OnodeRef& newo,
3137 			      uint64_t srcoff, uint64_t length, uint64_t dstoff);
3138 	  int _clone(TransContext *txc,
3139 		     CollectionRef& c,
3140 		     OnodeRef& oldo,
3141 		     OnodeRef& newo);
3142 	  int _clone_range(TransContext *txc,
3143 			   CollectionRef& c,
3144 			   OnodeRef& oldo,
3145 			   OnodeRef& newo,
3146 			   uint64_t srcoff, uint64_t length, uint64_t dstoff);
3147 	  int _rename(TransContext *txc,
3148 		      CollectionRef& c,
3149 		      OnodeRef& oldo,
3150 		      OnodeRef& newo,
3151 		      const ghobject_t& new_oid);
3152 	  int _create_collection(TransContext *txc, const coll_t &cid,
3153 				 unsigned bits, CollectionRef *c);
3154 	  int _remove_collection(TransContext *txc, const coll_t &cid,
3155 	                         CollectionRef *c);
3156 	  void _do_remove_collection(TransContext *txc, CollectionRef *c);
3157 	  int _split_collection(TransContext *txc,
3158 				CollectionRef& c,
3159 				CollectionRef& d,
3160 				unsigned bits, int rem);
3161 	  int _merge_collection(TransContext *txc,
3162 				CollectionRef *c,
3163 				CollectionRef& d,
3164 				unsigned bits);
3165 	
3166 	private:
3167 	  std::atomic<uint64_t> out_of_sync_fm = {0};
3168 	  // --------------------------------------------------------
3169 	  // BlueFSDeviceExpander implementation
3170 	  uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
3171 	    uint64_t bluefs_total) override {
3172 	    auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
3173 	    return delta > 0 ? delta : 0;
3174 	  }
3175 	  int allocate_freespace(
3176 	    uint64_t min_size,
3177 	    uint64_t size,
3178 	    PExtentVector& extents) override {
3179 	    return allocate_bluefs_freespace(min_size, size, &extents);
3180 	  };
3181 	  size_t available_freespace(uint64_t alloc_size) override;
3182 	  inline bool _use_rotational_settings();
3183 	
3184 	public:
3185 	  struct sb_info_t {
3186 	    coll_t cid;
3187 	    int64_t pool_id = INT64_MIN;
3188 	    list<ghobject_t> oids;
3189 	    BlueStore::SharedBlobRef sb;
3190 	    bluestore_extent_ref_map_t ref_map;
3191 	    bool compressed = false;
3192 	    bool passed = false;
3193 	    bool updated = false;
3194 	  };
3195 	  typedef btree::btree_set<
3196 	    uint64_t, std::less<uint64_t>,
3197 	    mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3198 	
3199 	  typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
3200 	  struct FSCK_ObjectCtx {
3201 	    int64_t& errors;
3202 	    int64_t& warnings;
3203 	    uint64_t& num_objects;
3204 	    uint64_t& num_extents;
3205 	    uint64_t& num_blobs;
3206 	    uint64_t& num_sharded_objects;
3207 	    uint64_t& num_spanning_blobs;
3208 	
3209 	    mempool_dynamic_bitset* used_blocks;
3210 	    uint64_t_btree_t* used_omap_head;
3211 	    uint64_t_btree_t* used_per_pool_omap_head;
3212 	    uint64_t_btree_t* used_pgmeta_omap_head;
3213 	
3214 	    ceph::mutex* sb_info_lock;
3215 	    sb_info_map_t& sb_info;
3216 	
3217 	    store_statfs_t& expected_store_statfs;
3218 	    per_pool_statfs& expected_pool_statfs;
3219 	    BlueStoreRepairer* repairer;
3220 	
3221 	    FSCK_ObjectCtx(int64_t& e,
3222 	                   int64_t& w,
3223 	                   uint64_t& _num_objects,
3224 	                   uint64_t& _num_extents,
3225 	                   uint64_t& _num_blobs,
3226 	                   uint64_t& _num_sharded_objects,
3227 	                   uint64_t& _num_spanning_blobs,
3228 	                   mempool_dynamic_bitset* _ub,
3229 	                   uint64_t_btree_t* _used_omap_head,
3230 	                   uint64_t_btree_t* _used_per_pool_omap_head,
3231 	                   uint64_t_btree_t* _used_pgmeta_omap_head,
3232 	                   ceph::mutex* _sb_info_lock,
3233 	                   sb_info_map_t& _sb_info,
3234 	                   store_statfs_t& _store_statfs,
3235 	                   per_pool_statfs& _pool_statfs,
3236 	                   BlueStoreRepairer* _repairer) :
3237 	      errors(e),
3238 	      warnings(w),
3239 	      num_objects(_num_objects),
3240 	      num_extents(_num_extents),
3241 	      num_blobs(_num_blobs),
3242 	      num_sharded_objects(_num_sharded_objects),
3243 	      num_spanning_blobs(_num_spanning_blobs),
3244 	      used_blocks(_ub),
3245 	      used_omap_head(_used_omap_head),
3246 	      used_per_pool_omap_head(_used_per_pool_omap_head),
3247 	      used_pgmeta_omap_head(_used_pgmeta_omap_head),
3248 	      sb_info_lock(_sb_info_lock),
3249 	      sb_info(_sb_info),
3250 	      expected_store_statfs(_store_statfs),
3251 	      expected_pool_statfs(_pool_statfs),
3252 	      repairer(_repairer) {
3253 	    }
3254 	  };
3255 	
3256 	  OnodeRef fsck_check_objects_shallow(
3257 	    FSCKDepth depth,
3258 	    int64_t pool_id,
3259 	    CollectionRef c,
3260 	    const ghobject_t& oid,
3261 	    const string& key,
3262 	    const bufferlist& value,
3263 	    mempool::bluestore_fsck::list<string>& expecting_shards,
3264 	    map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3265 	    const BlueStore::FSCK_ObjectCtx& ctx);
3266 	
3267 	private:
3268 	  void _fsck_check_objects(FSCKDepth depth,
3269 	    FSCK_ObjectCtx& ctx);
3270 	};
3271 	
3272 	inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
3273 	  return out 
3274 	    << " allocated:"
3275 	      << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3276 	    << " stored:"
3277 	      << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3278 	    << " compressed:"
3279 	      << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3280 	    << " compressed_orig:"
3281 	      << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3282 	    << " compressed_alloc:"
3283 	      << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3284 	}
3285 	
3286 	static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3287 	  o->get();
3288 	}
3289 	static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3290 	  o->put();
3291 	}
3292 	
3293 	static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3294 	  o->get();
3295 	}
3296 	static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
(1) Event fun_call_w_exception: Called function throws an exception of type "std::length_error". [details]
3297 	  o->put();
3298 	}
3299 	
3300 	class BlueStoreRepairer
3301 	{
3302 	public:
3303 	  // to simplify future potential migration to mempools
3304 	  using fsck_interval = interval_set<uint64_t>;
3305 	
3306 	  // Structure to track what pextents are used for specific cid/oid.
3307 	  // Similar to Bloom filter positive and false-positive matches are 
3308 	  // possible only.
3309 	  // Maintains two lists of bloom filters for both cids and oids
3310 	  //   where each list entry is a BF for specific disk pextent
3311 	  //   The length of the extent per filter is measured on init.
3312 	  // Allows to filter out 'uninteresting' pextents to speadup subsequent
3313 	  //  'is_used' access. 
3314 	  struct StoreSpaceTracker {
3315 	    const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3316 	    const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3317 	    const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3318 	    static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3319 	
3320 	    typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3321 	    bloom_vector collections_bfs;
3322 	    bloom_vector objects_bfs;
3323 	    
3324 	    bool was_filtered_out = false; 
3325 	    uint64_t granularity = 0; // extent length for a single filter
3326 	
3327 	    StoreSpaceTracker() {
3328 	    }
3329 	    StoreSpaceTracker(const StoreSpaceTracker& from) :
3330 	      collections_bfs(from.collections_bfs),
3331 	      objects_bfs(from.objects_bfs),
3332 	      granularity(from.granularity) {
3333 	    }
3334 	
3335 	    void init(uint64_t total,
3336 		      uint64_t min_alloc_size,
3337 		      uint64_t mem_cap = DEF_MEM_CAP) {
3338 	      ceph_assert(!granularity); // not initialized yet
3339 	      ceph_assert(min_alloc_size && isp2(min_alloc_size));
3340 	      ceph_assert(mem_cap);
3341 	      
3342 	      total = round_up_to(total, min_alloc_size);
3343 	      granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3344 	
3345 	      if (!granularity) {
3346 		granularity = min_alloc_size;
3347 	      } else {
3348 		granularity = round_up_to(granularity, min_alloc_size);
3349 	      }
3350 	
3351 	      uint64_t entries = round_up_to(total, granularity) / granularity;
3352 	      collections_bfs.resize(entries,
3353 	        bloom_filter(BLOOM_FILTER_SALT_COUNT,
3354 	                     BLOOM_FILTER_TABLE_SIZE,
3355 	                     0,
3356 	                     BLOOM_FILTER_EXPECTED_COUNT));
3357 	      objects_bfs.resize(entries, 
3358 	        bloom_filter(BLOOM_FILTER_SALT_COUNT,
3359 	                     BLOOM_FILTER_TABLE_SIZE,
3360 	                     0,
3361 	                     BLOOM_FILTER_EXPECTED_COUNT));
3362 	    }
3363 	    inline uint32_t get_hash(const coll_t& cid) const {
3364 	      return cid.hash_to_shard(1);
3365 	    }
3366 	    inline void set_used(uint64_t offset, uint64_t len,
3367 				 const coll_t& cid, const ghobject_t& oid) {
3368 	      ceph_assert(granularity); // initialized
3369 	      
3370 	      // can't call this func after filter_out has been applied
3371 	      ceph_assert(!was_filtered_out);
3372 	      if (!len) {
3373 		return;
3374 	      }
3375 	      auto pos = offset / granularity;
3376 	      auto end_pos = (offset + len - 1) / granularity;
3377 	      while (pos <= end_pos) {
3378 	        collections_bfs[pos].insert(get_hash(cid));
3379 	        objects_bfs[pos].insert(oid.hobj.get_hash());
3380 	        ++pos;
3381 	      }
3382 	    }
3383 	    // filter-out entries unrelated to the specified(broken) extents.
3384 	    // 'is_used' calls are permitted after that only
3385 	    size_t filter_out(const fsck_interval& extents);
3386 	
3387 	    // determines if collection's present after filtering-out 
3388 	    inline bool is_used(const coll_t& cid) const {
3389 	      ceph_assert(was_filtered_out);
3390 	      for(auto& bf : collections_bfs) {
3391 	        if (bf.contains(get_hash(cid))) {
3392 	          return true;
3393 	        }
3394 	      }
3395 	      return false;
3396 	    }
3397 	    // determines if object's present after filtering-out 
3398 	    inline bool is_used(const ghobject_t& oid) const {
3399 	      ceph_assert(was_filtered_out);
3400 	      for(auto& bf : objects_bfs) {
3401 	        if (bf.contains(oid.hobj.get_hash())) {
3402 	          return true;
3403 	        }
3404 	      }
3405 	      return false;
3406 	    }
3407 	    // determines if collection's present before filtering-out 
3408 	    inline bool is_used(const coll_t& cid, uint64_t offs) const {
3409 	      ceph_assert(granularity); // initialized
3410 	      ceph_assert(!was_filtered_out);
3411 	      auto &bf = collections_bfs[offs / granularity];
3412 	      if (bf.contains(get_hash(cid))) {
3413 	        return true;
3414 	      }
3415 	      return false;
3416 	    }
3417 	    // determines if object's present before filtering-out 
3418 	    inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3419 	      ceph_assert(granularity); // initialized
3420 	      ceph_assert(!was_filtered_out);
3421 	      auto &bf = objects_bfs[offs / granularity];
3422 	      if (bf.contains(oid.hobj.get_hash())) {
3423 	        return true;
3424 	      }
3425 	      return false;
3426 	    }
3427 	  };
3428 	public:
3429 	  void fix_per_pool_omap(KeyValueDB *db);
3430 	  bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
3431 	  bool fix_shared_blob(KeyValueDB *db,
3432 			         uint64_t sbid,
3433 			       const bufferlist* bl);
3434 	  bool fix_statfs(KeyValueDB *db, const string& key,
3435 	    const store_statfs_t& new_statfs);
3436 	
3437 	  bool fix_leaked(KeyValueDB *db,
3438 			  FreelistManager* fm,
3439 			  uint64_t offset, uint64_t len);
3440 	  bool fix_false_free(KeyValueDB *db,
3441 			      FreelistManager* fm,
3442 			      uint64_t offset, uint64_t len);
3443 	  bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
3444 	
3445 	  void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
3446 	
3447 	  bool preprocess_misreference(KeyValueDB *db);
3448 	
3449 	  unsigned apply(KeyValueDB* db);
3450 	
3451 	  void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3452 	    misreferenced_extents.union_insert(offs, len);
3453 	    if (inc_error) {
3454 	      ++to_repair_cnt;
3455 	    }
3456 	  }
3457 	  void inc_repaired() {
3458 	    ++to_repair_cnt;
3459 	  }
3460 	
3461 	  StoreSpaceTracker& get_space_usage_tracker() {
3462 	    return space_usage_tracker;
3463 	  }
3464 	  const fsck_interval& get_misreferences() const {
3465 	    return misreferenced_extents;
3466 	  }
3467 	  KeyValueDB::Transaction get_fix_misreferences_txn() {
3468 	    return fix_misreferences_txn;
3469 	  }
3470 	
3471 	private:
3472 	  unsigned to_repair_cnt = 0;
3473 	  KeyValueDB::Transaction fix_per_pool_omap_txn;
3474 	  KeyValueDB::Transaction fix_fm_leaked_txn;
3475 	  KeyValueDB::Transaction fix_fm_false_free_txn;
3476 	  KeyValueDB::Transaction remove_key_txn;
3477 	  KeyValueDB::Transaction fix_statfs_txn;
3478 	  KeyValueDB::Transaction fix_shared_blob_txn;
3479 	
3480 	  KeyValueDB::Transaction fix_misreferences_txn;
3481 	
3482 	  StoreSpaceTracker space_usage_tracker;
3483 	
3484 	  // non-shared extents with multiple references
3485 	  fsck_interval misreferenced_extents;
3486 	
3487 	};
3488 	#endif
3489