1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <chrono>
24 #include <ratio>
25 #include <mutex>
26 #include <condition_variable>
27
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
34
35 #include "include/cpp-btree/btree_set.h"
36
37 #include "include/ceph_assert.h"
38 #include "include/unordered_map.h"
39 #include "include/mempool.h"
40 #include "include/hash.h"
41 #include "common/bloom_filter.hpp"
42 #include "common/Finisher.h"
43 #include "common/ceph_mutex.h"
44 #include "common/Throttle.h"
45 #include "common/perf_counters.h"
46 #include "common/PriorityCache.h"
47 #include "compressor/Compressor.h"
48 #include "os/ObjectStore.h"
49
50 #include "bluestore_types.h"
51 #include "BlockDevice.h"
52 #include "BlueFS.h"
53 #include "common/EventTrace.h"
54
55 class Allocator;
56 class FreelistManager;
57 class BlueStoreRepairer;
58
59 //#define DEBUG_CACHE
60 //#define DEBUG_DEFERRED
61
62
63
64 // constants for Buffer::optimize()
65 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
66
67
68 enum {
69 l_bluestore_first = 732430,
70 l_bluestore_kv_flush_lat,
71 l_bluestore_kv_commit_lat,
72 l_bluestore_kv_sync_lat,
73 l_bluestore_kv_final_lat,
74 l_bluestore_state_prepare_lat,
75 l_bluestore_state_aio_wait_lat,
76 l_bluestore_state_io_done_lat,
77 l_bluestore_state_kv_queued_lat,
78 l_bluestore_state_kv_committing_lat,
79 l_bluestore_state_kv_done_lat,
80 l_bluestore_state_deferred_queued_lat,
81 l_bluestore_state_deferred_aio_wait_lat,
82 l_bluestore_state_deferred_cleanup_lat,
83 l_bluestore_state_finishing_lat,
84 l_bluestore_state_done_lat,
85 l_bluestore_throttle_lat,
86 l_bluestore_submit_lat,
87 l_bluestore_commit_lat,
88 l_bluestore_read_lat,
89 l_bluestore_read_onode_meta_lat,
90 l_bluestore_read_wait_aio_lat,
91 l_bluestore_compress_lat,
92 l_bluestore_decompress_lat,
93 l_bluestore_csum_lat,
94 l_bluestore_compress_success_count,
95 l_bluestore_compress_rejected_count,
96 l_bluestore_write_pad_bytes,
97 l_bluestore_deferred_write_ops,
98 l_bluestore_deferred_write_bytes,
99 l_bluestore_write_penalty_read_ops,
100 l_bluestore_allocated,
101 l_bluestore_stored,
102 l_bluestore_compressed,
103 l_bluestore_compressed_allocated,
104 l_bluestore_compressed_original,
105 l_bluestore_onodes,
106 l_bluestore_onode_hits,
107 l_bluestore_onode_misses,
108 l_bluestore_onode_shard_hits,
109 l_bluestore_onode_shard_misses,
110 l_bluestore_extents,
111 l_bluestore_blobs,
112 l_bluestore_buffers,
113 l_bluestore_buffer_bytes,
114 l_bluestore_buffer_hit_bytes,
115 l_bluestore_buffer_miss_bytes,
116 l_bluestore_write_big,
117 l_bluestore_write_big_bytes,
118 l_bluestore_write_big_blobs,
119 l_bluestore_write_small,
120 l_bluestore_write_small_bytes,
121 l_bluestore_write_small_unused,
122 l_bluestore_write_small_deferred,
123 l_bluestore_write_small_pre_read,
124 l_bluestore_write_small_new,
125 l_bluestore_txc,
126 l_bluestore_onode_reshard,
127 l_bluestore_blob_split,
128 l_bluestore_extent_compress,
129 l_bluestore_gc_merged,
130 l_bluestore_read_eio,
131 l_bluestore_reads_with_retries,
132 l_bluestore_fragmentation,
133 l_bluestore_omap_seek_to_first_lat,
134 l_bluestore_omap_upper_bound_lat,
135 l_bluestore_omap_lower_bound_lat,
136 l_bluestore_omap_next_lat,
137 l_bluestore_clist_lat,
138 l_bluestore_last
139 };
140
141 #define META_POOL_ID ((uint64_t)-1ull)
142
143 class BlueStore : public ObjectStore,
144 public BlueFSDeviceExpander,
145 public md_config_obs_t {
146 // -----------------------------------------------------
147 // types
148 public:
149 // config observer
150 const char** get_tracked_conf_keys() const override;
151 void handle_conf_change(const ConfigProxy& conf,
152 const std::set<std::string> &changed) override;
153
154 //handler for discard event
155 void handle_discard(interval_set<uint64_t>& to_release);
156
157 void _set_csum();
158 void _set_compression();
159 void _set_throttle_params();
160 int _set_cache_sizes();
161 void _set_max_defer_interval() {
162 max_defer_interval =
163 cct->_conf.get_val<double>("bluestore_max_defer_interval");
164 }
165
166 class TransContext;
167
168 typedef map<uint64_t, bufferlist> ready_regions_t;
169
170
171 struct BufferSpace;
172 struct Collection;
173 typedef boost::intrusive_ptr<Collection> CollectionRef;
174
175 struct AioContext {
176 virtual void aio_finish(BlueStore *store) = 0;
177 virtual ~AioContext() {}
178 };
179
180 /// cached buffer
181 struct Buffer {
182 MEMPOOL_CLASS_HELPERS();
183
184 enum {
185 STATE_EMPTY, ///< empty buffer -- used for cache history
186 STATE_CLEAN, ///< clean data that is up to date
187 STATE_WRITING, ///< data that is being written (io not yet complete)
188 };
189 static const char *get_state_name(int s) {
190 switch (s) {
191 case STATE_EMPTY: return "empty";
192 case STATE_CLEAN: return "clean";
193 case STATE_WRITING: return "writing";
194 default: return "???";
195 }
196 }
197 enum {
198 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
199 // NOTE: fix operator<< when you define a second flag
200 };
201 static const char *get_flag_name(int s) {
202 switch (s) {
203 case FLAG_NOCACHE: return "nocache";
204 default: return "???";
205 }
206 }
207
208 BufferSpace *space;
209 uint16_t state; ///< STATE_*
210 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
211 uint32_t flags; ///< FLAG_*
212 uint64_t seq;
213 uint32_t offset, length;
214 bufferlist data;
215
216 boost::intrusive::list_member_hook<> lru_item;
217 boost::intrusive::list_member_hook<> state_item;
218
219 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
220 unsigned f = 0)
221 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
222 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
223 unsigned f = 0)
224 : space(space), state(s), flags(f), seq(q), offset(o),
225 length(b.length()), data(b) {}
226
227 bool is_empty() const {
228 return state == STATE_EMPTY;
229 }
230 bool is_clean() const {
231 return state == STATE_CLEAN;
232 }
233 bool is_writing() const {
234 return state == STATE_WRITING;
235 }
236
237 uint32_t end() const {
238 return offset + length;
239 }
240
241 void truncate(uint32_t newlen) {
242 ceph_assert(newlen < length);
243 if (data.length()) {
244 bufferlist t;
245 t.substr_of(data, 0, newlen);
246 data.claim(t);
247 }
248 length = newlen;
249 }
250 void maybe_rebuild() {
251 if (data.length() &&
252 (data.get_num_buffers() > 1 ||
253 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
254 data.rebuild();
255 }
256 }
257
258 void dump(Formatter *f) const {
259 f->dump_string("state", get_state_name(state));
260 f->dump_unsigned("seq", seq);
261 f->dump_unsigned("offset", offset);
262 f->dump_unsigned("length", length);
263 f->dump_unsigned("data_length", data.length());
264 }
265 };
266
267 struct BufferCacheShard;
268
269 /// map logical extent range (object) onto buffers
270 struct BufferSpace {
271 enum {
272 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
273 };
274
275 typedef boost::intrusive::list<
276 Buffer,
277 boost::intrusive::member_hook<
278 Buffer,
279 boost::intrusive::list_member_hook<>,
280 &Buffer::state_item> > state_list_t;
281
282 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
283 buffer_map;
284
285 // we use a bare intrusive list here instead of std::map because
286 // it uses less memory and we expect this to be very small (very
287 // few IOs in flight to the same Blob at the same time).
288 state_list_t writing; ///< writing buffers, sorted by seq, ascending
289
290 ~BufferSpace() {
291 ceph_assert(buffer_map.empty());
292 ceph_assert(writing.empty());
293 }
294
295 void _add_buffer(BufferCacheShard* cache, Buffer *b, int level, Buffer *near) {
296 cache->_audit("_add_buffer start");
297 buffer_map[b->offset].reset(b);
298 if (b->is_writing()) {
299 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
300 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
301 writing.push_back(*b);
302 } else {
303 auto it = writing.begin();
304 while (it->seq < b->seq) {
305 ++it;
306 }
307
308 ceph_assert(it->seq >= b->seq);
309 // note that this will insert b before it
310 // hence the order is maintained
311 writing.insert(it, *b);
312 }
313 } else {
314 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
315 cache->_add(b, level, near);
316 }
317 cache->_audit("_add_buffer end");
318 }
319 void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
320 _rm_buffer(cache, buffer_map.find(b->offset));
321 }
322 void _rm_buffer(BufferCacheShard* cache,
323 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
324 ceph_assert(p != buffer_map.end());
325 cache->_audit("_rm_buffer start");
326 if (p->second->is_writing()) {
327 writing.erase(writing.iterator_to(*p->second));
328 } else {
329 cache->_rm(p->second.get());
330 }
331 buffer_map.erase(p);
332 cache->_audit("_rm_buffer end");
333 }
334
335 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
336 uint32_t offset) {
337 auto i = buffer_map.lower_bound(offset);
338 if (i != buffer_map.begin()) {
339 --i;
340 if (i->first + i->second->length <= offset)
341 ++i;
342 }
343 return i;
344 }
345
346 // must be called under protection of the Cache lock
347 void _clear(BufferCacheShard* cache);
348
349 // return value is the highest cache_private of a trimmed buffer, or 0.
350 int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
351 std::lock_guard l(cache->lock);
352 int ret = _discard(cache, offset, length);
353 cache->_trim();
354 return ret;
355 }
356 int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
357
358 void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
359 unsigned flags) {
360 std::lock_guard l(cache->lock);
361 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
362 flags);
363 b->cache_private = _discard(cache, offset, bl.length());
364 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
365 cache->_trim();
366 }
367 void _finish_write(BufferCacheShard* cache, uint64_t seq);
368 void did_read(BufferCacheShard* cache, uint32_t offset, bufferlist& bl) {
369 std::lock_guard l(cache->lock);
370 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
371 b->cache_private = _discard(cache, offset, bl.length());
372 _add_buffer(cache, b, 1, nullptr);
373 cache->_trim();
374 }
375
376 void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
377 BlueStore::ready_regions_t& res,
378 interval_set<uint32_t>& res_intervals,
379 int flags = 0);
380
381 void truncate(BufferCacheShard* cache, uint32_t offset) {
382 discard(cache, offset, (uint32_t)-1 - offset);
383 }
384
385 void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
386
387 void dump(BufferCacheShard* cache, Formatter *f) const {
388 std::lock_guard l(cache->lock);
389 f->open_array_section("buffers");
390 for (auto& i : buffer_map) {
391 f->open_object_section("buffer");
392 ceph_assert(i.first == i.second->offset);
393 i.second->dump(f);
394 f->close_section();
395 }
396 f->close_section();
397 }
398 };
399
400 struct SharedBlobSet;
401
402 /// in-memory shared blob state (incl cached buffers)
403 struct SharedBlob {
404 MEMPOOL_CLASS_HELPERS();
405
406 std::atomic_int nref = {0}; ///< reference count
407 bool loaded = false;
408
409 CollectionRef coll;
410 union {
411 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
412 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
413 };
414 BufferSpace bc; ///< buffer cache
415
416 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
417 if (get_cache()) {
418 get_cache()->add_blob();
419 }
420 }
421 SharedBlob(uint64_t i, Collection *_coll);
422 ~SharedBlob();
423
424 uint64_t get_sbid() const {
425 return loaded ? persistent->sbid : sbid_unloaded;
426 }
427
428 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
429 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
430
431 void dump(Formatter* f) const;
432 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
433
434 void get() {
435 ++nref;
436 }
437 void put();
438
439 /// get logical references
440 void get_ref(uint64_t offset, uint32_t length);
441
442 /// put logical references, and get back any released extents
443 void put_ref(uint64_t offset, uint32_t length,
444 PExtentVector *r, bool *unshare);
445
446 void finish_write(uint64_t seq);
447
448 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
449 return l.get_sbid() == r.get_sbid();
450 }
451 inline BufferCacheShard* get_cache() {
452 return coll ? coll->cache : nullptr;
453 }
454 inline SharedBlobSet* get_parent() {
455 return coll ? &(coll->shared_blob_set) : nullptr;
456 }
457 inline bool is_loaded() const {
458 return loaded;
459 }
460
461 };
462 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
463
464 /// a lookup table of SharedBlobs
465 struct SharedBlobSet {
466 /// protect lookup, insertion, removal
467 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
468
469 // we use a bare pointer because we don't want to affect the ref
470 // count
471 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
472
473 SharedBlobRef lookup(uint64_t sbid) {
474 std::lock_guard l(lock);
475 auto p = sb_map.find(sbid);
476 if (p == sb_map.end() ||
477 p->second->nref == 0) {
478 return nullptr;
479 }
480 return p->second;
481 }
482
483 void add(Collection* coll, SharedBlob *sb) {
484 std::lock_guard l(lock);
485 sb_map[sb->get_sbid()] = sb;
486 sb->coll = coll;
487 }
488
489 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
490 std::lock_guard l(lock);
491 ceph_assert(sb->get_parent() == this);
492 if (verify_nref_is_zero && sb->nref != 0) {
493 return false;
494 }
495 // only remove if it still points to us
496 auto p = sb_map.find(sb->get_sbid());
497 if (p != sb_map.end() &&
498 p->second == sb) {
499 sb_map.erase(p);
500 }
501 return true;
502 }
503
504 bool empty() {
505 std::lock_guard l(lock);
506 return sb_map.empty();
507 }
508
509 template <int LogLevelV>
510 void dump(CephContext *cct);
511 };
512
513 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
514
515 /// in-memory blob metadata and associated cached buffers (if any)
516 struct Blob {
517 MEMPOOL_CLASS_HELPERS();
518
519 std::atomic_int nref = {0}; ///< reference count
520 int16_t id = -1; ///< id, for spanning blobs only, >= 0
521 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
522 SharedBlobRef shared_blob; ///< shared blob state (if any)
523
524 private:
525 mutable bluestore_blob_t blob; ///< decoded blob metadata
526 #ifdef CACHE_BLOB_BL
527 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
528 #endif
529 /// refs from this shard. ephemeral if id<0, persisted if spanning.
530 bluestore_blob_use_tracker_t used_in_blob;
531
532 public:
533
534 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
535 friend void intrusive_ptr_release(Blob *b) { b->put(); }
536
537 void dump(Formatter* f) const;
538 friend ostream& operator<<(ostream& out, const Blob &b);
539
540 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
541 return used_in_blob;
542 }
543 bool is_referenced() const {
544 return used_in_blob.is_not_empty();
545 }
546 uint32_t get_referenced_bytes() const {
547 return used_in_blob.get_referenced_bytes();
548 }
549
550 bool is_spanning() const {
551 return id >= 0;
552 }
553
554 bool can_split() const {
555 std::lock_guard l(shared_blob->get_cache()->lock);
556 // splitting a BufferSpace writing list is too hard; don't try.
557 return shared_blob->bc.writing.empty() &&
558 used_in_blob.can_split() &&
559 get_blob().can_split();
560 }
561
562 bool can_split_at(uint32_t blob_offset) const {
563 return used_in_blob.can_split_at(blob_offset) &&
564 get_blob().can_split_at(blob_offset);
565 }
566
567 bool can_reuse_blob(uint32_t min_alloc_size,
568 uint32_t target_blob_size,
569 uint32_t b_offset,
570 uint32_t *length0);
571
572 void dup(Blob& o) {
573 o.shared_blob = shared_blob;
574 o.blob = blob;
575 #ifdef CACHE_BLOB_BL
576 o.blob_bl = blob_bl;
577 #endif
578 }
579
580 inline const bluestore_blob_t& get_blob() const {
581 return blob;
582 }
583 inline bluestore_blob_t& dirty_blob() {
584 #ifdef CACHE_BLOB_BL
585 blob_bl.clear();
586 #endif
587 return blob;
588 }
589
590 /// discard buffers for unallocated regions
591 void discard_unallocated(Collection *coll);
592
593 /// get logical references
594 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
595 /// put logical references, and get back any released extents
596 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
597 PExtentVector *r);
598
599 /// split the blob
600 void split(Collection *coll, uint32_t blob_offset, Blob *o);
601
602 void get() {
603 ++nref;
604 }
605 void put() {
606 if (--nref == 0)
607 delete this;
608 }
609
610
611 #ifdef CACHE_BLOB_BL
612 void _encode() const {
613 if (blob_bl.length() == 0 ) {
614 encode(blob, blob_bl);
615 } else {
616 ceph_assert(blob_bl.length());
617 }
618 }
619 void bound_encode(
620 size_t& p,
621 bool include_ref_map) const {
622 _encode();
623 p += blob_bl.length();
624 if (include_ref_map) {
625 used_in_blob.bound_encode(p);
626 }
627 }
628 void encode(
629 bufferlist::contiguous_appender& p,
630 bool include_ref_map) const {
631 _encode();
632 p.append(blob_bl);
633 if (include_ref_map) {
634 used_in_blob.encode(p);
635 }
636 }
637 void decode(
638 Collection */*coll*/,
639 bufferptr::const_iterator& p,
640 bool include_ref_map) {
641 const char *start = p.get_pos();
642 denc(blob, p);
643 const char *end = p.get_pos();
644 blob_bl.clear();
645 blob_bl.append(start, end - start);
646 if (include_ref_map) {
647 used_in_blob.decode(p);
648 }
649 }
650 #else
651 void bound_encode(
652 size_t& p,
653 uint64_t struct_v,
654 uint64_t sbid,
655 bool include_ref_map) const {
656 denc(blob, p, struct_v);
657 if (blob.is_shared()) {
658 denc(sbid, p);
659 }
660 if (include_ref_map) {
661 used_in_blob.bound_encode(p);
662 }
663 }
664 void encode(
665 bufferlist::contiguous_appender& p,
666 uint64_t struct_v,
667 uint64_t sbid,
668 bool include_ref_map) const {
669 denc(blob, p, struct_v);
670 if (blob.is_shared()) {
671 denc(sbid, p);
672 }
673 if (include_ref_map) {
674 used_in_blob.encode(p);
675 }
676 }
677 void decode(
678 Collection *coll,
679 bufferptr::const_iterator& p,
680 uint64_t struct_v,
681 uint64_t* sbid,
682 bool include_ref_map);
683 #endif
684 };
685 typedef boost::intrusive_ptr<Blob> BlobRef;
686 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
687
688 /// a logical extent, pointing to (some portion of) a blob
689 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
690 struct Extent : public ExtentBase {
691 MEMPOOL_CLASS_HELPERS();
692
693 uint32_t logical_offset = 0; ///< logical offset
694 uint32_t blob_offset = 0; ///< blob offset
695 uint32_t length = 0; ///< length
696 BlobRef blob; ///< the blob with our data
697
698 /// ctor for lookup only
699 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
700 /// ctor for delayed initialization (see decode_some())
701 explicit Extent() : ExtentBase() {
702 }
703 /// ctor for general usage
704 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
705 : ExtentBase(),
706 logical_offset(lo), blob_offset(o), length(l) {
707 assign_blob(b);
708 }
709 ~Extent() {
710 if (blob) {
711 blob->shared_blob->get_cache()->rm_extent();
712 }
713 }
714
715 void dump(Formatter* f) const;
716
717 void assign_blob(const BlobRef& b) {
718 ceph_assert(!blob);
719 blob = b;
720 blob->shared_blob->get_cache()->add_extent();
721 }
722
723 // comparators for intrusive_set
724 friend bool operator<(const Extent &a, const Extent &b) {
725 return a.logical_offset < b.logical_offset;
726 }
727 friend bool operator>(const Extent &a, const Extent &b) {
728 return a.logical_offset > b.logical_offset;
729 }
730 friend bool operator==(const Extent &a, const Extent &b) {
731 return a.logical_offset == b.logical_offset;
732 }
733
734 uint32_t blob_start() const {
735 return logical_offset - blob_offset;
736 }
737
738 uint32_t blob_end() const {
739 return blob_start() + blob->get_blob().get_logical_length();
740 }
741
742 uint32_t logical_end() const {
743 return logical_offset + length;
744 }
745
746 // return true if any piece of the blob is out of
747 // the given range [o, o + l].
748 bool blob_escapes_range(uint32_t o, uint32_t l) const {
749 return blob_start() < o || blob_end() > o + l;
750 }
751 };
752 typedef boost::intrusive::set<Extent> extent_map_t;
753
754
755 friend ostream& operator<<(ostream& out, const Extent& e);
756
757 struct OldExtent {
758 boost::intrusive::list_member_hook<> old_extent_item;
759 Extent e;
760 PExtentVector r;
761 bool blob_empty; // flag to track the last removed extent that makes blob
762 // empty - required to update compression stat properly
763 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
764 : e(lo, o, l, b), blob_empty(false) {
765 }
766 static OldExtent* create(CollectionRef c,
767 uint32_t lo,
768 uint32_t o,
769 uint32_t l,
770 BlobRef& b);
771 };
772 typedef boost::intrusive::list<
773 OldExtent,
774 boost::intrusive::member_hook<
775 OldExtent,
776 boost::intrusive::list_member_hook<>,
777 &OldExtent::old_extent_item> > old_extent_map_t;
778
779 struct Onode;
780
781 /// a sharded extent map, mapping offsets to lextents to blobs
782 struct ExtentMap {
783 Onode *onode;
784 extent_map_t extent_map; ///< map of Extents to Blobs
785 blob_map_t spanning_blob_map; ///< blobs that span shards
786 typedef boost::intrusive_ptr<Onode> OnodeRef;
787
788 struct Shard {
789 bluestore_onode_t::shard_info *shard_info = nullptr;
790 unsigned extents = 0; ///< count extents in this shard
791 bool loaded = false; ///< true if shard is loaded
792 bool dirty = false; ///< true if shard is dirty and needs reencoding
793 };
794 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
795
796 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
797
798 uint32_t needs_reshard_begin = 0;
799 uint32_t needs_reshard_end = 0;
800
801 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
802 uint64_t&, uint64_t&, uint64_t&);
803
804 bool needs_reshard() const {
805 return needs_reshard_end > needs_reshard_begin;
806 }
807 void clear_needs_reshard() {
808 needs_reshard_begin = needs_reshard_end = 0;
809 }
810 void request_reshard(uint32_t begin, uint32_t end) {
811 if (begin < needs_reshard_begin) {
812 needs_reshard_begin = begin;
813 }
814 if (end > needs_reshard_end) {
815 needs_reshard_end = end;
816 }
817 }
818
819 struct DeleteDisposer {
820 void operator()(Extent *e) { delete e; }
821 };
822
823 ExtentMap(Onode *o);
824 ~ExtentMap() {
825 extent_map.clear_and_dispose(DeleteDisposer());
826 }
827
828 void clear() {
829 extent_map.clear_and_dispose(DeleteDisposer());
830 shards.clear();
831 inline_bl.clear();
832 clear_needs_reshard();
833 }
834
835 void dump(Formatter* f) const;
836
837 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
838 unsigned *pn);
839 unsigned decode_some(bufferlist& bl);
840
841 void bound_encode_spanning_blobs(size_t& p);
842 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
843 void decode_spanning_blobs(bufferptr::const_iterator& p);
844
845 BlobRef get_spanning_blob(int id) {
846 auto p = spanning_blob_map.find(id);
847 ceph_assert(p != spanning_blob_map.end());
848 return p->second;
849 }
850
851 void update(KeyValueDB::Transaction t, bool force);
852 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
853 void reshard(
854 KeyValueDB *db,
855 KeyValueDB::Transaction t);
856
857 /// initialize Shards from the onode
858 void init_shards(bool loaded, bool dirty);
859
860 /// return index of shard containing offset
861 /// or -1 if not found
862 int seek_shard(uint32_t offset) {
863 size_t end = shards.size();
864 size_t mid, left = 0;
865 size_t right = end; // one passed the right end
866
867 while (left < right) {
868 mid = left + (right - left) / 2;
869 if (offset >= shards[mid].shard_info->offset) {
870 size_t next = mid + 1;
871 if (next >= end || offset < shards[next].shard_info->offset)
872 return mid;
873 //continue to search forwards
874 left = next;
875 } else {
876 //continue to search backwards
877 right = mid;
878 }
879 }
880
881 return -1; // not found
882 }
883
884 /// check if a range spans a shard
885 bool spans_shard(uint32_t offset, uint32_t length) {
886 if (shards.empty()) {
887 return false;
888 }
889 int s = seek_shard(offset);
890 ceph_assert(s >= 0);
891 if (s == (int)shards.size() - 1) {
892 return false; // last shard
893 }
894 if (offset + length <= shards[s+1].shard_info->offset) {
895 return false;
896 }
897 return true;
898 }
899
900 /// ensure that a range of the map is loaded
901 void fault_range(KeyValueDB *db,
902 uint32_t offset, uint32_t length);
903
904 /// ensure a range of the map is marked dirty
905 void dirty_range(uint32_t offset, uint32_t length);
906
907 /// for seek_lextent test
908 extent_map_t::iterator find(uint64_t offset);
909
910 /// seek to the first lextent including or after offset
911 extent_map_t::iterator seek_lextent(uint64_t offset);
912 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
913
914 /// add a new Extent
915 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
916 extent_map.insert(*new Extent(lo, o, l, b));
917 }
918
919 /// remove (and delete) an Extent
920 void rm(extent_map_t::iterator p) {
921 extent_map.erase_and_dispose(p, DeleteDisposer());
922 }
923
924 bool has_any_lextents(uint64_t offset, uint64_t length);
925
926 /// consolidate adjacent lextents in extent_map
927 int compress_extent_map(uint64_t offset, uint64_t length);
928
929 /// punch a logical hole. add lextents to deref to target list.
930 void punch_hole(CollectionRef &c,
931 uint64_t offset, uint64_t length,
932 old_extent_map_t *old_extents);
933
934 /// put new lextent into lextent_map overwriting existing ones if
935 /// any and update references accordingly
936 Extent *set_lextent(CollectionRef &c,
937 uint64_t logical_offset,
938 uint64_t offset, uint64_t length,
939 BlobRef b,
940 old_extent_map_t *old_extents);
941
942 /// split a blob (and referring extents)
943 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
944 };
945
946 /// Compressed Blob Garbage collector
947 /*
948 The primary idea of the collector is to estimate a difference between
949 allocation units(AU) currently present for compressed blobs and new AUs
950 required to store that data uncompressed.
951 Estimation is performed for protrusive extents within a logical range
952 determined by a concatenation of old_extents collection and specific(current)
953 write request.
954 The root cause for old_extents use is the need to handle blob ref counts
955 properly. Old extents still hold blob refs and hence we need to traverse
956 the collection to determine if blob to be released.
957 Protrusive extents are extents that fit into the blob set in action
958 (ones that are below the logical range from above) but not removed totally
959 due to the current write.
960 E.g. for
961 extent1 <loffs = 100, boffs = 100, len = 100> ->
962 blob1<compressed, len_on_disk=4096, logical_len=8192>
963 extent2 <loffs = 200, boffs = 200, len = 100> ->
964 blob2<raw, len_on_disk=4096, llen=4096>
965 extent3 <loffs = 300, boffs = 300, len = 100> ->
966 blob1<compressed, len_on_disk=4096, llen=8192>
967 extent4 <loffs = 4096, boffs = 0, len = 100> ->
968 blob3<raw, len_on_disk=4096, llen=4096>
969 write(300~100)
970 protrusive extents are within the following ranges <0~300, 400~8192-400>
971 In this case existing AUs that might be removed due to GC (i.e. blob1)
972 use 2x4K bytes.
973 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
974 Hence we should do a collect.
975 */
976 class GarbageCollector
977 {
978 public:
979 /// return amount of allocation units that might be saved due to GC
980 int64_t estimate(
981 uint64_t offset,
982 uint64_t length,
983 const ExtentMap& extent_map,
984 const old_extent_map_t& old_extents,
985 uint64_t min_alloc_size);
986
987 /// return a collection of extents to perform GC on
988 const interval_set<uint64_t>& get_extents_to_collect() const {
989 return extents_to_collect;
990 }
991 GarbageCollector(CephContext* _cct) : cct(_cct) {}
992
993 private:
994 struct BlobInfo {
995 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
996 int64_t expected_allocations = 0; ///< new alloc units required
997 ///< in case of gc fulfilled
998 bool collect_candidate = false; ///< indicate if blob has any extents
999 ///< eligible for GC.
1000 extent_map_t::const_iterator first_lextent; ///< points to the first
1001 ///< lextent referring to
1002 ///< the blob if any.
1003 ///< collect_candidate flag
1004 ///< determines the validity
1005 extent_map_t::const_iterator last_lextent; ///< points to the last
1006 ///< lextent referring to
1007 ///< the blob if any.
1008
1009 BlobInfo(uint64_t ref_bytes) :
1010 referenced_bytes(ref_bytes) {
1011 }
1012 };
1013 CephContext* cct;
1014 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1015 ///< copies that are affected by the
1016 ///< specific write
1017
1018 ///< protrusive extents that should be collected if GC takes place
1019 interval_set<uint64_t> extents_to_collect;
1020
1021 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1022 ///< unit when traversing
1023 ///< protrusive extents.
1024 ///< Other extents mapped to
1025 ///< this AU to be ignored
1026 ///< (except the case where
1027 ///< uncompressed extent follows
1028 ///< compressed one - see below).
1029 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
1030 ///< caused expected_allocations
1031 ///< counter increment at this blob.
1032 ///< if uncompressed extent follows
1033 ///< a decrement for the
1034 ///< expected_allocations counter
1035 ///< is needed
1036 int64_t expected_allocations = 0; ///< new alloc units required in case
1037 ///< of gc fulfilled
1038 int64_t expected_for_release = 0; ///< alloc units currently used by
1039 ///< compressed blobs that might
1040 ///< gone after GC
1041
1042 protected:
1043 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1044 uint64_t start_offset,
1045 uint64_t end_offset,
1046 uint64_t start_touch_offset,
1047 uint64_t end_touch_offset,
1048 uint64_t min_alloc_size);
1049 };
1050
1051 struct OnodeSpace;
1052
1053 /// an in-memory object
1054 struct Onode {
1055 MEMPOOL_CLASS_HELPERS();
1056
1057 std::atomic_int nref; ///< reference count
1058 Collection *c;
1059
1060 ghobject_t oid;
1061
1062 /// key under PREFIX_OBJ where we are stored
1063 mempool::bluestore_cache_other::string key;
1064
1065 boost::intrusive::list_member_hook<> lru_item;
1066
1067 bluestore_onode_t onode; ///< metadata stored as value in kv store
1068 bool exists; ///< true if object logically exists
1069
1070 ExtentMap extent_map;
1071
1072 // track txc's that have not been committed to kv store (and whose
1073 // effects cannot be read via the kvdb read methods)
1074 std::atomic<int> flushing_count = {0};
1075 std::atomic<int> waiting_count = {0};
1076 /// protect flush_txns
1077 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1078 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1079
1080 Onode(Collection *c, const ghobject_t& o,
1081 const mempool::bluestore_cache_other::string& k)
1082 : nref(0),
1083 c(c),
1084 oid(o),
1085 key(k),
1086 exists(false),
1087 extent_map(this) {
1088 }
1089 Onode(Collection* c, const ghobject_t& o,
1090 const string& k)
1091 : nref(0),
1092 c(c),
1093 oid(o),
1094 key(k),
1095 exists(false),
1096 extent_map(this) {
1097 }
1098 Onode(Collection* c, const ghobject_t& o,
1099 const char* k)
1100 : nref(0),
1101 c(c),
1102 oid(o),
1103 key(k),
1104 exists(false),
1105 extent_map(this) {
1106 }
1107
1108 static Onode* decode(
1109 CollectionRef c,
1110 const ghobject_t& oid,
1111 const string& key,
1112 const bufferlist& v);
1113
1114 void dump(Formatter* f) const;
1115
1116 void flush();
1117 void get() {
1118 ++nref;
1119 }
1120 void put() {
1121 if (--nref == 0)
1122 delete this;
1123 }
1124
1125 const string& get_omap_prefix();
1126 void get_omap_header(string *out);
1127 void get_omap_key(const string& key, string *out);
1128 void rewrite_omap_key(const string& old, string *out);
1129 void get_omap_tail(string *out);
1130 void decode_omap_key(const string& key, string *user_key);
1131 };
1132 typedef boost::intrusive_ptr<Onode> OnodeRef;
1133
1134 /// A generic Cache Shard
1135 struct CacheShard {
1136 CephContext *cct;
1137 PerfCounters *logger;
1138
1139 /// protect lru and other structures
1140 ceph::recursive_mutex lock = {
1141 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1142
1143 std::atomic<uint64_t> max = {0};
1144 std::atomic<uint64_t> num = {0};
1145
1146 CacheShard(CephContext* cct) : cct(cct), logger(nullptr) {}
1147 virtual ~CacheShard() {}
1148
1149 void set_max(uint64_t max_) {
1150 max = max_;
1151 }
1152
1153 uint64_t _get_num() {
1154 return num;
1155 }
1156
1157 virtual void _trim_to(uint64_t max) = 0;
1158 void _trim() {
1159 if (cct->_conf->objectstore_blackhole) {
1160 // do not trim if we are throwing away IOs a layer down
1161 return;
1162 }
1163 _trim_to(max);
1164 }
1165 void trim() {
1166 std::lock_guard l(lock);
1167 _trim();
1168 }
1169 void flush() {
1170 std::lock_guard l(lock);
1171 // we should not be shutting down after the blackhole is enabled
1172 assert(!cct->_conf->objectstore_blackhole);
1173 _trim_to(0);
1174 }
1175
1176 #ifdef DEBUG_CACHE
1177 virtual void _audit(const char *s) = 0;
1178 #else
1179 void _audit(const char *s) { /* no-op */ }
1180 #endif
1181 };
1182
1183 /// A Generic onode Cache Shard
1184 struct OnodeCacheShard : public CacheShard {
1185 std::array<std::pair<ghobject_t, mono_clock::time_point>, 64> dumped_onodes;
1186 public:
1187 OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1188 static OnodeCacheShard *create(CephContext* cct, string type,
1189 PerfCounters *logger);
1190 virtual void _add(OnodeRef& o, int level) = 0;
1191 virtual void _rm(OnodeRef& o) = 0;
1192 virtual void _touch(OnodeRef& o) = 0;
1193 virtual void add_stats(uint64_t *onodes) = 0;
1194
1195 bool empty() {
1196 return _get_num() == 0;
1197 }
1198 };
1199
1200 /// A Generic buffer Cache Shard
1201 struct BufferCacheShard : public CacheShard {
1202 std::atomic<uint64_t> num_extents = {0};
1203 std::atomic<uint64_t> num_blobs = {0};
1204 uint64_t buffer_bytes = 0;
1205
1206 public:
1207 BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1208 static BufferCacheShard *create(CephContext* cct, string type,
1209 PerfCounters *logger);
1210 virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1211 virtual void _rm(Buffer *b) = 0;
1212 virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1213 virtual void _touch(Buffer *b) = 0;
1214 virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1215
1216 uint64_t _get_bytes() {
1217 return buffer_bytes;
1218 }
1219
1220 void add_extent() {
1221 ++num_extents;
1222 }
1223 void rm_extent() {
1224 --num_extents;
1225 }
1226
1227 void add_blob() {
1228 ++num_blobs;
1229 }
1230 void rm_blob() {
1231 --num_blobs;
1232 }
1233
1234 virtual void add_stats(uint64_t *extents,
1235 uint64_t *blobs,
1236 uint64_t *buffers,
1237 uint64_t *bytes) = 0;
1238
1239 bool empty() {
1240 std::lock_guard l(lock);
1241 return _get_bytes() == 0;
1242 }
1243 };
1244
1245 struct OnodeSpace {
1246 OnodeCacheShard *cache;
1247
1248 private:
1249 /// forward lookups
1250 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1251
1252 friend class Collection; // for split_cache()
1253
1254 public:
1255 OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1256 ~OnodeSpace() {
1257 clear();
1258 }
1259
1260 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1261 OnodeRef lookup(const ghobject_t& o);
1262 void remove(const ghobject_t& oid) {
1263 onode_map.erase(oid);
1264 }
1265 void rename(OnodeRef& o, const ghobject_t& old_oid,
1266 const ghobject_t& new_oid,
1267 const mempool::bluestore_cache_other::string& new_okey);
1268 void clear();
1269 bool empty();
1270
1271 template <int LogLevelV>
1272 void dump(CephContext *cct);
1273
1274 /// return true if f true for any item
1275 bool map_any(std::function<bool(OnodeRef)> f);
1276 };
1277
1278 class OpSequencer;
1279 using OpSequencerRef = ceph::ref_t<OpSequencer>;
1280
1281 struct Collection : public CollectionImpl {
1282 BlueStore *store;
1283 OpSequencerRef osr;
1284 BufferCacheShard *cache; ///< our cache shard
1285 bluestore_cnode_t cnode;
1286 ceph::shared_mutex lock =
1287 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1288
1289 bool exists;
1290
1291 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1292
1293 // cache onodes on a per-collection basis to avoid lock
1294 // contention.
1295 OnodeSpace onode_map;
1296
1297 //pool options
1298 pool_opts_t pool_opts;
1299 ContextQueue *commit_queue;
1300
1301 OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1302
1303 // the terminology is confusing here, sorry!
1304 //
1305 // blob_t shared_blob_t
1306 // !shared unused -> open
1307 // shared !loaded -> open + shared
1308 // shared loaded -> open + shared + loaded
1309 //
1310 // i.e.,
1311 // open = SharedBlob is instantiated
1312 // shared = blob_t shared flag is set; SharedBlob is hashed.
1313 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1314 void open_shared_blob(uint64_t sbid, BlobRef b);
1315 void load_shared_blob(SharedBlobRef sb);
1316 void make_blob_shared(uint64_t sbid, BlobRef b);
1317 uint64_t make_blob_unshared(SharedBlob *sb);
1318
1319 BlobRef new_blob() {
1320 BlobRef b = new Blob();
1321 b->shared_blob = new SharedBlob(this);
1322 return b;
1323 }
1324
1325 bool contains(const ghobject_t& oid) {
1326 if (cid.is_meta())
1327 return oid.hobj.pool == -1;
1328 spg_t spgid;
1329 if (cid.is_pg(&spgid))
1330 return
1331 spgid.pgid.contains(cnode.bits, oid) &&
1332 oid.shard_id == spgid.shard;
1333 return false;
1334 }
1335
1336 int64_t pool() const {
1337 return cid.pool();
1338 }
1339
1340 void split_cache(Collection *dest);
1341
1342 bool flush_commit(Context *c) override;
1343 void flush() override;
1344 void flush_all_but_last();
1345
1346 Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1347 };
1348
1349 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1350 CollectionRef c;
1351 OnodeRef o;
1352 KeyValueDB::Iterator it;
1353 string head, tail;
1354
1355 string _stringify() const;
1356
1357 public:
1358 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1359 int seek_to_first() override;
1360 int upper_bound(const string &after) override;
1361 int lower_bound(const string &to) override;
1362 bool valid() override;
1363 int next() override;
1364 string key() override;
1365 bufferlist value() override;
1366 std::string tail_key() {
1367 return tail;
1368 }
1369
1370 int status() override {
1371 return 0;
1372 }
1373 };
1374
1375 struct volatile_statfs{
1376 enum {
1377 STATFS_ALLOCATED = 0,
1378 STATFS_STORED,
1379 STATFS_COMPRESSED_ORIGINAL,
1380 STATFS_COMPRESSED,
1381 STATFS_COMPRESSED_ALLOCATED,
1382 STATFS_LAST
1383 };
1384 int64_t values[STATFS_LAST];
1385 volatile_statfs() {
1386 memset(this, 0, sizeof(volatile_statfs));
1387 }
1388 void reset() {
1389 *this = volatile_statfs();
1390 }
1391 void publish(store_statfs_t* buf) const {
1392 buf->allocated = allocated();
1393 buf->data_stored = stored();
1394 buf->data_compressed = compressed();
1395 buf->data_compressed_original = compressed_original();
1396 buf->data_compressed_allocated = compressed_allocated();
1397 }
1398
1399 volatile_statfs& operator+=(const volatile_statfs& other) {
1400 for (size_t i = 0; i < STATFS_LAST; ++i) {
1401 values[i] += other.values[i];
1402 }
1403 return *this;
1404 }
1405 int64_t& allocated() {
1406 return values[STATFS_ALLOCATED];
1407 }
1408 int64_t& stored() {
1409 return values[STATFS_STORED];
1410 }
1411 int64_t& compressed_original() {
1412 return values[STATFS_COMPRESSED_ORIGINAL];
1413 }
1414 int64_t& compressed() {
1415 return values[STATFS_COMPRESSED];
1416 }
1417 int64_t& compressed_allocated() {
1418 return values[STATFS_COMPRESSED_ALLOCATED];
1419 }
1420 int64_t allocated() const {
1421 return values[STATFS_ALLOCATED];
1422 }
1423 int64_t stored() const {
1424 return values[STATFS_STORED];
1425 }
1426 int64_t compressed_original() const {
1427 return values[STATFS_COMPRESSED_ORIGINAL];
1428 }
1429 int64_t compressed() const {
1430 return values[STATFS_COMPRESSED];
1431 }
1432 int64_t compressed_allocated() const {
1433 return values[STATFS_COMPRESSED_ALLOCATED];
1434 }
1435 volatile_statfs& operator=(const store_statfs_t& st) {
1436 values[STATFS_ALLOCATED] = st.allocated;
1437 values[STATFS_STORED] = st.data_stored;
1438 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1439 values[STATFS_COMPRESSED] = st.data_compressed;
1440 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1441 return *this;
1442 }
1443 bool is_empty() {
1444 return values[STATFS_ALLOCATED] == 0 &&
1445 values[STATFS_STORED] == 0 &&
1446 values[STATFS_COMPRESSED] == 0 &&
1447 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1448 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1449 }
1450 void decode(bufferlist::const_iterator& it) {
1451 using ceph::decode;
1452 for (size_t i = 0; i < STATFS_LAST; i++) {
1453 decode(values[i], it);
1454 }
1455 }
1456
1457 void encode(bufferlist& bl) {
1458 using ceph::encode;
1459 for (size_t i = 0; i < STATFS_LAST; i++) {
1460 encode(values[i], bl);
1461 }
1462 }
1463 };
1464
1465 struct TransContext final : public AioContext {
1466 MEMPOOL_CLASS_HELPERS();
1467
1468 typedef enum {
1469 STATE_PREPARE,
1470 STATE_AIO_WAIT,
1471 STATE_IO_DONE,
1472 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1473 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1474 STATE_KV_DONE,
1475 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1476 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1477 STATE_DEFERRED_DONE,
1478 STATE_FINISHING,
1479 STATE_DONE,
1480 } state_t;
1481
1482 state_t state = STATE_PREPARE;
1483
1484 const char *get_state_name() {
1485 switch (state) {
1486 case STATE_PREPARE: return "prepare";
1487 case STATE_AIO_WAIT: return "aio_wait";
1488 case STATE_IO_DONE: return "io_done";
1489 case STATE_KV_QUEUED: return "kv_queued";
1490 case STATE_KV_SUBMITTED: return "kv_submitted";
1491 case STATE_KV_DONE: return "kv_done";
1492 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1493 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1494 case STATE_DEFERRED_DONE: return "deferred_done";
1495 case STATE_FINISHING: return "finishing";
1496 case STATE_DONE: return "done";
1497 }
1498 return "???";
1499 }
1500
1501 #if defined(WITH_LTTNG)
1502 const char *get_state_latency_name(int state) {
1503 switch (state) {
1504 case l_bluestore_state_prepare_lat: return "prepare";
1505 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1506 case l_bluestore_state_io_done_lat: return "io_done";
1507 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1508 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1509 case l_bluestore_state_kv_done_lat: return "kv_done";
1510 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1511 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1512 case l_bluestore_state_finishing_lat: return "finishing";
1513 case l_bluestore_state_done_lat: return "done";
1514 }
1515 return "???";
1516 }
1517 #endif
1518
1519 CollectionRef ch;
1520 OpSequencerRef osr; // this should be ch->osr
1521 boost::intrusive::list_member_hook<> sequencer_item;
1522
1523 uint64_t bytes = 0, ios = 0, cost = 0;
1524
1525 set<OnodeRef> onodes; ///< these need to be updated/written
1526 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1527 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1528 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1529
1530 KeyValueDB::Transaction t; ///< then we will commit this
1531 list<Context*> oncommits; ///< more commit completions
1532 list<CollectionRef> removed_collections; ///< colls we removed
1533
1534 boost::intrusive::list_member_hook<> deferred_queue_item;
1535 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1536
1537 interval_set<uint64_t> allocated, released;
1538 volatile_statfs statfs_delta; ///< overall store statistics delta
1539 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1540
1541 IOContext ioc;
1542 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1543
1544 uint64_t seq = 0;
1545 utime_t start;
1546 utime_t last_stamp;
1547
1548 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1549 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1550
1551 #if defined(WITH_LTTNG)
1552 bool tracing = false;
1553 #endif
1554
1555 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1556 list<Context*> *on_commits)
1557 : ch(c),
1558 osr(o),
1559 ioc(cct, this),
1560 start(ceph_clock_now()) {
1561 last_stamp = start;
1562 if (on_commits) {
1563 oncommits.swap(*on_commits);
1564 }
1565 }
1566 ~TransContext() {
1567 delete deferred_txn;
1568 }
1569
1570 void write_onode(OnodeRef &o) {
1571 onodes.insert(o);
1572 }
1573 void write_shared_blob(SharedBlobRef &sb) {
1574 shared_blobs.insert(sb);
1575 }
1576 void unshare_blob(SharedBlob *sb) {
1577 shared_blobs.erase(sb);
1578 }
1579
1580 /// note we logically modified object (when onode itself is unmodified)
1581 void note_modified_object(OnodeRef &o) {
1582 // onode itself isn't written, though
1583 modified_objects.insert(o);
1584 }
1585 void note_removed_object(OnodeRef& o) {
1586 onodes.erase(o);
1587 modified_objects.insert(o);
1588 }
1589
1590 void aio_finish(BlueStore *store) override {
1591 store->txc_aio_finish(this);
1592 }
1593 };
1594
1595 class BlueStoreThrottle {
1596 #if defined(WITH_LTTNG)
1597 const std::chrono::time_point<mono_clock> time_base = mono_clock::now();
1598
1599 // Time of last chosen io (microseconds)
1600 std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1601 std::atomic<uint64_t> ios_started_since_last_traced = {0};
1602 std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1603
1604 std::atomic_uint pending_kv_ios = {0};
1605 std::atomic_uint pending_deferred_ios = {0};
1606
1607 // Min period between trace points (microseconds)
1608 std::atomic<uint64_t> trace_period_mcs = {0};
1609
1610 bool should_trace(
1611 uint64_t *started,
1612 uint64_t *completed) {
1613 uint64_t min_period_mcs = trace_period_mcs.load(
1614 std::memory_order_relaxed);
1615
1616 if (min_period_mcs == 0) {
1617 *started = 1;
1618 *completed = ios_completed_since_last_traced.exchange(0);
1619 return true;
1620 } else {
1621 ios_started_since_last_traced++;
1622 auto now_mcs = ceph::to_microseconds<uint64_t>(
1623 mono_clock::now() - time_base);
1624 uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1625 uint64_t period_mcs = now_mcs - previous_mcs;
1626 if (period_mcs > min_period_mcs) {
1627 if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1628 previous_mcs, now_mcs)) {
1629 // This would be racy at a sufficiently extreme trace rate, but isn't
1630 // worth the overhead of doing it more carefully.
1631 *started = ios_started_since_last_traced.exchange(0);
1632 *completed = ios_completed_since_last_traced.exchange(0);
1633 return true;
1634 }
1635 }
1636 return false;
1637 }
1638 }
1639 #endif
1640
1641 #if defined(WITH_LTTNG)
1642 void emit_initial_tracepoint(
1643 KeyValueDB &db,
1644 TransContext &txc,
1645 mono_clock::time_point);
1646 #else
1647 void emit_initial_tracepoint(
1648 KeyValueDB &db,
1649 TransContext &txc,
1650 mono_clock::time_point) {}
1651 #endif
1652
1653 Throttle throttle_bytes; ///< submit to commit
1654 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1655
1656 public:
1657 BlueStoreThrottle(CephContext *cct) :
1658 throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1659 throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1660 {
1661 reset_throttle(cct->_conf);
1662 }
1663
1664 #if defined(WITH_LTTNG)
1665 void complete_kv(TransContext &txc);
1666 void complete(TransContext &txc);
1667 #else
1668 void complete_kv(TransContext &txc) {}
1669 void complete(TransContext &txc) {}
1670 #endif
1671
1672 utime_t log_state_latency(
1673 TransContext &txc, PerfCounters *logger, int state);
1674 bool try_start_transaction(
1675 KeyValueDB &db,
1676 TransContext &txc,
1677 mono_clock::time_point);
1678 void finish_start_transaction(
1679 KeyValueDB &db,
1680 TransContext &txc,
1681 mono_clock::time_point);
1682 void release_kv_throttle(uint64_t cost) {
1683 throttle_bytes.put(cost);
1684 }
1685 void release_deferred_throttle(uint64_t cost) {
1686 throttle_deferred_bytes.put(cost);
1687 }
1688 bool should_submit_deferred() {
1689 return throttle_deferred_bytes.past_midpoint();
1690 }
1691 void reset_throttle(const ConfigProxy &conf) {
1692 throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1693 throttle_deferred_bytes.reset_max(
1694 conf->bluestore_throttle_bytes +
1695 conf->bluestore_throttle_deferred_bytes);
1696 #if defined(WITH_LTTNG)
1697 double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1698 trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1699 #endif
1700 }
1701 } throttle;
1702
1703 typedef boost::intrusive::list<
1704 TransContext,
1705 boost::intrusive::member_hook<
1706 TransContext,
1707 boost::intrusive::list_member_hook<>,
1708 &TransContext::deferred_queue_item> > deferred_queue_t;
1709
1710 struct DeferredBatch final : public AioContext {
1711 OpSequencer *osr;
1712 struct deferred_io {
1713 bufferlist bl; ///< data
1714 uint64_t seq; ///< deferred transaction seq
1715 };
1716 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1717 deferred_queue_t txcs; ///< txcs in this batch
1718 IOContext ioc; ///< our aios
1719 /// bytes of pending io for each deferred seq (may be 0)
1720 map<uint64_t,int> seq_bytes;
1721
1722 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1723 void _audit(CephContext *cct);
1724
1725 DeferredBatch(CephContext *cct, OpSequencer *osr)
1726 : osr(osr), ioc(cct, this) {}
1727
1728 /// prepare a write
1729 void prepare_write(CephContext *cct,
1730 uint64_t seq, uint64_t offset, uint64_t length,
1731 bufferlist::const_iterator& p);
1732
1733 void aio_finish(BlueStore *store) override {
1734 store->_deferred_aio_finish(osr);
1735 }
1736 };
1737
1738 class OpSequencer : public RefCountedObject {
1739 public:
1740 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1741 ceph::condition_variable qcond;
1742 typedef boost::intrusive::list<
1743 TransContext,
1744 boost::intrusive::member_hook<
1745 TransContext,
1746 boost::intrusive::list_member_hook<>,
1747 &TransContext::sequencer_item> > q_list_t;
1748 q_list_t q; ///< transactions
1749
1750 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1751
1752 DeferredBatch *deferred_running = nullptr;
1753 DeferredBatch *deferred_pending = nullptr;
1754
1755 BlueStore *store;
1756 coll_t cid;
1757
1758 uint64_t last_seq = 0;
1759
1760 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1761
1762 std::atomic_int kv_committing_serially = {0};
1763
1764 std::atomic_int kv_submitted_waiters = {0};
1765
1766 std::atomic_int kv_drain_preceding_waiters = {0};
1767
1768 std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away)
1769
1770 const uint32_t sequencer_id;
1771
1772 uint32_t get_sequencer_id() const {
1773 return sequencer_id;
1774 }
1775
1776 void queue_new(TransContext *txc) {
1777 std::lock_guard l(qlock);
1778 txc->seq = ++last_seq;
1779 q.push_back(*txc);
1780 }
1781
1782 void drain() {
1783 std::unique_lock l(qlock);
1784 while (!q.empty())
1785 qcond.wait(l);
1786 }
1787
1788 void drain_preceding(TransContext *txc) {
1789 std::unique_lock l(qlock);
1790 while (&q.front() != txc)
1791 qcond.wait(l);
1792 }
1793
1794 bool _is_all_kv_submitted() {
1795 // caller must hold qlock & q.empty() must not empty
1796 ceph_assert(!q.empty());
1797 TransContext *txc = &q.back();
1798 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1799 return true;
1800 }
1801 return false;
1802 }
1803
1804 void flush() {
1805 std::unique_lock l(qlock);
1806 while (true) {
1807 // set flag before the check because the condition
1808 // may become true outside qlock, and we need to make
1809 // sure those threads see waiters and signal qcond.
1810 ++kv_submitted_waiters;
1811 if (q.empty() || _is_all_kv_submitted()) {
1812 --kv_submitted_waiters;
1813 return;
1814 }
1815 qcond.wait(l);
1816 --kv_submitted_waiters;
1817 }
1818 }
1819
1820 void flush_all_but_last() {
1821 std::unique_lock l(qlock);
1822 assert (q.size() >= 1);
1823 while (true) {
1824 // set flag before the check because the condition
1825 // may become true outside qlock, and we need to make
1826 // sure those threads see waiters and signal qcond.
1827 ++kv_submitted_waiters;
1828 if (q.size() <= 1) {
1829 --kv_submitted_waiters;
1830 return;
1831 } else {
1832 auto it = q.rbegin();
1833 it++;
1834 if (it->state >= TransContext::STATE_KV_SUBMITTED) {
1835 --kv_submitted_waiters;
1836 return;
1837 }
1838 }
1839 qcond.wait(l);
1840 --kv_submitted_waiters;
1841 }
1842 }
1843
1844 bool flush_commit(Context *c) {
1845 std::lock_guard l(qlock);
1846 if (q.empty()) {
1847 return true;
1848 }
1849 TransContext *txc = &q.back();
1850 if (txc->state >= TransContext::STATE_KV_DONE) {
1851 return true;
1852 }
1853 txc->oncommits.push_back(c);
1854 return false;
1855 }
1856 private:
1857 FRIEND_MAKE_REF(OpSequencer);
1858 OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
1859 : RefCountedObject(store->cct),
1860 store(store), cid(c), sequencer_id(sequencer_id) {
1861 }
1862 ~OpSequencer() {
1863 ceph_assert(q.empty());
1864 }
1865 };
1866
1867 typedef boost::intrusive::list<
1868 OpSequencer,
1869 boost::intrusive::member_hook<
1870 OpSequencer,
1871 boost::intrusive::list_member_hook<>,
1872 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1873
1874 struct KVSyncThread : public Thread {
1875 BlueStore *store;
1876 explicit KVSyncThread(BlueStore *s) : store(s) {}
1877 void *entry() override {
1878 store->_kv_sync_thread();
1879 return NULL;
1880 }
1881 };
1882 struct KVFinalizeThread : public Thread {
1883 BlueStore *store;
1884 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1885 void *entry() {
1886 store->_kv_finalize_thread();
1887 return NULL;
1888 }
1889 };
1890
1891 struct DBHistogram {
1892 struct value_dist {
1893 uint64_t count;
1894 uint32_t max_len;
1895 };
1896
1897 struct key_dist {
1898 uint64_t count;
1899 uint32_t max_len;
1900 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1901 };
1902
1903 map<string, map<int, struct key_dist> > key_hist;
1904 map<int, uint64_t> value_hist;
1905 int get_key_slab(size_t sz);
1906 string get_key_slab_to_range(int slab);
1907 int get_value_slab(size_t sz);
1908 string get_value_slab_to_range(int slab);
1909 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1910 const string &prefix, size_t key_size, size_t value_size);
1911 void dump(Formatter *f);
1912 };
1913
1914 // --------------------------------------------------------
1915 // members
1916 private:
1917 BlueFS *bluefs = nullptr;
1918 bluefs_layout_t bluefs_layout;
1919 mono_time bluefs_last_balance;
1920 utime_t next_dump_on_bluefs_alloc_failure;
1921
1922 KeyValueDB *db = nullptr;
1923 BlockDevice *bdev = nullptr;
1924 std::string freelist_type;
1925 FreelistManager *fm = nullptr;
1926 Allocator *alloc = nullptr;
1927 uuid_d fsid;
1928 int path_fd = -1; ///< open handle to $path
1929 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1930 bool mounted = false;
1931
1932 ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
1933 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1934 bool collections_had_errors = false;
1935 map<coll_t,CollectionRef> new_coll_map;
1936
1937 vector<OnodeCacheShard*> onode_cache_shards;
1938 vector<BufferCacheShard*> buffer_cache_shards;
1939
1940 /// protect zombie_osr_set
1941 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
1942 uint32_t next_sequencer_id = 0;
1943 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
1944
1945 std::atomic<uint64_t> nid_last = {0};
1946 std::atomic<uint64_t> nid_max = {0};
1947 std::atomic<uint64_t> blobid_last = {0};
1948 std::atomic<uint64_t> blobid_max = {0};
1949
1950 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1951 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1952
1953 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
1954 std::atomic<uint64_t> deferred_seq = {0};
1955 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1956 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1957 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1958 Finisher finisher;
1959 utime_t deferred_last_submitted = utime_t();
1960
1961 KVSyncThread kv_sync_thread;
1962 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
1963 ceph::condition_variable kv_cond;
1964 bool _kv_only = false;
1965 bool kv_sync_started = false;
1966 bool kv_stop = false;
1967 bool kv_finalize_started = false;
1968 bool kv_finalize_stop = false;
1969 deque<TransContext*> kv_queue; ///< ready, already submitted
1970 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1971 deque<TransContext*> kv_committing; ///< currently syncing
1972 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1973 bool kv_sync_in_progress = false;
1974
1975 KVFinalizeThread kv_finalize_thread;
1976 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
1977 ceph::condition_variable kv_finalize_cond;
1978 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1979 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1980 bool kv_finalize_in_progress = false;
1981
1982 PerfCounters *logger = nullptr;
1983
1984 list<CollectionRef> removed_collections;
1985
1986 ceph::shared_mutex debug_read_error_lock =
1987 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
1988 set<ghobject_t> debug_data_error_objects;
1989 set<ghobject_t> debug_mdata_error_objects;
1990
1991 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1992
1993 uint64_t block_size = 0; ///< block size of block device (power of 2)
1994 uint64_t block_mask = 0; ///< mask to get just the block offset
1995 size_t block_size_order = 0; ///< bits to shift to get block size
1996
1997 uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
1998 ///< bits for min_alloc_size
1999 uint8_t min_alloc_size_order = 0;
2000 static_assert(std::numeric_limits<uint8_t>::max() >
2001 std::numeric_limits<decltype(min_alloc_size)>::digits,
2002 "not enough bits for min_alloc_size");
2003
2004 bool per_pool_omap = false;
2005
2006 ///< maximum allocation unit (power of 2)
2007 std::atomic<uint64_t> max_alloc_size = {0};
2008
2009 ///< number threshold for forced deferred writes
2010 std::atomic<int> deferred_batch_ops = {0};
2011
2012 ///< size threshold for forced deferred writes
2013 std::atomic<uint64_t> prefer_deferred_size = {0};
2014
2015 ///< approx cost per io, in bytes
2016 std::atomic<uint64_t> throttle_cost_per_io = {0};
2017
2018 std::atomic<Compressor::CompressionMode> comp_mode =
2019 {Compressor::COMP_NONE}; ///< compression mode
2020 CompressorRef compressor;
2021 std::atomic<uint64_t> comp_min_blob_size = {0};
2022 std::atomic<uint64_t> comp_max_blob_size = {0};
2023
2024 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
2025
2026 uint64_t kv_ios = 0;
2027 uint64_t kv_throttle_costs = 0;
2028
2029 // cache trim control
2030 uint64_t cache_size = 0; ///< total cache size
2031 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2032 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2033 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2034 bool cache_autotune = false; ///< cache autotune setting
2035 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2036 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2037 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2038 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2039 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2040 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2041 double max_defer_interval = 0; ///< Time to wait between last deferred submit
2042 std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2043
2044 typedef map<uint64_t, volatile_statfs> osd_pools_map;
2045
2046 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2047 volatile_statfs vstatfs;
2048 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2049
2050 bool per_pool_stat_collection = true;
2051
2052 struct MempoolThread : public Thread {
2053 public:
2054 BlueStore *store;
2055
2056 ceph::condition_variable cond;
2057 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2058 bool stop = false;
2059 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2060 std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2061
2062 struct MempoolCache : public PriorityCache::PriCache {
2063 BlueStore *store;
2064 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2065 int64_t committed_bytes = 0;
2066 double cache_ratio = 0;
2067
2068 MempoolCache(BlueStore *s) : store(s) {};
2069
2070 virtual uint64_t _get_used_bytes() const = 0;
2071
2072 virtual int64_t request_cache_bytes(
2073 PriorityCache::Priority pri, uint64_t total_cache) const {
2074 int64_t assigned = get_cache_bytes(pri);
2075
2076 switch (pri) {
2077 // All cache items are currently shoved into the PRI1 priority
2078 case PriorityCache::Priority::PRI1:
2079 {
2080 int64_t request = _get_used_bytes();
2081 return(request > assigned) ? request - assigned : 0;
2082 }
2083 default:
2084 break;
2085 }
2086 return -EOPNOTSUPP;
2087 }
2088
2089 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2090 return cache_bytes[pri];
2091 }
2092 virtual int64_t get_cache_bytes() const {
2093 int64_t total = 0;
2094
2095 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2096 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2097 total += get_cache_bytes(pri);
2098 }
2099 return total;
2100 }
2101 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2102 cache_bytes[pri] = bytes;
2103 }
2104 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2105 cache_bytes[pri] += bytes;
2106 }
2107 virtual int64_t commit_cache_size(uint64_t total_cache) {
2108 committed_bytes = PriorityCache::get_chunk(
2109 get_cache_bytes(), total_cache);
2110 return committed_bytes;
2111 }
2112 virtual int64_t get_committed_size() const {
2113 return committed_bytes;
2114 }
2115 virtual double get_cache_ratio() const {
2116 return cache_ratio;
2117 }
2118 virtual void set_cache_ratio(double ratio) {
2119 cache_ratio = ratio;
2120 }
2121 virtual string get_cache_name() const = 0;
2122 };
2123
2124 struct MetaCache : public MempoolCache {
2125 MetaCache(BlueStore *s) : MempoolCache(s) {};
2126
2127 virtual uint64_t _get_used_bytes() const {
2128 return mempool::bluestore_cache_other::allocated_bytes() +
2129 mempool::bluestore_cache_onode::allocated_bytes();
2130 }
2131
2132 virtual string get_cache_name() const {
2133 return "BlueStore Meta Cache";
2134 }
2135
2136 uint64_t _get_num_onodes() const {
2137 uint64_t onode_num =
2138 mempool::bluestore_cache_onode::allocated_items();
2139 return (2 > onode_num) ? 2 : onode_num;
2140 }
2141
2142 double get_bytes_per_onode() const {
2143 return (double)_get_used_bytes() / (double)_get_num_onodes();
2144 }
2145 };
2146 std::shared_ptr<MetaCache> meta_cache;
2147
2148 struct DataCache : public MempoolCache {
2149 DataCache(BlueStore *s) : MempoolCache(s) {};
2150
2151 virtual uint64_t _get_used_bytes() const {
2152 uint64_t bytes = 0;
2153 for (auto i : store->buffer_cache_shards) {
2154 bytes += i->_get_bytes();
2155 }
2156 return bytes;
2157 }
2158 virtual string get_cache_name() const {
2159 return "BlueStore Data Cache";
2160 }
2161 };
2162 std::shared_ptr<DataCache> data_cache;
2163
2164 public:
2165 explicit MempoolThread(BlueStore *s)
2166 : store(s),
2167 meta_cache(new MetaCache(s)),
2168 data_cache(new DataCache(s)) {}
2169
2170 void *entry() override;
2171 void init() {
2172 ceph_assert(stop == false);
2173 create("bstore_mempool");
2174 }
2175 void shutdown() {
2176 lock.lock();
2177 stop = true;
2178 cond.notify_all();
2179 lock.unlock();
2180 join();
2181 }
2182
2183 private:
2184 void _adjust_cache_settings();
2185 void _update_cache_settings();
2186 void _resize_shards(bool interval_stats);
2187 } mempool_thread;
2188
2189 // --------------------------------------------------------
2190 // private methods
2191
2192 void _init_logger();
2193 void _shutdown_logger();
2194 int _reload_logger();
2195
2196 int _open_path();
2197 void _close_path();
2198 int _open_fsid(bool create);
2199 int _lock_fsid();
2200 int _read_fsid(uuid_d *f);
2201 int _write_fsid();
2202 void _close_fsid();
2203 void _set_alloc_sizes();
2204 void _set_blob_size();
2205 void _set_finisher_num();
2206 void _update_osd_memory_options();
2207
2208 int _open_bdev(bool create);
2209 // Verifies if disk space is enough for reserved + min bluefs
2210 // and alters the latter if needed.
2211 // Depends on min_alloc_size hence should be called after
2212 // its initialization (and outside of _open_bdev)
2213 void _validate_bdev();
2214 void _close_bdev();
2215
2216 int _minimal_open_bluefs(bool create);
2217 void _minimal_close_bluefs();
2218 int _open_bluefs(bool create);
2219 void _close_bluefs();
2220
2221 // Limited (u)mount intended for BlueFS operations only
2222 int _mount_for_bluefs();
2223 void _umount_for_bluefs();
2224
2225
2226 int _is_bluefs(bool create, bool* ret);
2227 /*
2228 * opens both DB and dependant super_meta, FreelistManager and allocator
2229 * in the proper order
2230 */
2231 int _open_db_and_around(bool read_only);
2232 void _close_db_and_around();
2233
2234 // updates legacy bluefs related recs in DB to a state valid for
2235 // downgrades from nautilus.
2236 void _sync_bluefs_and_fm();
2237
2238 /*
2239 * @warning to_repair_db means that we open this db to repair it, will not
2240 * hold the rocksdb's file lock.
2241 */
2242 int _open_db(bool create,
2243 bool to_repair_db=false,
2244 bool read_only = false);
2245 void _close_db();
2246 int _open_fm(KeyValueDB::Transaction t);
2247 void _close_fm();
2248 int _open_alloc();
2249 void _close_alloc();
2250 int _open_collections();
2251 void _fsck_collections(int64_t* errors);
2252 void _close_collections();
2253
2254 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
2255 bool create);
2256
2257 public:
2258 utime_t get_deferred_last_submitted() {
2259 std::lock_guard l(deferred_lock);
2260 return deferred_last_submitted;
2261 }
2262
2263 static int _write_bdev_label(CephContext* cct,
2264 string path, bluestore_bdev_label_t label);
2265 static int _read_bdev_label(CephContext* cct, string path,
2266 bluestore_bdev_label_t *label);
2267 private:
2268 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
2269 bool create);
2270
2271 int _open_super_meta();
2272
2273 void _open_statfs();
2274 void _get_statfs_overall(struct store_statfs_t *buf);
2275
2276 void _dump_alloc_on_failure();
2277
2278 int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
2279 int _balance_bluefs_freespace();
2280
2281 CollectionRef _get_collection(const coll_t& cid);
2282 void _queue_reap_collection(CollectionRef& c);
2283 void _reap_collections();
2284 void _update_cache_logger();
2285
2286 void _assign_nid(TransContext *txc, OnodeRef o);
2287 uint64_t _assign_blobid(TransContext *txc);
2288
2289 template <int LogLevelV>
2290 friend void _dump_onode(CephContext *cct, const Onode& o);
2291 template <int LogLevelV>
2292 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2293 template <int LogLevelV>
2294 friend void _dump_transaction(CephContext *cct, Transaction *t);
2295
2296 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2297 list<Context*> *on_commits);
2298 void _txc_update_store_statfs(TransContext *txc);
2299 void _txc_add_transaction(TransContext *txc, Transaction *t);
2300 void _txc_calc_cost(TransContext *txc);
2301 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2302 void _txc_state_proc(TransContext *txc);
2303 void _txc_aio_submit(TransContext *txc);
2304 public:
2305 void txc_aio_finish(void *p) {
2306 _txc_state_proc(static_cast<TransContext*>(p));
2307 }
2308 private:
2309 void _txc_finish_io(TransContext *txc);
2310 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2311 void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2312 void _txc_committed_kv(TransContext *txc);
2313 void _txc_finish(TransContext *txc);
2314 void _txc_release_alloc(TransContext *txc);
2315
2316 void _osr_attach(Collection *c);
2317 void _osr_register_zombie(OpSequencer *osr);
2318 void _osr_drain(OpSequencer *osr);
2319 void _osr_drain_preceding(TransContext *txc);
2320 void _osr_drain_all();
2321
2322 void _kv_start();
2323 void _kv_stop();
2324 void _kv_sync_thread();
2325 void _kv_finalize_thread();
2326
2327 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc);
2328 void _deferred_queue(TransContext *txc);
2329 public:
2330 void deferred_try_submit();
2331 private:
2332 void _deferred_submit_unlock(OpSequencer *osr);
2333 void _deferred_aio_finish(OpSequencer *osr);
2334 int _deferred_replay();
2335
2336 public:
2337 using mempool_dynamic_bitset =
2338 boost::dynamic_bitset<uint64_t,
2339 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2340 using per_pool_statfs =
2341 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2342
2343 enum FSCKDepth {
2344 FSCK_REGULAR,
2345 FSCK_DEEP,
2346 FSCK_SHALLOW
2347 };
2348
2349 private:
2350 int _fsck_check_extents(
2351 const coll_t& cid,
2352 const ghobject_t& oid,
2353 const PExtentVector& extents,
2354 bool compressed,
2355 mempool_dynamic_bitset &used_blocks,
2356 uint64_t granularity,
2357 BlueStoreRepairer* repairer,
2358 store_statfs_t& expected_statfs,
2359 FSCKDepth depth);
2360
2361 void _fsck_check_pool_statfs(
2362 per_pool_statfs& expected_pool_statfs,
2363 int64_t& errors,
2364 int64_t &warnings,
2365 BlueStoreRepairer* repairer);
2366
2367 int _fsck(FSCKDepth depth, bool repair);
2368 int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2369
2370 void _buffer_cache_write(
2371 TransContext *txc,
2372 BlobRef b,
2373 uint64_t offset,
2374 bufferlist& bl,
2375 unsigned flags) {
2376 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2377 flags);
2378 txc->shared_blobs_written.insert(b->shared_blob);
2379 }
2380
2381 int _collection_list(
2382 Collection *c, const ghobject_t& start, const ghobject_t& end,
2383 int max, vector<ghobject_t> *ls, ghobject_t *next);
2384
2385 template <typename T, typename F>
2386 T select_option(const std::string& opt_name, T val1, F f) {
2387 //NB: opt_name reserved for future use
2388 boost::optional<T> val2 = f();
2389 if (val2) {
2390 return *val2;
2391 }
2392 return val1;
2393 }
2394
2395 void _apply_padding(uint64_t head_pad,
2396 uint64_t tail_pad,
2397 bufferlist& padded);
2398
2399 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2400
2401 // -- ondisk version ---
2402 public:
2403 const int32_t latest_ondisk_format = 3; ///< our version
2404 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2405 const int32_t min_compat_ondisk_format = 3; ///< who can read us
2406
2407 private:
2408 int32_t ondisk_format = 0; ///< value detected on mount
2409
2410 int _upgrade_super(); ///< upgrade (called during open_super)
2411 uint64_t _get_ondisk_reserved() const;
2412 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2413
2414 // --- public interface ---
2415 public:
2416 BlueStore(CephContext *cct, const string& path);
2417 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2418 ~BlueStore() override;
2419
2420 string get_type() override {
2421 return "bluestore";
2422 }
2423
2424 bool needs_journal() override { return false; };
2425 bool wants_journal() override { return false; };
2426 bool allows_journal() override { return false; };
2427
2428 uint64_t get_min_alloc_size() const override {
2429 return min_alloc_size;
2430 }
2431
2432 int get_devices(set<string> *ls) override;
2433
2434 bool is_rotational() override;
2435 bool is_journal_rotational() override;
2436
2437 string get_default_device_class() override {
2438 string device_class;
2439 map<string, string> metadata;
2440 collect_metadata(&metadata);
2441 auto it = metadata.find("bluestore_bdev_type");
2442 if (it != metadata.end()) {
2443 device_class = it->second;
2444 }
2445 return device_class;
2446 }
2447
2448 int get_numa_node(
2449 int *numa_node,
2450 set<int> *nodes,
2451 set<string> *failed) override;
2452
2453 static int get_block_device_fsid(CephContext* cct, const string& path,
2454 uuid_d *fsid);
2455
2456 bool test_mount_in_use() override;
2457
2458 private:
2459 int _mount(bool kv_only, bool open_db=true);
2460 public:
2461 int mount() override {
2462 return _mount(false);
2463 }
2464 int umount() override;
2465
2466 int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
2467 int r = _mount(true, open_db);
2468 if (r < 0)
2469 return r;
2470 *pdb = db;
2471 return 0;
2472 }
2473
2474 int write_meta(const std::string& key, const std::string& value) override;
2475 int read_meta(const std::string& key, std::string *value) override;
2476
2477 int cold_open();
2478 int cold_close();
2479
2480 int fsck(bool deep) override {
2481 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2482 }
2483 int repair(bool deep) override {
2484 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2485 }
2486 int quick_fix() override {
2487 return _fsck(FSCK_SHALLOW, true);
2488 }
2489
2490 void set_cache_shards(unsigned num) override;
2491 void dump_cache_stats(Formatter *f) override {
2492 int onode_count = 0, buffers_bytes = 0;
2493 for (auto i: onode_cache_shards) {
2494 onode_count += i->_get_num();
2495 }
2496 for (auto i: buffer_cache_shards) {
2497 buffers_bytes += i->_get_bytes();
2498 }
2499 f->dump_int("bluestore_onode", onode_count);
2500 f->dump_int("bluestore_buffers", buffers_bytes);
2501 }
2502 void dump_cache_stats(ostream& ss) override {
2503 int onode_count = 0, buffers_bytes = 0;
2504 for (auto i: onode_cache_shards) {
2505 onode_count += i->_get_num();
2506 }
2507 for (auto i: buffer_cache_shards) {
2508 buffers_bytes += i->_get_bytes();
2509 }
2510 ss << "bluestore_onode: " << onode_count;
2511 ss << "bluestore_buffers: " << buffers_bytes;
2512 }
2513
2514 int validate_hobject_key(const hobject_t &obj) const override {
2515 return 0;
2516 }
2517 unsigned get_max_attr_name_length() override {
2518 return 256; // arbitrary; there is no real limit internally
2519 }
2520
2521 int mkfs() override;
2522 int mkjournal() override {
2523 return 0;
2524 }
2525
2526 void get_db_statistics(Formatter *f) override;
2527 void generate_db_histogram(Formatter *f) override;
2528 void _flush_cache();
2529 int flush_cache(ostream *os = NULL) override;
2530 void dump_perf_counters(Formatter *f) override {
2531 f->open_object_section("perf_counters");
2532 logger->dump_formatted(f, false);
2533 f->close_section();
2534 }
2535
2536 int add_new_bluefs_device(int id, const string& path);
2537 int migrate_to_existing_bluefs_device(const set<int>& devs_source,
2538 int id);
2539 int migrate_to_new_bluefs_device(const set<int>& devs_source,
2540 int id,
2541 const string& path);
2542 int expand_devices(ostream& out);
2543 string get_device_path(unsigned id);
2544
2545 public:
2546 int statfs(struct store_statfs_t *buf,
2547 osd_alert_list_t* alerts = nullptr) override;
2548 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2549 bool *per_pool_omap) override;
2550
2551 void collect_metadata(map<string,string> *pm) override;
2552
2553 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2554 int set_collection_opts(
2555 CollectionHandle& c,
2556 const pool_opts_t& opts) override;
2557 int stat(
2558 CollectionHandle &c,
2559 const ghobject_t& oid,
2560 struct stat *st,
2561 bool allow_eio = false) override;
2562 int read(
2563 CollectionHandle &c,
2564 const ghobject_t& oid,
2565 uint64_t offset,
2566 size_t len,
2567 bufferlist& bl,
2568 uint32_t op_flags = 0) override;
2569
2570 private:
2571
2572 // --------------------------------------------------------
2573 // intermediate data structures used while reading
2574 struct region_t {
2575 uint64_t logical_offset;
2576 uint64_t blob_xoffset; //region offset within the blob
2577 uint64_t length;
2578
2579 // used later in read process
2580 uint64_t front = 0;
2581
2582 region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2583 : logical_offset(offset),
2584 blob_xoffset(b_offs),
2585 length(len),
2586 front(front){}
2587 region_t(const region_t& from)
2588 : logical_offset(from.logical_offset),
2589 blob_xoffset(from.blob_xoffset),
2590 length(from.length),
2591 front(from.front){}
2592
2593 friend ostream& operator<<(ostream& out, const region_t& r) {
2594 return out << "0x" << std::hex << r.logical_offset << ":"
2595 << r.blob_xoffset << "~" << r.length << std::dec;
2596 }
2597 };
2598
2599 // merged blob read request
2600 struct read_req_t {
2601 uint64_t r_off = 0;
2602 uint64_t r_len = 0;
2603 bufferlist bl;
2604 std::list<region_t> regs; // original read regions
2605
2606 read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2607
2608 friend ostream& operator<<(ostream& out, const read_req_t& r) {
2609 out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2610 for (const auto& reg : r.regs)
2611 out << reg;
2612 return out << "]}" << std::dec;
2613 }
2614 };
2615
2616 typedef list<read_req_t> regions2read_t;
2617 typedef map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2618
2619 void _read_cache(
2620 OnodeRef o,
2621 uint64_t offset,
2622 size_t length,
2623 int read_cache_policy,
2624 ready_regions_t& ready_regions,
2625 blobs2read_t& blobs2read);
2626
2627
2628 int _prepare_read_ioc(
2629 blobs2read_t& blobs2read,
2630 vector<bufferlist>* compressed_blob_bls,
2631 IOContext* ioc);
2632
2633 int _generate_read_result_bl(
2634 OnodeRef o,
2635 uint64_t offset,
2636 size_t length,
2637 ready_regions_t& ready_regions,
2638 vector<bufferlist>& compressed_blob_bls,
2639 blobs2read_t& blobs2read,
2640 bool buffered,
2641 bool* csum_error,
2642 bufferlist& bl);
2643
2644 int _do_read(
2645 Collection *c,
2646 OnodeRef o,
2647 uint64_t offset,
2648 size_t len,
2649 bufferlist& bl,
2650 uint32_t op_flags = 0,
2651 uint64_t retry_count = 0);
2652
2653 int _do_readv(
2654 Collection *c,
2655 OnodeRef o,
2656 const interval_set<uint64_t>& m,
2657 bufferlist& bl,
2658 uint32_t op_flags = 0,
2659 uint64_t retry_count = 0);
2660
2661 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2662 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2663 public:
2664 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2665 uint64_t offset, size_t len, bufferlist& bl) override;
2666 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2667 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2668
2669 int readv(
2670 CollectionHandle &c_,
2671 const ghobject_t& oid,
2672 interval_set<uint64_t>& m,
2673 bufferlist& bl,
2674 uint32_t op_flags) override;
2675
2676 int dump_onode(CollectionHandle &c, const ghobject_t& oid,
2677 const string& section_name, Formatter *f) override;
2678
2679 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2680 bufferptr& value) override;
2681
2682 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2683 map<string,bufferptr>& aset) override;
2684
2685 int list_collections(vector<coll_t>& ls) override;
2686
2687 CollectionHandle open_collection(const coll_t &c) override;
2688 CollectionHandle create_new_collection(const coll_t& cid) override;
2689 void set_collection_commit_queue(const coll_t& cid,
2690 ContextQueue *commit_queue) override;
2691
2692 bool collection_exists(const coll_t& c) override;
2693 int collection_empty(CollectionHandle& c, bool *empty) override;
2694 int collection_bits(CollectionHandle& c) override;
2695
2696 int collection_list(CollectionHandle &c,
2697 const ghobject_t& start,
2698 const ghobject_t& end,
2699 int max,
2700 vector<ghobject_t> *ls, ghobject_t *next) override;
2701
2702 int omap_get(
2703 CollectionHandle &c, ///< [in] Collection containing oid
2704 const ghobject_t &oid, ///< [in] Object containing omap
2705 bufferlist *header, ///< [out] omap header
2706 map<string, bufferlist> *out /// < [out] Key to value map
2707 ) override;
2708 int _omap_get(
2709 Collection *c, ///< [in] Collection containing oid
2710 const ghobject_t &oid, ///< [in] Object containing omap
2711 bufferlist *header, ///< [out] omap header
2712 map<string, bufferlist> *out /// < [out] Key to value map
2713 );
2714
2715 /// Get omap header
2716 int omap_get_header(
2717 CollectionHandle &c, ///< [in] Collection containing oid
2718 const ghobject_t &oid, ///< [in] Object containing omap
2719 bufferlist *header, ///< [out] omap header
2720 bool allow_eio = false ///< [in] don't assert on eio
2721 ) override;
2722
2723 /// Get keys defined on oid
2724 int omap_get_keys(
2725 CollectionHandle &c, ///< [in] Collection containing oid
2726 const ghobject_t &oid, ///< [in] Object containing omap
2727 set<string> *keys ///< [out] Keys defined on oid
2728 ) override;
2729
2730 /// Get key values
2731 int omap_get_values(
2732 CollectionHandle &c, ///< [in] Collection containing oid
2733 const ghobject_t &oid, ///< [in] Object containing omap
2734 const set<string> &keys, ///< [in] Keys to get
2735 map<string, bufferlist> *out ///< [out] Returned keys and values
2736 ) override;
2737
2738 /// Filters keys into out which are defined on oid
2739 int omap_check_keys(
2740 CollectionHandle &c, ///< [in] Collection containing oid
2741 const ghobject_t &oid, ///< [in] Object containing omap
2742 const set<string> &keys, ///< [in] Keys to check
2743 set<string> *out ///< [out] Subset of keys defined on oid
2744 ) override;
2745
2746 ObjectMap::ObjectMapIterator get_omap_iterator(
2747 CollectionHandle &c, ///< [in] collection
2748 const ghobject_t &oid ///< [in] object
2749 ) override;
2750
2751 void set_fsid(uuid_d u) override {
2752 fsid = u;
2753 }
2754 uuid_d get_fsid() override {
2755 return fsid;
2756 }
2757
2758 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2759 return num_objects * 300; //assuming per-object overhead is 300 bytes
2760 }
2761
2762 struct BSPerfTracker {
2763 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2764 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2765
2766 objectstore_perf_stat_t get_cur_stats() const {
2767 objectstore_perf_stat_t ret;
2768 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2769 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2770 return ret;
2771 }
2772
2773 void update_from_perfcounters(PerfCounters &logger);
2774 } perf_tracker;
2775
2776 objectstore_perf_stat_t get_cur_stats() override {
2777 perf_tracker.update_from_perfcounters(*logger);
2778 return perf_tracker.get_cur_stats();
2779 }
2780 const PerfCounters* get_perf_counters() const override {
2781 return logger;
2782 }
2783
2784 int queue_transactions(
2785 CollectionHandle& ch,
2786 vector<Transaction>& tls,
2787 TrackedOpRef op = TrackedOpRef(),
2788 ThreadPool::TPHandle *handle = NULL) override;
2789
2790 // error injection
2791 void inject_data_error(const ghobject_t& o) override {
2792 std::unique_lock l(debug_read_error_lock);
2793 debug_data_error_objects.insert(o);
2794 }
2795 void inject_mdata_error(const ghobject_t& o) override {
2796 std::unique_lock l(debug_read_error_lock);
2797 debug_mdata_error_objects.insert(o);
2798 }
2799
2800 /// methods to inject various errors fsck can repair
2801 void inject_broken_shared_blob_key(const string& key,
2802 const bufferlist& bl);
2803 void inject_leaked(uint64_t len);
2804 void inject_false_free(coll_t cid, ghobject_t oid);
2805 void inject_statfs(const string& key, const store_statfs_t& new_statfs);
2806 void inject_global_statfs(const store_statfs_t& new_statfs);
2807 void inject_misreference(coll_t cid1, ghobject_t oid1,
2808 coll_t cid2, ghobject_t oid2,
2809 uint64_t offset);
2810
2811 void compact() override {
2812 ceph_assert(db);
2813 db->compact();
2814 }
2815 bool has_builtin_csum() const override {
2816 return true;
2817 }
2818
2819 /*
2820 Allocate space for BlueFS from slow device.
2821 Either automatically applies allocated extents to underlying
2822 BlueFS (extents == nullptr) or just return them (non-null extents) provided
2823 */
2824 int allocate_bluefs_freespace(
2825 uint64_t min_size,
2826 uint64_t size,
2827 PExtentVector* extents);
2828
2829 inline void log_latency(const char* name,
2830 int idx,
2831 const ceph::timespan& lat,
2832 double lat_threshold,
2833 const char* info = "") const;
2834
2835 inline void log_latency_fn(const char* name,
2836 int idx,
2837 const ceph::timespan& lat,
2838 double lat_threshold,
2839 std::function<string (const ceph::timespan& lat)> fn) const;
2840
2841 private:
2842 bool _debug_data_eio(const ghobject_t& o) {
2843 if (!cct->_conf->bluestore_debug_inject_read_err) {
2844 return false;
2845 }
2846 std::shared_lock l(debug_read_error_lock);
2847 return debug_data_error_objects.count(o);
2848 }
2849 bool _debug_mdata_eio(const ghobject_t& o) {
2850 if (!cct->_conf->bluestore_debug_inject_read_err) {
2851 return false;
2852 }
2853 std::shared_lock l(debug_read_error_lock);
2854 return debug_mdata_error_objects.count(o);
2855 }
2856 void _debug_obj_on_delete(const ghobject_t& o) {
2857 if (cct->_conf->bluestore_debug_inject_read_err) {
2858 std::unique_lock l(debug_read_error_lock);
2859 debug_data_error_objects.erase(o);
2860 debug_mdata_error_objects.erase(o);
2861 }
2862 }
2863 private:
2864 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
2865 string failed_cmode;
2866 set<string> failed_compressors;
2867 string spillover_alert;
2868 string legacy_statfs_alert;
2869 string no_per_pool_omap_alert;
2870 string disk_size_mismatch_alert;
2871
2872 void _log_alerts(osd_alert_list_t& alerts);
2873 bool _set_compression_alert(bool cmode, const char* s) {
2874 std::lock_guard l(qlock);
2875 if (cmode) {
2876 bool ret = failed_cmode.empty();
2877 failed_cmode = s;
2878 return ret;
2879 }
2880 return failed_compressors.emplace(s).second;
2881 }
2882 void _clear_compression_alert() {
2883 std::lock_guard l(qlock);
2884 failed_compressors.clear();
2885 failed_cmode.clear();
2886 }
2887
2888 void _set_spillover_alert(const string& s) {
2889 std::lock_guard l(qlock);
2890 spillover_alert = s;
2891 }
2892 void _clear_spillover_alert() {
2893 std::lock_guard l(qlock);
2894 spillover_alert.clear();
2895 }
2896
2897 void _check_legacy_statfs_alert();
2898 void _check_no_per_pool_omap_alert();
2899 void _set_disk_size_mismatch_alert(const string& s) {
2900 std::lock_guard l(qlock);
2901 disk_size_mismatch_alert = s;
2902 }
2903
2904 private:
2905
2906 // --------------------------------------------------------
2907 // read processing internal methods
2908 int _verify_csum(
2909 OnodeRef& o,
2910 const bluestore_blob_t* blob,
2911 uint64_t blob_xoffset,
2912 const bufferlist& bl,
2913 uint64_t logical_offset) const;
2914 int _decompress(bufferlist& source, bufferlist* result);
2915
2916
2917 // --------------------------------------------------------
2918 // write ops
2919
2920 struct WriteContext {
2921 bool buffered = false; ///< buffered write
2922 bool compress = false; ///< compressed write
2923 uint64_t target_blob_size = 0; ///< target (max) blob size
2924 unsigned csum_order = 0; ///< target checksum chunk order
2925
2926 old_extent_map_t old_extents; ///< must deref these blobs
2927 interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
2928
2929 struct write_item {
2930 uint64_t logical_offset; ///< write logical offset
2931 BlobRef b;
2932 uint64_t blob_length;
2933 uint64_t b_off;
2934 bufferlist bl;
2935 uint64_t b_off0; ///< original offset in a blob prior to padding
2936 uint64_t length0; ///< original data length prior to padding
2937
2938 bool mark_unused;
2939 bool new_blob; ///< whether new blob was created
2940
2941 bool compressed = false;
2942 bufferlist compressed_bl;
2943 size_t compressed_len = 0;
2944
2945 write_item(
2946 uint64_t logical_offs,
2947 BlobRef b,
2948 uint64_t blob_len,
2949 uint64_t o,
2950 bufferlist& bl,
2951 uint64_t o0,
2952 uint64_t l0,
2953 bool _mark_unused,
2954 bool _new_blob)
2955 :
2956 logical_offset(logical_offs),
2957 b(b),
2958 blob_length(blob_len),
2959 b_off(o),
2960 bl(bl),
2961 b_off0(o0),
2962 length0(l0),
2963 mark_unused(_mark_unused),
2964 new_blob(_new_blob) {}
2965 };
2966 vector<write_item> writes; ///< blobs we're writing
2967
2968 /// partial clone of the context
2969 void fork(const WriteContext& other) {
2970 buffered = other.buffered;
2971 compress = other.compress;
2972 target_blob_size = other.target_blob_size;
2973 csum_order = other.csum_order;
2974 }
2975 void write(
2976 uint64_t loffs,
2977 BlobRef b,
2978 uint64_t blob_len,
2979 uint64_t o,
2980 bufferlist& bl,
2981 uint64_t o0,
2982 uint64_t len0,
2983 bool _mark_unused,
2984 bool _new_blob) {
2985 writes.emplace_back(loffs,
2986 b,
2987 blob_len,
2988 o,
2989 bl,
2990 o0,
2991 len0,
2992 _mark_unused,
2993 _new_blob);
2994 }
2995 /// Checks for writes to the same pextent within a blob
2996 bool has_conflict(
2997 BlobRef b,
2998 uint64_t loffs,
2999 uint64_t loffs_end,
3000 uint64_t min_alloc_size);
3001 };
3002
3003 void _do_write_small(
3004 TransContext *txc,
3005 CollectionRef &c,
3006 OnodeRef o,
3007 uint64_t offset, uint64_t length,
3008 bufferlist::iterator& blp,
3009 WriteContext *wctx);
3010 void _do_write_big(
3011 TransContext *txc,
3012 CollectionRef &c,
3013 OnodeRef o,
3014 uint64_t offset, uint64_t length,
3015 bufferlist::iterator& blp,
3016 WriteContext *wctx);
3017 int _do_alloc_write(
3018 TransContext *txc,
3019 CollectionRef c,
3020 OnodeRef o,
3021 WriteContext *wctx);
3022 void _wctx_finish(
3023 TransContext *txc,
3024 CollectionRef& c,
3025 OnodeRef o,
3026 WriteContext *wctx,
3027 set<SharedBlob*> *maybe_unshared_blobs=0);
3028
3029 int _write(TransContext *txc,
3030 CollectionRef& c,
3031 OnodeRef& o,
3032 uint64_t offset, size_t len,
3033 bufferlist& bl,
3034 uint32_t fadvise_flags);
3035 void _pad_zeros(bufferlist *bl, uint64_t *offset,
3036 uint64_t chunk_size);
3037
3038 void _choose_write_options(CollectionRef& c,
3039 OnodeRef o,
3040 uint32_t fadvise_flags,
3041 WriteContext *wctx);
3042
3043 int _do_gc(TransContext *txc,
3044 CollectionRef& c,
3045 OnodeRef o,
3046 const WriteContext& wctx,
3047 uint64_t *dirty_start,
3048 uint64_t *dirty_end);
3049
3050 int _do_write(TransContext *txc,
3051 CollectionRef &c,
3052 OnodeRef o,
3053 uint64_t offset, uint64_t length,
3054 bufferlist& bl,
3055 uint32_t fadvise_flags);
3056 void _do_write_data(TransContext *txc,
3057 CollectionRef& c,
3058 OnodeRef o,
3059 uint64_t offset,
3060 uint64_t length,
3061 bufferlist& bl,
3062 WriteContext *wctx);
3063
3064 int _touch(TransContext *txc,
3065 CollectionRef& c,
3066 OnodeRef& o);
3067 int _do_zero(TransContext *txc,
3068 CollectionRef& c,
3069 OnodeRef& o,
3070 uint64_t offset, size_t len);
3071 int _zero(TransContext *txc,
3072 CollectionRef& c,
3073 OnodeRef& o,
3074 uint64_t offset, size_t len);
3075 void _do_truncate(TransContext *txc,
3076 CollectionRef& c,
3077 OnodeRef o,
3078 uint64_t offset,
3079 set<SharedBlob*> *maybe_unshared_blobs=0);
3080 int _truncate(TransContext *txc,
3081 CollectionRef& c,
3082 OnodeRef& o,
3083 uint64_t offset);
3084 int _remove(TransContext *txc,
3085 CollectionRef& c,
3086 OnodeRef& o);
3087 int _do_remove(TransContext *txc,
3088 CollectionRef& c,
3089 OnodeRef o);
3090 int _setattr(TransContext *txc,
3091 CollectionRef& c,
3092 OnodeRef& o,
3093 const string& name,
3094 bufferptr& val);
3095 int _setattrs(TransContext *txc,
3096 CollectionRef& c,
3097 OnodeRef& o,
3098 const map<string,bufferptr>& aset);
3099 int _rmattr(TransContext *txc,
3100 CollectionRef& c,
3101 OnodeRef& o,
3102 const string& name);
3103 int _rmattrs(TransContext *txc,
3104 CollectionRef& c,
3105 OnodeRef& o);
3106 void _do_omap_clear(TransContext *txc, OnodeRef &o);
3107 int _omap_clear(TransContext *txc,
3108 CollectionRef& c,
3109 OnodeRef& o);
3110 int _omap_setkeys(TransContext *txc,
3111 CollectionRef& c,
3112 OnodeRef& o,
3113 bufferlist& bl);
3114 int _omap_setheader(TransContext *txc,
3115 CollectionRef& c,
3116 OnodeRef& o,
3117 bufferlist& header);
3118 int _omap_rmkeys(TransContext *txc,
3119 CollectionRef& c,
3120 OnodeRef& o,
3121 bufferlist& bl);
3122 int _omap_rmkey_range(TransContext *txc,
3123 CollectionRef& c,
3124 OnodeRef& o,
3125 const string& first, const string& last);
3126 int _set_alloc_hint(
3127 TransContext *txc,
3128 CollectionRef& c,
3129 OnodeRef& o,
3130 uint64_t expected_object_size,
3131 uint64_t expected_write_size,
3132 uint32_t flags);
3133 int _do_clone_range(TransContext *txc,
3134 CollectionRef& c,
3135 OnodeRef& oldo,
3136 OnodeRef& newo,
3137 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3138 int _clone(TransContext *txc,
3139 CollectionRef& c,
3140 OnodeRef& oldo,
3141 OnodeRef& newo);
3142 int _clone_range(TransContext *txc,
3143 CollectionRef& c,
3144 OnodeRef& oldo,
3145 OnodeRef& newo,
3146 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3147 int _rename(TransContext *txc,
3148 CollectionRef& c,
3149 OnodeRef& oldo,
3150 OnodeRef& newo,
3151 const ghobject_t& new_oid);
3152 int _create_collection(TransContext *txc, const coll_t &cid,
3153 unsigned bits, CollectionRef *c);
3154 int _remove_collection(TransContext *txc, const coll_t &cid,
3155 CollectionRef *c);
3156 void _do_remove_collection(TransContext *txc, CollectionRef *c);
3157 int _split_collection(TransContext *txc,
3158 CollectionRef& c,
3159 CollectionRef& d,
3160 unsigned bits, int rem);
3161 int _merge_collection(TransContext *txc,
3162 CollectionRef *c,
3163 CollectionRef& d,
3164 unsigned bits);
3165
3166 private:
3167 std::atomic<uint64_t> out_of_sync_fm = {0};
3168 // --------------------------------------------------------
3169 // BlueFSDeviceExpander implementation
3170 uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
3171 uint64_t bluefs_total) override {
3172 auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
3173 return delta > 0 ? delta : 0;
3174 }
3175 int allocate_freespace(
3176 uint64_t min_size,
3177 uint64_t size,
3178 PExtentVector& extents) override {
3179 return allocate_bluefs_freespace(min_size, size, &extents);
3180 };
3181 size_t available_freespace(uint64_t alloc_size) override;
3182 inline bool _use_rotational_settings();
3183
3184 public:
3185 struct sb_info_t {
3186 coll_t cid;
3187 int64_t pool_id = INT64_MIN;
3188 list<ghobject_t> oids;
3189 BlueStore::SharedBlobRef sb;
3190 bluestore_extent_ref_map_t ref_map;
3191 bool compressed = false;
3192 bool passed = false;
3193 bool updated = false;
3194 };
3195 typedef btree::btree_set<
3196 uint64_t, std::less<uint64_t>,
3197 mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3198
3199 typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
3200 struct FSCK_ObjectCtx {
3201 int64_t& errors;
3202 int64_t& warnings;
3203 uint64_t& num_objects;
3204 uint64_t& num_extents;
3205 uint64_t& num_blobs;
3206 uint64_t& num_sharded_objects;
3207 uint64_t& num_spanning_blobs;
3208
3209 mempool_dynamic_bitset* used_blocks;
3210 uint64_t_btree_t* used_omap_head;
3211 uint64_t_btree_t* used_per_pool_omap_head;
3212 uint64_t_btree_t* used_pgmeta_omap_head;
3213
3214 ceph::mutex* sb_info_lock;
3215 sb_info_map_t& sb_info;
3216
3217 store_statfs_t& expected_store_statfs;
3218 per_pool_statfs& expected_pool_statfs;
3219 BlueStoreRepairer* repairer;
3220
3221 FSCK_ObjectCtx(int64_t& e,
3222 int64_t& w,
3223 uint64_t& _num_objects,
3224 uint64_t& _num_extents,
3225 uint64_t& _num_blobs,
3226 uint64_t& _num_sharded_objects,
3227 uint64_t& _num_spanning_blobs,
3228 mempool_dynamic_bitset* _ub,
3229 uint64_t_btree_t* _used_omap_head,
3230 uint64_t_btree_t* _used_per_pool_omap_head,
3231 uint64_t_btree_t* _used_pgmeta_omap_head,
3232 ceph::mutex* _sb_info_lock,
3233 sb_info_map_t& _sb_info,
3234 store_statfs_t& _store_statfs,
3235 per_pool_statfs& _pool_statfs,
3236 BlueStoreRepairer* _repairer) :
3237 errors(e),
3238 warnings(w),
3239 num_objects(_num_objects),
3240 num_extents(_num_extents),
3241 num_blobs(_num_blobs),
3242 num_sharded_objects(_num_sharded_objects),
3243 num_spanning_blobs(_num_spanning_blobs),
3244 used_blocks(_ub),
3245 used_omap_head(_used_omap_head),
3246 used_per_pool_omap_head(_used_per_pool_omap_head),
3247 used_pgmeta_omap_head(_used_pgmeta_omap_head),
3248 sb_info_lock(_sb_info_lock),
3249 sb_info(_sb_info),
3250 expected_store_statfs(_store_statfs),
3251 expected_pool_statfs(_pool_statfs),
3252 repairer(_repairer) {
3253 }
3254 };
3255
3256 OnodeRef fsck_check_objects_shallow(
3257 FSCKDepth depth,
3258 int64_t pool_id,
3259 CollectionRef c,
3260 const ghobject_t& oid,
3261 const string& key,
3262 const bufferlist& value,
3263 mempool::bluestore_fsck::list<string>& expecting_shards,
3264 map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3265 const BlueStore::FSCK_ObjectCtx& ctx);
3266
3267 private:
3268 void _fsck_check_objects(FSCKDepth depth,
3269 FSCK_ObjectCtx& ctx);
3270 };
3271
3272 inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
3273 return out
3274 << " allocated:"
3275 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3276 << " stored:"
3277 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3278 << " compressed:"
3279 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3280 << " compressed_orig:"
3281 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3282 << " compressed_alloc:"
3283 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3284 }
3285
3286 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3287 o->get();
3288 }
3289 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3290 o->put();
3291 }
3292
3293 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3294 o->get();
3295 }
3296 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
(1) Event fun_call_w_exception: |
Called function throws an exception of type "std::length_error". [details] |
3297 o->put();
3298 }
3299
3300 class BlueStoreRepairer
3301 {
3302 public:
3303 // to simplify future potential migration to mempools
3304 using fsck_interval = interval_set<uint64_t>;
3305
3306 // Structure to track what pextents are used for specific cid/oid.
3307 // Similar to Bloom filter positive and false-positive matches are
3308 // possible only.
3309 // Maintains two lists of bloom filters for both cids and oids
3310 // where each list entry is a BF for specific disk pextent
3311 // The length of the extent per filter is measured on init.
3312 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3313 // 'is_used' access.
3314 struct StoreSpaceTracker {
3315 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3316 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3317 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3318 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3319
3320 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3321 bloom_vector collections_bfs;
3322 bloom_vector objects_bfs;
3323
3324 bool was_filtered_out = false;
3325 uint64_t granularity = 0; // extent length for a single filter
3326
3327 StoreSpaceTracker() {
3328 }
3329 StoreSpaceTracker(const StoreSpaceTracker& from) :
3330 collections_bfs(from.collections_bfs),
3331 objects_bfs(from.objects_bfs),
3332 granularity(from.granularity) {
3333 }
3334
3335 void init(uint64_t total,
3336 uint64_t min_alloc_size,
3337 uint64_t mem_cap = DEF_MEM_CAP) {
3338 ceph_assert(!granularity); // not initialized yet
3339 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3340 ceph_assert(mem_cap);
3341
3342 total = round_up_to(total, min_alloc_size);
3343 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3344
3345 if (!granularity) {
3346 granularity = min_alloc_size;
3347 } else {
3348 granularity = round_up_to(granularity, min_alloc_size);
3349 }
3350
3351 uint64_t entries = round_up_to(total, granularity) / granularity;
3352 collections_bfs.resize(entries,
3353 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3354 BLOOM_FILTER_TABLE_SIZE,
3355 0,
3356 BLOOM_FILTER_EXPECTED_COUNT));
3357 objects_bfs.resize(entries,
3358 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3359 BLOOM_FILTER_TABLE_SIZE,
3360 0,
3361 BLOOM_FILTER_EXPECTED_COUNT));
3362 }
3363 inline uint32_t get_hash(const coll_t& cid) const {
3364 return cid.hash_to_shard(1);
3365 }
3366 inline void set_used(uint64_t offset, uint64_t len,
3367 const coll_t& cid, const ghobject_t& oid) {
3368 ceph_assert(granularity); // initialized
3369
3370 // can't call this func after filter_out has been applied
3371 ceph_assert(!was_filtered_out);
3372 if (!len) {
3373 return;
3374 }
3375 auto pos = offset / granularity;
3376 auto end_pos = (offset + len - 1) / granularity;
3377 while (pos <= end_pos) {
3378 collections_bfs[pos].insert(get_hash(cid));
3379 objects_bfs[pos].insert(oid.hobj.get_hash());
3380 ++pos;
3381 }
3382 }
3383 // filter-out entries unrelated to the specified(broken) extents.
3384 // 'is_used' calls are permitted after that only
3385 size_t filter_out(const fsck_interval& extents);
3386
3387 // determines if collection's present after filtering-out
3388 inline bool is_used(const coll_t& cid) const {
3389 ceph_assert(was_filtered_out);
3390 for(auto& bf : collections_bfs) {
3391 if (bf.contains(get_hash(cid))) {
3392 return true;
3393 }
3394 }
3395 return false;
3396 }
3397 // determines if object's present after filtering-out
3398 inline bool is_used(const ghobject_t& oid) const {
3399 ceph_assert(was_filtered_out);
3400 for(auto& bf : objects_bfs) {
3401 if (bf.contains(oid.hobj.get_hash())) {
3402 return true;
3403 }
3404 }
3405 return false;
3406 }
3407 // determines if collection's present before filtering-out
3408 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3409 ceph_assert(granularity); // initialized
3410 ceph_assert(!was_filtered_out);
3411 auto &bf = collections_bfs[offs / granularity];
3412 if (bf.contains(get_hash(cid))) {
3413 return true;
3414 }
3415 return false;
3416 }
3417 // determines if object's present before filtering-out
3418 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3419 ceph_assert(granularity); // initialized
3420 ceph_assert(!was_filtered_out);
3421 auto &bf = objects_bfs[offs / granularity];
3422 if (bf.contains(oid.hobj.get_hash())) {
3423 return true;
3424 }
3425 return false;
3426 }
3427 };
3428 public:
3429 void fix_per_pool_omap(KeyValueDB *db);
3430 bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
3431 bool fix_shared_blob(KeyValueDB *db,
3432 uint64_t sbid,
3433 const bufferlist* bl);
3434 bool fix_statfs(KeyValueDB *db, const string& key,
3435 const store_statfs_t& new_statfs);
3436
3437 bool fix_leaked(KeyValueDB *db,
3438 FreelistManager* fm,
3439 uint64_t offset, uint64_t len);
3440 bool fix_false_free(KeyValueDB *db,
3441 FreelistManager* fm,
3442 uint64_t offset, uint64_t len);
3443 bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
3444
3445 void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
3446
3447 bool preprocess_misreference(KeyValueDB *db);
3448
3449 unsigned apply(KeyValueDB* db);
3450
3451 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3452 misreferenced_extents.union_insert(offs, len);
3453 if (inc_error) {
3454 ++to_repair_cnt;
3455 }
3456 }
3457 void inc_repaired() {
3458 ++to_repair_cnt;
3459 }
3460
3461 StoreSpaceTracker& get_space_usage_tracker() {
3462 return space_usage_tracker;
3463 }
3464 const fsck_interval& get_misreferences() const {
3465 return misreferenced_extents;
3466 }
3467 KeyValueDB::Transaction get_fix_misreferences_txn() {
3468 return fix_misreferences_txn;
3469 }
3470
3471 private:
3472 unsigned to_repair_cnt = 0;
3473 KeyValueDB::Transaction fix_per_pool_omap_txn;
3474 KeyValueDB::Transaction fix_fm_leaked_txn;
3475 KeyValueDB::Transaction fix_fm_false_free_txn;
3476 KeyValueDB::Transaction remove_key_txn;
3477 KeyValueDB::Transaction fix_statfs_txn;
3478 KeyValueDB::Transaction fix_shared_blob_txn;
3479
3480 KeyValueDB::Transaction fix_misreferences_txn;
3481
3482 StoreSpaceTracker space_usage_tracker;
3483
3484 // non-shared extents with multiple references
3485 fsck_interval misreferenced_extents;
3486
3487 };
3488 #endif
3489