1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include <limits.h>
5    	
6    	#include "msg/Messenger.h"
7    	#include "ObjectCacher.h"
8    	#include "WritebackHandler.h"
9    	#include "common/errno.h"
10   	#include "common/perf_counters.h"
11   	
12   	#include "include/ceph_assert.h"
13   	
14   	#define MAX_FLUSH_UNDER_LOCK 20  ///< max bh's we start writeback on
15   	#define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT  // memory usage of BufferHead, count in (1<<n)
16   	
17   	using std::chrono::seconds;
18   					 /// while holding the lock
19   	
20   	/*** ObjectCacher::BufferHead ***/
21   	
22   	
23   	/*** ObjectCacher::Object ***/
24   	
25   	#define dout_subsys ceph_subsys_objectcacher
26   	#undef dout_prefix
27   	#define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
28   	
29   	
30   	
31   	class ObjectCacher::C_ReadFinish : public Context {
32   	  ObjectCacher *oc;
33   	  int64_t poolid;
34   	  sobject_t oid;
35   	  loff_t start;
36   	  uint64_t length;
37   	  xlist<C_ReadFinish*>::item set_item;
38   	  bool trust_enoent;
39   	  ceph_tid_t tid;
40   	  ZTracer::Trace trace;
41   	
42   	public:
43   	  bufferlist bl;
44   	  C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
45   		       uint64_t l, const ZTracer::Trace &trace) :
46   	    oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
47   	    set_item(this), trust_enoent(true),
48   	    tid(t), trace(trace) {
49   	    ob->reads.push_back(&set_item);
50   	  }
51   	
52   	  void finish(int r) override {
53   	    oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
54   	    trace.event("finish");
55   	
56   	    // object destructor clears the list
57   	    if (set_item.is_on_list())
58   	      set_item.remove_myself();
59   	  }
60   	
61   	  void distrust_enoent() {
62   	    trust_enoent = false;
63   	  }
64   	};
65   	
66   	class ObjectCacher::C_RetryRead : public Context {
67   	  ObjectCacher *oc;
68   	  OSDRead *rd;
69   	  ObjectSet *oset;
70   	  Context *onfinish;
71   	  ZTracer::Trace trace;
72   	public:
73   	  C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
74   		      const ZTracer::Trace &trace)
75   	    : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
76   	  }
77   	  void finish(int r) override {
78   	    if (r >= 0) {
79   	      r = oc->_readx(rd, oset, onfinish, false, &trace);
80   	    }
81   	
82   	    if (r == 0) {
83   	      // read is still in-progress
84   	      return;
85   	    }
86   	
87   	    trace.event("finish");
88   	    if (onfinish) {
89   	      onfinish->complete(r);
90   	    }
91   	  }
92   	};
93   	
94   	ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
95   							      loff_t off)
96   	{
97   	  ceph_assert(ceph_mutex_is_locked(oc->lock));
98   	  ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
99   	
100  	  // split off right
101  	  ObjectCacher::BufferHead *right = new BufferHead(this);
102  	
103  	  //inherit and if later access, this auto clean.
104  	  right->set_dontneed(left->get_dontneed());
105  	  right->set_nocache(left->get_nocache());
106  	
107  	  right->last_write_tid = left->last_write_tid;
108  	  right->last_read_tid = left->last_read_tid;
109  	  right->set_state(left->get_state());
110  	  right->snapc = left->snapc;
111  	  right->set_journal_tid(left->journal_tid);
112  	
113  	  loff_t newleftlen = off - left->start();
114  	  right->set_start(off);
115  	  right->set_length(left->length() - newleftlen);
116  	
117  	  // shorten left
118  	  oc->bh_stat_sub(left);
119  	  left->set_length(newleftlen);
120  	  oc->bh_stat_add(left);
121  	
122  	  // add right
123  	  oc->bh_add(this, right);
124  	
125  	  // split buffers too
126  	  bufferlist bl;
127  	  bl.claim(left->bl);
128  	  if (bl.length()) {
129  	    ceph_assert(bl.length() == (left->length() + right->length()));
130  	    right->bl.substr_of(bl, left->length(), right->length());
131  	    left->bl.substr_of(bl, 0, left->length());
132  	  }
133  	
134  	  // move read waiters
135  	  if (!left->waitfor_read.empty()) {
136  	    map<loff_t, list<Context*> >::iterator start_remove
137  	      = left->waitfor_read.begin();
138  	    while (start_remove != left->waitfor_read.end() &&
139  		   start_remove->first < right->start())
140  	      ++start_remove;
141  	    for (map<loff_t, list<Context*> >::iterator p = start_remove;
142  		 p != left->waitfor_read.end(); ++p) {
143  	      ldout(oc->cct, 20) << "split  moving waiters at byte " << p->first
144  				 << " to right bh" << dendl;
145  	      right->waitfor_read[p->first].swap( p->second );
146  	      ceph_assert(p->second.empty());
147  	    }
148  	    left->waitfor_read.erase(start_remove, left->waitfor_read.end());
149  	  }
150  	
151  	  ldout(oc->cct, 20) << "split    left is " << *left << dendl;
152  	  ldout(oc->cct, 20) << "split   right is " << *right << dendl;
153  	  return right;
154  	}
155  	
156  	
157  	void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
158  	{
159  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
160  	
161  	  ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
162  	  if (left->get_journal_tid() == 0) {
163  	    left->set_journal_tid(right->get_journal_tid());
164  	  }
165  	  right->set_journal_tid(0);
166  	
167  	  oc->bh_remove(this, right);
168  	  oc->bh_stat_sub(left);
169  	  left->set_length(left->length() + right->length());
170  	  oc->bh_stat_add(left);
171  	
172  	  // data
173  	  left->bl.claim_append(right->bl);
174  	
175  	  // version
176  	  // note: this is sorta busted, but should only be used for dirty buffers
177  	  left->last_write_tid =  std::max( left->last_write_tid, right->last_write_tid );
178  	  left->last_write = std::max( left->last_write, right->last_write );
179  	
180  	  left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
181  	  left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
182  	
183  	  // waiters
184  	  for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
185  	       p != right->waitfor_read.end();
186  	       ++p)
187  	    left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
188  						p->second );
189  	
190  	  // hose right
191  	  delete right;
192  	
193  	  ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
194  	}
195  	
196  	bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
197  	{
198  	  if (left->end() != right->start() ||
199  	      left->get_state() != right->get_state() ||
200  	      !left->can_merge_journal(right))
201  	    return false;
202  	  if (left->is_tx() && left->last_write_tid != right->last_write_tid)
203  	    return false;
204  	  return true;
205  	}
206  	
207  	void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
208  	{
209  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
210  	  ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
211  	
212  	  // do not merge rx buffers; last_read_tid may not match
213  	  if (bh->is_rx())
214  	    return;
215  	
216  	  // to the left?
217  	  map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
218  	  ceph_assert(p->second == bh);
219  	  if (p != data.begin()) {
220  	    --p;
221  	    if (can_merge_bh(p->second, bh)) {
222  	      merge_left(p->second, bh);
223  	      bh = p->second;
224  	    } else {
225  	      ++p;
226  	    }
227  	  }
228  	  // to the right?
229  	  ceph_assert(p->second == bh);
230  	  ++p;
231  	  if (p != data.end() && can_merge_bh(bh, p->second))
232  	    merge_left(bh, p->second);
233  	
234  	  maybe_rebuild_buffer(bh);
235  	}
236  	
237  	void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead *bh)
238  	{
239  	  auto& bl = bh->bl;
240  	  if (bl.get_num_buffers() <= 1)
241  	    return;
242  	
243  	  auto wasted = bl.get_wasted_space();
244  	  if (wasted * 2 > bl.length() &&
245  	      wasted > (1U << BUFFER_MEMORY_WEIGHT))
246  	    bl.rebuild();
247  	}
248  	
249  	/*
250  	 * count bytes we have cached in given range
251  	 */
252  	bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
253  	{
254  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
255  	  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(cur);
256  	  while (left > 0) {
257  	    if (p == data.end())
258  	      return false;
259  	
260  	    if (p->first <= cur) {
261  	      // have part of it
262  	      loff_t lenfromcur = std::min(p->second->end() - cur, left);
263  	      cur += lenfromcur;
264  	      left -= lenfromcur;
265  	      ++p;
266  	      continue;
267  	    } else if (p->first > cur) {
268  	      // gap
269  	      return false;
270  	    } else
271  	      ceph_abort();
272  	  }
273  	
274  	  return true;
275  	}
276  	
277  	/*
278  	 * all cached data in this range[off, off+len]
279  	 */
280  	bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
281  	{
282  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
283  	  if (data.empty())
284  	      return true;
285  	  map<loff_t, BufferHead*>::iterator first = data.begin();
286  	  map<loff_t, BufferHead*>::reverse_iterator last = data.rbegin();
287  	  if (first->second->start() >= off && last->second->end() <= (off + len))
288  	    return true;
289  	  else
290  	    return false;
291  	}
292  	
293  	/*
294  	 * map a range of bytes into buffer_heads.
295  	 * - create missing buffer_heads as necessary.
296  	 */
297  	int ObjectCacher::Object::map_read(ObjectExtent &ex,
298  	                                   map<loff_t, BufferHead*>& hits,
299  	                                   map<loff_t, BufferHead*>& missing,
300  	                                   map<loff_t, BufferHead*>& rx,
301  					   map<loff_t, BufferHead*>& errors)
302  	{
303  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
304  	  ldout(oc->cct, 10) << "map_read " << ex.oid << " "
305  	                     << ex.offset << "~" << ex.length << dendl;
306  	
307  	  loff_t cur = ex.offset;
308  	  loff_t left = ex.length;
309  	
310  	  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
311  	  while (left > 0) {
312  	    // at end?
313  	    if (p == data.end()) {
314  	      // rest is a miss.
315  	      BufferHead *n = new BufferHead(this);
316  	      n->set_start(cur);
317  	      n->set_length(left);
318  	      oc->bh_add(this, n);
319  	      if (complete) {
320  	        oc->mark_zero(n);
321  	        hits[cur] = n;
322  	        ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
323  	      } else {
324  	        missing[cur] = n;
325  	        ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
326  	      }
327  	      cur += left;
328  	      ceph_assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
329  	      break;  // no more.
330  	    }
331  	
332  	    if (p->first <= cur) {
333  	      // have it (or part of it)
334  	      BufferHead *e = p->second;
335  	
336  	      if (e->is_clean() ||
337  	          e->is_dirty() ||
338  	          e->is_tx() ||
339  	          e->is_zero()) {
340  	        hits[cur] = e;     // readable!
341  	        ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
342  	      } else if (e->is_rx()) {
343  	        rx[cur] = e;       // missing, not readable.
344  	        ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
345  	      } else if (e->is_error()) {
346  	        errors[cur] = e;
347  	        ldout(oc->cct, 20) << "map_read error " << *e << dendl;
348  	      } else {
349  	        ceph_abort();
350  	      }
351  	
352  	      loff_t lenfromcur = std::min(e->end() - cur, left);
353  	      cur += lenfromcur;
354  	      left -= lenfromcur;
355  	      ++p;
356  	      continue;  // more?
357  	
358  	    } else if (p->first > cur) {
359  	      // gap.. miss
360  	      loff_t next = p->first;
361  	      BufferHead *n = new BufferHead(this);
362  	      loff_t len = std::min(next - cur, left);
363  	      n->set_start(cur);
364  	      n->set_length(len);
365  	      oc->bh_add(this,n);
366  	      if (complete) {
367  	        oc->mark_zero(n);
368  	        hits[cur] = n;
369  	        ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
370  	      } else {
371  	        missing[cur] = n;
372  	        ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
373  	      }
374  	      cur += std::min(left, n->length());
375  	      left -= std::min(left, n->length());
376  	      continue;    // more?
377  	    } else {
378  	      ceph_abort();
379  	    }
380  	  }
381  	  return 0;
382  	}
383  	
384  	void ObjectCacher::Object::audit_buffers()
385  	{
386  	  loff_t offset = 0;
387  	  for (map<loff_t, BufferHead*>::const_iterator it = data.begin();
388  	       it != data.end(); ++it) {
389  	    if (it->first != it->second->start()) {
390  	      lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
391  			     << " does not match bh start position: "
392  			     << *it->second << dendl;
393  	      ceph_assert(it->first == it->second->start());
394  	    }
395  	    if (it->first < offset) {
396  	      lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
397  			     << " overlaps with previous bh " << *((--it)->second)
398  			     << dendl;
399  	      ceph_assert(it->first >= offset);
400  	    }
401  	    BufferHead *bh = it->second;
402  	    map<loff_t, list<Context*> >::const_iterator w_it;
403  	    for (w_it = bh->waitfor_read.begin();
404  		 w_it != bh->waitfor_read.end(); ++w_it) {
405  	      if (w_it->first < bh->start() ||
406  		    w_it->first >= bh->start() + bh->length()) {
407  		lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
408  			       << " is not within bh " << *bh << dendl;
409  		ceph_assert(w_it->first >= bh->start());
410  		ceph_assert(w_it->first < bh->start() + bh->length());
411  	      }
412  	    }
413  	    offset = it->first + it->second->length();
414  	  }
415  	}
416  	
417  	/*
418  	 * map a range of extents on an object's buffer cache.
419  	 * - combine any bh's we're writing into one
420  	 * - break up bufferheads that don't fall completely within the range
421  	 * //no! - return a bh that includes the write.  may also include
422  	 * other dirty data to left and/or right.
423  	 */
424  	ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
425  								  ceph_tid_t tid)
426  	{
427  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
428  	  BufferHead *final = 0;
429  	
430  	  ldout(oc->cct, 10) << "map_write oex " << ex.oid
431  	      	       << " " << ex.offset << "~" << ex.length << dendl;
432  	
433  	  loff_t cur = ex.offset;
434  	  loff_t left = ex.length;
435  	
436  	  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(ex.offset);
437  	  while (left > 0) {
438  	    loff_t max = left;
439  	
440  	    // at end ?
441  	    if (p == data.end()) {
442  	      if (final == NULL) {
443  	        final = new BufferHead(this);
444  	        replace_journal_tid(final, tid);
445  	        final->set_start( cur );
446  	        final->set_length( max );
447  	        oc->bh_add(this, final);
448  	        ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
449  	      } else {
450  	        oc->bh_stat_sub(final);
451  	        final->set_length(final->length() + max);
452  	        oc->bh_stat_add(final);
453  	      }
454  	      left -= max;
455  	      cur += max;
456  	      continue;
457  	    }
458  	
459  	    ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
460  	    //oc->verify_stats();
461  	
462  	    if (p->first <= cur) {
463  	      BufferHead *bh = p->second;
464  	      ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
465  	
466  	      if (p->first < cur) {
467  	        ceph_assert(final == 0);
468  	        if (cur + max >= bh->end()) {
469  	          // we want right bit (one splice)
470  	          final = split(bh, cur);   // just split it, take right half.
471  	          maybe_rebuild_buffer(bh);
472  	          replace_journal_tid(final, tid);
473  	          ++p;
474  	          ceph_assert(p->second == final);
475  	        } else {
476  	          // we want middle bit (two splices)
477  	          final = split(bh, cur);
478  	          maybe_rebuild_buffer(bh);
479  	          ++p;
480  	          ceph_assert(p->second == final);
481  	          auto right = split(final, cur+max);
482  	          maybe_rebuild_buffer(right);
483  	          replace_journal_tid(final, tid);
484  	        }
485  	      } else {
486  	        ceph_assert(p->first == cur);
487  	        if (bh->length() <= max) {
488  	          // whole bufferhead, piece of cake.
489  	        } else {
490  	          // we want left bit (one splice)
491  	          auto right = split(bh, cur + max);        // just split
492  	          maybe_rebuild_buffer(right);
493  	        }
494  	        if (final) {
495  	          oc->mark_dirty(bh);
496  	          oc->mark_dirty(final);
497  	          --p;  // move iterator back to final
498  	          ceph_assert(p->second == final);
499  	          replace_journal_tid(bh, tid);
500  	          merge_left(final, bh);
501  	        } else {
502  	          final = bh;
503  	          replace_journal_tid(final, tid);
504  	        }
505  	      }
506  	
507  	      // keep going.
508  	      loff_t lenfromcur = final->end() - cur;
509  	      cur += lenfromcur;
510  	      left -= lenfromcur;
511  	      ++p;
512  	      continue;
513  	    } else {
514  	      // gap!
515  	      loff_t next = p->first;
516  	      loff_t glen = std::min(next - cur, max);
517  	      ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
518  	      if (final) {
519  	        oc->bh_stat_sub(final);
520  	        final->set_length(final->length() + glen);
521  	        oc->bh_stat_add(final);
522  	      } else {
523  	        final = new BufferHead(this);
524  		replace_journal_tid(final, tid);
525  	        final->set_start( cur );
526  	        final->set_length( glen );
527  	        oc->bh_add(this, final);
528  	      }
529  	
530  	      cur += glen;
531  	      left -= glen;
532  	      continue;    // more?
533  	    }
534  	  }
535  	
536  	  // set version
537  	  ceph_assert(final);
538  	  ceph_assert(final->get_journal_tid() == tid);
539  	  ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
540  	
541  	  return final;
542  	}
543  	
544  	void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
545  						       ceph_tid_t tid) {
546  	  ceph_tid_t bh_tid = bh->get_journal_tid();
547  	
548  	  ceph_assert(tid == 0 || bh_tid <= tid);
549  	  if (bh_tid != 0 && bh_tid != tid) {
550  	    // inform journal that it should not expect a writeback from this extent
551  	    oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
552  						   bh->length(), bh_tid, tid);
553  	  }
554  	  bh->set_journal_tid(tid);
555  	}
556  	
557  	void ObjectCacher::Object::truncate(loff_t s)
558  	{
559  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
560  	  ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
561  	
562  	  while (!data.empty()) {
563  	    BufferHead *bh = data.rbegin()->second;
564  	    if (bh->end() <= s)
565  	      break;
566  	
567  	    // split bh at truncation point?
568  	    if (bh->start() < s) {
569  	      split(bh, s);
570  	      maybe_rebuild_buffer(bh);
571  	      continue;
572  	    }
573  	
574  	    // remove bh entirely
575  	    ceph_assert(bh->start() >= s);
576  	    ceph_assert(bh->waitfor_read.empty());
577  	    replace_journal_tid(bh, 0);
578  	    oc->bh_remove(this, bh);
579  	    delete bh;
580  	  }
581  	}
582  	
583  	void ObjectCacher::Object::discard(loff_t off, loff_t len,
584  	                                   C_GatherBuilder* commit_gather)
585  	{
586  	  ceph_assert(ceph_mutex_is_locked(oc->lock));
587  	  ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
588  			     << dendl;
589  	
590  	  if (!exists) {
591  	    ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
592  	    exists = true;
593  	  }
594  	  if (complete) {
595  	    ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
596  	    complete = false;
597  	  }
598  	
599  	  map<loff_t, BufferHead*>::const_iterator p = data_lower_bound(off);
600  	  while (p != data.end()) {
601  	    BufferHead *bh = p->second;
602  	    if (bh->start() >= off + len)
603  	      break;
604  	
605  	    // split bh at truncation point?
606  	    if (bh->start() < off) {
607  	      split(bh, off);
608  	      maybe_rebuild_buffer(bh);
609  	      ++p;
610  	      continue;
611  	    }
612  	
613  	    ceph_assert(bh->start() >= off);
614  	    if (bh->end() > off + len) {
615  	      auto right = split(bh, off + len);
616  	      maybe_rebuild_buffer(right);
617  	    }
618  	
619  	    ++p;
620  	    ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
621  	    replace_journal_tid(bh, 0);
622  	
623  	    if (bh->is_tx() && commit_gather != nullptr) {
624  	      // wait for the writeback to commit
625  	      waitfor_commit[bh->last_write_tid].emplace_back(commit_gather->new_sub());
626  	    } else if (bh->is_rx()) {
627  	      // cannot remove bh with in-flight read, but we can ensure the
628  	      // read won't overwrite the discard
629  	      bh->last_read_tid = ++oc->last_read_tid;
630  	      bh->bl.clear();
631  	      bh->set_nocache(true);
632  	      oc->mark_zero(bh);
633  	      // we should mark all Rx bh to zero
634  	      continue;
635  	    } else {
636  	      ceph_assert(bh->waitfor_read.empty());
637  	    }
638  	
639  	    oc->bh_remove(this, bh);
640  	    delete bh;
641  	  }
642  	}
643  	
644  	
645  	
646  	/*** ObjectCacher ***/
647  	
648  	#undef dout_prefix
649  	#define dout_prefix *_dout << "objectcacher "
650  	
651  	
652  	ObjectCacher::ObjectCacher(CephContext *cct_, string name,
653  				   WritebackHandler& wb, ceph::mutex& l,
654  				   flush_set_callback_t flush_callback,
655  				   void *flush_callback_arg, uint64_t max_bytes,
656  				   uint64_t max_objects, uint64_t max_dirty,
657  				   uint64_t target_dirty, double max_dirty_age,
658  				   bool block_writes_upfront)
659  	  : perfcounter(NULL),
660  	    cct(cct_), writeback_handler(wb), name(name), lock(l),
661  	    max_dirty(max_dirty), target_dirty(target_dirty),
662  	    max_size(max_bytes), max_objects(max_objects),
663  	    max_dirty_age(ceph::make_timespan(max_dirty_age)),
664  	    block_writes_upfront(block_writes_upfront),
665  	    trace_endpoint("ObjectCacher"),
666  	    flush_set_callback(flush_callback),
667  	    flush_set_callback_arg(flush_callback_arg),
668  	    last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
669  	    stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
670  	    stat_missing(0), stat_error(0), stat_dirty_waiting(0),
671  	    stat_nr_dirty_waiters(0), reads_outstanding(0)
672  	{
673  	  perf_start();
674  	  finisher.start();
675  	  scattered_write = writeback_handler.can_scattered_write();
676  	}
677  	
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
678  	ObjectCacher::~ObjectCacher()
679  	{
680  	  finisher.stop();
681  	  perf_stop();
682  	  // we should be empty.
683  	  for (vector<ceph::unordered_map<sobject_t, Object *> >::iterator i
684  		 = objects.begin();
685  	       i != objects.end();
686  	       ++i)
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
687  	    ceph_assert(i->empty());
688  	  ceph_assert(bh_lru_rest.lru_get_size() == 0);
689  	  ceph_assert(bh_lru_dirty.lru_get_size() == 0);
690  	  ceph_assert(ob_lru.lru_get_size() == 0);
691  	  ceph_assert(dirty_or_tx_bh.empty());
692  	}
693  	
694  	void ObjectCacher::perf_start()
695  	{
696  	  string n = "objectcacher-" + name;
697  	  PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
698  	
699  	  plb.add_u64_counter(l_objectcacher_cache_ops_hit,
700  			      "cache_ops_hit", "Hit operations");
701  	  plb.add_u64_counter(l_objectcacher_cache_ops_miss,
702  			      "cache_ops_miss", "Miss operations");
703  	  plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
704  			      "cache_bytes_hit", "Hit data", NULL, 0, unit_t(UNIT_BYTES));
705  	  plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
706  			      "cache_bytes_miss", "Miss data", NULL, 0, unit_t(UNIT_BYTES));
707  	  plb.add_u64_counter(l_objectcacher_data_read,
708  			      "data_read", "Read data");
709  	  plb.add_u64_counter(l_objectcacher_data_written,
710  			      "data_written", "Data written to cache");
711  	  plb.add_u64_counter(l_objectcacher_data_flushed,
712  			      "data_flushed", "Data flushed");
713  	  plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
714  			      "data_overwritten_while_flushing",
715  			      "Data overwritten while flushing");
716  	  plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
717  			      "Write operations, delayed due to dirty limits");
718  	  plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
719  			      "write_bytes_blocked",
720  			      "Write data blocked on dirty limit", NULL, 0, unit_t(UNIT_BYTES));
721  	  plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
722  		       "Time spent blocking a write due to dirty limits");
723  	
724  	  perfcounter = plb.create_perf_counters();
725  	  cct->get_perfcounters_collection()->add(perfcounter);
726  	}
727  	
728  	void ObjectCacher::perf_stop()
729  	{
730  	  ceph_assert(perfcounter);
731  	  cct->get_perfcounters_collection()->remove(perfcounter);
732  	  delete perfcounter;
733  	}
734  	
735  	/* private */
736  	ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
737  						       uint64_t object_no,
738  						       ObjectSet *oset,
739  						       object_locator_t &l,
740  						       uint64_t truncate_size,
741  						       uint64_t truncate_seq)
742  	{
743  	  // XXX: Add handling of nspace in object_locator_t in cache
744  	  ceph_assert(ceph_mutex_is_locked(lock));
745  	  // have it?
746  	  if ((uint32_t)l.pool < objects.size()) {
747  	    if (objects[l.pool].count(oid)) {
748  	      Object *o = objects[l.pool][oid];
749  	      o->object_no = object_no;
750  	      o->truncate_size = truncate_size;
751  	      o->truncate_seq = truncate_seq;
752  	      return o;
753  	    }
754  	  } else {
755  	    objects.resize(l.pool+1);
756  	  }
757  	
758  	  // create it.
759  	  Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
760  				 truncate_seq);
761  	  objects[l.pool][oid] = o;
762  	  ob_lru.lru_insert_top(o);
763  	  return o;
764  	}
765  	
766  	void ObjectCacher::close_object(Object *ob)
767  	{
768  	  ceph_assert(ceph_mutex_is_locked(lock));
769  	  ldout(cct, 10) << "close_object " << *ob << dendl;
770  	  ceph_assert(ob->can_close());
771  	
772  	  // ok!
773  	  ob_lru.lru_remove(ob);
774  	  objects[ob->oloc.pool].erase(ob->get_soid());
775  	  ob->set_item.remove_myself();
776  	  delete ob;
777  	}
778  	
779  	void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
780  	                           const ZTracer::Trace &parent_trace)
781  	{
782  	  ceph_assert(ceph_mutex_is_locked(lock));
783  	  ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
784  			<< reads_outstanding << dendl;
785  	
786  	  ZTracer::Trace trace;
787  	  if (parent_trace.valid()) {
788  	    trace.init("", &trace_endpoint, &parent_trace);
789  	    trace.copy_name("bh_read " + bh->ob->get_oid().name);
790  	    trace.event("start");
791  	  }
792  	
793  	  mark_rx(bh);
794  	  bh->last_read_tid = ++last_read_tid;
795  	
796  	  // finisher
797  	  C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
798  						    bh->start(), bh->length(), trace);
799  	  // go
800  	  writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
801  				 bh->ob->get_oloc(), bh->start(), bh->length(),
802  				 bh->ob->get_snap(), &onfinish->bl,
803  				 bh->ob->truncate_size, bh->ob->truncate_seq,
804  				 op_flags, trace, onfinish);
805  	
806  	  ++reads_outstanding;
807  	}
808  	
809  	void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
810  					  ceph_tid_t tid, loff_t start,
811  					  uint64_t length, bufferlist &bl, int r,
812  					  bool trust_enoent)
813  	{
814  	  ceph_assert(ceph_mutex_is_locked(lock));
815  	  ldout(cct, 7) << "bh_read_finish "
816  			<< oid
817  			<< " tid " << tid
818  			<< " " << start << "~" << length
819  			<< " (bl is " << bl.length() << ")"
820  			<< " returned " << r
821  			<< " outstanding reads " << reads_outstanding
822  			<< dendl;
823  	
824  	  if (r >= 0 && bl.length() < length) {
825  	    ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
826  			  << length << " with " << length - bl.length() << " bytes of zeroes"
827  			  << dendl;
828  	    bl.append_zero(length - bl.length());
829  	  }
830  	
831  	  list<Context*> ls;
832  	  int err = 0;
833  	
834  	  if (objects[poolid].count(oid) == 0) {
835  	    ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
836  	  } else {
837  	    Object *ob = objects[poolid][oid];
838  	
839  	    if (r == -ENOENT && !ob->complete) {
840  	      // wake up *all* rx waiters, or else we risk reordering
841  	      // identical reads. e.g.
842  	      //   read 1~1
843  	      //   reply to unrelated 3~1 -> !exists
844  	      //   read 1~1 -> immediate ENOENT
845  	      //   reply to first 1~1 -> ooo ENOENT
846  	      bool allzero = true;
847  	      for (map<loff_t, BufferHead*>::iterator p = ob->data.begin();
848  		   p != ob->data.end(); ++p) {
849  		BufferHead *bh = p->second;
850  		for (map<loff_t, list<Context*> >::iterator p
851  		       = bh->waitfor_read.begin();
852  		     p != bh->waitfor_read.end();
853  		     ++p)
854  		  ls.splice(ls.end(), p->second);
855  		bh->waitfor_read.clear();
856  		if (!bh->is_zero() && !bh->is_rx())
857  		  allzero = false;
858  	      }
859  	
860  	      // just pass through and retry all waiters if we don't trust
861  	      // -ENOENT for this read
862  	      if (trust_enoent) {
863  		ldout(cct, 7)
864  		  << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
865  		  << dendl;
866  		ob->complete = true;
867  		ob->exists = false;
868  	
869  		/* If all the bhs are effectively zero, get rid of them.  All
870  		 * the waiters will be retried and get -ENOENT immediately, so
871  		 * it's safe to clean up the unneeded bh's now. Since we know
872  		 * it's safe to remove them now, do so, so they aren't hanging
873  		 *around waiting for more -ENOENTs from rados while the cache
874  		 * is being shut down.
875  		 *
876  		 * Only do this when all the bhs are rx or clean, to match the
877  		 * condition in _readx(). If there are any non-rx or non-clean
878  		 * bhs, _readx() will wait for the final result instead of
879  		 * returning -ENOENT immediately.
880  		 */
881  		if (allzero) {
882  		  ldout(cct, 10)
883  		    << "bh_read_finish ENOENT and allzero, getting rid of "
884  		    << "bhs for " << *ob << dendl;
885  		  map<loff_t, BufferHead*>::iterator p = ob->data.begin();
886  		  while (p != ob->data.end()) {
887  		    BufferHead *bh = p->second;
888  		    // current iterator will be invalidated by bh_remove()
889  		    ++p;
890  		    bh_remove(ob, bh);
891  		    delete bh;
892  		  }
893  		}
894  	      }
895  	    }
896  	
897  	    // apply to bh's!
898  	    loff_t opos = start;
899  	    while (true) {
900  	      map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(opos);
901  	      if (p == ob->data.end())
902  		break;
903  	      if (opos >= start+(loff_t)length) {
904  		ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
905  			       << start << "+" << length << "=" << start+(loff_t)length
906  			       << dendl;
907  		break;
908  	      }
909  	
910  	      BufferHead *bh = p->second;
911  	      ldout(cct, 20) << "checking bh " << *bh << dendl;
912  	
913  	      // finishers?
914  	      for (map<loff_t, list<Context*> >::iterator it
915  		     = bh->waitfor_read.begin();
916  		   it != bh->waitfor_read.end();
917  		   ++it)
918  		ls.splice(ls.end(), it->second);
919  	      bh->waitfor_read.clear();
920  	
921  	      if (bh->start() > opos) {
922  		ldout(cct, 1) << "bh_read_finish skipping gap "
923  			      << opos << "~" << bh->start() - opos
924  			      << dendl;
925  		opos = bh->start();
926  		continue;
927  	      }
928  	
929  	      if (!bh->is_rx()) {
930  		ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
931  		opos = bh->end();
932  		continue;
933  	      }
934  	
935  	      if (bh->last_read_tid != tid) {
936  		ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
937  			       << bh->last_read_tid << " != tid " << tid
938  			       << ", skipping" << dendl;
939  		opos = bh->end();
940  		continue;
941  	      }
942  	
943  	      ceph_assert(opos >= bh->start());
944  	      ceph_assert(bh->start() == opos);   // we don't merge rx bh's... yet!
945  	      ceph_assert(bh->length() <= start+(loff_t)length-opos);
946  	
947  	      if (bh->error < 0)
948  		err = bh->error;
949  	
950  	      opos = bh->end();
951  	
952  	      if (r == -ENOENT) {
953  		if (trust_enoent) {
954  		  ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
955  		  bh_remove(ob, bh);
956  		  delete bh;
957  		} else {
958  		  ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
959  				 << *bh << dendl;
960  		}
961  		continue;
962  	      }
963  	
964  	      if (r < 0) {
965  		bh->error = r;
966  		mark_error(bh);
967  	      } else {
968  		bh->bl.substr_of(bl,
969  				 bh->start() - start,
970  				 bh->length());
971  		mark_clean(bh);
972  	      }
973  	
974  	      ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
975  	
976  	      ob->try_merge_bh(bh);
977  	    }
978  	  }
979  	
980  	  // called with lock held.
981  	  ldout(cct, 20) << "finishing waiters " << ls << dendl;
982  	
983  	  finish_contexts(cct, ls, err);
984  	  retry_waiting_reads();
985  	
986  	  --reads_outstanding;
987  	  read_cond.notify_all();
988  	}
989  	
990  	void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
991  						int64_t *max_amount, int *max_count)
992  	{
993  	  list<BufferHead*> blist;
994  	
995  	  int count = 0;
996  	  int64_t total_len = 0;
997  	  set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
998  	  ceph_assert(it != dirty_or_tx_bh.end());
999  	  for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
1000 	       p != dirty_or_tx_bh.end();
1001 	       ++p) {
1002 	    BufferHead *obh = *p;
1003 	    if (obh->ob != bh->ob)
1004 	      break;
1005 	    if (obh->is_dirty() && obh->last_write <= cutoff) {
1006 	      blist.push_back(obh);
1007 	      ++count;
1008 	      total_len += obh->length();
1009 	      if ((max_count && count > *max_count) ||
1010 		  (max_amount && total_len > *max_amount))
1011 		break;
1012 	    }
1013 	  }
1014 	
1015 	  while (it != dirty_or_tx_bh.begin()) {
1016 	    --it;
1017 	    BufferHead *obh = *it;
1018 	    if (obh->ob != bh->ob)
1019 	      break;
1020 	    if (obh->is_dirty() && obh->last_write <= cutoff) {
1021 	      blist.push_front(obh);
1022 	      ++count;
1023 	      total_len += obh->length();
1024 	      if ((max_count && count > *max_count) ||
1025 		  (max_amount && total_len > *max_amount))
1026 		break;
1027 	    }
1028 	  }
1029 	  if (max_count)
1030 	    *max_count -= count;
1031 	  if (max_amount)
1032 	    *max_amount -= total_len;
1033 	
1034 	  bh_write_scattered(blist);
1035 	}
1036 	
1037 	class ObjectCacher::C_WriteCommit : public Context {
1038 	  ObjectCacher *oc;
1039 	  int64_t poolid;
1040 	  sobject_t oid;
1041 	  vector<pair<loff_t, uint64_t> > ranges;
1042 	  ZTracer::Trace trace;
1043 	public:
1044 	  ceph_tid_t tid = 0;
1045 	  C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
1046 			uint64_t l, const ZTracer::Trace &trace) :
1047 	    oc(c), poolid(_poolid), oid(o), trace(trace) {
1048 	      ranges.push_back(make_pair(s, l));
1049 	    }
1050 	  C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
1051 			vector<pair<loff_t, uint64_t> >& _ranges) :
1052 	    oc(c), poolid(_poolid), oid(o), tid(0) {
1053 	      ranges.swap(_ranges);
1054 	    }
1055 	  void finish(int r) override {
1056 	    oc->bh_write_commit(poolid, oid, ranges, tid, r);
1057 	    trace.event("finish");
1058 	  }
1059 	};
1060 	void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
1061 	{
1062 	  ceph_assert(ceph_mutex_is_locked(lock));
1063 	
1064 	  Object *ob = blist.front()->ob;
1065 	  ob->get();
1066 	
1067 	  ceph::real_time last_write;
1068 	  SnapContext snapc;
1069 	  vector<pair<loff_t, uint64_t> > ranges;
1070 	  vector<pair<uint64_t, bufferlist> > io_vec;
1071 	
1072 	  ranges.reserve(blist.size());
1073 	  io_vec.reserve(blist.size());
1074 	
1075 	  uint64_t total_len = 0;
1076 	  for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1077 	    BufferHead *bh = *p;
1078 	    ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
1079 	    ceph_assert(bh->ob == ob);
1080 	    ceph_assert(bh->bl.length() == bh->length());
1081 	    ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
1082 	
1083 	    int n = io_vec.size();
1084 	    io_vec.resize(n + 1);
1085 	    io_vec[n].first = bh->start();
1086 	    io_vec[n].second = bh->bl;
1087 	
1088 	    total_len += bh->length();
1089 	    if (bh->snapc.seq > snapc.seq)
1090 	      snapc = bh->snapc;
1091 	    if (bh->last_write > last_write)
1092 	      last_write = bh->last_write;
1093 	  }
1094 	
1095 	  C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
1096 	
1097 	  ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
1098 						   io_vec, snapc, last_write,
1099 						   ob->truncate_size, ob->truncate_seq,
1100 						   oncommit);
1101 	  oncommit->tid = tid;
1102 	  ob->last_write_tid = tid;
1103 	  for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
1104 	    BufferHead *bh = *p;
1105 	    bh->last_write_tid = tid;
1106 	    mark_tx(bh);
1107 	  }
1108 	
1109 	  if (perfcounter)
1110 	    perfcounter->inc(l_objectcacher_data_flushed, total_len);
1111 	}
1112 	
1113 	void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
1114 	{
1115 	  ceph_assert(ceph_mutex_is_locked(lock));
1116 	  ldout(cct, 7) << "bh_write " << *bh << dendl;
1117 	
1118 	  bh->ob->get();
1119 	
1120 	  ZTracer::Trace trace;
1121 	  if (parent_trace.valid()) {
1122 	    trace.init("", &trace_endpoint, &parent_trace);
1123 	    trace.copy_name("bh_write " + bh->ob->get_oid().name);
1124 	    trace.event("start");
1125 	  }
1126 	
1127 	  // finishers
1128 	  C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
1129 						      bh->ob->get_soid(), bh->start(),
1130 						      bh->length(), trace);
1131 	  // go
1132 	  ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
1133 						   bh->ob->get_oloc(),
1134 						   bh->start(), bh->length(),
1135 						   bh->snapc, bh->bl, bh->last_write,
1136 						   bh->ob->truncate_size,
1137 						   bh->ob->truncate_seq,
1138 						   bh->journal_tid, trace, oncommit);
1139 	  ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
1140 	
1141 	  // set bh last_write_tid
1142 	  oncommit->tid = tid;
1143 	  bh->ob->last_write_tid = tid;
1144 	  bh->last_write_tid = tid;
1145 	
1146 	  if (perfcounter) {
1147 	    perfcounter->inc(l_objectcacher_data_flushed, bh->length());
1148 	  }
1149 	
1150 	  mark_tx(bh);
1151 	}
1152 	
1153 	void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
1154 					   vector<pair<loff_t, uint64_t> >& ranges,
1155 					   ceph_tid_t tid, int r)
1156 	{
1157 	  ceph_assert(ceph_mutex_is_locked(lock));
1158 	  ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
1159 			<< " ranges " << ranges << " returned " << r << dendl;
1160 	
1161 	  if (objects[poolid].count(oid) == 0) {
1162 	    ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
1163 	    return;
1164 	  }
1165 	
1166 	  Object *ob = objects[poolid][oid];
1167 	  int was_dirty_or_tx = ob->oset->dirty_or_tx;
1168 	
1169 	  for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
1170 	       p != ranges.end();
1171 	       ++p) {
1172 	    loff_t start = p->first;
1173 	    uint64_t length = p->second;
1174 	    if (!ob->exists) {
1175 	      ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
1176 	      ob->exists = true;
1177 	
1178 	      if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
1179 						      ob->get_snap())) {
1180 		ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
1181 		  "complete on " << *ob << dendl;
1182 		ob->complete = false;
1183 	      }
1184 	    }
1185 	
1186 	    vector<pair<loff_t, BufferHead*>> hit;
1187 	    // apply to bh's!
1188 	    for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
1189 		 p != ob->data.end();
1190 		 ++p) {
1191 	      BufferHead *bh = p->second;
1192 	
1193 	      if (bh->start() >= start+(loff_t)length)
1194 		break;
1195 	
1196 	      // make sure bh is tx
1197 	      if (!bh->is_tx()) {
1198 		ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
1199 		continue;
1200 	      }
1201 	
1202 	      // make sure bh tid matches
1203 	      if (bh->last_write_tid != tid) {
1204 		ceph_assert(bh->last_write_tid > tid);
1205 		ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
1206 		continue;
1207 	      }
1208 	
1209 	      // we don't merge tx buffers. tx buffer should be within the range
1210 	      ceph_assert(bh->start() >= start);
1211 	      ceph_assert(bh->end() <= start+(loff_t)length);
1212 	
1213 	      if (r >= 0) {
1214 		// ok!  mark bh clean and error-free
1215 		mark_clean(bh);
1216 		bh->set_journal_tid(0);
1217 		if (bh->get_nocache())
1218 		  bh_lru_rest.lru_bottouch(bh);
1219 		hit.push_back(make_pair(bh->start(), bh));
1220 		ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
1221 	      } else {
1222 		mark_dirty(bh);
1223 		ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
1224 			       << *bh << " r = " << r << " " << cpp_strerror(-r)
1225 			       << dendl;
1226 	      }
1227 	    }
1228 	
1229 	    for (auto& p : hit) {
1230 	      //p.second maybe merged and deleted in merge_left
1231 	      if (ob->data.count(p.first))
1232 		ob->try_merge_bh(p.second);
1233 	    }
1234 	  }
1235 	
1236 	  // update last_commit.
1237 	  ceph_assert(ob->last_commit_tid < tid);
1238 	  ob->last_commit_tid = tid;
1239 	
1240 	  // waiters?
1241 	  list<Context*> ls;
1242 	  if (ob->waitfor_commit.count(tid)) {
1243 	    ls.splice(ls.begin(), ob->waitfor_commit[tid]);
1244 	    ob->waitfor_commit.erase(tid);
1245 	  }
1246 	
1247 	  // is the entire object set now clean and fully committed?
1248 	  ObjectSet *oset = ob->oset;
1249 	  ob->put();
1250 	
1251 	  if (flush_set_callback &&
1252 	      was_dirty_or_tx > 0 &&
1253 	      oset->dirty_or_tx == 0) {        // nothing dirty/tx
1254 	    flush_set_callback(flush_set_callback_arg, oset);
1255 	  }
1256 	
1257 	  if (!ls.empty())
1258 	    finish_contexts(cct, ls, r);
1259 	}
1260 	
1261 	void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
1262 	{
1263 	  ceph_assert(trace != nullptr);
1264 	  ceph_assert(ceph_mutex_is_locked(lock));
1265 	  ceph::real_time cutoff = ceph::real_clock::now();
1266 	
1267 	  ldout(cct, 10) << "flush " << amount << dendl;
1268 	
1269 	  /*
1270 	   * NOTE: we aren't actually pulling things off the LRU here, just
1271 	   * looking at the tail item.  Then we call bh_write, which moves it
1272 	   * to the other LRU, so that we can call
1273 	   * lru_dirty.lru_get_next_expire() again.
1274 	   */
1275 	  int64_t left = amount;
1276 	  while (amount == 0 || left > 0) {
1277 	    BufferHead *bh = static_cast<BufferHead*>(
1278 	      bh_lru_dirty.lru_get_next_expire());
1279 	    if (!bh) break;
1280 	    if (bh->last_write > cutoff) break;
1281 	
1282 	    if (scattered_write) {
1283 	      bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
1284 	    } else {
1285 	      left -= bh->length();
1286 	      bh_write(bh, *trace);
1287 	    }
1288 	  }
1289 	}
1290 	
1291 	
1292 	void ObjectCacher::trim()
1293 	{
1294 	  ceph_assert(ceph_mutex_is_locked(lock));
1295 	  ldout(cct, 10) << "trim  start: bytes: max " << max_size << "  clean "
1296 			 << get_stat_clean() << ", objects: max " << max_objects
1297 			 << " current " << ob_lru.lru_get_size() << dendl;
1298 	
1299 	  uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
1300 	  uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
1301 	  while (get_stat_clean() > 0 &&
1302 		 ((uint64_t)get_stat_clean() > max_size ||
1303 		  nr_clean_bh > max_clean_bh)) {
1304 	    BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
1305 	    if (!bh)
1306 	      break;
1307 	
1308 	    ldout(cct, 10) << "trim trimming " << *bh << dendl;
1309 	    ceph_assert(bh->is_clean() || bh->is_zero() || bh->is_error());
1310 	
1311 	    Object *ob = bh->ob;
1312 	    bh_remove(ob, bh);
1313 	    delete bh;
1314 	
1315 	    --nr_clean_bh;
1316 	
1317 	    if (ob->complete) {
1318 	      ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
1319 	      ob->complete = false;
1320 	    }
1321 	  }
1322 	
1323 	  while (ob_lru.lru_get_size() > max_objects) {
1324 	    Object *ob = static_cast<Object*>(ob_lru.lru_expire());
1325 	    if (!ob)
1326 	      break;
1327 	
1328 	    ldout(cct, 10) << "trim trimming " << *ob << dendl;
1329 	    close_object(ob);
1330 	  }
1331 	
1332 	  ldout(cct, 10) << "trim finish:  max " << max_size << "  clean "
1333 			 << get_stat_clean() << ", objects: max " << max_objects
1334 			 << " current " << ob_lru.lru_get_size() << dendl;
1335 	}
1336 	
1337 	
1338 	
1339 	/* public */
1340 	
1341 	bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
1342 				     snapid_t snapid)
1343 	{
1344 	  ceph_assert(ceph_mutex_is_locked(lock));
1345 	  for (vector<ObjectExtent>::iterator ex_it = extents.begin();
1346 	       ex_it != extents.end();
1347 	       ++ex_it) {
1348 	    ldout(cct, 10) << "is_cached " << *ex_it << dendl;
1349 	
1350 	    // get Object cache
1351 	    sobject_t soid(ex_it->oid, snapid);
1352 	    Object *o = get_object_maybe(soid, ex_it->oloc);
1353 	    if (!o)
1354 	      return false;
1355 	    if (!o->is_cached(ex_it->offset, ex_it->length))
1356 	      return false;
1357 	  }
1358 	  return true;
1359 	}
1360 	
1361 	
1362 	/*
1363 	 * returns # bytes read (if in cache).  onfinish is untouched (caller
1364 	 *           must delete it)
1365 	 * returns 0 if doing async read
1366 	 */
1367 	int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1368 				ZTracer::Trace *parent_trace)
1369 	{
1370 	  ZTracer::Trace trace;
1371 	  if (parent_trace != nullptr) {
1372 	    trace.init("read", &trace_endpoint, parent_trace);
1373 	    trace.event("start");
1374 	  }
1375 	
1376 	  int r =_readx(rd, oset, onfinish, true, &trace);
1377 	  if (r < 0) {
1378 	    trace.event("finish");
1379 	  }
1380 	  return r;
1381 	}
1382 	
1383 	int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
1384 				 bool external_call, ZTracer::Trace *trace)
1385 	{
1386 	  ceph_assert(trace != nullptr);
1387 	  ceph_assert(ceph_mutex_is_locked(lock));
1388 	  bool success = true;
1389 	  int error = 0;
1390 	  uint64_t bytes_in_cache = 0;
1391 	  uint64_t bytes_not_in_cache = 0;
1392 	  uint64_t total_bytes_read = 0;
1393 	  map<uint64_t, bufferlist> stripe_map;  // final buffer offset -> substring
1394 	  bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1395 	  bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1396 	
1397 	  /*
1398 	   * WARNING: we can only meaningfully return ENOENT if the read request
1399 	   * passed in a single ObjectExtent.  Any caller who wants ENOENT instead of
1400 	   * zeroed buffers needs to feed single extents into readx().
1401 	   */
1402 	  ceph_assert(!oset->return_enoent || rd->extents.size() == 1);
1403 	
1404 	  for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
1405 	       ex_it != rd->extents.end();
1406 	       ++ex_it) {
1407 	    ldout(cct, 10) << "readx " << *ex_it << dendl;
1408 	
1409 	    total_bytes_read += ex_it->length;
1410 	
1411 	    // get Object cache
1412 	    sobject_t soid(ex_it->oid, rd->snap);
1413 	    Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1414 				   ex_it->truncate_size, oset->truncate_seq);
1415 	    if (external_call)
1416 	      touch_ob(o);
1417 	
1418 	    // does not exist and no hits?
1419 	    if (oset->return_enoent && !o->exists) {
1420 	      ldout(cct, 10) << "readx  object !exists, 1 extent..." << dendl;
1421 	
1422 	      // should we worry about COW underneath us?
1423 	      if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
1424 						      ex_it->length, soid.snap)) {
1425 		ldout(cct, 20) << "readx  may copy on write" << dendl;
1426 		bool wait = false;
1427 		list<BufferHead*> blist;
1428 		for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1429 		     bh_it != o->data.end();
1430 		     ++bh_it) {
1431 		  BufferHead *bh = bh_it->second;
1432 		  if (bh->is_dirty() || bh->is_tx()) {
1433 		    ldout(cct, 10) << "readx  flushing " << *bh << dendl;
1434 		    wait = true;
1435 		    if (bh->is_dirty()) {
1436 		      if (scattered_write)
1437 			blist.push_back(bh);
1438 		      else
1439 			bh_write(bh, *trace);
1440 		    }
1441 		  }
1442 		}
1443 		if (scattered_write && !blist.empty())
1444 		  bh_write_scattered(blist);
1445 		if (wait) {
1446 		  ldout(cct, 10) << "readx  waiting on tid " << o->last_write_tid
1447 				 << " on " << *o << dendl;
1448 		  o->waitfor_commit[o->last_write_tid].push_back(
1449 		    new C_RetryRead(this,rd, oset, onfinish, *trace));
1450 		  // FIXME: perfcounter!
1451 		  return 0;
1452 		}
1453 	      }
1454 	
1455 	      // can we return ENOENT?
1456 	      bool allzero = true;
1457 	      for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
1458 		   bh_it != o->data.end();
1459 		   ++bh_it) {
1460 		ldout(cct, 20) << "readx  ob has bh " << *bh_it->second << dendl;
1461 		if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
1462 		  allzero = false;
1463 		  break;
1464 		}
1465 	      }
1466 	      if (allzero) {
1467 		ldout(cct, 10) << "readx  ob has all zero|rx, returning ENOENT"
1468 			       << dendl;
1469 		delete rd;
1470 		if (dontneed)
1471 		  bottouch_ob(o);
1472 		return -ENOENT;
1473 	      }
1474 	    }
1475 	
1476 	    // map extent into bufferheads
1477 	    map<loff_t, BufferHead*> hits, missing, rx, errors;
1478 	    o->map_read(*ex_it, hits, missing, rx, errors);
1479 	    if (external_call) {
1480 	      // retry reading error buffers
1481 	      missing.insert(errors.begin(), errors.end());
1482 	    } else {
1483 	      // some reads had errors, fail later so completions
1484 	      // are cleaned up properly
1485 	      // TODO: make read path not call _readx for every completion
1486 	      hits.insert(errors.begin(), errors.end());
1487 	    }
1488 	
1489 	    if (!missing.empty() || !rx.empty()) {
1490 	      // read missing
1491 	      map<loff_t, BufferHead*>::iterator last = missing.end();
1492 	      for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
1493 		   bh_it != missing.end();
1494 		   ++bh_it) {
1495 		uint64_t rx_bytes = static_cast<uint64_t>(
1496 		  stat_rx + bh_it->second->length());
1497 		bytes_not_in_cache += bh_it->second->length();
1498 		if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
1499 		  // cache is full with concurrent reads -- wait for rx's to complete
1500 		  // to constrain memory growth (especially during copy-ups)
1501 		  if (success) {
1502 		    ldout(cct, 10) << "readx missed, waiting on cache to complete "
1503 				   << waitfor_read.size() << " blocked reads, "
1504 				   << (std::max(rx_bytes, max_size) - max_size)
1505 				   << " read bytes" << dendl;
1506 		    waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
1507 							   *trace));
1508 		  }
1509 	
1510 		  bh_remove(o, bh_it->second);
1511 		  delete bh_it->second;
1512 		} else {
1513 		  bh_it->second->set_nocache(nocache);
1514 		  bh_read(bh_it->second, rd->fadvise_flags, *trace);
1515 		  if ((success && onfinish) || last != missing.end())
1516 		    last = bh_it;
1517 		}
1518 		success = false;
1519 	      }
1520 	
1521 	      //add wait in last bh avoid wakeup early. Because read is order
1522 	      if (last != missing.end()) {
1523 		ldout(cct, 10) << "readx missed, waiting on " << *last->second
1524 		  << " off " << last->first << dendl;
1525 		last->second->waitfor_read[last->first].push_back(
1526 		  new C_RetryRead(this, rd, oset, onfinish, *trace) );
1527 	
1528 	      }
1529 	
1530 	      // bump rx
1531 	      for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
1532 		   bh_it != rx.end();
1533 		   ++bh_it) {
1534 		touch_bh(bh_it->second); // bump in lru, so we don't lose it.
1535 		if (success && onfinish) {
1536 		  ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
1537 				 << " off " << bh_it->first << dendl;
1538 		  bh_it->second->waitfor_read[bh_it->first].push_back(
1539 		    new C_RetryRead(this, rd, oset, onfinish, *trace) );
1540 		}
1541 		bytes_not_in_cache += bh_it->second->length();
1542 		success = false;
1543 	      }
1544 	
1545 	      for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1546 		   bh_it != hits.end();  ++bh_it)
1547 		//bump in lru, so we don't lose it when later read
1548 		touch_bh(bh_it->second);
1549 	
1550 	    } else {
1551 	      ceph_assert(!hits.empty());
1552 	
1553 	      // make a plain list
1554 	      for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1555 		   bh_it != hits.end();
1556 		   ++bh_it) {
1557 		BufferHead *bh = bh_it->second;
1558 		ldout(cct, 10) << "readx hit bh " << *bh << dendl;
1559 		if (bh->is_error() && bh->error)
1560 		  error = bh->error;
1561 		bytes_in_cache += bh->length();
1562 	
1563 		if (bh->get_nocache() && bh->is_clean())
1564 		  bh_lru_rest.lru_bottouch(bh);
1565 		else
1566 		  touch_bh(bh);
1567 		//must be after touch_bh because touch_bh set dontneed false
1568 		if (dontneed &&
1569 		    ((loff_t)ex_it->offset <= bh->start() &&
1570 		     (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
1571 		  bh->set_dontneed(true); //if dirty
1572 		  if (bh->is_clean())
1573 		    bh_lru_rest.lru_bottouch(bh);
1574 		}
1575 	      }
1576 	
1577 	      if (!error) {
1578 		// create reverse map of buffer offset -> object for the
1579 		// eventual result.  this is over a single ObjectExtent, so we
1580 		// know that
1581 		//  - the bh's are contiguous
1582 		//  - the buffer frags need not be (and almost certainly aren't)
1583 		loff_t opos = ex_it->offset;
1584 		map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
1585 		ceph_assert(bh_it->second->start() <= opos);
1586 		uint64_t bhoff = opos - bh_it->second->start();
1587 		vector<pair<uint64_t,uint64_t> >::iterator f_it
1588 		  = ex_it->buffer_extents.begin();
1589 		uint64_t foff = 0;
1590 		while (1) {
1591 		  BufferHead *bh = bh_it->second;
1592 		  ceph_assert(opos == (loff_t)(bh->start() + bhoff));
1593 	
1594 		  uint64_t len = std::min(f_it->second - foff, bh->length() - bhoff);
1595 		  ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
1596 				 << bhoff << " frag " << f_it->first << "~"
1597 				 << f_it->second << " +" << foff << "~" << len
1598 				 << dendl;
1599 	
1600 		  bufferlist bit;
1601 		  // put substr here first, since substr_of clobbers, and we
1602 		  // may get multiple bh's at this stripe_map position
1603 		  if (bh->is_zero()) {
1604 		    stripe_map[f_it->first].append_zero(len);
1605 		  } else {
1606 		    bit.substr_of(bh->bl,
1607 			opos - bh->start(),
1608 			len);
1609 		    stripe_map[f_it->first].claim_append(bit);
1610 		  }
1611 	
1612 		  opos += len;
1613 		  bhoff += len;
1614 		  foff += len;
1615 		  if (opos == bh->end()) {
1616 		    ++bh_it;
1617 		    bhoff = 0;
1618 		  }
1619 		  if (foff == f_it->second) {
1620 		    ++f_it;
1621 		    foff = 0;
1622 		  }
1623 		  if (bh_it == hits.end()) break;
1624 		  if (f_it == ex_it->buffer_extents.end())
1625 		    break;
1626 		}
1627 		ceph_assert(f_it == ex_it->buffer_extents.end());
1628 		ceph_assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
1629 	      }
1630 	
1631 	      if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
1632 		  bottouch_ob(o);
1633 	    }
1634 	  }
1635 	
1636 	  if (!success) {
1637 	    if (perfcounter && external_call) {
1638 	      perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1639 	      perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
1640 	      perfcounter->inc(l_objectcacher_cache_ops_miss);
1641 	    }
1642 	    if (onfinish) {
1643 	      ldout(cct, 20) << "readx defer " << rd << dendl;
1644 	    } else {
1645 	      ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
1646 			     << dendl;
1647 	      delete rd;
1648 	    }
1649 	    return 0;  // wait!
1650 	  }
1651 	  if (perfcounter && external_call) {
1652 	    perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
1653 	    perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
1654 	    perfcounter->inc(l_objectcacher_cache_ops_hit);
1655 	  }
1656 	
1657 	  // no misses... success!  do the read.
1658 	  ldout(cct, 10) << "readx has all buffers" << dendl;
1659 	
1660 	  // ok, assemble into result buffer.
1661 	  uint64_t pos = 0;
1662 	  if (rd->bl && !error) {
1663 	    rd->bl->clear();
1664 	    for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
1665 		 i != stripe_map.end();
1666 		 ++i) {
1667 	      ceph_assert(pos == i->first);
1668 	      ldout(cct, 10) << "readx  adding buffer len " << i->second.length()
1669 			     << " at " << pos << dendl;
1670 	      pos += i->second.length();
1671 	      rd->bl->claim_append(i->second);
1672 	      ceph_assert(rd->bl->length() == pos);
1673 	    }
1674 	    ldout(cct, 10) << "readx  result is " << rd->bl->length() << dendl;
1675 	  } else if (!error) {
1676 	    ldout(cct, 10) << "readx  no bufferlist ptr (readahead?), done." << dendl;
1677 	    map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
1678 	    pos = i->first + i->second.length();
1679 	  }
1680 	
1681 	  // done with read.
1682 	  int ret = error ? error : pos;
1683 	  ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
1684 	  ceph_assert(pos <= (uint64_t) INT_MAX);
1685 	
1686 	  delete rd;
1687 	
1688 	  trim();
1689 	
1690 	  return ret;
1691 	}
1692 	
1693 	void ObjectCacher::retry_waiting_reads()
1694 	{
1695 	  list<Context *> ls;
1696 	  ls.swap(waitfor_read);
1697 	
1698 	  while (!ls.empty() && waitfor_read.empty()) {
1699 	    Context *ctx = ls.front();
1700 	    ls.pop_front();
1701 	    ctx->complete(0);
1702 	  }
1703 	  waitfor_read.splice(waitfor_read.end(), ls);
1704 	}
1705 	
1706 	int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
1707 				 ZTracer::Trace *parent_trace)
1708 	{
1709 	  ceph_assert(ceph_mutex_is_locked(lock));
1710 	  ceph::real_time now = ceph::real_clock::now();
1711 	  uint64_t bytes_written = 0;
1712 	  uint64_t bytes_written_in_flush = 0;
1713 	  bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
1714 	  bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
1715 	
1716 	  ZTracer::Trace trace;
1717 	  if (parent_trace != nullptr) {
1718 	    trace.init("write", &trace_endpoint, parent_trace);
1719 	    trace.event("start");
1720 	  }
1721 	
1722 	  for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
1723 	       ex_it != wr->extents.end();
1724 	       ++ex_it) {
1725 	    // get object cache
1726 	    sobject_t soid(ex_it->oid, CEPH_NOSNAP);
1727 	    Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
1728 				   ex_it->truncate_size, oset->truncate_seq);
1729 	
1730 	    // map it all into a single bufferhead.
1731 	    BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
1732 	    bool missing = bh->is_missing();
1733 	    bh->snapc = wr->snapc;
1734 	
1735 	    bytes_written += ex_it->length;
1736 	    if (bh->is_tx()) {
1737 	      bytes_written_in_flush += ex_it->length;
1738 	    }
1739 	
1740 	    // adjust buffer pointers (ie "copy" data into my cache)
1741 	    // this is over a single ObjectExtent, so we know that
1742 	    //  - there is one contiguous bh
1743 	    //  - the buffer frags need not be (and almost certainly aren't)
1744 	    // note: i assume striping is monotonic... no jumps backwards, ever!
1745 	    loff_t opos = ex_it->offset;
1746 	    for (vector<pair<uint64_t, uint64_t> >::iterator f_it
1747 		   = ex_it->buffer_extents.begin();
1748 		 f_it != ex_it->buffer_extents.end();
1749 		 ++f_it) {
1750 	      ldout(cct, 10) << "writex writing " << f_it->first << "~"
1751 			     << f_it->second << " into " << *bh << " at " << opos
1752 			     << dendl;
1753 	      uint64_t bhoff = opos - bh->start();
1754 	      ceph_assert(f_it->second <= bh->length() - bhoff);
1755 	
1756 	      // get the frag we're mapping in
1757 	      bufferlist frag;
1758 	      frag.substr_of(wr->bl, f_it->first, f_it->second);
1759 	
1760 	      // keep anything left of bhoff
1761 	      if (!bhoff)
1762 	        bh->bl.swap(frag);
1763 	      else
1764 	        bh->bl.claim_append(frag);
1765 	
1766 	      opos += f_it->second;
1767 	    }
1768 	
1769 	    // ok, now bh is dirty.
1770 	    mark_dirty(bh);
1771 	    if (dontneed)
1772 	      bh->set_dontneed(true);
1773 	    else if (nocache && missing)
1774 	      bh->set_nocache(true);
1775 	    else
1776 	      touch_bh(bh);
1777 	
1778 	    bh->last_write = now;
1779 	
1780 	    o->try_merge_bh(bh);
1781 	  }
1782 	
1783 	  if (perfcounter) {
1784 	    perfcounter->inc(l_objectcacher_data_written, bytes_written);
1785 	    if (bytes_written_in_flush) {
1786 	      perfcounter->inc(l_objectcacher_overwritten_in_flush,
1787 			       bytes_written_in_flush);
1788 	    }
1789 	  }
1790 	
1791 	  int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
1792 	  delete wr;
1793 	
1794 	  //verify_stats();
1795 	  trim();
1796 	  return r;
1797 	}
1798 	
1799 	class ObjectCacher::C_WaitForWrite : public Context {
1800 	public:
1801 	  C_WaitForWrite(ObjectCacher *oc, uint64_t len,
1802 	                 const ZTracer::Trace &trace, Context *onfinish) :
1803 	    m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
1804 	  void finish(int r) override;
1805 	private:
1806 	  ObjectCacher *m_oc;
1807 	  uint64_t m_len;
1808 	  ZTracer::Trace m_trace;
1809 	  Context *m_onfinish;
1810 	};
1811 	
1812 	void ObjectCacher::C_WaitForWrite::finish(int r)
1813 	{
1814 	  std::lock_guard l(m_oc->lock);
1815 	  m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
1816 	  m_onfinish->complete(r);
1817 	}
1818 	
1819 	void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
1820 						     ZTracer::Trace *trace)
1821 	{
1822 	  ceph_assert(ceph_mutex_is_locked(lock));
1823 	  ceph::mono_time start = ceph::mono_clock::now();
1824 	  int blocked = 0;
1825 	  // wait for writeback?
1826 	  //  - wait for dirty and tx bytes (relative to the max_dirty threshold)
1827 	  //  - do not wait for bytes other waiters are waiting on.  this means that
1828 	  //    threads do not wait for each other.  this effectively allows the cache
1829 	  //    size to balloon proportional to the data that is in flight.
1830 	
1831 	  uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
1832 	  while (get_stat_dirty() + get_stat_tx() > 0 &&
1833 		 (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
1834 		  max_dirty + get_stat_dirty_waiting()) ||
1835 		 (dirty_or_tx_bh.size() >=
1836 		  max_dirty_bh + get_stat_nr_dirty_waiters()))) {
1837 	
1838 	    if (blocked == 0) {
1839 	      trace->event("start wait for writeback");
1840 	    }
1841 	    ldout(cct, 10) << __func__ << " waiting for dirty|tx "
1842 			   << (get_stat_dirty() + get_stat_tx()) << " >= max "
1843 			   << max_dirty << " + dirty_waiting "
1844 			   << get_stat_dirty_waiting() << dendl;
1845 	    flusher_cond.notify_all();
1846 	    stat_dirty_waiting += len;
1847 	    ++stat_nr_dirty_waiters;
1848 	    std::unique_lock l{lock, std::adopt_lock};
1849 	    stat_cond.wait(l);
1850 	    l.release();
1851 	    stat_dirty_waiting -= len;
1852 	    --stat_nr_dirty_waiters;
1853 	    ++blocked;
1854 	    ldout(cct, 10) << __func__ << " woke up" << dendl;
1855 	  }
1856 	  if (blocked > 0) {
1857 	    trace->event("finish wait for writeback");
1858 	  }
1859 	  if (blocked && perfcounter) {
1860 	    perfcounter->inc(l_objectcacher_write_ops_blocked);
1861 	    perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
1862 	    ceph::timespan blocked = ceph::mono_clock::now() - start;
1863 	    perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
1864 	  }
1865 	}
1866 	
1867 	// blocking wait for write.
1868 	int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
1869 					  ZTracer::Trace *trace, Context *onfreespace)
1870 	{
1871 	  ceph_assert(ceph_mutex_is_locked(lock));
1872 	  ceph_assert(trace != nullptr);
1873 	  int ret = 0;
1874 	
1875 	  if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
1876 	    if (block_writes_upfront) {
1877 	      _maybe_wait_for_writeback(len, trace);
1878 	      if (onfreespace)
1879 		onfreespace->complete(0);
1880 	    } else {
1881 	      ceph_assert(onfreespace);
1882 	      finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
1883 	    }
1884 	  } else {
1885 	    // write-thru!  flush what we just wrote.
1886 	    ceph::condition_variable cond;
1887 	    bool done = false;
1888 	    Context *fin = block_writes_upfront ?
1889 	      new C_Cond(cond, &done, &ret) : onfreespace;
1890 	    ceph_assert(fin);
1891 	    bool flushed = flush_set(oset, wr->extents, trace, fin);
1892 	    ceph_assert(!flushed);   // we just dirtied it, and didn't drop our lock!
1893 	    ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
1894 			   << " bytes" << dendl;
1895 	    if (block_writes_upfront) {
1896 	      std::unique_lock l{lock, std::adopt_lock};
1897 	      cond.wait(l, [&done] { return done; });
1898 	      l.release();
1899 	      ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
1900 	      if (onfreespace)
1901 		onfreespace->complete(ret);
1902 	    }
1903 	  }
1904 	
1905 	  // start writeback anyway?
1906 	  if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
1907 	    ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
1908 			   << target_dirty << ", nudging flusher" << dendl;
1909 	    flusher_cond.notify_all();
1910 	  }
1911 	  return ret;
1912 	}
1913 	
1914 	void ObjectCacher::flusher_entry()
1915 	{
1916 	  ldout(cct, 10) << "flusher start" << dendl;
1917 	  std::unique_lock l{lock};
1918 	  while (!flusher_stop) {
1919 	    loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
1920 	      get_stat_dirty();
1921 	    ldout(cct, 11) << "flusher "
1922 			   << all << " / " << max_size << ":  "
1923 			   << get_stat_tx() << " tx, "
1924 			   << get_stat_rx() << " rx, "
1925 			   << get_stat_clean() << " clean, "
1926 			   << get_stat_dirty() << " dirty ("
1927 			   << target_dirty << " target, "
1928 			   << max_dirty << " max)"
1929 			   << dendl;
1930 	    loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
1931 	
1932 	    ZTracer::Trace trace;
1933 	    if (cct->_conf->osdc_blkin_trace_all) {
1934 	      trace.init("flusher", &trace_endpoint);
1935 	      trace.event("start");
1936 	    }
1937 	
1938 	    if (actual > 0 && (uint64_t) actual > target_dirty) {
1939 	      // flush some dirty pages
1940 	      ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
1941 			     << get_stat_dirty_waiting() << " dirty_waiting > target "
1942 			     << target_dirty << ", flushing some dirty bhs" << dendl;
1943 	      flush(&trace, actual - target_dirty);
1944 	    } else {
1945 	      // check tail of lru for old dirty items
1946 	      ceph::real_time cutoff = ceph::real_clock::now();
1947 	      cutoff -= max_dirty_age;
1948 	      BufferHead *bh = 0;
1949 	      int max = MAX_FLUSH_UNDER_LOCK;
1950 	      while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
1951 						    lru_get_next_expire())) != 0 &&
1952 		     bh->last_write <= cutoff &&
1953 		     max > 0) {
1954 		ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
1955 		if (scattered_write) {
1956 		  bh_write_adjacencies(bh, cutoff, NULL, &max);
1957 	        } else {
1958 		  bh_write(bh, trace);
1959 		  --max;
1960 		}
1961 	      }
1962 	      if (!max) {
1963 		// back off the lock to avoid starving other threads
1964 	        trace.event("backoff");
1965 		l.unlock();
1966 		l.lock();
1967 		continue;
1968 	      }
1969 	    }
1970 	
1971 	    trace.event("finish");
1972 	    if (flusher_stop)
1973 	      break;
1974 	
1975 	    flusher_cond.wait_for(l, 1s);
1976 	  }
1977 	
1978 	  /* Wait for reads to finish. This is only possible if handling
1979 	   * -ENOENT made some read completions finish before their rados read
1980 	   * came back. If we don't wait for them, and destroy the cache, when
1981 	   * the rados reads do come back their callback will try to access the
1982 	   * no-longer-valid ObjectCacher.
1983 	   */
1984 	  read_cond.wait(l, [this] {
1985 	    if (reads_outstanding > 0) {
1986 	      ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
1987 			     << reads_outstanding << dendl;
1988 	      return false;
1989 	    } else {
1990 	      return true;
1991 	    }
1992 	  });
1993 	  ldout(cct, 10) << "flusher finish" << dendl;
1994 	}
1995 	
1996 	
1997 	// -------------------------------------------------
1998 	
1999 	bool ObjectCacher::set_is_empty(ObjectSet *oset)
2000 	{
2001 	  ceph_assert(ceph_mutex_is_locked(lock));
2002 	  if (oset->objects.empty())
2003 	    return true;
2004 	
2005 	  for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
2006 	    if (!(*p)->is_empty())
2007 	      return false;
2008 	
2009 	  return true;
2010 	}
2011 	
2012 	bool ObjectCacher::set_is_cached(ObjectSet *oset)
2013 	{
2014 	  ceph_assert(ceph_mutex_is_locked(lock));
2015 	  if (oset->objects.empty())
2016 	    return false;
2017 	
2018 	  for (xlist<Object*>::iterator p = oset->objects.begin();
2019 	       !p.end(); ++p) {
2020 	    Object *ob = *p;
2021 	    for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
2022 		 q != ob->data.end();
2023 		 ++q) {
2024 	      BufferHead *bh = q->second;
2025 	      if (!bh->is_dirty() && !bh->is_tx())
2026 		return true;
2027 	    }
2028 	  }
2029 	
2030 	  return false;
2031 	}
2032 	
2033 	bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
2034 	{
2035 	  ceph_assert(ceph_mutex_is_locked(lock));
2036 	  if (oset->objects.empty())
2037 	    return false;
2038 	
2039 	  for (xlist<Object*>::iterator i = oset->objects.begin();
2040 	       !i.end(); ++i) {
2041 	    Object *ob = *i;
2042 	
2043 	    for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2044 		 p != ob->data.end();
2045 		 ++p) {
2046 	      BufferHead *bh = p->second;
2047 	      if (bh->is_dirty() || bh->is_tx())
2048 		return true;
2049 	    }
2050 	  }
2051 	
2052 	  return false;
2053 	}
2054 	
2055 	
2056 	// purge.  non-blocking.  violently removes dirty buffers from cache.
2057 	void ObjectCacher::purge(Object *ob)
2058 	{
2059 	  ceph_assert(ceph_mutex_is_locked(lock));
2060 	  ldout(cct, 10) << "purge " << *ob << dendl;
2061 	
2062 	  ob->truncate(0);
2063 	}
2064 	
2065 	
2066 	// flush.  non-blocking.  no callback.
2067 	// true if clean, already flushed.
2068 	// false if we wrote something.
2069 	// be sloppy about the ranges and flush any buffer it touches
2070 	bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
2071 	                         ZTracer::Trace *trace)
2072 	{
2073 	  ceph_assert(trace != nullptr);
2074 	  ceph_assert(ceph_mutex_is_locked(lock));
2075 	  list<BufferHead*> blist;
2076 	  bool clean = true;
2077 	  ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
2078 	  for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
2079 	       p != ob->data.end();
2080 	       ++p) {
2081 	    BufferHead *bh = p->second;
2082 	    ldout(cct, 20) << "flush  " << *bh << dendl;
2083 	    if (length && bh->start() > offset+length) {
2084 	      break;
2085 	    }
2086 	    if (bh->is_tx()) {
2087 	      clean = false;
2088 	      continue;
2089 	    }
2090 	    if (!bh->is_dirty()) {
2091 	      continue;
2092 	    }
2093 	
2094 	    if (scattered_write)
2095 	      blist.push_back(bh);
2096 	    else
2097 	      bh_write(bh, *trace);
2098 	    clean = false;
2099 	  }
2100 	  if (scattered_write && !blist.empty())
2101 	    bh_write_scattered(blist);
2102 	
2103 	  return clean;
2104 	}
2105 	
2106 	bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
2107 					     Context *onfinish)
2108 	{
2109 	  ceph_assert(ceph_mutex_is_locked(lock));
2110 	  if (gather->has_subs()) {
2111 	    gather->set_finisher(onfinish);
2112 	    gather->activate();
2113 	    return false;
2114 	  }
2115 	
2116 	  ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
2117 	  onfinish->complete(0);
2118 	  return true;
2119 	}
2120 	
2121 	// flush.  non-blocking, takes callback.
2122 	// returns true if already flushed
2123 	bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
2124 	{
2125 	  ceph_assert(ceph_mutex_is_locked(lock));
2126 	  ceph_assert(onfinish != NULL);
2127 	  if (oset->objects.empty()) {
2128 	    ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2129 	    onfinish->complete(0);
2130 	    return true;
2131 	  }
2132 	
2133 	  ldout(cct, 10) << "flush_set " << oset << dendl;
2134 	
2135 	  // we'll need to wait for all objects to flush!
2136 	  C_GatherBuilder gather(cct);
2137 	  set<Object*> waitfor_commit;
2138 	
2139 	  list<BufferHead*> blist;
2140 	  Object *last_ob = NULL;
2141 	  set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
2142 	
2143 	  // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
2144 	  // order. But items in oset->objects are not sorted. So the iterator can
2145 	  // point to any buffer head in the ObjectSet
2146 	  BufferHead key(*oset->objects.begin());
2147 	  it = dirty_or_tx_bh.lower_bound(&key);
2148 	  p = q = it;
2149 	
2150 	  bool backwards = true;
2151 	  if (it != dirty_or_tx_bh.begin())
2152 	    --it;
2153 	  else
2154 	    backwards = false;
2155 	
2156 	  for (; p != dirty_or_tx_bh.end(); p = q) {
2157 	    ++q;
2158 	    BufferHead *bh = *p;
2159 	    if (bh->ob->oset != oset)
2160 	      break;
2161 	    waitfor_commit.insert(bh->ob);
2162 	    if (bh->is_dirty()) {
2163 	      if (scattered_write) {
2164 		if (last_ob != bh->ob) {
2165 		  if (!blist.empty()) {
2166 		    bh_write_scattered(blist);
2167 		    blist.clear();
2168 		  }
2169 		  last_ob = bh->ob;
2170 		}
2171 		blist.push_back(bh);
2172 	      } else {
2173 		bh_write(bh, {});
2174 	      }
2175 	    }
2176 	  }
2177 	
2178 	  if (backwards) {
2179 	    for(p = q = it; true; p = q) {
2180 	      if (q != dirty_or_tx_bh.begin())
2181 		--q;
2182 	      else
2183 		backwards = false;
2184 	      BufferHead *bh = *p;
2185 	      if (bh->ob->oset != oset)
2186 		break;
2187 	      waitfor_commit.insert(bh->ob);
2188 	      if (bh->is_dirty()) {
2189 		if (scattered_write) {
2190 		  if (last_ob != bh->ob) {
2191 		    if (!blist.empty()) {
2192 		      bh_write_scattered(blist);
2193 		      blist.clear();
2194 		    }
2195 		    last_ob = bh->ob;
2196 		  }
2197 		  blist.push_front(bh);
2198 		} else {
2199 		  bh_write(bh, {});
2200 		}
2201 	      }
2202 	      if (!backwards)
2203 		break;
2204 	    }
2205 	  }
2206 	
2207 	  if (scattered_write && !blist.empty())
2208 	    bh_write_scattered(blist);
2209 	
2210 	  for (set<Object*>::iterator i = waitfor_commit.begin();
2211 	       i != waitfor_commit.end(); ++i) {
2212 	    Object *ob = *i;
2213 	
2214 	    // we'll need to gather...
2215 	    ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2216 			   << ob->last_write_tid << " on " << *ob << dendl;
2217 	    ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2218 	  }
2219 	
2220 	  return _flush_set_finish(&gather, onfinish);
2221 	}
2222 	
2223 	// flush.  non-blocking, takes callback.
2224 	// returns true if already flushed
2225 	bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
2226 				     ZTracer::Trace *trace, Context *onfinish)
2227 	{
2228 	  ceph_assert(ceph_mutex_is_locked(lock));
2229 	  ceph_assert(trace != nullptr);
2230 	  ceph_assert(onfinish != NULL);
2231 	  if (oset->objects.empty()) {
2232 	    ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
2233 	    onfinish->complete(0);
2234 	    return true;
2235 	  }
2236 	
2237 	  ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
2238 			 << " ObjectExtents" << dendl;
2239 	
2240 	  // we'll need to wait for all objects to flush!
2241 	  C_GatherBuilder gather(cct);
2242 	
2243 	  for (vector<ObjectExtent>::iterator p = exv.begin();
2244 	       p != exv.end();
2245 	       ++p) {
2246 	    ObjectExtent &ex = *p;
2247 	    sobject_t soid(ex.oid, CEPH_NOSNAP);
2248 	    if (objects[oset->poolid].count(soid) == 0)
2249 	      continue;
2250 	    Object *ob = objects[oset->poolid][soid];
2251 	
2252 	    ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
2253 			   << " " << ob << dendl;
2254 	
2255 	    if (!flush(ob, ex.offset, ex.length, trace)) {
2256 	      // we'll need to gather...
2257 	      ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
2258 			     << ob->last_write_tid << " on " << *ob << dendl;
2259 	      ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2260 	    }
2261 	  }
2262 	
2263 	  return _flush_set_finish(&gather, onfinish);
2264 	}
2265 	
2266 	// flush all dirty data.  non-blocking, takes callback.
2267 	// returns true if already flushed
2268 	bool ObjectCacher::flush_all(Context *onfinish)
2269 	{
2270 	  ceph_assert(ceph_mutex_is_locked(lock));
2271 	  ceph_assert(onfinish != NULL);
2272 	
2273 	  ldout(cct, 10) << "flush_all " << dendl;
2274 	
2275 	  // we'll need to wait for all objects to flush!
2276 	  C_GatherBuilder gather(cct);
2277 	  set<Object*> waitfor_commit;
2278 	
2279 	  list<BufferHead*> blist;
2280 	  Object *last_ob = NULL;
2281 	  set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
2282 	  next = it = dirty_or_tx_bh.begin();
2283 	  while (it != dirty_or_tx_bh.end()) {
2284 	    ++next;
2285 	    BufferHead *bh = *it;
2286 	    waitfor_commit.insert(bh->ob);
2287 	
2288 	    if (bh->is_dirty()) {
2289 	      if (scattered_write) {
2290 		if (last_ob != bh->ob) {
2291 		  if (!blist.empty()) {
2292 		    bh_write_scattered(blist);
2293 		    blist.clear();
2294 		  }
2295 		  last_ob = bh->ob;
2296 		}
2297 		blist.push_back(bh);
2298 	      } else {
2299 		bh_write(bh, {});
2300 	      }
2301 	    }
2302 	
2303 	    it = next;
2304 	  }
2305 	
2306 	  if (scattered_write && !blist.empty())
2307 	    bh_write_scattered(blist);
2308 	
2309 	  for (set<Object*>::iterator i = waitfor_commit.begin();
2310 	       i != waitfor_commit.end();
2311 	       ++i) {
2312 	    Object *ob = *i;
2313 	
2314 	    // we'll need to gather...
2315 	    ldout(cct, 10) << "flush_all will wait for ack tid "
2316 			   << ob->last_write_tid << " on " << *ob << dendl;
2317 	    ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
2318 	  }
2319 	
2320 	  return _flush_set_finish(&gather, onfinish);
2321 	}
2322 	
2323 	void ObjectCacher::purge_set(ObjectSet *oset)
2324 	{
2325 	  ceph_assert(ceph_mutex_is_locked(lock));
2326 	  if (oset->objects.empty()) {
2327 	    ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
2328 	    return;
2329 	  }
2330 	
2331 	  ldout(cct, 10) << "purge_set " << oset << dendl;
2332 	  const bool were_dirty = oset->dirty_or_tx > 0;
2333 	
2334 	  for (xlist<Object*>::iterator i = oset->objects.begin();
2335 	       !i.end(); ++i) {
2336 	    Object *ob = *i;
2337 		purge(ob);
2338 	  }
2339 	
2340 	  // Although we have purged rather than flushed, caller should still
2341 	  // drop any resources associate with dirty data.
2342 	  ceph_assert(oset->dirty_or_tx == 0);
2343 	  if (flush_set_callback && were_dirty) {
2344 	    flush_set_callback(flush_set_callback_arg, oset);
2345 	  }
2346 	}
2347 	
2348 	
2349 	loff_t ObjectCacher::release(Object *ob)
2350 	{
2351 	  ceph_assert(ceph_mutex_is_locked(lock));
2352 	  list<BufferHead*> clean;
2353 	  loff_t o_unclean = 0;
2354 	
2355 	  for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
2356 	       p != ob->data.end();
2357 	       ++p) {
2358 	    BufferHead *bh = p->second;
2359 	    if (bh->is_clean() || bh->is_zero() || bh->is_error())
2360 	      clean.push_back(bh);
2361 	    else
2362 	      o_unclean += bh->length();
2363 	  }
2364 	
2365 	  for (list<BufferHead*>::iterator p = clean.begin();
2366 	       p != clean.end();
2367 	       ++p) {
2368 	    bh_remove(ob, *p);
2369 	    delete *p;
2370 	  }
2371 	
2372 	  if (ob->can_close()) {
2373 	    ldout(cct, 10) << "release trimming " << *ob << dendl;
2374 	    close_object(ob);
2375 	    ceph_assert(o_unclean == 0);
2376 	    return 0;
2377 	  }
2378 	
2379 	  if (ob->complete) {
2380 	    ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
2381 	    ob->complete = false;
2382 	  }
2383 	  if (!ob->exists) {
2384 	    ldout(cct, 10) << "release setting exists on " << *ob << dendl;
2385 	    ob->exists = true;
2386 	  }
2387 	
2388 	  return o_unclean;
2389 	}
2390 	
2391 	loff_t ObjectCacher::release_set(ObjectSet *oset)
2392 	{
2393 	  ceph_assert(ceph_mutex_is_locked(lock));
2394 	  // return # bytes not clean (and thus not released).
2395 	  loff_t unclean = 0;
2396 	
2397 	  if (oset->objects.empty()) {
2398 	    ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
2399 	    return 0;
2400 	  }
2401 	
2402 	  ldout(cct, 10) << "release_set " << oset << dendl;
2403 	
2404 	  xlist<Object*>::iterator q;
2405 	  for (xlist<Object*>::iterator p = oset->objects.begin();
2406 	       !p.end(); ) {
2407 	    q = p;
2408 	    ++q;
2409 	    Object *ob = *p;
2410 	
2411 	    loff_t o_unclean = release(ob);
2412 	    unclean += o_unclean;
2413 	
2414 	    if (o_unclean)
2415 	      ldout(cct, 10) << "release_set " << oset << " " << *ob
2416 			     << " has " << o_unclean << " bytes left"
2417 			     << dendl;
2418 	    p = q;
2419 	  }
2420 	
2421 	  if (unclean) {
2422 	    ldout(cct, 10) << "release_set " << oset
2423 			   << ", " << unclean << " bytes left" << dendl;
2424 	  }
2425 	
2426 	  return unclean;
2427 	}
2428 	
2429 	
2430 	uint64_t ObjectCacher::release_all()
2431 	{
2432 	  ceph_assert(ceph_mutex_is_locked(lock));
2433 	  ldout(cct, 10) << "release_all" << dendl;
2434 	  uint64_t unclean = 0;
2435 	
2436 	  vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
2437 	    = objects.begin();
2438 	  while (i != objects.end()) {
2439 	    ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
2440 	    while (p != i->end()) {
2441 	      ceph::unordered_map<sobject_t, Object*>::iterator n = p;
2442 	      ++n;
2443 	
2444 	      Object *ob = p->second;
2445 	
2446 	      loff_t o_unclean = release(ob);
2447 	      unclean += o_unclean;
2448 	
2449 	      if (o_unclean)
2450 		ldout(cct, 10) << "release_all " << *ob
2451 			       << " has " << o_unclean << " bytes left"
2452 			       << dendl;
2453 	    p = n;
2454 	    }
2455 	    ++i;
2456 	  }
2457 	
2458 	  if (unclean) {
2459 	    ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
2460 			   << dendl;
2461 	  }
2462 	
2463 	  return unclean;
2464 	}
2465 	
2466 	void ObjectCacher::clear_nonexistence(ObjectSet *oset)
2467 	{
2468 	  ceph_assert(ceph_mutex_is_locked(lock));
2469 	  ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
2470 	
2471 	  for (xlist<Object*>::iterator p = oset->objects.begin();
2472 	       !p.end(); ++p) {
2473 	    Object *ob = *p;
2474 	    if (!ob->exists) {
2475 	      ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
2476 	      ob->exists = true;
2477 	      ob->complete = false;
2478 	    }
2479 	    for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
2480 		 !q.end(); ++q) {
2481 	      C_ReadFinish *comp = *q;
2482 	      comp->distrust_enoent();
2483 	    }
2484 	  }
2485 	}
2486 	
2487 	/**
2488 	 * discard object extents from an ObjectSet by removing the objects in
2489 	 * exls from the in-memory oset.
2490 	 */
2491 	void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
2492 	{
2493 	  ceph_assert(ceph_mutex_is_locked(lock));
2494 	  bool was_dirty = oset->dirty_or_tx > 0;
2495 	
2496 	  _discard(oset, exls, nullptr);
2497 	  _discard_finish(oset, was_dirty, nullptr);
2498 	}
2499 	
2500 	/**
2501 	 * discard object extents from an ObjectSet by removing the objects in
2502 	 * exls from the in-memory oset. If the bh is in TX state, the discard
2503 	 * will wait for the write to commit prior to invoking on_finish.
2504 	 */
2505 	void ObjectCacher::discard_writeback(ObjectSet *oset,
2506 	                                     const vector<ObjectExtent>& exls,
2507 	                                     Context* on_finish)
2508 	{
2509 	  ceph_assert(ceph_mutex_is_locked(lock));
2510 	  bool was_dirty = oset->dirty_or_tx > 0;
2511 	
2512 	  C_GatherBuilder gather(cct);
2513 	  _discard(oset, exls, &gather);
2514 	
2515 	  if (gather.has_subs()) {
2516 	    bool flushed = was_dirty && oset->dirty_or_tx == 0;
2517 	    gather.set_finisher(new LambdaContext(
2518 	      [this, oset, flushed, on_finish](int) {
2519 		ceph_assert(ceph_mutex_is_locked(lock));
2520 		if (flushed && flush_set_callback)
2521 		  flush_set_callback(flush_set_callback_arg, oset);
2522 		if (on_finish)
2523 		  on_finish->complete(0);
2524 	      }));
2525 	    gather.activate();
2526 	    return;
2527 	  }
2528 	
2529 	  _discard_finish(oset, was_dirty, on_finish);
2530 	}
2531 	
2532 	void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
2533 	                            C_GatherBuilder* gather)
2534 	{
2535 	  if (oset->objects.empty()) {
2536 	    ldout(cct, 10) << __func__ << " on " << oset << " dne" << dendl;
2537 	    return;
2538 	  }
2539 	
2540 	  ldout(cct, 10) << __func__ << " " << oset << dendl;
2541 	
2542 	  for (auto& ex : exls) {
2543 	    ldout(cct, 10) << __func__ << " " << oset << " ex " << ex << dendl;
2544 	    sobject_t soid(ex.oid, CEPH_NOSNAP);
2545 	    if (objects[oset->poolid].count(soid) == 0)
2546 	      continue;
2547 	    Object *ob = objects[oset->poolid][soid];
2548 	
2549 	    ob->discard(ex.offset, ex.length, gather);
2550 	  }
2551 	}
2552 	
2553 	void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
2554 	                                   Context* on_finish)
2555 	{
2556 	  ceph_assert(ceph_mutex_is_locked(lock));
2557 	
2558 	  // did we truncate off dirty data?
2559 	  if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
2560 	    flush_set_callback(flush_set_callback_arg, oset);
2561 	  }
2562 	
2563 	  // notify that in-flight writeback has completed
2564 	  if (on_finish != nullptr) {
2565 	    on_finish->complete(0);
2566 	  }
2567 	}
2568 	
2569 	void ObjectCacher::verify_stats() const
2570 	{
2571 	  ceph_assert(ceph_mutex_is_locked(lock));
2572 	  ldout(cct, 10) << "verify_stats" << dendl;
2573 	
2574 	  loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
2575 	    error = 0;
2576 	  for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
2577 		 = objects.begin();
2578 	       i != objects.end();
2579 	       ++i) {
2580 	    for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
2581 		   = i->begin();
2582 		 p != i->end();
2583 		 ++p) {
2584 	      Object *ob = p->second;
2585 	      for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
2586 		   q != ob->data.end();
2587 		  ++q) {
2588 		BufferHead *bh = q->second;
2589 		switch (bh->get_state()) {
2590 		case BufferHead::STATE_MISSING:
2591 		  missing += bh->length();
2592 		  break;
2593 		case BufferHead::STATE_CLEAN:
2594 		  clean += bh->length();
2595 		  break;
2596 		case BufferHead::STATE_ZERO:
2597 		  zero += bh->length();
2598 		  break;
2599 		case BufferHead::STATE_DIRTY:
2600 		  dirty += bh->length();
2601 		  break;
2602 		case BufferHead::STATE_TX:
2603 		  tx += bh->length();
2604 		  break;
2605 		case BufferHead::STATE_RX:
2606 		  rx += bh->length();
2607 		  break;
2608 		case BufferHead::STATE_ERROR:
2609 		  error += bh->length();
2610 		  break;
2611 		default:
2612 		  ceph_abort();
2613 		}
2614 	      }
2615 	    }
2616 	  }
2617 	
2618 	  ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
2619 			 << " dirty " << dirty << " missing " << missing
2620 			 << " error " << error << dendl;
2621 	  ceph_assert(clean == stat_clean);
2622 	  ceph_assert(rx == stat_rx);
2623 	  ceph_assert(tx == stat_tx);
2624 	  ceph_assert(dirty == stat_dirty);
2625 	  ceph_assert(missing == stat_missing);
2626 	  ceph_assert(zero == stat_zero);
2627 	  ceph_assert(error == stat_error);
2628 	}
2629 	
2630 	void ObjectCacher::bh_stat_add(BufferHead *bh)
2631 	{
2632 	  ceph_assert(ceph_mutex_is_locked(lock));
2633 	  switch (bh->get_state()) {
2634 	  case BufferHead::STATE_MISSING:
2635 	    stat_missing += bh->length();
2636 	    break;
2637 	  case BufferHead::STATE_CLEAN:
2638 	    stat_clean += bh->length();
2639 	    break;
2640 	  case BufferHead::STATE_ZERO:
2641 	    stat_zero += bh->length();
2642 	    break;
2643 	  case BufferHead::STATE_DIRTY:
2644 	    stat_dirty += bh->length();
2645 	    bh->ob->dirty_or_tx += bh->length();
2646 	    bh->ob->oset->dirty_or_tx += bh->length();
2647 	    break;
2648 	  case BufferHead::STATE_TX:
2649 	    stat_tx += bh->length();
2650 	    bh->ob->dirty_or_tx += bh->length();
2651 	    bh->ob->oset->dirty_or_tx += bh->length();
2652 	    break;
2653 	  case BufferHead::STATE_RX:
2654 	    stat_rx += bh->length();
2655 	    break;
2656 	  case BufferHead::STATE_ERROR:
2657 	    stat_error += bh->length();
2658 	    break;
2659 	  default:
2660 	    ceph_abort_msg("bh_stat_add: invalid bufferhead state");
2661 	  }
2662 	  if (get_stat_dirty_waiting() > 0)
2663 	    stat_cond.notify_all();
2664 	}
2665 	
2666 	void ObjectCacher::bh_stat_sub(BufferHead *bh)
2667 	{
2668 	  ceph_assert(ceph_mutex_is_locked(lock));
2669 	  switch (bh->get_state()) {
2670 	  case BufferHead::STATE_MISSING:
2671 	    stat_missing -= bh->length();
2672 	    break;
2673 	  case BufferHead::STATE_CLEAN:
2674 	    stat_clean -= bh->length();
2675 	    break;
2676 	  case BufferHead::STATE_ZERO:
2677 	    stat_zero -= bh->length();
2678 	    break;
2679 	  case BufferHead::STATE_DIRTY:
2680 	    stat_dirty -= bh->length();
2681 	    bh->ob->dirty_or_tx -= bh->length();
2682 	    bh->ob->oset->dirty_or_tx -= bh->length();
2683 	    break;
2684 	  case BufferHead::STATE_TX:
2685 	    stat_tx -= bh->length();
2686 	    bh->ob->dirty_or_tx -= bh->length();
2687 	    bh->ob->oset->dirty_or_tx -= bh->length();
2688 	    break;
2689 	  case BufferHead::STATE_RX:
2690 	    stat_rx -= bh->length();
2691 	    break;
2692 	  case BufferHead::STATE_ERROR:
2693 	    stat_error -= bh->length();
2694 	    break;
2695 	  default:
2696 	    ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
2697 	  }
2698 	}
2699 	
2700 	void ObjectCacher::bh_set_state(BufferHead *bh, int s)
2701 	{
2702 	  ceph_assert(ceph_mutex_is_locked(lock));
2703 	  int state = bh->get_state();
2704 	  // move between lru lists?
2705 	  if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
2706 	    bh_lru_rest.lru_remove(bh);
2707 	    bh_lru_dirty.lru_insert_top(bh);
2708 	  } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
2709 	    bh_lru_dirty.lru_remove(bh);
2710 	    if (bh->get_dontneed())
2711 	      bh_lru_rest.lru_insert_bot(bh);
2712 	    else
2713 	      bh_lru_rest.lru_insert_top(bh);
2714 	  }
2715 	
2716 	  if ((s == BufferHead::STATE_TX ||
2717 	       s == BufferHead::STATE_DIRTY) &&
2718 	      state != BufferHead::STATE_TX &&
2719 	      state != BufferHead::STATE_DIRTY) {
2720 	    dirty_or_tx_bh.insert(bh);
2721 	  } else if ((state == BufferHead::STATE_TX ||
2722 		      state == BufferHead::STATE_DIRTY) &&
2723 		     s != BufferHead::STATE_TX &&
2724 		     s != BufferHead::STATE_DIRTY) {
2725 	    dirty_or_tx_bh.erase(bh);
2726 	  }
2727 	
2728 	  if (s != BufferHead::STATE_ERROR &&
2729 	      state == BufferHead::STATE_ERROR) {
2730 	    bh->error = 0;
2731 	  }
2732 	
2733 	  // set state
2734 	  bh_stat_sub(bh);
2735 	  bh->set_state(s);
2736 	  bh_stat_add(bh);
2737 	}
2738 	
2739 	void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
2740 	{
2741 	  ceph_assert(ceph_mutex_is_locked(lock));
2742 	  ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
2743 	  ob->add_bh(bh);
2744 	  if (bh->is_dirty()) {
2745 	    bh_lru_dirty.lru_insert_top(bh);
2746 	    dirty_or_tx_bh.insert(bh);
2747 	  } else {
2748 	    if (bh->get_dontneed())
2749 	      bh_lru_rest.lru_insert_bot(bh);
2750 	    else
2751 	      bh_lru_rest.lru_insert_top(bh);
2752 	  }
2753 	
2754 	  if (bh->is_tx()) {
2755 	    dirty_or_tx_bh.insert(bh);
2756 	  }
2757 	  bh_stat_add(bh);
2758 	}
2759 	
2760 	void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
2761 	{
2762 	  ceph_assert(ceph_mutex_is_locked(lock));
2763 	  ceph_assert(bh->get_journal_tid() == 0);
2764 	  ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
2765 	  ob->remove_bh(bh);
2766 	  if (bh->is_dirty()) {
2767 	    bh_lru_dirty.lru_remove(bh);
2768 	    dirty_or_tx_bh.erase(bh);
2769 	  } else {
2770 	    bh_lru_rest.lru_remove(bh);
2771 	  }
2772 	
2773 	  if (bh->is_tx()) {
2774 	    dirty_or_tx_bh.erase(bh);
2775 	  }
2776 	  bh_stat_sub(bh);
2777 	  if (get_stat_dirty_waiting() > 0)
2778 	    stat_cond.notify_all();
2779 	}
2780 	
2781