1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	#ifndef CEPH_OBJECTCACHER_H
4    	#define CEPH_OBJECTCACHER_H
5    	
6    	#include "include/types.h"
7    	#include "include/lru.h"
8    	#include "include/Context.h"
9    	#include "include/xlist.h"
10   	
11   	#include "common/Cond.h"
12   	#include "common/Finisher.h"
13   	#include "common/Thread.h"
14   	#include "common/zipkin_trace.h"
15   	
16   	#include "Objecter.h"
17   	#include "Striper.h"
18   	
19   	class CephContext;
20   	class WritebackHandler;
21   	class PerfCounters;
22   	
23   	enum {
24   	  l_objectcacher_first = 25000,
25   	
26   	  l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache
27   	  l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache
28   	
29   	  l_objectcacher_cache_bytes_hit, // bytes read directly from cache
30   	
31   	  l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly
32   	
33   					   // from cache
34   	
35   	  l_objectcacher_data_read, // total bytes read out
36   	  l_objectcacher_data_written, // bytes written to cache
37   	  l_objectcacher_data_flushed, // bytes flushed to WritebackHandler
38   	  l_objectcacher_overwritten_in_flush, // bytes overwritten while
39   					       // flushing is in progress
40   	
41   	  l_objectcacher_write_ops_blocked, // total write ops we delayed due
42   					    // to dirty limits
43   	  l_objectcacher_write_bytes_blocked, // total number of write bytes
44   					      // we delayed due to dirty
45   					      // limits
46   	  l_objectcacher_write_time_blocked, // total time in seconds spent
47   					     // blocking a write due to dirty
48   					     // limits
49   	
50   	  l_objectcacher_last,
51   	};
52   	
53   	class ObjectCacher {
54   	  PerfCounters *perfcounter;
55   	 public:
56   	  CephContext *cct;
57   	  class Object;
58   	  struct ObjectSet;
59   	  class C_ReadFinish;
60   	
61   	  typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset);
62   	
63   	  // read scatter/gather
64   	  struct OSDRead {
65   	    vector<ObjectExtent> extents;
66   	    snapid_t snap;
67   	    bufferlist *bl;
68   	    int fadvise_flags;
69   	    OSDRead(snapid_t s, bufferlist *b, int f)
70   	      : snap(s), bl(b), fadvise_flags(f) {}
71   	  };
72   	
73   	  OSDRead *prepare_read(snapid_t snap, bufferlist *b, int f) const {
74   	    return new OSDRead(snap, b, f);
75   	  }
76   	
77   	  // write scatter/gather
78   	  struct OSDWrite {
79   	    vector<ObjectExtent> extents;
80   	    SnapContext snapc;
81   	    bufferlist bl;
82   	    ceph::real_time mtime;
83   	    int fadvise_flags;
84   	    ceph_tid_t journal_tid;
85   	    OSDWrite(const SnapContext& sc, const bufferlist& b, ceph::real_time mt,
86   		     int f, ceph_tid_t _journal_tid)
87   	      : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
88   		journal_tid(_journal_tid) {}
89   	  };
90   	
91   	  OSDWrite *prepare_write(const SnapContext& sc,
92   				  const bufferlist &b,
93   				  ceph::real_time mt,
94   				  int f,
95   				  ceph_tid_t journal_tid) const {
96   	    return new OSDWrite(sc, b, mt, f, journal_tid);
97   	  }
98   	
99   	
100  	
101  	  // ******* BufferHead *********
102  	  class BufferHead : public LRUObject {
103  	  public:
104  	    // states
105  	    static const int STATE_MISSING = 0;
106  	    static const int STATE_CLEAN = 1;
107  	    static const int STATE_ZERO = 2;   // NOTE: these are *clean* zeros
108  	    static const int STATE_DIRTY = 3;
109  	    static const int STATE_RX = 4;
110  	    static const int STATE_TX = 5;
111  	    static const int STATE_ERROR = 6; // a read error occurred
112  	
113  	  private:
114  	    // my fields
115  	    int state;
116  	    int ref;
117  	    struct {
118  	      loff_t start, length;   // bh extent in object
119  	    } ex;
120  	    bool dontneed; //indicate bh don't need by anyone
121  	    bool nocache; //indicate bh don't need by this caller
122  	
123  	  public:
124  	    Object *ob;
125  	    bufferlist  bl;
126  	    ceph_tid_t last_write_tid;  // version of bh (if non-zero)
127  	    ceph_tid_t last_read_tid;   // tid of last read op (if any)
128  	    ceph::real_time last_write;
129  	    SnapContext snapc;
130  	    ceph_tid_t journal_tid;
131  	    int error; // holds return value for failed reads
132  	
133  	    map<loff_t, list<Context*> > waitfor_read;
134  	
135  	    // cons
136  	    explicit BufferHead(Object *o) :
137  	      state(STATE_MISSING),
138  	      ref(0),
139  	      dontneed(false),
140  	      nocache(false),
141  	      ob(o),
142  	      last_write_tid(0),
143  	      last_read_tid(0),
144  	      journal_tid(0),
145  	      error(0) {
146  	      ex.start = ex.length = 0;
147  	    }
148  	
149  	    // extent
150  	    loff_t start() const { return ex.start; }
151  	    void set_start(loff_t s) { ex.start = s; }
152  	    loff_t length() const { return ex.length; }
153  	    void set_length(loff_t l) { ex.length = l; }
154  	    loff_t end() const { return ex.start + ex.length; }
155  	    loff_t last() const { return end() - 1; }
156  	
157  	    // states
158  	    void set_state(int s) {
159  	      if (s == STATE_RX || s == STATE_TX) get();
160  	      if (state == STATE_RX || state == STATE_TX) put();
161  	      state = s;
162  	    }
163  	    int get_state() const { return state; }
164  	
165  	    inline ceph_tid_t get_journal_tid() const {
166  	      return journal_tid;
167  	    }
168  	    inline void set_journal_tid(ceph_tid_t _journal_tid) {
169  	      journal_tid = _journal_tid;
170  	    }
171  	
172  	    bool is_missing() const { return state == STATE_MISSING; }
173  	    bool is_dirty() const { return state == STATE_DIRTY; }
174  	    bool is_clean() const { return state == STATE_CLEAN; }
175  	    bool is_zero() const { return state == STATE_ZERO; }
176  	    bool is_tx() const { return state == STATE_TX; }
177  	    bool is_rx() const { return state == STATE_RX; }
178  	    bool is_error() const { return state == STATE_ERROR; }
179  	
180  	    // reference counting
181  	    int get() {
182  	      ceph_assert(ref >= 0);
183  	      if (ref == 0) lru_pin();
184  	      return ++ref;
185  	    }
186  	    int put() {
187  	      ceph_assert(ref > 0);
188  	      if (ref == 1) lru_unpin();
189  	      --ref;
190  	      return ref;
191  	    }
192  	
193  	    void set_dontneed(bool v) {
194  	      dontneed = v;
195  	    }
196  	    bool get_dontneed() const {
197  	      return dontneed;
198  	    }
199  	
200  	    void set_nocache(bool v) {
201  	      nocache = v;
202  	    }
203  	    bool get_nocache() const {
204  	      return nocache;
205  	    }
206  	
207  	    inline bool can_merge_journal(BufferHead *bh) const {
208  	      return (get_journal_tid() == bh->get_journal_tid());
209  	    }
210  	
211  	    struct ptr_lt {
212  	      bool operator()(const BufferHead* l, const BufferHead* r) const {
213  		const Object *lob = l->ob;
214  		const Object *rob = r->ob;
215  		const ObjectSet *loset = lob->oset;
216  		const ObjectSet *roset = rob->oset;
217  		if (loset != roset)
218  		  return loset < roset;
219  		if (lob != rob)
220  		  return lob < rob;
221  		if (l->start() != r->start())
222  		  return l->start() < r->start();
223  		return l < r;
224  	      }
225  	    };
226  	  };
227  	
228  	  // ******* Object *********
229  	  class Object : public LRUObject {
230  	  private:
231  	    // ObjectCacher::Object fields
232  	    int ref;
233  	    ObjectCacher *oc;
234  	    sobject_t oid;
235  	    friend struct ObjectSet;
236  	
237  	  public:
238  	    uint64_t object_no;
239  	    ObjectSet *oset;
240  	    xlist<Object*>::item set_item;
241  	    object_locator_t oloc;
242  	    uint64_t truncate_size, truncate_seq;
243  	
244  	    bool complete;
245  	    bool exists;
246  	
247  	    map<loff_t, BufferHead*>     data;
248  	
249  	    ceph_tid_t last_write_tid;  // version of bh (if non-zero)
250  	    ceph_tid_t last_commit_tid; // last update committed.
251  	
252  	    int dirty_or_tx;
253  	
254  	    map< ceph_tid_t, list<Context*> > waitfor_commit;
255  	    xlist<C_ReadFinish*> reads;
256  	
257  	    Object(const Object&) = delete;
258  	    Object& operator=(const Object&) = delete;
259  	
260  	    Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os,
261  		   object_locator_t& l, uint64_t ts, uint64_t tq) :
262  	      ref(0),
263  	      oc(_oc),
264  	      oid(o), object_no(ono), oset(os), set_item(this), oloc(l),
265  	      truncate_size(ts), truncate_seq(tq),
266  	      complete(false), exists(true),
267  	      last_write_tid(0), last_commit_tid(0),
268  	      dirty_or_tx(0) {
269  	      // add to set
270  	      os->objects.push_back(&set_item);
271  	    }
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
272  	    ~Object() {
273  	      reads.clear();
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
274  	      ceph_assert(ref == 0);
275  	      ceph_assert(data.empty());
276  	      ceph_assert(dirty_or_tx == 0);
277  	      set_item.remove_myself();
278  	    }
279  	
280  	    sobject_t get_soid() const { return oid; }
281  	    object_t get_oid() { return oid.oid; }
282  	    snapid_t get_snap() { return oid.snap; }
283  	    ObjectSet *get_object_set() const { return oset; }
284  	    string get_namespace() { return oloc.nspace; }
285  	    uint64_t get_object_number() const { return object_no; }
286  	
287  	    const object_locator_t& get_oloc() const { return oloc; }
288  	    void set_object_locator(object_locator_t& l) { oloc = l; }
289  	
290  	    bool can_close() const {
291  	      if (lru_is_expireable()) {
292  		ceph_assert(data.empty());
293  		ceph_assert(waitfor_commit.empty());
294  		return true;
295  	      }
296  	      return false;
297  	    }
298  	
299  	    /**
300  	     * Check buffers and waiters for consistency
301  	     * - no overlapping buffers
302  	     * - index in map matches BH
303  	     * - waiters fall within BH
304  	     */
305  	    void audit_buffers();
306  	
307  	    /**
308  	     * find first buffer that includes or follows an offset
309  	     *
310  	     * @param offset object byte offset
311  	     * @return iterator pointing to buffer, or data.end()
312  	     */
313  	    map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const {
314  	      map<loff_t,BufferHead*>::const_iterator p = data.lower_bound(offset);
315  	      if (p != data.begin() &&
316  		  (p == data.end() || p->first > offset)) {
317  		--p;     // might overlap!
318  		if (p->first + p->second->length() <= offset)
319  		  ++p;   // doesn't overlap.
320  	      }
321  	      return p;
322  	    }
323  	
324  	    // bh
325  	    // add to my map
326  	    void add_bh(BufferHead *bh) {
327  	      if (data.empty())
328  		get();
329  	      ceph_assert(data.count(bh->start()) == 0);
330  	      data[bh->start()] = bh;
331  	    }
332  	    void remove_bh(BufferHead *bh) {
333  	      ceph_assert(data.count(bh->start()));
334  	      data.erase(bh->start());
335  	      if (data.empty())
336  		put();
337  	    }
338  	
339  	    bool is_empty() const { return data.empty(); }
340  	
341  	    // mid-level
342  	    BufferHead *split(BufferHead *bh, loff_t off);
343  	    void merge_left(BufferHead *left, BufferHead *right);
344  	    bool can_merge_bh(BufferHead *left, BufferHead *right);
345  	    void try_merge_bh(BufferHead *bh);
346  	    void maybe_rebuild_buffer(BufferHead *bh);
347  	
348  	    bool is_cached(loff_t off, loff_t len) const;
349  	    bool include_all_cached_data(loff_t off, loff_t len);
350  	    int map_read(ObjectExtent &ex,
351  	                 map<loff_t, BufferHead*>& hits,
352  	                 map<loff_t, BufferHead*>& missing,
353  	                 map<loff_t, BufferHead*>& rx,
354  			 map<loff_t, BufferHead*>& errors);
355  	    BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
356  	
357  	    void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
358  	    void truncate(loff_t s);
359  	    void discard(loff_t off, loff_t len, C_GatherBuilder* commit_gather);
360  	
361  	    // reference counting
362  	    int get() {
363  	      ceph_assert(ref >= 0);
364  	      if (ref == 0) lru_pin();
365  	      return ++ref;
366  	    }
367  	    int put() {
368  	      ceph_assert(ref > 0);
369  	      if (ref == 1) lru_unpin();
370  	      --ref;
371  	      return ref;
372  	    }
373  	  };
374  	
375  	
376  	  struct ObjectSet {
377  	    void *parent;
378  	
379  	    inodeno_t ino;
380  	    uint64_t truncate_seq, truncate_size;
381  	
382  	    int64_t poolid;
383  	    xlist<Object*> objects;
384  	
385  	    int dirty_or_tx;
386  	    bool return_enoent;
387  	
388  	    ObjectSet(void *p, int64_t _poolid, inodeno_t i)
389  	      : parent(p), ino(i), truncate_seq(0),
390  		truncate_size(0), poolid(_poolid), dirty_or_tx(0),
391  		return_enoent(false) {}
392  	
393  	  };
394  	
395  	
396  	  // ******* ObjectCacher *********
397  	  // ObjectCacher fields
398  	 private:
399  	  WritebackHandler& writeback_handler;
400  	  bool scattered_write;
401  	
402  	  string name;
403  	  ceph::mutex& lock;
404  	
405  	  uint64_t max_dirty, target_dirty, max_size, max_objects;
406  	  ceph::timespan max_dirty_age;
407  	  bool block_writes_upfront;
408  	
409  	  ZTracer::Endpoint trace_endpoint;
410  	
411  	  flush_set_callback_t flush_set_callback;
412  	  void *flush_set_callback_arg;
413  	
414  	  // indexed by pool_id
415  	  vector<ceph::unordered_map<sobject_t, Object*> > objects;
416  	
417  	  list<Context*> waitfor_read;
418  	
419  	  ceph_tid_t last_read_tid;
420  	
421  	  set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh;
422  	  LRU   bh_lru_dirty, bh_lru_rest;
423  	  LRU   ob_lru;
424  	
425  	  ceph::condition_variable flusher_cond;
426  	  bool flusher_stop;
427  	  void flusher_entry();
428  	  class FlusherThread : public Thread {
429  	    ObjectCacher *oc;
430  	  public:
431  	    explicit FlusherThread(ObjectCacher *o) : oc(o) {}
432  	    void *entry() override {
433  	      oc->flusher_entry();
434  	      return 0;
435  	    }
436  	  } flusher_thread;
437  	
438  	  Finisher finisher;
439  	
440  	  // objects
441  	  Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
442  	    // have it?
443  	    if (((uint32_t)l.pool < objects.size()) &&
444  		(objects[l.pool].count(oid)))
445  	      return objects[l.pool][oid];
446  	    return NULL;
447  	  }
448  	
449  	  Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset,
450  			     object_locator_t &l, uint64_t truncate_size,
451  			     uint64_t truncate_seq);
452  	  void close_object(Object *ob);
453  	
454  	  // bh stats
455  	  ceph::condition_variable  stat_cond;
456  	
457  	  loff_t stat_clean;
458  	  loff_t stat_zero;
459  	  loff_t stat_dirty;
460  	  loff_t stat_rx;
461  	  loff_t stat_tx;
462  	  loff_t stat_missing;
463  	  loff_t stat_error;
464  	  loff_t stat_dirty_waiting;   // bytes that writers are waiting on to write
465  	
466  	  size_t stat_nr_dirty_waiters;
467  	
468  	  void verify_stats() const;
469  	
470  	  void bh_stat_add(BufferHead *bh);
471  	  void bh_stat_sub(BufferHead *bh);
472  	  loff_t get_stat_tx() const { return stat_tx; }
473  	  loff_t get_stat_rx() const { return stat_rx; }
474  	  loff_t get_stat_dirty() const { return stat_dirty; }
475  	  loff_t get_stat_clean() const { return stat_clean; }
476  	  loff_t get_stat_zero() const { return stat_zero; }
477  	  loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; }
478  	  size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; }
479  	
480  	  void touch_bh(BufferHead *bh) {
481  	    if (bh->is_dirty())
482  	      bh_lru_dirty.lru_touch(bh);
483  	    else
484  	      bh_lru_rest.lru_touch(bh);
485  	
486  	    bh->set_dontneed(false);
487  	    bh->set_nocache(false);
488  	    touch_ob(bh->ob);
489  	  }
490  	  void touch_ob(Object *ob) {
491  	    ob_lru.lru_touch(ob);
492  	  }
493  	  void bottouch_ob(Object *ob) {
494  	    ob_lru.lru_bottouch(ob);
495  	  }
496  	
497  	  // bh states
498  	  void bh_set_state(BufferHead *bh, int s);
499  	  void copy_bh_state(BufferHead *bh1, BufferHead *bh2) {
500  	    bh_set_state(bh2, bh1->get_state());
501  	  }
502  	
503  	  void mark_missing(BufferHead *bh) {
504  	    bh_set_state(bh,BufferHead::STATE_MISSING);
505  	  }
506  	  void mark_clean(BufferHead *bh) {
507  	    bh_set_state(bh, BufferHead::STATE_CLEAN);
508  	  }
509  	  void mark_zero(BufferHead *bh) {
510  	    bh_set_state(bh, BufferHead::STATE_ZERO);
511  	  }
512  	  void mark_rx(BufferHead *bh) {
513  	    bh_set_state(bh, BufferHead::STATE_RX);
514  	  }
515  	  void mark_tx(BufferHead *bh) {
516  	    bh_set_state(bh, BufferHead::STATE_TX); }
517  	  void mark_error(BufferHead *bh) {
518  	    bh_set_state(bh, BufferHead::STATE_ERROR);
519  	  }
520  	  void mark_dirty(BufferHead *bh) {
521  	    bh_set_state(bh, BufferHead::STATE_DIRTY);
522  	    bh_lru_dirty.lru_touch(bh);
523  	    //bh->set_dirty_stamp(ceph_clock_now());
524  	  }
525  	
526  	  void bh_add(Object *ob, BufferHead *bh);
527  	  void bh_remove(Object *ob, BufferHead *bh);
528  	
529  	  // io
530  	  void bh_read(BufferHead *bh, int op_flags,
531  	               const ZTracer::Trace &parent_trace);
532  	  void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
533  	  void bh_write_scattered(list<BufferHead*>& blist);
534  	  void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
535  				    int64_t *amount, int *max_count);
536  	
537  	  void trim();
538  	  void flush(ZTracer::Trace *trace, loff_t amount=0);
539  	
540  	  /**
541  	   * flush a range of buffers
542  	   *
543  	   * Flush any buffers that intersect the specified extent.  If len==0,
544  	   * flush *all* buffers for the object.
545  	   *
546  	   * @param o object
547  	   * @param off start offset
548  	   * @param len extent length, or 0 for entire object
549  	   * @return true if object was already clean/flushed.
550  	   */
551  	  bool flush(Object *o, loff_t off, loff_t len,
552  	             ZTracer::Trace *trace);
553  	  loff_t release(Object *o);
554  	  void purge(Object *o);
555  	
556  	  int64_t reads_outstanding;
557  	  ceph::condition_variable read_cond;
558  	
559  	  int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
560  		     bool external_call, ZTracer::Trace *trace);
561  	  void retry_waiting_reads();
562  	
563  	 public:
564  	  void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
565  			      loff_t offset, uint64_t length,
566  			      bufferlist &bl, int r,
567  			      bool trust_enoent);
568  	  void bh_write_commit(int64_t poolid, sobject_t oid,
569  			       vector<pair<loff_t, uint64_t> >& ranges,
570  			       ceph_tid_t t, int r);
571  	
572  	  class C_WriteCommit;
573  	  class C_WaitForWrite;
574  	
575  	  void perf_start();
576  	  void perf_stop();
577  	
578  	
579  	
580  	  ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, ceph::mutex& l,
581  		       flush_set_callback_t flush_callback,
582  		       void *flush_callback_arg,
583  		       uint64_t max_bytes, uint64_t max_objects,
584  		       uint64_t max_dirty, uint64_t target_dirty, double max_age,
585  		       bool block_writes_upfront);
586  	  ~ObjectCacher();
587  	
588  	  void start() {
589  	    flusher_thread.create("flusher");
590  	  }
591  	  void stop() {
592  	    ceph_assert(flusher_thread.is_started());
593  	    lock.lock();  // hmm.. watch out for deadlock!
594  	    flusher_stop = true;
595  	    flusher_cond.notify_all();
596  	    lock.unlock();
597  	    flusher_thread.join();
598  	  }
599  	
600  	
601  	  class C_RetryRead;
602  	
603  	
604  	  // non-blocking.  async.
605  	
606  	  /**
607  	   * @note total read size must be <= INT_MAX, since
608  	   * the return value is total bytes read
609  	   */
610  	  int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
611  		    ZTracer::Trace *parent_trace = nullptr);
612  	  int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
613  		     ZTracer::Trace *parent_trace = nullptr);
614  	  bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
615  			 snapid_t snapid);
616  	
617  	private:
618  	  // write blocking
619  	  int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
620  	                      ZTracer::Trace *trace, Context *onfreespace);
621  	  void _maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
622  	  bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
623  	
624  	  void _discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
625  	                C_GatherBuilder* gather);
626  	  void _discard_finish(ObjectSet *oset, bool was_dirty, Context* on_finish);
627  	
628  	public:
629  	  bool set_is_empty(ObjectSet *oset);
630  	  bool set_is_cached(ObjectSet *oset);
631  	  bool set_is_dirty_or_committing(ObjectSet *oset);
632  	
633  	  bool flush_set(ObjectSet *oset, Context *onfinish=0);
634  	  bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
635  	                 ZTracer::Trace *trace, Context *onfinish = 0);
636  	  bool flush_all(Context *onfinish = 0);
637  	
638  	  void purge_set(ObjectSet *oset);
639  	
640  	  // returns # of bytes not released (ie non-clean)
641  	  loff_t release_set(ObjectSet *oset);
642  	  uint64_t release_all();
643  	
644  	  void discard_set(ObjectSet *oset, const vector<ObjectExtent>& ex);
645  	  void discard_writeback(ObjectSet *oset, const vector<ObjectExtent>& ex,
646  	                         Context* on_finish);
647  	
648  	  /**
649  	   * Retry any in-flight reads that get -ENOENT instead of marking
650  	   * them zero, and get rid of any cached -ENOENTs.
651  	   * After this is called and the cache's lock is unlocked,
652  	   * any new requests will treat -ENOENT normally.
653  	   */
654  	  void clear_nonexistence(ObjectSet *oset);
655  	
656  	
657  	  // cache sizes
658  	  void set_max_dirty(uint64_t v) {
659  	    max_dirty = v;
660  	  }
661  	  void set_target_dirty(int64_t v) {
662  	    target_dirty = v;
663  	  }
664  	  void set_max_size(int64_t v) {
665  	    max_size = v;
666  	  }
667  	  void set_max_dirty_age(double a) {
668  	    max_dirty_age = make_timespan(a);
669  	  }
670  	  void set_max_objects(int64_t v) {
671  	    max_objects = v;
672  	  }
673  	
674  	
675  	  // file functions
676  	
677  	  /*** async+caching (non-blocking) file interface ***/
678  	  int file_is_cached(ObjectSet *oset, file_layout_t *layout,
679  			     snapid_t snapid, loff_t offset, uint64_t len) {
680  	    vector<ObjectExtent> extents;
681  	    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
682  				     oset->truncate_size, extents);
683  	    return is_cached(oset, extents, snapid);
684  	  }
685  	
686  	  int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid,
687  			loff_t offset, uint64_t len, bufferlist *bl, int flags,
688  			Context *onfinish) {
689  	    OSDRead *rd = prepare_read(snapid, bl, flags);
690  	    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
691  				     oset->truncate_size, rd->extents);
692  	    return readx(rd, oset, onfinish);
693  	  }
694  	
695  	  int file_write(ObjectSet *oset, file_layout_t *layout,
696  			 const SnapContext& snapc, loff_t offset, uint64_t len,
697  			 bufferlist& bl, ceph::real_time mtime, int flags) {
698  	    OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
699  	    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
700  				     oset->truncate_size, wr->extents);
701  	    return writex(wr, oset, nullptr);
702  	  }
703  	
704  	  bool file_flush(ObjectSet *oset, file_layout_t *layout,
705  			  const SnapContext& snapc, loff_t offset, uint64_t len,
706  			  Context *onfinish) {
707  	    vector<ObjectExtent> extents;
708  	    Striper::file_to_extents(cct, oset->ino, layout, offset, len,
709  				     oset->truncate_size, extents);
710  	    ZTracer::Trace trace;
711  	    return flush_set(oset, extents, &trace, onfinish);
712  	  }
713  	};
714  	
715  	
716  	inline ostream& operator<<(ostream &out, const ObjectCacher::BufferHead &bh)
717  	{
718  	  out << "bh[ " << &bh << " "
719  	      << bh.start() << "~" << bh.length()
720  	      << " " << bh.ob
721  	      << " (" << bh.bl.length() << ")"
722  	      << " v " << bh.last_write_tid;
723  	  if (bh.get_journal_tid() != 0) {
724  	    out << " j " << bh.get_journal_tid();
725  	  }
726  	  if (bh.is_tx()) out << " tx";
727  	  if (bh.is_rx()) out << " rx";
728  	  if (bh.is_dirty()) out << " dirty";
729  	  if (bh.is_clean()) out << " clean";
730  	  if (bh.is_zero()) out << " zero";
731  	  if (bh.is_missing()) out << " missing";
732  	  if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0];
733  	  if (bh.error) out << " error=" << bh.error;
734  	  out << "]";
735  	  out << " waiters = {";
736  	  for (map<loff_t, list<Context*> >::const_iterator it
737  		 = bh.waitfor_read.begin();
738  	       it != bh.waitfor_read.end(); ++it) {
739  	    out << " " << it->first << "->[";
740  	    for (list<Context*>::const_iterator lit = it->second.begin();
741  		 lit != it->second.end(); ++lit) {
742  		 out << *lit << ", ";
743  	    }
744  	    out << "]";
745  	  }
746  	  out << "}";
747  	  return out;
748  	}
749  	
750  	inline ostream& operator<<(ostream &out, const ObjectCacher::ObjectSet &os)
751  	{
752  	  return out << "objectset[" << os.ino
753  		     << " ts " << os.truncate_seq << "/" << os.truncate_size
754  		     << " objects " << os.objects.size()
755  		     << " dirty_or_tx " << os.dirty_or_tx
756  		     << "]";
757  	}
758  	
759  	inline ostream& operator<<(ostream &out, const ObjectCacher::Object &ob)
760  	{
761  	  out << "object["
762  	      << ob.get_soid() << " oset " << ob.oset << dec
763  	      << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
764  	
765  	  if (ob.complete)
766  	    out << " COMPLETE";
767  	  if (!ob.exists)
768  	    out << " !EXISTS";
769  	
770  	  out << "]";
771  	  return out;
772  	}
773  	
774  	#endif
775