1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#ifndef CEPH_OBJECTER_H
16   	#define CEPH_OBJECTER_H
17   	
18   	#include <condition_variable>
19   	#include <list>
20   	#include <map>
21   	#include <mutex>
22   	#include <memory>
23   	#include <sstream>
24   	#include <type_traits>
25   	
26   	#include <boost/thread/shared_mutex.hpp>
27   	
28   	#include "include/ceph_assert.h"
29   	#include "include/buffer.h"
30   	#include "include/types.h"
31   	#include "include/rados/rados_types.hpp"
32   	
33   	#include "common/admin_socket.h"
34   	#include "common/ceph_time.h"
35   	#include "common/ceph_timer.h"
36   	#include "common/config_obs.h"
37   	#include "common/shunique_lock.h"
38   	#include "common/zipkin_trace.h"
39   	#include "common/Finisher.h"
40   	#include "common/Throttle.h"
41   	
42   	#include "messages/MOSDOp.h"
43   	#include "msg/Dispatcher.h"
44   	
45   	#include "osd/OSDMap.h"
46   	
47   	class Context;
48   	class Messenger;
49   	class MonClient;
50   	class Message;
51   	class Finisher;
52   	
53   	class MPoolOpReply;
54   	
55   	class MGetPoolStatsReply;
56   	class MStatfsReply;
57   	class MCommandReply;
58   	class MWatchNotify;
59   	
60   	class PerfCounters;
61   	
62   	// -----------------------------------------
63   	
64   	struct ObjectOperation {
65   	  std::vector<OSDOp> ops;
66   	  int flags;
67   	  int priority;
68   	
69   	  std::vector<ceph::buffer::list*> out_bl;
70   	  std::vector<Context*> out_handler;
71   	  std::vector<int*> out_rval;
72   	
73   	  ObjectOperation() : flags(0), priority(0) {}
74   	  ~ObjectOperation() {
75   	    while (!out_handler.empty()) {
76   	      delete out_handler.back();
77   	      out_handler.pop_back();
78   	    }
79   	  }
80   	
81   	  size_t size() {
82   	    return ops.size();
83   	  }
84   	
85   	  void set_last_op_flags(int flags) {
86   	    ceph_assert(!ops.empty());
87   	    ops.rbegin()->op.flags = flags;
88   	  }
89   	
90   	  class C_TwoContexts;
91   	  /**
92   	   * Add a callback to run when this operation completes,
93   	   * after any other callbacks for it.
94   	   */
95   	  void add_handler(Context *extra);
96   	
97   	  OSDOp& add_op(int op) {
98   	    int s = ops.size();
99   	    ops.resize(s+1);
100  	    ops[s].op.op = op;
101  	    out_bl.resize(s+1);
102  	    out_bl[s] = NULL;
103  	    out_handler.resize(s+1);
104  	    out_handler[s] = NULL;
105  	    out_rval.resize(s+1);
106  	    out_rval[s] = NULL;
107  	    return ops[s];
108  	  }
109  	  void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list& bl) {
110  	    OSDOp& osd_op = add_op(op);
111  	    osd_op.op.extent.offset = off;
112  	    osd_op.op.extent.length = len;
113  	    osd_op.indata.claim_append(bl);
114  	  }
115  	  void add_writesame(int op, uint64_t off, uint64_t write_len,
116  			     ceph::buffer::list& bl) {
117  	    OSDOp& osd_op = add_op(op);
118  	    osd_op.op.writesame.offset = off;
119  	    osd_op.op.writesame.length = write_len;
120  	    osd_op.op.writesame.data_length = bl.length();
121  	    osd_op.indata.claim_append(bl);
122  	  }
123  	  void add_xattr(int op, const char *name, const ceph::buffer::list& data) {
124  	    OSDOp& osd_op = add_op(op);
125  	    osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
126  	    osd_op.op.xattr.value_len = data.length();
127  	    if (name)
128  	      osd_op.indata.append(name, osd_op.op.xattr.name_len);
129  	    osd_op.indata.append(data);
130  	  }
131  	  void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
132  			     uint8_t cmp_mode, const ceph::buffer::list& data) {
133  	    OSDOp& osd_op = add_op(op);
134  	    osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
135  	    osd_op.op.xattr.value_len = data.length();
136  	    osd_op.op.xattr.cmp_op = cmp_op;
137  	    osd_op.op.xattr.cmp_mode = cmp_mode;
138  	    if (name)
139  	      osd_op.indata.append(name, osd_op.op.xattr.name_len);
140  	    osd_op.indata.append(data);
141  	  }
142  	  void add_call(int op, const char *cname, const char *method,
143  			ceph::buffer::list &indata,
144  			ceph::buffer::list *outbl, Context *ctx, int *prval) {
145  	    OSDOp& osd_op = add_op(op);
146  	
147  	    unsigned p = ops.size() - 1;
148  	    out_handler[p] = ctx;
149  	    out_bl[p] = outbl;
150  	    out_rval[p] = prval;
151  	
152  	    osd_op.op.cls.class_len = strlen(cname);
153  	    osd_op.op.cls.method_len = strlen(method);
154  	    osd_op.op.cls.indata_len = indata.length();
155  	    osd_op.indata.append(cname, osd_op.op.cls.class_len);
156  	    osd_op.indata.append(method, osd_op.op.cls.method_len);
157  	    osd_op.indata.append(indata);
158  	  }
159  	  void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
160  			epoch_t start_epoch) {
161  	    using ceph::encode;
162  	    OSDOp& osd_op = add_op(op);
163  	    osd_op.op.pgls.count = count;
164  	    osd_op.op.pgls.start_epoch = start_epoch;
165  	    encode(cookie, osd_op.indata);
166  	  }
167  	  void add_pgls_filter(int op, uint64_t count, const ceph::buffer::list& filter,
168  			       collection_list_handle_t cookie, epoch_t start_epoch) {
169  	    using ceph::encode;
170  	    OSDOp& osd_op = add_op(op);
171  	    osd_op.op.pgls.count = count;
172  	    osd_op.op.pgls.start_epoch = start_epoch;
173  	    std::string cname = "pg";
174  	    std::string mname = "filter";
175  	    encode(cname, osd_op.indata);
176  	    encode(mname, osd_op.indata);
177  	    osd_op.indata.append(filter);
178  	    encode(cookie, osd_op.indata);
179  	  }
180  	  void add_alloc_hint(int op, uint64_t expected_object_size,
181  	                      uint64_t expected_write_size,
182  			      uint32_t flags) {
183  	    OSDOp& osd_op = add_op(op);
184  	    osd_op.op.alloc_hint.expected_object_size = expected_object_size;
185  	    osd_op.op.alloc_hint.expected_write_size = expected_write_size;
186  	    osd_op.op.alloc_hint.flags = flags;
187  	  }
188  	
189  	  // ------
190  	
191  	  // pg
192  	  void pg_ls(uint64_t count, ceph::buffer::list& filter,
193  		     collection_list_handle_t cookie, epoch_t start_epoch) {
194  	    if (filter.length() == 0)
195  	      add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
196  	    else
197  	      add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
198  			      start_epoch);
199  	    flags |= CEPH_OSD_FLAG_PGOP;
200  	  }
201  	
202  	  void pg_nls(uint64_t count, const ceph::buffer::list& filter,
203  		      collection_list_handle_t cookie, epoch_t start_epoch) {
204  	    if (filter.length() == 0)
205  	      add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
206  	    else
207  	      add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
208  			      start_epoch);
209  	    flags |= CEPH_OSD_FLAG_PGOP;
210  	  }
211  	
212  	  void scrub_ls(const librados::object_id_t& start_after,
213  			uint64_t max_to_get,
214  			std::vector<librados::inconsistent_obj_t> *objects,
215  			uint32_t *interval,
216  			int *rval);
217  	  void scrub_ls(const librados::object_id_t& start_after,
218  			uint64_t max_to_get,
219  			std::vector<librados::inconsistent_snapset_t> *objects,
220  			uint32_t *interval,
221  			int *rval);
222  	
223  	  void create(bool excl) {
224  	    OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
225  	    o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
226  	  }
227  	
228  	  struct C_ObjectOperation_stat : public Context {
229  	    ceph::buffer::list bl;
230  	    uint64_t *psize;
231  	    ceph::real_time *pmtime;
232  	    time_t *ptime;
233  	    struct timespec *pts;
234  	    int *prval;
235  	    C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
236  				   int *prval)
237  	      : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
238  	    void finish(int r) override {
239  	      using ceph::decode;
240  	      if (r >= 0) {
241  		auto p = bl.cbegin();
242  		try {
243  		  uint64_t size;
244  		  ceph::real_time mtime;
245  		  decode(size, p);
246  		  decode(mtime, p);
247  		  if (psize)
248  		    *psize = size;
249  		  if (pmtime)
250  		    *pmtime = mtime;
251  		  if (ptime)
252  		    *ptime = ceph::real_clock::to_time_t(mtime);
253  		  if (pts)
254  		    *pts = ceph::real_clock::to_timespec(mtime);
255  		} catch (ceph::buffer::error& e) {
256  		  if (prval)
257  		    *prval = -EIO;
258  		}
259  	      }
260  	    }
261  	  };
262  	  void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
263  	    add_op(CEPH_OSD_OP_STAT);
264  	    unsigned p = ops.size() - 1;
265  	    C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
266  								   prval);
267  	    out_bl[p] = &h->bl;
268  	    out_handler[p] = h;
269  	    out_rval[p] = prval;
270  	  }
271  	  void stat(uint64_t *psize, time_t *ptime, int *prval) {
272  	    add_op(CEPH_OSD_OP_STAT);
273  	    unsigned p = ops.size() - 1;
274  	    C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
275  								   prval);
276  	    out_bl[p] = &h->bl;
277  	    out_handler[p] = h;
278  	    out_rval[p] = prval;
279  	  }
280  	  void stat(uint64_t *psize, struct timespec *pts, int *prval) {
281  	    add_op(CEPH_OSD_OP_STAT);
282  	    unsigned p = ops.size() - 1;
283  	    C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
284  								   prval);
285  	    out_bl[p] = &h->bl;
286  	    out_handler[p] = h;
287  	    out_rval[p] = prval;
288  	  }
289  	  // object cmpext
290  	  struct C_ObjectOperation_cmpext : public Context {
291  	    int *prval;
292  	    explicit C_ObjectOperation_cmpext(int *prval)
293  	      : prval(prval) {}
294  	
295  	    void finish(int r) {
296  	      if (prval)
297  	        *prval = r;
298  	    }
299  	  };
300  	
301  	  void cmpext(uint64_t off, ceph::buffer::list& cmp_bl, int *prval) {
302  	    add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
303  	    unsigned p = ops.size() - 1;
304  	    C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
305  	    out_handler[p] = h;
306  	    out_rval[p] = prval;
307  	  }
308  	
309  	  // Used by C API
310  	  void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
311  	    ceph::buffer::list cmp_bl;
312  	    cmp_bl.append(cmp_buf, cmp_len);
313  	    add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
314  	    unsigned p = ops.size() - 1;
315  	    C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
316  	    out_handler[p] = h;
317  	    out_rval[p] = prval;
318  	  }
319  	
320  	  void read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, int *prval,
321  		    Context* ctx) {
322  	    ceph::buffer::list bl;
323  	    add_data(CEPH_OSD_OP_READ, off, len, bl);
324  	    unsigned p = ops.size() - 1;
325  	    out_bl[p] = pbl;
326  	    out_rval[p] = prval;
327  	    out_handler[p] = ctx;
328  	  }
329  	
330  	  struct C_ObjectOperation_sparse_read : public Context {
331  	    ceph::buffer::list bl;
332  	    ceph::buffer::list *data_bl;
333  	    std::map<uint64_t, uint64_t> *extents;
334  	    int *prval;
335  	    C_ObjectOperation_sparse_read(ceph::buffer::list *data_bl,
336  					  std::map<uint64_t, uint64_t> *extents,
337  					  int *prval)
338  	      : data_bl(data_bl), extents(extents), prval(prval) {}
339  	    void finish(int r) override {
340  	      using ceph::decode;
341  	      auto iter = bl.cbegin();
342  	      if (r >= 0) {
343  	        // NOTE: it's possible the sub-op has not been executed but the result
344  	        // code remains zeroed. Avoid the costly exception handling on a
345  	        // potential IO path.
346  	        if (bl.length() > 0) {
347  		  try {
348  		    decode(*extents, iter);
349  		    decode(*data_bl, iter);
350  		  } catch (ceph::buffer::error& e) {
351  		    if (prval)
352  	              *prval = -EIO;
353  		  }
354  	        } else if (prval) {
355  	          *prval = -EIO;
356  	        }
357  	      }
358  	    }
359  	  };
360  	  void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
361  			   ceph::buffer::list *data_bl, int *prval) {
362  	    ceph::buffer::list bl;
363  	    add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
364  	    unsigned p = ops.size() - 1;
365  	    C_ObjectOperation_sparse_read *h =
366  	      new C_ObjectOperation_sparse_read(data_bl, m, prval);
367  	    out_bl[p] = &h->bl;
368  	    out_handler[p] = h;
369  	    out_rval[p] = prval;
370  	  }
371  	  void write(uint64_t off, ceph::buffer::list& bl,
372  		     uint64_t truncate_size,
373  		     uint32_t truncate_seq) {
374  	    add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
375  	    OSDOp& o = *ops.rbegin();
376  	    o.op.extent.truncate_size = truncate_size;
377  	    o.op.extent.truncate_seq = truncate_seq;
378  	  }
379  	  void write(uint64_t off, ceph::buffer::list& bl) {
380  	    write(off, bl, 0, 0);
381  	  }
382  	  void write_full(ceph::buffer::list& bl) {
383  	    add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
384  	  }
385  	  void writesame(uint64_t off, uint64_t write_len, ceph::buffer::list& bl) {
386  	    add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
387  	  }
388  	  void append(ceph::buffer::list& bl) {
389  	    add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
390  	  }
391  	  void zero(uint64_t off, uint64_t len) {
392  	    ceph::buffer::list bl;
393  	    add_data(CEPH_OSD_OP_ZERO, off, len, bl);
394  	  }
395  	  void truncate(uint64_t off) {
396  	    ceph::buffer::list bl;
397  	    add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
398  	  }
399  	  void remove() {
400  	    ceph::buffer::list bl;
401  	    add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
402  	  }
403  	  void mapext(uint64_t off, uint64_t len) {
404  	    ceph::buffer::list bl;
405  	    add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
406  	  }
407  	  void sparse_read(uint64_t off, uint64_t len) {
408  	    ceph::buffer::list bl;
409  	    add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
410  	  }
411  	
412  	  void checksum(uint8_t type, const ceph::buffer::list &init_value_bl,
413  			uint64_t off, uint64_t len, size_t chunk_size,
414  			ceph::buffer::list *pbl, int *prval, Context *ctx) {
415  	    OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
416  	    osd_op.op.checksum.offset = off;
417  	    osd_op.op.checksum.length = len;
418  	    osd_op.op.checksum.type = type;
419  	    osd_op.op.checksum.chunk_size = chunk_size;
420  	    osd_op.indata.append(init_value_bl);
421  	
422  	    unsigned p = ops.size() - 1;
423  	    out_bl[p] = pbl;
424  	    out_rval[p] = prval;
425  	    out_handler[p] = ctx;
426  	  }
427  	
428  	  // object attrs
429  	  void getxattr(const char *name, ceph::buffer::list *pbl, int *prval) {
430  	    ceph::buffer::list bl;
431  	    add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
432  	    unsigned p = ops.size() - 1;
433  	    out_bl[p] = pbl;
434  	    out_rval[p] = prval;
435  	  }
436  	  struct C_ObjectOperation_decodevals : public Context {
437  	    uint64_t max_entries;
438  	    ceph::buffer::list bl;
439  	    std::map<std::string,ceph::buffer::list> *pattrs;
440  	    bool *ptruncated;
441  	    int *prval;
442  	    C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,ceph::buffer::list> *pa,
443  					 bool *pt, int *pr)
444  	      : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
445  	      if (ptruncated) {
446  		*ptruncated = false;
447  	      }
448  	    }
449  	    void finish(int r) override {
450  	      using ceph::decode;
451  	      if (r >= 0) {
452  		auto p = bl.cbegin();
453  		try {
454  		  if (pattrs)
455  		    decode(*pattrs, p);
456  		  if (ptruncated) {
457  		    std::map<std::string,ceph::buffer::list> ignore;
458  		    if (!pattrs) {
459  		      decode(ignore, p);
460  		      pattrs = &ignore;
461  		    }
462  		    if (!p.end()) {
463  		      decode(*ptruncated, p);
464  		    } else {
465  		      // the OSD did not provide this.  since old OSDs do not
466  		      // enfoce omap result limits either, we can infer it from
467  		      // the size of the result
468  		      *ptruncated = (pattrs->size() == max_entries);
469  		    }
470  		  }
471  		}
472  		catch (ceph::buffer::error& e) {
473  		  if (prval)
474  		    *prval = -EIO;
475  		}
476  	      }
477  	    }
478  	  };
479  	  struct C_ObjectOperation_decodekeys : public Context {
480  	    uint64_t max_entries;
481  	    ceph::buffer::list bl;
482  	    std::set<std::string> *pattrs;
483  	    bool *ptruncated;
484  	    int *prval;
485  	    C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
486  					 int *pr)
487  	      : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
488  	      if (ptruncated) {
489  		*ptruncated = false;
490  	      }
491  	    }
492  	    void finish(int r) override {
493  	      if (r >= 0) {
494  		using ceph::decode;
495  		auto p = bl.cbegin();
496  		try {
497  		  if (pattrs)
498  		    decode(*pattrs, p);
499  		  if (ptruncated) {
500  		    std::set<std::string> ignore;
501  		    if (!pattrs) {
502  		      decode(ignore, p);
503  		      pattrs = &ignore;
504  		    }
505  		    if (!p.end()) {
506  		      decode(*ptruncated, p);
507  		    } else {
508  		      // the OSD did not provide this.  since old OSDs do not
509  		      // enforce omap result limits either, we can infer it from
510  		      // the size of the result
511  		      *ptruncated = (pattrs->size() == max_entries);
512  		    }
513  		  }
514  		}
515  		catch (ceph::buffer::error& e) {
516  		  if (prval)
517  		    *prval = -EIO;
518  		}
519  	      }
520  	    }
521  	  };
522  	  struct C_ObjectOperation_decodewatchers : public Context {
523  	    ceph::buffer::list bl;
524  	    std::list<obj_watch_t> *pwatchers;
525  	    int *prval;
526  	    C_ObjectOperation_decodewatchers(std::list<obj_watch_t> *pw, int *pr)
527  	      : pwatchers(pw), prval(pr) {}
528  	    void finish(int r) override {
529  	      using ceph::decode;
530  	      if (r >= 0) {
531  		auto p = bl.cbegin();
532  		try {
533  		  obj_list_watch_response_t resp;
534  		  decode(resp, p);
535  		  if (pwatchers) {
536  		    for (const auto& watch_item : resp.entries) {
537  		      obj_watch_t ow;
538  		      std::string sa = watch_item.addr.get_legacy_str();
539  		      strncpy(ow.addr, sa.c_str(), sizeof(ow.addr) - 1);
540  		      ow.addr[sizeof(ow.addr) - 1] = '\0';
541  		      ow.watcher_id = watch_item.name.num();
542  		      ow.cookie = watch_item.cookie;
543  		      ow.timeout_seconds = watch_item.timeout_seconds;
544  		      pwatchers->push_back(std::move(ow));
545  		    }
546  		  }
547  		}
548  		catch (ceph::buffer::error& e) {
549  		  if (prval)
550  		    *prval = -EIO;
551  		}
552  	      }
553  	    }
554  	  };
555  	  struct C_ObjectOperation_decodesnaps : public Context {
556  	    ceph::buffer::list bl;
557  	    librados::snap_set_t *psnaps;
558  	    int *prval;
559  	    C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
560  	      : psnaps(ps), prval(pr) {}
561  	    void finish(int r) override {
562  	      if (r >= 0) {
563  		using ceph::decode;
564  		auto p = bl.cbegin();
565  		try {
566  		  obj_list_snap_response_t resp;
567  		  decode(resp, p);
568  		  if (psnaps) {
569  		    psnaps->clones.clear();
570  		    for (auto ci = resp.clones.begin(); ci != resp.clones.end(); ++ci) {
571  		      librados::clone_info_t clone;
572  	
573  		      clone.cloneid = ci->cloneid;
574  		      clone.snaps.reserve(ci->snaps.size());
575  		      clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
576  					 ci->snaps.end());
577  		      clone.overlap = ci->overlap;
578  		      clone.size = ci->size;
579  	
580  		      psnaps->clones.push_back(clone);
581  		    }
582  		    psnaps->seq = resp.seq;
583  		  }
584  		} catch (ceph::buffer::error& e) {
585  		  if (prval)
586  		    *prval = -EIO;
587  		}
588  	      }
589  	    }
590  	  };
591  	  void getxattrs(std::map<std::string,ceph::buffer::list> *pattrs, int *prval) {
592  	    add_op(CEPH_OSD_OP_GETXATTRS);
593  	    if (pattrs || prval) {
594  	      unsigned p = ops.size() - 1;
595  	      C_ObjectOperation_decodevals *h
596  		= new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
597  	      out_handler[p] = h;
598  	      out_bl[p] = &h->bl;
599  	      out_rval[p] = prval;
600  	    }
601  	  }
602  	  void setxattr(const char *name, const ceph::buffer::list& bl) {
603  	    add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
604  	  }
605  	  void setxattr(const char *name, const std::string& s) {
606  	    ceph::buffer::list bl;
607  	    bl.append(s);
608  	    add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
609  	  }
610  	  void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
611  			const ceph::buffer::list& bl) {
612  	    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
613  	  }
614  	  void rmxattr(const char *name) {
615  	    ceph::buffer::list bl;
616  	    add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
617  	  }
618  	  void setxattrs(std::map<std::string, ceph::buffer::list>& attrs) {
619  	    using ceph::encode;
620  	    ceph::buffer::list bl;
621  	    encode(attrs, bl);
622  	    add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
623  	  }
624  	  void resetxattrs(const char *prefix, std::map<std::string, ceph::buffer::list>& attrs) {
625  	    using ceph::encode;
626  	    ceph::buffer::list bl;
627  	    encode(attrs, bl);
628  	    add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
629  	  }
630  	
631  	  // trivialmap
632  	  void tmap_update(ceph::buffer::list& bl) {
633  	    add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
634  	  }
635  	
636  	  // objectmap
637  	  void omap_get_keys(const std::string &start_after,
638  			     uint64_t max_to_get,
639  			     std::set<std::string> *out_set,
640  			     bool *ptruncated,
641  			     int *prval) {
642  	    using ceph::encode;
643  	    OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
644  	    ceph::buffer::list bl;
645  	    encode(start_after, bl);
646  	    encode(max_to_get, bl);
647  	    op.op.extent.offset = 0;
648  	    op.op.extent.length = bl.length();
649  	    op.indata.claim_append(bl);
650  	    if (prval || ptruncated || out_set) {
651  	      unsigned p = ops.size() - 1;
652  	      C_ObjectOperation_decodekeys *h =
653  		new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
654  	      out_handler[p] = h;
655  	      out_bl[p] = &h->bl;
656  	      out_rval[p] = prval;
657  	    }
658  	  }
659  	
660  	  void omap_get_vals(const std::string &start_after,
661  			     const std::string &filter_prefix,
662  			     uint64_t max_to_get,
663  			     std::map<std::string, ceph::buffer::list> *out_set,
664  			     bool *ptruncated,
665  			     int *prval) {
666  	    using ceph::encode;
667  	    OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
668  	    ceph::buffer::list bl;
669  	    encode(start_after, bl);
670  	    encode(max_to_get, bl);
671  	    encode(filter_prefix, bl);
672  	    op.op.extent.offset = 0;
673  	    op.op.extent.length = bl.length();
674  	    op.indata.claim_append(bl);
675  	    if (prval || out_set || ptruncated) {
676  	      unsigned p = ops.size() - 1;
677  	      C_ObjectOperation_decodevals *h =
678  		new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
679  	      out_handler[p] = h;
680  	      out_bl[p] = &h->bl;
681  	      out_rval[p] = prval;
682  	    }
683  	  }
684  	
685  	  void omap_get_vals_by_keys(const std::set<std::string> &to_get,
686  				    std::map<std::string, ceph::buffer::list> *out_set,
687  				    int *prval) {
688  	    using ceph::encode;
689  	    OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
690  	    ceph::buffer::list bl;
691  	    encode(to_get, bl);
692  	    op.op.extent.offset = 0;
693  	    op.op.extent.length = bl.length();
694  	    op.indata.claim_append(bl);
695  	    if (prval || out_set) {
696  	      unsigned p = ops.size() - 1;
697  	      C_ObjectOperation_decodevals *h =
698  		new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
699  	      out_handler[p] = h;
700  	      out_bl[p] = &h->bl;
701  	      out_rval[p] = prval;
702  	    }
703  	  }
704  	
705  	  void omap_cmp(const std::map<std::string, std::pair<ceph::buffer::list,int> > &assertions,
706  			int *prval) {
707  	    using ceph::encode;
708  	    OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
709  	    ceph::buffer::list bl;
710  	    encode(assertions, bl);
711  	    op.op.extent.offset = 0;
712  	    op.op.extent.length = bl.length();
713  	    op.indata.claim_append(bl);
714  	    if (prval) {
715  	      unsigned p = ops.size() - 1;
716  	      out_rval[p] = prval;
717  	    }
718  	  }
719  	
720  	  struct C_ObjectOperation_copyget : public Context {
721  	    ceph::buffer::list bl;
722  	    object_copy_cursor_t *cursor;
723  	    uint64_t *out_size;
724  	    ceph::real_time *out_mtime;
725  	    std::map<std::string,ceph::buffer::list> *out_attrs;
726  	    ceph::buffer::list *out_data, *out_omap_header, *out_omap_data;
727  	    std::vector<snapid_t> *out_snaps;
728  	    snapid_t *out_snap_seq;
729  	    uint32_t *out_flags;
730  	    uint32_t *out_data_digest;
731  	    uint32_t *out_omap_digest;
732  	    mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids;
733  	    mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
734  	    uint64_t *out_truncate_seq;
735  	    uint64_t *out_truncate_size;
736  	    int *prval;
737  	    C_ObjectOperation_copyget(object_copy_cursor_t *c,
738  				      uint64_t *s,
739  				      ceph::real_time *m,
740  				      std::map<std::string,ceph::buffer::list> *a,
741  				      ceph::buffer::list *d, ceph::buffer::list *oh,
742  				      ceph::buffer::list *o,
743  				      std::vector<snapid_t> *osnaps,
744  				      snapid_t *osnap_seq,
745  				      uint32_t *flags,
746  				      uint32_t *dd,
747  				      uint32_t *od,
748  				      mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *oreqids,
749  				      mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
750  				      uint64_t *otseq,
751  				      uint64_t *otsize,
752  				      int *r)
753  	      : cursor(c),
754  		out_size(s), out_mtime(m),
755  		out_attrs(a), out_data(d), out_omap_header(oh),
756  		out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
757  		out_flags(flags), out_data_digest(dd), out_omap_digest(od),
758  		out_reqids(oreqids),
759  		out_reqid_return_codes(oreqid_return_codes),
760  		out_truncate_seq(otseq),
761  		out_truncate_size(otsize),
762  		prval(r) {}
763  	    void finish(int r) override {
764  	      using ceph::decode;
765  	      // reqids are copied on ENOENT
766  	      if (r < 0 && r != -ENOENT)
767  		return;
768  	      try {
769  		auto p = bl.cbegin();
770  		object_copy_data_t copy_reply;
771  		decode(copy_reply, p);
772  		if (r == -ENOENT) {
773  		  if (out_reqids)
774  		    *out_reqids = copy_reply.reqids;
775  		  return;
776  		}
777  		if (out_size)
778  		  *out_size = copy_reply.size;
779  		if (out_mtime)
780  		  *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
781  		if (out_attrs)
782  		  *out_attrs = copy_reply.attrs;
783  		if (out_data)
784  		  out_data->claim_append(copy_reply.data);
785  		if (out_omap_header)
786  		  out_omap_header->claim_append(copy_reply.omap_header);
787  		if (out_omap_data)
788  		  *out_omap_data = copy_reply.omap_data;
789  		if (out_snaps)
790  		  *out_snaps = copy_reply.snaps;
791  		if (out_snap_seq)
792  		  *out_snap_seq = copy_reply.snap_seq;
793  		if (out_flags)
794  		  *out_flags = copy_reply.flags;
795  		if (out_data_digest)
796  		  *out_data_digest = copy_reply.data_digest;
797  		if (out_omap_digest)
798  		  *out_omap_digest = copy_reply.omap_digest;
799  		if (out_reqids)
800  		  *out_reqids = copy_reply.reqids;
801  		if (out_reqid_return_codes)
802  		  *out_reqid_return_codes = copy_reply.reqid_return_codes;
803  		if (out_truncate_seq)
804  		  *out_truncate_seq = copy_reply.truncate_seq;
805  		if (out_truncate_size)
806  		  *out_truncate_size = copy_reply.truncate_size;
807  		*cursor = copy_reply.cursor;
808  	      } catch (ceph::buffer::error& e) {
809  		if (prval)
810  		  *prval = -EIO;
811  	      }
812  	    }
813  	  };
814  	
815  	  void copy_get(object_copy_cursor_t *cursor,
816  			uint64_t max,
817  			uint64_t *out_size,
818  			ceph::real_time *out_mtime,
819  			std::map<std::string,ceph::buffer::list> *out_attrs,
820  			ceph::buffer::list *out_data,
821  			ceph::buffer::list *out_omap_header,
822  			ceph::buffer::list *out_omap_data,
823  			std::vector<snapid_t> *out_snaps,
824  			snapid_t *out_snap_seq,
825  			uint32_t *out_flags,
826  			uint32_t *out_data_digest,
827  			uint32_t *out_omap_digest,
828  			mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids,
829  			mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
830  			uint64_t *truncate_seq,
831  			uint64_t *truncate_size,
832  			int *prval) {
833  	    using ceph::encode;
834  	    OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
835  	    osd_op.op.copy_get.max = max;
836  	    encode(*cursor, osd_op.indata);
837  	    encode(max, osd_op.indata);
838  	    unsigned p = ops.size() - 1;
839  	    out_rval[p] = prval;
840  	    C_ObjectOperation_copyget *h =
841  	      new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
842  					    out_attrs, out_data, out_omap_header,
843  					    out_omap_data, out_snaps, out_snap_seq,
844  					    out_flags, out_data_digest,
845  					    out_omap_digest, out_reqids,
846  					    out_reqid_return_codes, truncate_seq,
847  					    truncate_size, prval);
848  	    out_bl[p] = &h->bl;
849  	    out_handler[p] = h;
850  	  }
851  	
852  	  void undirty() {
853  	    add_op(CEPH_OSD_OP_UNDIRTY);
854  	  }
855  	
856  	  struct C_ObjectOperation_isdirty : public Context {
857  	    ceph::buffer::list bl;
858  	    bool *pisdirty;
859  	    int *prval;
860  	    C_ObjectOperation_isdirty(bool *p, int *r)
861  	      : pisdirty(p), prval(r) {}
862  	    void finish(int r) override {
863  	      using ceph::decode;
864  	      if (r < 0)
865  		return;
866  	      try {
867  		auto p = bl.cbegin();
868  		bool isdirty;
869  		decode(isdirty, p);
870  		if (pisdirty)
871  		  *pisdirty = isdirty;
872  	      } catch (ceph::buffer::error& e) {
873  		if (prval)
874  		  *prval = -EIO;
875  	      }
876  	    }
877  	  };
878  	
879  	  void is_dirty(bool *pisdirty, int *prval) {
880  	    add_op(CEPH_OSD_OP_ISDIRTY);
881  	    unsigned p = ops.size() - 1;
882  	    out_rval[p] = prval;
883  	    C_ObjectOperation_isdirty *h =
884  	      new C_ObjectOperation_isdirty(pisdirty, prval);
885  	    out_bl[p] = &h->bl;
886  	    out_handler[p] = h;
887  	  }
888  	
889  	  struct C_ObjectOperation_hit_set_ls : public Context {
890  	    ceph::buffer::list bl;
891  	    std::list< std::pair<time_t, time_t> > *ptls;
892  	    std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
893  	    int *prval;
894  	    C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
895  					 std::list< std::pair<ceph::real_time,
896  							      ceph::real_time> > *ut,
897  					 int *r)
898  	      : ptls(t), putls(ut), prval(r) {}
899  	    void finish(int r) override {
900  	      using ceph::decode;
901  	      if (r < 0)
902  		return;
903  	      try {
904  		auto p = bl.cbegin();
905  		std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
906  		decode(ls, p);
907  		if (ptls) {
908  		  ptls->clear();
909  		  for (auto p = ls.begin(); p != ls.end(); ++p)
910  		    // round initial timestamp up to the next full second to
911  		    // keep this a valid interval.
912  		    ptls->push_back(
913  		      std::make_pair(ceph::real_clock::to_time_t(
914  				  ceph::ceil(p->first,
915  					     // Sadly, no time literals until C++14.
916  					     std::chrono::seconds(1))),
917  				ceph::real_clock::to_time_t(p->second)));
918  		}
919  		if (putls)
920  		  putls->swap(ls);
921  	      } catch (ceph::buffer::error& e) {
922  		r = -EIO;
923  	      }
924  	      if (prval)
925  		*prval = r;
926  	    }
927  	  };
928  	
929  	  /**
930  	   * std::list available HitSets.
931  	   *
932  	   * We will get back a std::list of time intervals.  Note that the most
933  	   * recent range may have an empty end timestamp if it is still
934  	   * accumulating.
935  	   *
936  	   * @param pls [out] std::list of time intervals
937  	   * @param prval [out] return value
938  	   */
939  	  void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
940  	    add_op(CEPH_OSD_OP_PG_HITSET_LS);
941  	    unsigned p = ops.size() - 1;
942  	    out_rval[p] = prval;
943  	    C_ObjectOperation_hit_set_ls *h =
944  	      new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
945  	    out_bl[p] = &h->bl;
946  	    out_handler[p] = h;
947  	  }
948  	  void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
949  			  int *prval) {
950  	    add_op(CEPH_OSD_OP_PG_HITSET_LS);
951  	    unsigned p = ops.size() - 1;
952  	    out_rval[p] = prval;
953  	    C_ObjectOperation_hit_set_ls *h =
954  	      new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
955  	    out_bl[p] = &h->bl;
956  	    out_handler[p] = h;
957  	  }
958  	
959  	  /**
960  	   * get HitSet
961  	   *
962  	   * Return an encoded HitSet that includes the provided time
963  	   * interval.
964  	   *
965  	   * @param stamp [in] timestamp
966  	   * @param pbl [out] target buffer for encoded HitSet
967  	   * @param prval [out] return value
968  	   */
969  	  void hit_set_get(ceph::real_time stamp, ceph::buffer::list *pbl, int *prval) {
970  	    OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
971  	    op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
972  	    unsigned p = ops.size() - 1;
973  	    out_rval[p] = prval;
974  	    out_bl[p] = pbl;
975  	  }
976  	
977  	  void omap_get_header(ceph::buffer::list *bl, int *prval) {
978  	    add_op(CEPH_OSD_OP_OMAPGETHEADER);
979  	    unsigned p = ops.size() - 1;
980  	    out_bl[p] = bl;
981  	    out_rval[p] = prval;
982  	  }
983  	
984  	  void omap_set(const std::map<std::string, ceph::buffer::list>& map) {
985  	    using ceph::encode;
986  	    ceph::buffer::list bl;
987  	    encode(map, bl);
988  	    add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
989  	  }
990  	
991  	  void omap_set_header(ceph::buffer::list &bl) {
992  	    add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
993  	  }
994  	
995  	  void omap_clear() {
996  	    add_op(CEPH_OSD_OP_OMAPCLEAR);
997  	  }
998  	
999  	  void omap_rm_keys(const std::set<std::string> &to_remove) {
1000 	    using ceph::encode;
1001 	    ceph::buffer::list bl;
1002 	    encode(to_remove, bl);
1003 	    add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
1004 	  }
1005 	
1006 	  void omap_rm_range(std::string_view key_begin, std::string_view key_end) {
1007 	    bufferlist bl;
1008 	    encode(key_begin, bl);
1009 	    encode(key_end, bl);
1010 	    add_data(CEPH_OSD_OP_OMAPRMKEYRANGE, 0, bl.length(), bl);
1011 	  }
1012 	
1013 	  // object classes
1014 	  void call(const char *cname, const char *method, ceph::buffer::list &indata) {
1015 	    add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
1016 	  }
1017 	
1018 	  void call(const char *cname, const char *method, ceph::buffer::list &indata,
1019 		    ceph::buffer::list *outdata, Context *ctx, int *prval) {
1020 	    add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
1021 	  }
1022 	
1023 	  // watch/notify
1024 	  void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1025 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1026 	    osd_op.op.watch.cookie = cookie;
1027 	    osd_op.op.watch.op = op;
1028 	    osd_op.op.watch.timeout = timeout;
1029 	  }
1030 	
1031 	  void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1032 	              ceph::buffer::list &bl, ceph::buffer::list *inbl) {
1033 	    using ceph::encode;
1034 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1035 	    osd_op.op.notify.cookie = cookie;
1036 	    encode(prot_ver, *inbl);
1037 	    encode(timeout, *inbl);
1038 	    encode(bl, *inbl);
1039 	    osd_op.indata.append(*inbl);
1040 	  }
1041 	
1042 	  void notify_ack(uint64_t notify_id, uint64_t cookie,
1043 			  ceph::buffer::list& reply_bl) {
1044 	    using ceph::encode;
1045 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1046 	    ceph::buffer::list bl;
1047 	    encode(notify_id, bl);
1048 	    encode(cookie, bl);
1049 	    encode(reply_bl, bl);
1050 	    osd_op.indata.append(bl);
1051 	  }
1052 	
1053 	  void list_watchers(std::list<obj_watch_t> *out,
1054 			     int *prval) {
1055 	    (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1056 	    if (prval || out) {
1057 	      unsigned p = ops.size() - 1;
1058 	      C_ObjectOperation_decodewatchers *h =
1059 		new C_ObjectOperation_decodewatchers(out, prval);
1060 	      out_handler[p] = h;
1061 	      out_bl[p] = &h->bl;
1062 	      out_rval[p] = prval;
1063 	    }
1064 	  }
1065 	
1066 	  void list_snaps(librados::snap_set_t *out, int *prval) {
1067 	    (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1068 	    if (prval || out) {
1069 	      unsigned p = ops.size() - 1;
1070 	      C_ObjectOperation_decodesnaps *h =
1071 		new C_ObjectOperation_decodesnaps(out, prval);
1072 	      out_handler[p] = h;
1073 	      out_bl[p] = &h->bl;
1074 	      out_rval[p] = prval;
1075 	    }
1076 	  }
1077 	
1078 	  void assert_version(uint64_t ver) {
1079 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1080 	    osd_op.op.assert_ver.ver = ver;
1081 	  }
1082 	
1083 	  void cmpxattr(const char *name, const ceph::buffer::list& val,
1084 			int op, int mode) {
1085 	    add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1086 	    OSDOp& o = *ops.rbegin();
1087 	    o.op.xattr.cmp_op = op;
1088 	    o.op.xattr.cmp_mode = mode;
1089 	  }
1090 	
1091 	  void rollback(uint64_t snapid) {
1092 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1093 	    osd_op.op.snap.snapid = snapid;
1094 	  }
1095 	
1096 	  void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1097 			 version_t src_version, unsigned flags,
1098 			 unsigned src_fadvise_flags) {
1099 	    using ceph::encode;
1100 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1101 	    osd_op.op.copy_from.snapid = snapid;
1102 	    osd_op.op.copy_from.src_version = src_version;
1103 	    osd_op.op.copy_from.flags = flags;
1104 	    osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1105 	    encode(src, osd_op.indata);
1106 	    encode(src_oloc, osd_op.indata);
1107 	  }
1108 	
1109 	  /**
1110 	   * writeback content to backing tier
1111 	   *
1112 	   * If object is marked dirty in the cache tier, write back content
1113 	   * to backing tier. If the object is clean this is a no-op.
1114 	   *
1115 	   * If writeback races with an update, the update will block.
1116 	   *
1117 	   * use with IGNORE_CACHE to avoid triggering promote.
1118 	   */
1119 	  void cache_flush() {
1120 	    add_op(CEPH_OSD_OP_CACHE_FLUSH);
1121 	  }
1122 	
1123 	  /**
1124 	   * writeback content to backing tier
1125 	   *
1126 	   * If object is marked dirty in the cache tier, write back content
1127 	   * to backing tier. If the object is clean this is a no-op.
1128 	   *
1129 	   * If writeback races with an update, return EAGAIN.  Requires that
1130 	   * the SKIPRWLOCKS flag be set.
1131 	   *
1132 	   * use with IGNORE_CACHE to avoid triggering promote.
1133 	   */
1134 	  void cache_try_flush() {
1135 	    add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1136 	  }
1137 	
1138 	  /**
1139 	   * evict object from cache tier
1140 	   *
1141 	   * If object is marked clean, remove the object from the cache tier.
1142 	   * Otherwise, return EBUSY.
1143 	   *
1144 	   * use with IGNORE_CACHE to avoid triggering promote.
1145 	   */
1146 	  void cache_evict() {
1147 	    add_op(CEPH_OSD_OP_CACHE_EVICT);
1148 	  }
1149 	
1150 	  /*
1151 	   * Extensible tier
1152 	   */
1153 	  void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc, 
1154 			    version_t tgt_version, int flag) {
1155 	    using ceph::encode;
1156 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1157 	    osd_op.op.copy_from.snapid = snapid;
1158 	    osd_op.op.copy_from.src_version = tgt_version;
1159 	    encode(tgt, osd_op.indata);
1160 	    encode(tgt_oloc, osd_op.indata);
1161 	    set_last_op_flags(flag);
1162 	  }
1163 	
1164 	  void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
1165 			 object_t tgt_oid, uint64_t tgt_offset, int flag) {
1166 	    using ceph::encode;
1167 	    OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
1168 	    encode(src_offset, osd_op.indata);
1169 	    encode(src_length, osd_op.indata);
1170 	    encode(tgt_oloc, osd_op.indata);
1171 	    encode(tgt_oid, osd_op.indata);
1172 	    encode(tgt_offset, osd_op.indata);
1173 	    set_last_op_flags(flag);
1174 	  }
1175 	
1176 	  void tier_promote() {
1177 	    add_op(CEPH_OSD_OP_TIER_PROMOTE);
1178 	  }
1179 	
1180 	  void unset_manifest() {
1181 	    add_op(CEPH_OSD_OP_UNSET_MANIFEST);
1182 	  }
1183 	
1184 	  void tier_flush() {
1185 	    add_op(CEPH_OSD_OP_TIER_FLUSH);
1186 	  }
1187 	
1188 	  void set_alloc_hint(uint64_t expected_object_size,
1189 	                      uint64_t expected_write_size,
1190 			      uint32_t flags) {
1191 	    add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1192 			   expected_write_size, flags);
1193 	
1194 	    // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1195 	    // not worth a feature bit.  Set FAILOK per-op flag to make
1196 	    // sure older osds don't trip over an unsupported opcode.
1197 	    set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1198 	  }
1199 	
1200 	  void dup(std::vector<OSDOp>& sops) {
1201 	    ops = sops;
1202 	    out_bl.resize(sops.size());
1203 	    out_handler.resize(sops.size());
1204 	    out_rval.resize(sops.size());
1205 	    for (uint32_t i = 0; i < sops.size(); i++) {
1206 	      out_bl[i] = &sops[i].outdata;
1207 	      out_handler[i] = NULL;
1208 	      out_rval[i] = &sops[i].rval;
1209 	    }
1210 	  }
1211 	
1212 	  /**
1213 	   * Pin/unpin an object in cache tier
1214 	   */
1215 	  void cache_pin() {
1216 	    add_op(CEPH_OSD_OP_CACHE_PIN);
1217 	  }
1218 	
1219 	  void cache_unpin() {
1220 	    add_op(CEPH_OSD_OP_CACHE_UNPIN);
1221 	  }
1222 	};
1223 	
1224 	
1225 	// ----------------
1226 	
1227 	
1228 	class Objecter : public md_config_obs_t, public Dispatcher {
1229 	public:
1230 	  // config observer bits
1231 	  const char** get_tracked_conf_keys() const override;
1232 	  void handle_conf_change(const ConfigProxy& conf,
1233 	                          const std::set <std::string> &changed) override;
1234 	
1235 	public:
1236 	  Messenger *messenger;
1237 	  MonClient *monc;
1238 	  Finisher *finisher;
1239 	  ZTracer::Endpoint trace_endpoint;
1240 	private:
1241 	  std::unique_ptr<OSDMap> osdmap;
1242 	public:
1243 	  using Dispatcher::cct;
1244 	  std::multimap<std::string,std::string> crush_location;
1245 	
1246 	  std::atomic<bool> initialized{false};
1247 	
1248 	private:
1249 	  std::atomic<uint64_t> last_tid{0};
1250 	  std::atomic<unsigned> inflight_ops{0};
1251 	  std::atomic<int> client_inc{-1};
1252 	  uint64_t max_linger_id{0};
1253 	  std::atomic<unsigned> num_in_flight{0};
1254 	  std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1255 	  bool keep_balanced_budget = false;
1256 	  bool honor_pool_full = true;
1257 	  bool pool_full_try = false;
1258 	
1259 	  // If this is true, accumulate a set of blacklisted entities
1260 	  // to be drained by consume_blacklist_events.
1261 	  bool blacklist_events_enabled = false;
1262 	  std::set<entity_addr_t> blacklist_events;
1263 	  struct pg_mapping_t {
1264 	    epoch_t epoch = 0;
1265 	    std::vector<int> up;
1266 	    int up_primary = -1;
1267 	    std::vector<int> acting;
1268 	    int acting_primary = -1;
1269 	
1270 	    pg_mapping_t() {}
1271 	    pg_mapping_t(epoch_t epoch, std::vector<int> up, int up_primary,
1272 	                 std::vector<int> acting, int acting_primary)
1273 	               : epoch(epoch), up(up), up_primary(up_primary),
1274 	                 acting(acting), acting_primary(acting_primary) {}
1275 	  };
1276 	  std::shared_mutex pg_mapping_lock;
1277 	  // pool -> pg mapping
1278 	  std::map<int64_t, std::vector<pg_mapping_t>> pg_mappings;
1279 	
1280 	  // convenient accessors
1281 	  bool lookup_pg_mapping(const pg_t& pg, pg_mapping_t* pg_mapping) {
1282 	    std::shared_lock l{pg_mapping_lock};
1283 	    auto it = pg_mappings.find(pg.pool());
1284 	    if (it == pg_mappings.end())
1285 	      return false;
1286 	    auto& mapping_array = it->second;
1287 	    if (pg.ps() >= mapping_array.size())
1288 	      return false;
1289 	    if (mapping_array[pg.ps()].epoch != pg_mapping->epoch) // stale
1290 	      return false;
1291 	    *pg_mapping = mapping_array[pg.ps()];
1292 	    return true;
1293 	  }
1294 	  void update_pg_mapping(const pg_t& pg, pg_mapping_t&& pg_mapping) {
1295 	    std::lock_guard l{pg_mapping_lock};
1296 	    auto& mapping_array = pg_mappings[pg.pool()];
1297 	    ceph_assert(pg.ps() < mapping_array.size());
1298 	    mapping_array[pg.ps()] = std::move(pg_mapping);
1299 	  }
1300 	  void prune_pg_mapping(const mempool::osdmap::map<int64_t,pg_pool_t>& pools) {
1301 	    std::lock_guard l{pg_mapping_lock};
1302 	    for (auto& pool : pools) {
1303 	      auto& mapping_array = pg_mappings[pool.first];
1304 	      size_t pg_num = pool.second.get_pg_num();
1305 	      if (mapping_array.size() != pg_num) {
1306 	        // catch both pg_num increasing & decreasing
1307 	        mapping_array.resize(pg_num);
1308 	      }
1309 	    }
1310 	    for (auto it = pg_mappings.begin(); it != pg_mappings.end(); ) {
1311 	      if (!pools.count(it->first)) {
1312 	        // pool is gone
1313 	        pg_mappings.erase(it++);
1314 	        continue;
1315 	      }
1316 	      it++;
1317 	    }
1318 	  }
1319 	
1320 	public:
1321 	  void maybe_request_map();
1322 	
1323 	  void enable_blacklist_events();
1324 	private:
1325 	
1326 	  void _maybe_request_map();
1327 	
1328 	  version_t last_seen_osdmap_version = 0;
1329 	  version_t last_seen_pgmap_version = 0;
1330 	
1331 	  mutable std::shared_mutex rwlock;
1332 	  using lock_guard = std::lock_guard<decltype(rwlock)>;
1333 	  using unique_lock = std::unique_lock<decltype(rwlock)>;
1334 	  using shared_lock = boost::shared_lock<decltype(rwlock)>;
1335 	  using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1336 	  ceph::timer<ceph::coarse_mono_clock> timer;
1337 	
1338 	  PerfCounters *logger = nullptr;
1339 	
1340 	  uint64_t tick_event = 0;
1341 	
1342 	  void start_tick();
1343 	  void tick();
1344 	  void update_crush_location();
1345 	
1346 	  class RequestStateHook;
1347 	
1348 	  RequestStateHook *m_request_state_hook = nullptr;
1349 	
1350 	public:
1351 	  /*** track pending operations ***/
1352 	  // read
1353 	
1354 	  struct OSDSession;
1355 	
1356 	  struct op_target_t {
1357 	    int flags = 0;
1358 	
1359 	    epoch_t epoch = 0;  ///< latest epoch we calculated the mapping
1360 	
1361 	    object_t base_oid;
1362 	    object_locator_t base_oloc;
1363 	    object_t target_oid;
1364 	    object_locator_t target_oloc;
1365 	
1366 	    ///< true if we are directed at base_pgid, not base_oid
1367 	    bool precalc_pgid = false;
1368 	
1369 	    ///< true if we have ever mapped to a valid pool
1370 	    bool pool_ever_existed = false;
1371 	
1372 	    ///< explcit pg target, if any
1373 	    pg_t base_pgid;
1374 	
1375 	    pg_t pgid; ///< last (raw) pg we mapped to
1376 	    spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1377 	    unsigned pg_num = 0; ///< last pg_num we mapped to
1378 	    unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1379 	    unsigned pg_num_pending = 0; ///< last pg_num we mapped to
1380 	    std::vector<int> up; ///< set of up osds for last pg we mapped to
1381 	    std::vector<int> acting; ///< set of acting osds for last pg we mapped to
1382 	    int up_primary = -1; ///< last up_primary we mapped to
1383 	    int acting_primary = -1;  ///< last acting_primary we mapped to
1384 	    int size = -1; ///< the size of the pool when were were last mapped
1385 	    int min_size = -1; ///< the min size of the pool when were were last mapped
1386 	    bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1387 	    bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1388 	
1389 	    bool used_replica = false;
1390 	    bool paused = false;
1391 	
1392 	    int osd = -1;      ///< the final target osd, or -1
1393 	
1394 	    epoch_t last_force_resend = 0;
1395 	
1396 	    op_target_t(object_t oid, object_locator_t oloc, int flags)
1397 	      : flags(flags),
1398 		base_oid(oid),
1399 		base_oloc(oloc)
1400 	      {}
1401 	
1402 	    explicit op_target_t(pg_t pgid)
1403 	      : base_oloc(pgid.pool(), pgid.ps()),
1404 		precalc_pgid(true),
1405 		base_pgid(pgid)
1406 	      {}
1407 	
1408 	    op_target_t() = default;
1409 	
1410 	    hobject_t get_hobj() {
1411 	      return hobject_t(target_oid,
1412 			       target_oloc.key,
1413 			       CEPH_NOSNAP,
1414 			       target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1415 			       target_oloc.pool,
1416 			       target_oloc.nspace);
1417 	    }
1418 	
1419 	    bool contained_by(const hobject_t& begin, const hobject_t& end) {
1420 	      hobject_t h = get_hobj();
1421 	      int r = cmp(h, begin);
1422 	      return r == 0 || (r > 0 && h < end);
1423 	    }
1424 	
1425 	    void dump(ceph::Formatter *f) const;
1426 	  };
1427 	
1428 	  struct Op : public RefCountedObject {
1429 	    OSDSession *session;
1430 	    int incarnation;
1431 	
1432 	    op_target_t target;
1433 	
1434 	    ConnectionRef con;  // for rx buffer only
1435 	    uint64_t features;  // explicitly specified op features
1436 	
1437 	    std::vector<OSDOp> ops;
1438 	
1439 	    snapid_t snapid;
1440 	    SnapContext snapc;
1441 	    ceph::real_time mtime;
1442 	
1443 	    ceph::buffer::list *outbl;
1444 	    std::vector<ceph::buffer::list*> out_bl;
1445 	    std::vector<Context*> out_handler;
1446 	    std::vector<int*> out_rval;
1447 	
1448 	    int priority;
1449 	    Context *onfinish;
1450 	    uint64_t ontimeout;
1451 	
1452 	    ceph_tid_t tid;
1453 	    int attempts;
1454 	
1455 	    version_t *objver;
1456 	    epoch_t *reply_epoch;
1457 	
1458 	    ceph::coarse_mono_time stamp;
1459 	
1460 	    epoch_t map_dne_bound;
1461 	
1462 	    int budget;
1463 	
1464 	    /// true if we should resend this message on failure
1465 	    bool should_resend;
1466 	
1467 	    /// true if the throttle budget is get/put on a series of OPs,
1468 	    /// instead of per OP basis, when this flag is set, the budget is
1469 	    /// acquired before sending the very first OP of the series and
1470 	    /// released upon receiving the last OP reply.
1471 	    bool ctx_budgeted;
1472 	
1473 	    int *data_offset;
1474 	
1475 	    osd_reqid_t reqid; // explicitly setting reqid
1476 	    ZTracer::Trace trace;
1477 	
1478 	    Op(const object_t& o, const object_locator_t& ol, std::vector<OSDOp>& op,
1479 	       int f, Context *fin, version_t *ov, int *offset = NULL,
1480 	       ZTracer::Trace *parent_trace = nullptr) :
1481 	      session(NULL), incarnation(0),
1482 	      target(o, ol, f),
1483 	      con(NULL),
1484 	      features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1485 	      snapid(CEPH_NOSNAP),
1486 	      outbl(NULL),
1487 	      priority(0),
1488 	      onfinish(fin),
1489 	      ontimeout(0),
1490 	      tid(0),
1491 	      attempts(0),
1492 	      objver(ov),
1493 	      reply_epoch(NULL),
1494 	      map_dne_bound(0),
1495 	      budget(-1),
1496 	      should_resend(true),
1497 	      ctx_budgeted(false),
1498 	      data_offset(offset) {
1499 	      ops.swap(op);
1500 	
1501 	      /* initialize out_* to match op std::vector */
1502 	      out_bl.resize(ops.size());
1503 	      out_rval.resize(ops.size());
1504 	      out_handler.resize(ops.size());
1505 	      for (unsigned i = 0; i < ops.size(); i++) {
1506 		out_bl[i] = NULL;
1507 		out_handler[i] = NULL;
1508 		out_rval[i] = NULL;
1509 	      }
1510 	
1511 	      if (target.base_oloc.key == o)
1512 		target.base_oloc.key.clear();
1513 	
1514 	      if (parent_trace && parent_trace->valid()) {
1515 	        trace.init("op", nullptr, parent_trace);
1516 	        trace.event("start");
1517 	      }
1518 	    }
1519 	
1520 	    bool operator<(const Op& other) const {
1521 	      return tid < other.tid;
1522 	    }
1523 	
1524 	    bool respects_full() const {
1525 	      return
1526 		(target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1527 		!(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1528 	    }
1529 	
1530 	  private:
1531 	    ~Op() override {
1532 	      while (!out_handler.empty()) {
1533 		delete out_handler.back();
1534 		out_handler.pop_back();
1535 	      }
1536 	      trace.event("finish");
1537 	    }
1538 	  };
1539 	
1540 	  struct C_Op_Map_Latest : public Context {
1541 	    Objecter *objecter;
1542 	    ceph_tid_t tid;
1543 	    version_t latest;
1544 	    C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1545 							 latest(0) {}
1546 	    void finish(int r) override;
1547 	  };
1548 	
1549 	  struct C_Command_Map_Latest : public Context {
1550 	    Objecter *objecter;
1551 	    uint64_t tid;
1552 	    version_t latest;
1553 	    C_Command_Map_Latest(Objecter *o, ceph_tid_t t) :  objecter(o), tid(t),
1554 							       latest(0) {}
1555 	    void finish(int r) override;
1556 	  };
1557 	
1558 	  struct C_Stat : public Context {
1559 	    ceph::buffer::list bl;
1560 	    uint64_t *psize;
1561 	    ceph::real_time *pmtime;
1562 	    Context *fin;
1563 	    C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1564 	      psize(ps), pmtime(pm), fin(c) {}
1565 	    void finish(int r) override {
1566 	      using ceph::decode;
1567 	      if (r >= 0) {
1568 		auto p = bl.cbegin();
1569 		uint64_t s;
1570 		ceph::real_time m;
1571 		decode(s, p);
1572 		decode(m, p);
1573 		if (psize)
1574 		  *psize = s;
1575 		if (pmtime)
1576 		  *pmtime = m;
1577 	      }
1578 	      fin->complete(r);
1579 	    }
1580 	  };
1581 	
1582 	  struct C_GetAttrs : public Context {
1583 	    ceph::buffer::list bl;
1584 	    std::map<std::string,ceph::buffer::list>& attrset;
1585 	    Context *fin;
1586 	    C_GetAttrs(std::map<std::string, ceph::buffer::list>& set, Context *c) : attrset(set),
1587 								   fin(c) {}
1588 	    void finish(int r) override {
1589 	      using ceph::decode;
1590 	      if (r >= 0) {
1591 		auto p = bl.cbegin();
1592 		decode(attrset, p);
1593 	      }
1594 	      fin->complete(r);
1595 	    }
1596 	  };
1597 	
1598 	
1599 	  // Pools and statistics
1600 	  struct NListContext {
1601 	    collection_list_handle_t pos;
1602 	
1603 	    // these are for !sortbitwise compat only
1604 	    int current_pg = 0;
1605 	    int starting_pg_num = 0;
1606 	    bool sort_bitwise = false;
1607 	
1608 	    bool at_end_of_pool = false; ///< publicly visible end flag
1609 	
1610 	    int64_t pool_id = -1;
1611 	    int pool_snap_seq = 0;
1612 	    uint64_t max_entries = 0;
1613 	    std::string nspace;
1614 	
1615 	    ceph::buffer::list bl;   // raw data read to here
1616 	    std::list<librados::ListObjectImpl> list;
1617 	
1618 	    ceph::buffer::list filter;
1619 	
1620 	    // The budget associated with this context, once it is set (>= 0),
1621 	    // the budget is not get/released on OP basis, instead the budget
1622 	    // is acquired before sending the first OP and released upon receiving
1623 	    // the last op reply.
1624 	    int ctx_budget = -1;
1625 	
1626 	    bool at_end() const {
1627 	      return at_end_of_pool;
1628 	    }
1629 	
1630 	    uint32_t get_pg_hash_position() const {
1631 	      return pos.get_hash();
1632 	    }
1633 	  };
1634 	
1635 	  struct C_NList : public Context {
1636 	    NListContext *list_context;
1637 	    Context *final_finish;
1638 	    Objecter *objecter;
1639 	    epoch_t epoch;
1640 	    C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1641 	      list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1642 	    void finish(int r) override {
1643 	      if (r >= 0) {
1644 		objecter->_nlist_reply(list_context, r, final_finish, epoch);
1645 	      } else {
1646 		final_finish->complete(r);
1647 	      }
1648 	    }
1649 	  };
1650 	
1651 	  struct PoolStatOp {
1652 	    ceph_tid_t tid;
1653 	    std::list<std::string> pools;
1654 	
1655 	    std::map<std::string,pool_stat_t> *pool_stats;
1656 	    bool *per_pool;
1657 	    Context *onfinish;
1658 	    uint64_t ontimeout;
1659 	
1660 	    ceph::coarse_mono_time last_submit;
1661 	  };
1662 	
1663 	  struct StatfsOp {
1664 	    ceph_tid_t tid;
1665 	    struct ceph_statfs *stats;
1666 	    boost::optional<int64_t> data_pool;
1667 	    Context *onfinish;
1668 	    uint64_t ontimeout;
1669 	
1670 	    ceph::coarse_mono_time last_submit;
1671 	  };
1672 	
1673 	  struct PoolOp {
1674 	    ceph_tid_t tid;
1675 	    int64_t pool;
1676 	    std::string name;
1677 	    Context *onfinish;
1678 	    uint64_t ontimeout;
1679 	    int pool_op;
1680 	    int16_t crush_rule;
1681 	    snapid_t snapid;
1682 	    ceph::buffer::list *blp;
1683 	
1684 	    ceph::coarse_mono_time last_submit;
1685 	    PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1686 		       crush_rule(0), snapid(0), blp(NULL) {}
1687 	  };
1688 	
1689 	  // -- osd commands --
1690 	  struct CommandOp : public RefCountedObject {
1691 	    OSDSession *session = nullptr;
1692 	    ceph_tid_t tid = 0;
1693 	    std::vector<std::string> cmd;
1694 	    ceph::buffer::list inbl;
1695 	    ceph::buffer::list *poutbl = nullptr;
1696 	    std::string *prs = nullptr;
1697 	
1698 	    // target_osd == -1 means target_pg is valid
1699 	    const int target_osd = -1;
1700 	    const pg_t target_pg;
1701 	
1702 	    op_target_t target;
1703 	
1704 	    epoch_t map_dne_bound = 0;
1705 	    int map_check_error = 0; // error to return if std::map check fails
1706 	    const char *map_check_error_str = nullptr;
1707 	
1708 	    Context *onfinish = nullptr;
1709 	    uint64_t ontimeout = 0;
1710 	    ceph::coarse_mono_time last_submit;
1711 	
1712 	    CommandOp(
1713 	      int target_osd,
1714 	      const std::vector<std::string> &cmd,
1715 	      ceph::buffer::list inbl,
1716 	      ceph::buffer::list *poutbl,
1717 	      std::string *prs,
1718 	      Context *onfinish)
1719 	      : cmd(cmd),
1720 		inbl(inbl),
1721 		poutbl(poutbl),
1722 		prs(prs),
1723 		target_osd(target_osd),
1724 		onfinish(onfinish) {}
1725 	
1726 	    CommandOp(
1727 	      pg_t pgid,
1728 	      const std::vector<std::string> &cmd,
1729 	      ceph::buffer::list inbl,
1730 	      ceph::buffer::list *poutbl,
1731 	      std::string *prs,
1732 	      Context *onfinish)
1733 	      : cmd(cmd),
1734 		inbl(inbl),
1735 		poutbl(poutbl),
1736 		prs(prs),
1737 		target_pg(pgid),
1738 		target(pgid),
1739 		onfinish(onfinish) {}
1740 	
1741 	  };
1742 	
1743 	  void submit_command(CommandOp *c, ceph_tid_t *ptid);
1744 	  int _calc_command_target(CommandOp *c, shunique_lock &sul);
1745 	  void _assign_command_session(CommandOp *c, shunique_lock &sul);
1746 	  void _send_command(CommandOp *c);
1747 	  int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1748 	  void _finish_command(CommandOp *c, int r, std::string rs);
1749 	  void handle_command_reply(MCommandReply *m);
1750 	
1751 	
1752 	  // -- lingering ops --
1753 	
1754 	  struct WatchContext {
1755 	    // this simply mirrors librados WatchCtx2
1756 	    virtual void handle_notify(uint64_t notify_id,
1757 				       uint64_t cookie,
1758 				       uint64_t notifier_id,
1759 				       ceph::buffer::list& bl) = 0;
1760 	    virtual void handle_error(uint64_t cookie, int err) = 0;
1761 	    virtual ~WatchContext() {}
1762 	  };
1763 	
1764 	  struct LingerOp : public RefCountedObject {
1765 	    uint64_t linger_id;
1766 	
1767 	    op_target_t target;
1768 	
1769 	    snapid_t snap;
1770 	    SnapContext snapc;
1771 	    ceph::real_time mtime;
1772 	
1773 	    std::vector<OSDOp> ops;
1774 	    ceph::buffer::list inbl;
1775 	    ceph::buffer::list *poutbl;
1776 	    version_t *pobjver;
1777 	
1778 	    bool is_watch;
1779 	    ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
1780 	    int last_error;  ///< error from last failed ping|reconnect, if any
1781 	    std::shared_mutex watch_lock;
1782 	    using lock_guard = std::unique_lock<decltype(watch_lock)>;
1783 	    using unique_lock = std::unique_lock<decltype(watch_lock)>;
1784 	    using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1785 	    using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1786 	
1787 	    // queue of pending async operations, with the timestamp of
1788 	    // when they were queued.
1789 	    std::list<ceph::coarse_mono_time> watch_pending_async;
1790 	
1791 	    uint32_t register_gen;
1792 	    bool registered;
1793 	    bool canceled;
1794 	    Context *on_reg_commit;
1795 	
1796 	    // we trigger these from an async finisher
1797 	    Context *on_notify_finish;
1798 	    ceph::buffer::list *notify_result_bl;
1799 	    uint64_t notify_id;
1800 	
1801 	    WatchContext *watch_context;
1802 	
1803 	    OSDSession *session;
1804 	
1805 	    Objecter *objecter;
1806 	    int ctx_budget;
1807 	    ceph_tid_t register_tid;
1808 	    ceph_tid_t ping_tid;
1809 	    epoch_t map_dne_bound;
1810 	
1811 	    void _queued_async() {
1812 	      // watch_lock ust be locked unique
1813 	      watch_pending_async.push_back(ceph::coarse_mono_clock::now());
1814 	    }
1815 	    void finished_async() {
1816 	      unique_lock l(watch_lock);
1817 	      ceph_assert(!watch_pending_async.empty());
1818 	      watch_pending_async.pop_front();
1819 	    }
1820 	
1821 	    explicit LingerOp(Objecter *o) : linger_id(0),
1822 				    target(object_t(), object_locator_t(), 0),
1823 				    snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1824 				    is_watch(false), last_error(0),
1825 				    register_gen(0),
1826 				    registered(false),
1827 				    canceled(false),
1828 				    on_reg_commit(NULL),
1829 				    on_notify_finish(NULL),
1830 				    notify_result_bl(NULL),
1831 				    notify_id(0),
1832 				    watch_context(NULL),
1833 				    session(NULL),
1834 				    objecter(o),
1835 				    ctx_budget(-1),
1836 				    register_tid(0),
1837 				    ping_tid(0),
1838 				    map_dne_bound(0) {}
1839 	
1840 	    const LingerOp &operator=(const LingerOp& r) = delete;
1841 	    LingerOp(const LingerOp& o) = delete;
1842 	
1843 	    uint64_t get_cookie() {
1844 	      return reinterpret_cast<uint64_t>(this);
1845 	    }
1846 	
1847 	  private:
1848 	    ~LingerOp() override {
1849 	      delete watch_context;
1850 	    }
1851 	  };
1852 	
1853 	  struct C_Linger_Commit : public Context {
1854 	    Objecter *objecter;
1855 	    LingerOp *info;
1856 	    ceph::buffer::list outbl;  // used for notify only
1857 	    C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1858 	      info->get();
1859 	    }
(1) Event exn_spec_violation: An exception of type "std::length_error" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
1860 	    ~C_Linger_Commit() override {
(2) Event fun_call_w_exception: Called function throws an exception of type "std::length_error". [details]
Also see events: [exn_spec_violation]
1861 	      info->put();
1862 	    }
1863 	    void finish(int r) override {
1864 	      objecter->_linger_commit(info, r, outbl);
1865 	    }
1866 	  };
1867 	
1868 	  struct C_Linger_Reconnect : public Context {
1869 	    Objecter *objecter;
1870 	    LingerOp *info;
1871 	    C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1872 	      info->get();
1873 	    }
1874 	    ~C_Linger_Reconnect() override {
1875 	      info->put();
1876 	    }
1877 	    void finish(int r) override {
1878 	      objecter->_linger_reconnect(info, r);
1879 	    }
1880 	  };
1881 	
1882 	  struct C_Linger_Ping : public Context {
1883 	    Objecter *objecter;
1884 	    LingerOp *info;
1885 	    ceph::coarse_mono_time sent;
1886 	    uint32_t register_gen;
1887 	    C_Linger_Ping(Objecter *o, LingerOp *l)
1888 	      : objecter(o), info(l), register_gen(info->register_gen) {
1889 	      info->get();
1890 	    }
1891 	    ~C_Linger_Ping() override {
1892 	      info->put();
1893 	    }
1894 	    void finish(int r) override {
1895 	      objecter->_linger_ping(info, r, sent, register_gen);
1896 	    }
1897 	  };
1898 	
1899 	  struct C_Linger_Map_Latest : public Context {
1900 	    Objecter *objecter;
1901 	    uint64_t linger_id;
1902 	    version_t latest;
1903 	    C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1904 	      objecter(o), linger_id(id), latest(0) {}
1905 	    void finish(int r) override;
1906 	  };
1907 	
1908 	  // -- osd sessions --
1909 	  struct OSDBackoff {
1910 	    spg_t pgid;
1911 	    uint64_t id;
1912 	    hobject_t begin, end;
1913 	  };
1914 	
1915 	  struct OSDSession : public RefCountedObject {
1916 	    std::shared_mutex lock;
1917 	    using lock_guard = std::lock_guard<decltype(lock)>;
1918 	    using unique_lock = std::unique_lock<decltype(lock)>;
1919 	    using shared_lock = boost::shared_lock<decltype(lock)>;
1920 	    using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1921 	
1922 	    // pending ops
1923 	    std::map<ceph_tid_t,Op*> ops;
1924 	    std::map<uint64_t, LingerOp*> linger_ops;
1925 	    std::map<ceph_tid_t,CommandOp*> command_ops;
1926 	
1927 	    // backoffs
1928 	    std::map<spg_t,std::map<hobject_t,OSDBackoff>> backoffs;
1929 	    std::map<uint64_t,OSDBackoff*> backoffs_by_id;
1930 	
1931 	    int osd;
1932 	    int incarnation;
1933 	    ConnectionRef con;
1934 	    int num_locks;
1935 	    std::unique_ptr<std::mutex[]> completion_locks;
1936 	    using unique_completion_lock = std::unique_lock<
1937 	      decltype(completion_locks)::element_type>;
1938 	
1939 	
1940 	    OSDSession(CephContext *cct, int o) :
1941 	      osd(o), incarnation(0), con(NULL),
1942 	      num_locks(cct->_conf->objecter_completion_locks_per_session),
1943 	      completion_locks(new std::mutex[num_locks]) {}
1944 	
1945 	    ~OSDSession() override;
1946 	
1947 	    bool is_homeless() { return (osd == -1); }
1948 	
1949 	    unique_completion_lock get_lock(object_t& oid);
1950 	  };
1951 	  std::map<int,OSDSession*> osd_sessions;
1952 	
1953 	  bool osdmap_full_flag() const;
1954 	  bool osdmap_pool_full(const int64_t pool_id) const;
1955 	
1956 	 private:
1957 	
1958 	  /**
1959 	   * Test pg_pool_t::FLAG_FULL on a pool
1960 	   *
1961 	   * @return true if the pool exists and has the flag set, or
1962 	   *         the global full flag is set, else false
1963 	   */
1964 	  bool _osdmap_pool_full(const int64_t pool_id) const;
1965 	  bool _osdmap_pool_full(const pg_pool_t &p) const;
1966 	  void update_pool_full_map(std::map<int64_t, bool>& pool_full_map);
1967 	
1968 	  std::map<uint64_t, LingerOp*> linger_ops;
1969 	  // we use this just to confirm a cookie is valid before dereferencing the ptr
1970 	  std::set<LingerOp*> linger_ops_set;
1971 	
1972 	  std::map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1973 	  std::map<ceph_tid_t,StatfsOp*> statfs_ops;
1974 	  std::map<ceph_tid_t,PoolOp*> pool_ops;
1975 	  std::atomic<unsigned> num_homeless_ops{0};
1976 	
1977 	  OSDSession *homeless_session;
1978 	
1979 	  // ops waiting for an osdmap with a new pool or confirmation that
1980 	  // the pool does not exist (may be expanded to other uses later)
1981 	  std::map<uint64_t, LingerOp*> check_latest_map_lingers;
1982 	  std::map<ceph_tid_t, Op*> check_latest_map_ops;
1983 	  std::map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1984 	
1985 	  std::map<epoch_t,std::list< std::pair<Context*, int> > > waiting_for_map;
1986 	
1987 	  ceph::timespan mon_timeout;
1988 	  ceph::timespan osd_timeout;
1989 	
1990 	  MOSDOp *_prepare_osd_op(Op *op);
1991 	  void _send_op(Op *op);
1992 	  void _send_op_account(Op *op);
1993 	  void _cancel_linger_op(Op *op);
1994 	  void _finish_op(Op *op, int r);
1995 	  static bool is_pg_changed(
1996 	    int oldprimary,
1997 	    const std::vector<int>& oldacting,
1998 	    int newprimary,
1999 	    const std::vector<int>& newacting,
2000 	    bool any_change=false);
2001 	  enum recalc_op_target_result {
2002 	    RECALC_OP_TARGET_NO_ACTION = 0,
2003 	    RECALC_OP_TARGET_NEED_RESEND,
2004 	    RECALC_OP_TARGET_POOL_DNE,
2005 	    RECALC_OP_TARGET_OSD_DNE,
2006 	    RECALC_OP_TARGET_OSD_DOWN,
2007 	  };
2008 	  bool _osdmap_full_flag() const;
2009 	  bool _osdmap_has_pool_full() const;
2010 	  void _prune_snapc(
2011 	    const mempool::osdmap::map<int64_t, snap_interval_set_t>& new_removed_snaps,
2012 	    Op *op);
2013 	
2014 	  bool target_should_be_paused(op_target_t *op);
2015 	  int _calc_target(op_target_t *t, Connection *con,
2016 			   bool any_change = false);
2017 	  int _map_session(op_target_t *op, OSDSession **s,
2018 			   shunique_lock& lc);
2019 	
2020 	  void _session_op_assign(OSDSession *s, Op *op);
2021 	  void _session_op_remove(OSDSession *s, Op *op);
2022 	  void _session_linger_op_assign(OSDSession *to, LingerOp *op);
2023 	  void _session_linger_op_remove(OSDSession *from, LingerOp *op);
2024 	  void _session_command_op_assign(OSDSession *to, CommandOp *op);
2025 	  void _session_command_op_remove(OSDSession *from, CommandOp *op);
2026 	
2027 	  int _assign_op_target_session(Op *op, shunique_lock& lc,
2028 					bool src_session_locked,
2029 					bool dst_session_locked);
2030 	  int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
2031 	
2032 	  void _linger_submit(LingerOp *info, shunique_lock& sul);
2033 	  void _send_linger(LingerOp *info, shunique_lock& sul);
2034 	  void _linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl);
2035 	  void _linger_reconnect(LingerOp *info, int r);
2036 	  void _send_linger_ping(LingerOp *info);
2037 	  void _linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
2038 			    uint32_t register_gen);
2039 	  int _normalize_watch_error(int r);
2040 	
2041 	  friend class C_DoWatchError;
2042 	public:
2043 	  void linger_callback_flush(Context *ctx) {
2044 	    finisher->queue(ctx);
2045 	  }
2046 	
2047 	private:
2048 	  void _check_op_pool_dne(Op *op, unique_lock *sl);
2049 	  void _send_op_map_check(Op *op);
2050 	  void _op_cancel_map_check(Op *op);
2051 	  void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
2052 	  void _send_linger_map_check(LingerOp *op);
2053 	  void _linger_cancel_map_check(LingerOp *op);
2054 	  void _check_command_map_dne(CommandOp *op);
2055 	  void _send_command_map_check(CommandOp *op);
2056 	  void _command_cancel_map_check(CommandOp *op);
2057 	
2058 	  void _kick_requests(OSDSession *session, std::map<uint64_t, LingerOp *>& lresend);
2059 	  void _linger_ops_resend(std::map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
2060 	
2061 	  int _get_session(int osd, OSDSession **session, shunique_lock& sul);
2062 	  void put_session(OSDSession *s);
2063 	  void get_session(OSDSession *s);
2064 	  void _reopen_session(OSDSession *session);
2065 	  void close_session(OSDSession *session);
2066 	
2067 	  void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
2068 			   epoch_t reply_epoch);
2069 	
2070 	  void resend_mon_ops();
2071 	
2072 	  /**
2073 	   * handle a budget for in-flight ops
2074 	   * budget is taken whenever an op goes into the ops std::map
2075 	   * and returned whenever an op is removed from the std::map
2076 	   * If throttle_op needs to throttle it will unlock client_lock.
2077 	   */
2078 	  int calc_op_budget(const std::vector<OSDOp>& ops);
2079 	  void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
2080 	  int _take_op_budget(Op *op, shunique_lock& sul) {
2081 	    ceph_assert(sul && sul.mutex() == &rwlock);
2082 	    int op_budget = calc_op_budget(op->ops);
2083 	    if (keep_balanced_budget) {
2084 	      _throttle_op(op, sul, op_budget);
2085 	    } else { // update take_linger_budget to match this!
2086 	      op_throttle_bytes.take(op_budget);
2087 	      op_throttle_ops.take(1);
2088 	    }
2089 	    op->budget = op_budget;
2090 	    return op_budget;
2091 	  }
2092 	  int take_linger_budget(LingerOp *info);
2093 	  friend class WatchContext; // to invoke put_up_budget_bytes
2094 	  void put_op_budget_bytes(int op_budget) {
2095 	    ceph_assert(op_budget >= 0);
2096 	    op_throttle_bytes.put(op_budget);
2097 	    op_throttle_ops.put(1);
2098 	  }
2099 	  void put_nlist_context_budget(NListContext *list_context);
2100 	  Throttle op_throttle_bytes, op_throttle_ops;
2101 	
2102 	 public:
2103 	  Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
2104 		   Finisher *fin,
2105 		   double mon_timeout,
2106 		   double osd_timeout);
2107 	  ~Objecter() override;
2108 	
2109 	  void init();
2110 	  void start(const OSDMap *o = nullptr);
2111 	  void shutdown();
2112 	
2113 	  // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2114 	  // whatever functionality you want to use the OSDMap in a lambda like:
2115 	  //
2116 	  // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2117 	  //
2118 	  // or
2119 	  //
2120 	  // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2121 	  //
2122 	  // Do not call into something that will try to lock the OSDMap from
2123 	  // here or you will have great woe and misery.
2124 	
2125 	  template<typename Callback, typename...Args>
2126 	  decltype(auto) with_osdmap(Callback&& cb, Args&&... args) {
2127 	    shared_lock l(rwlock);
2128 	    return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2129 	  }
2130 	
2131 	
2132 	  /**
2133 	   * Tell the objecter to throttle outgoing ops according to its
2134 	   * budget (in _conf). If you do this, ops can block, in
2135 	   * which case it will unlock client_lock and sleep until
2136 	   * incoming messages reduce the used budget low enough for
2137 	   * the ops to continue going; then it will lock client_lock again.
2138 	   */
2139 	  void set_balanced_budget() { keep_balanced_budget = true; }
2140 	  void unset_balanced_budget() { keep_balanced_budget = false; }
2141 	
2142 	  void set_honor_pool_full() { honor_pool_full = true; }
2143 	  void unset_honor_pool_full() { honor_pool_full = false; }
2144 	
2145 	  void set_pool_full_try() { pool_full_try = true; }
2146 	  void unset_pool_full_try() { pool_full_try = false; }
2147 	
2148 	  void _scan_requests(
2149 	    OSDSession *s,
2150 	    bool skipped_map,
2151 	    bool cluster_full,
2152 	    std::map<int64_t, bool> *pool_full_map,
2153 	    std::map<ceph_tid_t, Op*>& need_resend,
2154 	    std::list<LingerOp*>& need_resend_linger,
2155 	    std::map<ceph_tid_t, CommandOp*>& need_resend_command,
2156 	    shunique_lock& sul);
2157 	
2158 	  int64_t get_object_hash_position(int64_t pool, const std::string& key,
2159 					   const std::string& ns);
2160 	  int64_t get_object_pg_hash_position(int64_t pool, const std::string& key,
2161 					      const std::string& ns);
2162 	
2163 	  // messages
2164 	 public:
2165 	  bool ms_dispatch(Message *m) override;
2166 	  bool ms_can_fast_dispatch_any() const override {
2167 	    return true;
2168 	  }
2169 	  bool ms_can_fast_dispatch(const Message *m) const override {
2170 	    switch (m->get_type()) {
2171 	    case CEPH_MSG_OSD_OPREPLY:
2172 	    case CEPH_MSG_WATCH_NOTIFY:
2173 	      return true;
2174 	    default:
2175 	      return false;
2176 	    }
2177 	  }
2178 	  void ms_fast_dispatch(Message *m) override {
2179 	    if (!ms_dispatch(m)) {
2180 	      m->put();
2181 	    }
2182 	  }
2183 	
2184 	  void handle_osd_op_reply(class MOSDOpReply *m);
2185 	  void handle_osd_backoff(class MOSDBackoff *m);
2186 	  void handle_watch_notify(class MWatchNotify *m);
2187 	  void handle_osd_map(class MOSDMap *m);
2188 	  void wait_for_osd_map();
2189 	
2190 	  /**
2191 	   * Get std::list of entities blacklisted since this was last called,
2192 	   * and reset the std::list.
2193 	   *
2194 	   * Uses a std::set because typical use case is to compare some
2195 	   * other std::list of clients to see which overlap with the blacklisted
2196 	   * addrs.
2197 	   *
2198 	   */
2199 	  void consume_blacklist_events(std::set<entity_addr_t> *events);
2200 	
2201 	  int pool_snap_by_name(int64_t poolid,
2202 				const char *snap_name,
2203 				snapid_t *snap) const;
2204 	  int pool_snap_get_info(int64_t poolid, snapid_t snap,
2205 				 pool_snap_info_t *info) const;
2206 	  int pool_snap_list(int64_t poolid, std::vector<uint64_t> *snaps);
2207 	private:
2208 	
2209 	  void emit_blacklist_events(const OSDMap::Incremental &inc);
2210 	  void emit_blacklist_events(const OSDMap &old_osd_map,
2211 	                             const OSDMap &new_osd_map);
2212 	
2213 	  // low-level
2214 	  void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2215 	  void _op_submit_with_budget(Op *op, shunique_lock& lc,
2216 				      ceph_tid_t *ptid,
2217 				      int *ctx_budget = NULL);
2218 	  // public interface
2219 	public:
2220 	  void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2221 	  bool is_active() {
2222 	    shared_lock l(rwlock);
2223 	    return !((!inflight_ops) && linger_ops.empty() &&
2224 		     poolstat_ops.empty() && statfs_ops.empty());
2225 	  }
2226 	
2227 	  /**
2228 	   * Output in-flight requests
2229 	   */
2230 	  void _dump_active(OSDSession *s);
2231 	  void _dump_active();
2232 	  void dump_active();
2233 	  void dump_requests(ceph::Formatter *fmt);
2234 	  void _dump_ops(const OSDSession *s, ceph::Formatter *fmt);
2235 	  void dump_ops(ceph::Formatter *fmt);
2236 	  void _dump_linger_ops(const OSDSession *s, ceph::Formatter *fmt);
2237 	  void dump_linger_ops(ceph::Formatter *fmt);
2238 	  void _dump_command_ops(const OSDSession *s, ceph::Formatter *fmt);
2239 	  void dump_command_ops(ceph::Formatter *fmt);
2240 	  void dump_pool_ops(ceph::Formatter *fmt) const;
2241 	  void dump_pool_stat_ops(ceph::Formatter *fmt) const;
2242 	  void dump_statfs_ops(ceph::Formatter *fmt) const;
2243 	
2244 	  int get_client_incarnation() const { return client_inc; }
2245 	  void set_client_incarnation(int inc) { client_inc = inc; }
2246 	
2247 	  bool have_map(epoch_t epoch);
2248 	  /// wait for epoch; true if we already have it
2249 	  bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2250 	  void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2251 	  void wait_for_latest_osdmap(Context *fin);
2252 	  void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2253 	
2254 	  /** Get the current set of global op flags */
2255 	  int get_global_op_flags() const { return global_op_flags; }
2256 	  /** Add a flag to the global op flags, not really atomic operation */
2257 	  void add_global_op_flags(int flag) {
2258 	    global_op_flags.fetch_or(flag);
2259 	  }
2260 	  /** Clear the passed flags from the global op flag set */
2261 	  void clear_global_op_flag(int flags) {
2262 	    global_op_flags.fetch_and(~flags);
2263 	  }
2264 	
2265 	  /// cancel an in-progress request with the given return code
2266 	private:
2267 	  int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2268 	  int _op_cancel(ceph_tid_t tid, int r);
2269 	public:
2270 	  int op_cancel(ceph_tid_t tid, int r);
2271 	  int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
2272 	
2273 	  /**
2274 	   * Any write op which is in progress at the start of this call shall no
2275 	   * longer be in progress when this call ends.  Operations started after the
2276 	   * start of this call may still be in progress when this call ends.
2277 	   *
2278 	   * @return the latest possible epoch in which a cancelled op could have
2279 	   *         existed, or -1 if nothing was cancelled.
2280 	   */
2281 	  epoch_t op_cancel_writes(int r, int64_t pool=-1);
2282 	
2283 	  // commands
2284 	  void osd_command(int osd, const std::vector<std::string>& cmd,
2285 			  const ceph::buffer::list& inbl, ceph_tid_t *ptid,
2286 			  ceph::buffer::list *poutbl, std::string *prs, Context *onfinish) {
2287 	    ceph_assert(osd >= 0);
2288 	    CommandOp *c = new CommandOp(
2289 	      osd,
2290 	      cmd,
2291 	      inbl,
2292 	      poutbl,
2293 	      prs,
2294 	      onfinish);
2295 	    submit_command(c, ptid);
2296 	  }
2297 	  void pg_command(pg_t pgid, const std::vector<std::string>& cmd,
2298 			 const ceph::buffer::list& inbl, ceph_tid_t *ptid,
2299 			 ceph::buffer::list *poutbl, std::string *prs, Context *onfinish) {
2300 	    CommandOp *c = new CommandOp(
2301 	      pgid,
2302 	      cmd,
2303 	      inbl,
2304 	      poutbl,
2305 	      prs,
2306 	      onfinish);
2307 	    submit_command(c, ptid);
2308 	  }
2309 	
2310 	  // mid-level helpers
2311 	  Op *prepare_mutate_op(
2312 	    const object_t& oid, const object_locator_t& oloc,
2313 	    ObjectOperation& op, const SnapContext& snapc,
2314 	    ceph::real_time mtime, int flags,
2315 	    Context *oncommit, version_t *objver = NULL,
2316 	    osd_reqid_t reqid = osd_reqid_t(),
2317 	    ZTracer::Trace *parent_trace = nullptr) {
2318 	    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2319 			   CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2320 	    o->priority = op.priority;
2321 	    o->mtime = mtime;
2322 	    o->snapc = snapc;
2323 	    o->out_rval.swap(op.out_rval);
2324 	    o->out_bl.swap(op.out_bl);
2325 	    o->out_handler.swap(op.out_handler);
2326 	    o->reqid = reqid;
2327 	    return o;
2328 	  }
2329 	  ceph_tid_t mutate(
2330 	    const object_t& oid, const object_locator_t& oloc,
2331 	    ObjectOperation& op, const SnapContext& snapc,
2332 	    ceph::real_time mtime, int flags,
2333 	    Context *oncommit, version_t *objver = NULL,
2334 	    osd_reqid_t reqid = osd_reqid_t()) {
2335 	    Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2336 				      oncommit, objver, reqid);
2337 	    ceph_tid_t tid;
2338 	    op_submit(o, &tid);
2339 	    return tid;
2340 	  }
2341 	  Op *prepare_read_op(
2342 	    const object_t& oid, const object_locator_t& oloc,
2343 	    ObjectOperation& op,
2344 	    snapid_t snapid, ceph::buffer::list *pbl, int flags,
2345 	    Context *onack, version_t *objver = NULL,
2346 	    int *data_offset = NULL,
2347 	    uint64_t features = 0,
2348 	    ZTracer::Trace *parent_trace = nullptr) {
2349 	    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2350 			   CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2351 	    o->priority = op.priority;
2352 	    o->snapid = snapid;
2353 	    o->outbl = pbl;
2354 	    if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2355 		o->outbl = op.out_bl[0];
2356 	    o->out_bl.swap(op.out_bl);
2357 	    o->out_handler.swap(op.out_handler);
2358 	    o->out_rval.swap(op.out_rval);
2359 	    return o;
2360 	  }
2361 	  ceph_tid_t read(
2362 	    const object_t& oid, const object_locator_t& oloc,
2363 	    ObjectOperation& op,
2364 	    snapid_t snapid, ceph::buffer::list *pbl, int flags,
2365 	    Context *onack, version_t *objver = NULL,
2366 	    int *data_offset = NULL,
2367 	    uint64_t features = 0) {
2368 	    Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2369 				    data_offset);
2370 	    if (features)
2371 	      o->features = features;
2372 	    ceph_tid_t tid;
2373 	    op_submit(o, &tid);
2374 	    return tid;
2375 	  }
2376 	  Op *prepare_pg_read_op(
2377 	    uint32_t hash, object_locator_t oloc,
2378 	    ObjectOperation& op, ceph::buffer::list *pbl, int flags,
2379 	    Context *onack, epoch_t *reply_epoch,
2380 	    int *ctx_budget) {
2381 	    Op *o = new Op(object_t(), oloc,
2382 			   op.ops,
2383 			   flags | global_op_flags | CEPH_OSD_FLAG_READ |
2384 			   CEPH_OSD_FLAG_IGNORE_OVERLAY,
2385 			   onack, NULL);
2386 	    o->target.precalc_pgid = true;
2387 	    o->target.base_pgid = pg_t(hash, oloc.pool);
2388 	    o->priority = op.priority;
2389 	    o->snapid = CEPH_NOSNAP;
2390 	    o->outbl = pbl;
2391 	    o->out_bl.swap(op.out_bl);
2392 	    o->out_handler.swap(op.out_handler);
2393 	    o->out_rval.swap(op.out_rval);
2394 	    o->reply_epoch = reply_epoch;
2395 	    if (ctx_budget) {
2396 	      // budget is tracked by listing context
2397 	      o->ctx_budgeted = true;
2398 	    }
2399 	    return o;
2400 	  }
2401 	  ceph_tid_t pg_read(
2402 	    uint32_t hash, object_locator_t oloc,
2403 	    ObjectOperation& op, ceph::buffer::list *pbl, int flags,
2404 	    Context *onack, epoch_t *reply_epoch,
2405 	    int *ctx_budget) {
2406 	    Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2407 				       onack, reply_epoch, ctx_budget);
2408 	    ceph_tid_t tid;
2409 	    op_submit(o, &tid, ctx_budget);
2410 	    return tid;
2411 	  }
2412 	
2413 	  // caller owns a ref
2414 	  LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2415 				    int flags);
2416 	  ceph_tid_t linger_watch(LingerOp *info,
2417 				  ObjectOperation& op,
2418 				  const SnapContext& snapc, ceph::real_time mtime,
2419 				  ceph::buffer::list& inbl,
2420 				  Context *onfinish,
2421 				  version_t *objver);
2422 	  ceph_tid_t linger_notify(LingerOp *info,
2423 				   ObjectOperation& op,
2424 				   snapid_t snap, ceph::buffer::list& inbl,
2425 				   ceph::buffer::list *poutbl,
2426 				   Context *onack,
2427 				   version_t *objver);
2428 	  int linger_check(LingerOp *info);
2429 	  void linger_cancel(LingerOp *info);  // releases a reference
2430 	  void _linger_cancel(LingerOp *info);
2431 	
2432 	  void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2433 	
2434 	  /**
2435 	   * set up initial ops in the op std::vector, and allocate a final op slot.
2436 	   *
2437 	   * The caller is responsible for filling in the final ops_count ops.
2438 	   *
2439 	   * @param ops op std::vector
2440 	   * @param ops_count number of final ops the caller will fill in
2441 	   * @param extra_ops pointer to [array of] initial op[s]
2442 	   * @return index of final op (for caller to fill in)
2443 	   */
2444 	  int init_ops(std::vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2445 	    int i;
2446 	    int extra = 0;
2447 	
2448 	    if (extra_ops)
2449 	      extra = extra_ops->ops.size();
2450 	
2451 	    ops.resize(ops_count + extra);
2452 	
2453 	    for (i=0; i<extra; i++) {
2454 	      ops[i] = extra_ops->ops[i];
2455 	    }
2456 	
2457 	    return i;
2458 	  }
2459 	
2460 	
2461 	  // high-level helpers
2462 	  Op *prepare_stat_op(
2463 	    const object_t& oid, const object_locator_t& oloc,
2464 	    snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2465 	    int flags, Context *onfinish, version_t *objver = NULL,
2466 	    ObjectOperation *extra_ops = NULL) {
2467 	    std::vector<OSDOp> ops;
2468 	    int i = init_ops(ops, 1, extra_ops);
2469 	    ops[i].op.op = CEPH_OSD_OP_STAT;
2470 	    C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2471 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2472 			   CEPH_OSD_FLAG_READ, fin, objver);
2473 	    o->snapid = snap;
2474 	    o->outbl = &fin->bl;
2475 	    return o;
2476 	  }
2477 	  ceph_tid_t stat(
2478 	    const object_t& oid, const object_locator_t& oloc,
2479 	    snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2480 	    int flags, Context *onfinish, version_t *objver = NULL,
2481 	    ObjectOperation *extra_ops = NULL) {
2482 	    Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2483 				    onfinish, objver, extra_ops);
2484 	    ceph_tid_t tid;
2485 	    op_submit(o, &tid);
2486 	    return tid;
2487 	  }
2488 	
2489 	  Op *prepare_read_op(
2490 	    const object_t& oid, const object_locator_t& oloc,
2491 	    uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
2492 	    int flags, Context *onfinish, version_t *objver = NULL,
2493 	    ObjectOperation *extra_ops = NULL, int op_flags = 0,
2494 	    ZTracer::Trace *parent_trace = nullptr) {
2495 	    std::vector<OSDOp> ops;
2496 	    int i = init_ops(ops, 1, extra_ops);
2497 	    ops[i].op.op = CEPH_OSD_OP_READ;
2498 	    ops[i].op.extent.offset = off;
2499 	    ops[i].op.extent.length = len;
2500 	    ops[i].op.extent.truncate_size = 0;
2501 	    ops[i].op.extent.truncate_seq = 0;
2502 	    ops[i].op.flags = op_flags;
2503 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2504 			   CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2505 	    o->snapid = snap;
2506 	    o->outbl = pbl;
2507 	    return o;
2508 	  }
2509 	  ceph_tid_t read(
2510 	    const object_t& oid, const object_locator_t& oloc,
2511 	    uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
2512 	    int flags, Context *onfinish, version_t *objver = NULL,
2513 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2514 	    Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2515 				    onfinish, objver, extra_ops, op_flags);
2516 	    ceph_tid_t tid;
2517 	    op_submit(o, &tid);
2518 	    return tid;
2519 	  }
2520 	
2521 	  Op *prepare_cmpext_op(
2522 	    const object_t& oid, const object_locator_t& oloc,
2523 	    uint64_t off, ceph::buffer::list &cmp_bl,
2524 	    snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2525 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2526 	    std::vector<OSDOp> ops;
2527 	    int i = init_ops(ops, 1, extra_ops);
2528 	    ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2529 	    ops[i].op.extent.offset = off;
2530 	    ops[i].op.extent.length = cmp_bl.length();
2531 	    ops[i].op.extent.truncate_size = 0;
2532 	    ops[i].op.extent.truncate_seq = 0;
2533 	    ops[i].indata = cmp_bl;
2534 	    ops[i].op.flags = op_flags;
2535 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2536 			   CEPH_OSD_FLAG_READ, onfinish, objver);
2537 	    o->snapid = snap;
2538 	    return o;
2539 	  }
2540 	
2541 	  ceph_tid_t cmpext(
2542 	    const object_t& oid, const object_locator_t& oloc,
2543 	    uint64_t off, ceph::buffer::list &cmp_bl,
2544 	    snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2545 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2546 	    Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2547 				      flags, onfinish, objver, extra_ops, op_flags);
2548 	    ceph_tid_t tid;
2549 	    op_submit(o, &tid);
2550 	    return tid;
2551 	  }
2552 	
2553 	  ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2554 				uint64_t off, uint64_t len, snapid_t snap,
2555 				ceph::buffer::list *pbl, int flags, uint64_t trunc_size,
2556 				__u32 trunc_seq, Context *onfinish,
2557 				version_t *objver = NULL,
2558 				ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2559 	    std::vector<OSDOp> ops;
2560 	    int i = init_ops(ops, 1, extra_ops);
2561 	    ops[i].op.op = CEPH_OSD_OP_READ;
2562 	    ops[i].op.extent.offset = off;
2563 	    ops[i].op.extent.length = len;
2564 	    ops[i].op.extent.truncate_size = trunc_size;
2565 	    ops[i].op.extent.truncate_seq = trunc_seq;
2566 	    ops[i].op.flags = op_flags;
2567 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2568 			   CEPH_OSD_FLAG_READ, onfinish, objver);
2569 	    o->snapid = snap;
2570 	    o->outbl = pbl;
2571 	    ceph_tid_t tid;
2572 	    op_submit(o, &tid);
2573 	    return tid;
2574 	  }
2575 	  ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2576 			    uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
2577 			    int flags, Context *onfinish, version_t *objver = NULL,
2578 			    ObjectOperation *extra_ops = NULL) {
2579 	    std::vector<OSDOp> ops;
2580 	    int i = init_ops(ops, 1, extra_ops);
2581 	    ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2582 	    ops[i].op.extent.offset = off;
2583 	    ops[i].op.extent.length = len;
2584 	    ops[i].op.extent.truncate_size = 0;
2585 	    ops[i].op.extent.truncate_seq = 0;
2586 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2587 			   CEPH_OSD_FLAG_READ, onfinish, objver);
2588 	    o->snapid = snap;
2589 	    o->outbl = pbl;
2590 	    ceph_tid_t tid;
2591 	    op_submit(o, &tid);
2592 	    return tid;
2593 	  }
2594 	  ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2595 		     const char *name, snapid_t snap, ceph::buffer::list *pbl, int flags,
2596 		     Context *onfinish,
2597 		     version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2598 	    std::vector<OSDOp> ops;
2599 	    int i = init_ops(ops, 1, extra_ops);
2600 	    ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2601 	    ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2602 	    ops[i].op.xattr.value_len = 0;
2603 	    if (name)
2604 	      ops[i].indata.append(name, ops[i].op.xattr.name_len);
2605 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2606 			   CEPH_OSD_FLAG_READ, onfinish, objver);
2607 	    o->snapid = snap;
2608 	    o->outbl = pbl;
2609 	    ceph_tid_t tid;
2610 	    op_submit(o, &tid);
2611 	    return tid;
2612 	  }
2613 	
2614 	  ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2615 			       snapid_t snap, std::map<std::string,ceph::buffer::list>& attrset,
2616 			       int flags, Context *onfinish, version_t *objver = NULL,
2617 			       ObjectOperation *extra_ops = NULL) {
2618 	    std::vector<OSDOp> ops;
2619 	    int i = init_ops(ops, 1, extra_ops);
2620 	    ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2621 	    C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2622 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2623 			   CEPH_OSD_FLAG_READ, fin, objver);
2624 	    o->snapid = snap;
2625 	    o->outbl = &fin->bl;
2626 	    ceph_tid_t tid;
2627 	    op_submit(o, &tid);
2628 	    return tid;
2629 	  }
2630 	
2631 	  ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2632 			       snapid_t snap, ceph::buffer::list *pbl, int flags,
2633 			       Context *onfinish, version_t *objver = NULL,
2634 			       ObjectOperation *extra_ops = NULL) {
2635 	    return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
2636 			CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2637 	  }
2638 	
2639 	
2640 	  // writes
2641 	  ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2642 			     std::vector<OSDOp>& ops, ceph::real_time mtime,
2643 			     const SnapContext& snapc, int flags,
2644 			     Context *oncommit,
2645 			     version_t *objver = NULL) {
2646 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2647 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2648 	    o->mtime = mtime;
2649 	    o->snapc = snapc;
2650 	    ceph_tid_t tid;
2651 	    op_submit(o, &tid);
2652 	    return tid;
2653 	  }
2654 	  Op *prepare_write_op(
2655 	    const object_t& oid, const object_locator_t& oloc,
2656 	    uint64_t off, uint64_t len, const SnapContext& snapc,
2657 	    const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2658 	    Context *oncommit, version_t *objver = NULL,
2659 	    ObjectOperation *extra_ops = NULL, int op_flags = 0,
2660 	    ZTracer::Trace *parent_trace = nullptr) {
2661 	    std::vector<OSDOp> ops;
2662 	    int i = init_ops(ops, 1, extra_ops);
2663 	    ops[i].op.op = CEPH_OSD_OP_WRITE;
2664 	    ops[i].op.extent.offset = off;
2665 	    ops[i].op.extent.length = len;
2666 	    ops[i].op.extent.truncate_size = 0;
2667 	    ops[i].op.extent.truncate_seq = 0;
2668 	    ops[i].indata = bl;
2669 	    ops[i].op.flags = op_flags;
2670 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2671 			   CEPH_OSD_FLAG_WRITE, oncommit, objver,
2672 	                   nullptr, parent_trace);
2673 	    o->mtime = mtime;
2674 	    o->snapc = snapc;
2675 	    return o;
2676 	  }
2677 	  ceph_tid_t write(
2678 	    const object_t& oid, const object_locator_t& oloc,
2679 	    uint64_t off, uint64_t len, const SnapContext& snapc,
2680 	    const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2681 	    Context *oncommit, version_t *objver = NULL,
2682 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2683 	    Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2684 				     oncommit, objver, extra_ops, op_flags);
2685 	    ceph_tid_t tid;
2686 	    op_submit(o, &tid);
2687 	    return tid;
2688 	  }
2689 	  Op *prepare_append_op(
2690 	    const object_t& oid, const object_locator_t& oloc,
2691 	    uint64_t len, const SnapContext& snapc,
2692 	    const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2693 	    Context *oncommit,
2694 	    version_t *objver = NULL,
2695 	    ObjectOperation *extra_ops = NULL) {
2696 	    std::vector<OSDOp> ops;
2697 	    int i = init_ops(ops, 1, extra_ops);
2698 	    ops[i].op.op = CEPH_OSD_OP_APPEND;
2699 	    ops[i].op.extent.offset = 0;
2700 	    ops[i].op.extent.length = len;
2701 	    ops[i].op.extent.truncate_size = 0;
2702 	    ops[i].op.extent.truncate_seq = 0;
2703 	    ops[i].indata = bl;
2704 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2705 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2706 	    o->mtime = mtime;
2707 	    o->snapc = snapc;
2708 	    return o;
2709 	  }
2710 	  ceph_tid_t append(
2711 	    const object_t& oid, const object_locator_t& oloc,
2712 	    uint64_t len, const SnapContext& snapc,
2713 	    const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2714 	    Context *oncommit,
2715 	    version_t *objver = NULL,
2716 	    ObjectOperation *extra_ops = NULL) {
2717 	    Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2718 				      oncommit, objver, extra_ops);
2719 	    ceph_tid_t tid;
2720 	    op_submit(o, &tid);
2721 	    return tid;
2722 	  }
2723 	  ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2724 				 uint64_t off, uint64_t len, const SnapContext& snapc,
2725 				 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2726 				 uint64_t trunc_size, __u32 trunc_seq,
2727 				 Context *oncommit,
2728 				 version_t *objver = NULL,
2729 				 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2730 	    std::vector<OSDOp> ops;
2731 	    int i = init_ops(ops, 1, extra_ops);
2732 	    ops[i].op.op = CEPH_OSD_OP_WRITE;
2733 	    ops[i].op.extent.offset = off;
2734 	    ops[i].op.extent.length = len;
2735 	    ops[i].op.extent.truncate_size = trunc_size;
2736 	    ops[i].op.extent.truncate_seq = trunc_seq;
2737 	    ops[i].indata = bl;
2738 	    ops[i].op.flags = op_flags;
2739 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2740 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2741 	    o->mtime = mtime;
2742 	    o->snapc = snapc;
2743 	    ceph_tid_t tid;
2744 	    op_submit(o, &tid);
2745 	    return tid;
2746 	  }
2747 	  Op *prepare_write_full_op(
2748 	    const object_t& oid, const object_locator_t& oloc,
2749 	    const SnapContext& snapc, const ceph::buffer::list &bl,
2750 	    ceph::real_time mtime, int flags,
2751 	    Context *oncommit, version_t *objver = NULL,
2752 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2753 	    std::vector<OSDOp> ops;
2754 	    int i = init_ops(ops, 1, extra_ops);
2755 	    ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2756 	    ops[i].op.extent.offset = 0;
2757 	    ops[i].op.extent.length = bl.length();
2758 	    ops[i].indata = bl;
2759 	    ops[i].op.flags = op_flags;
2760 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2761 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2762 	    o->mtime = mtime;
2763 	    o->snapc = snapc;
2764 	    return o;
2765 	  }
2766 	  ceph_tid_t write_full(
2767 	    const object_t& oid, const object_locator_t& oloc,
2768 	    const SnapContext& snapc, const ceph::buffer::list &bl,
2769 	    ceph::real_time mtime, int flags,
2770 	    Context *oncommit, version_t *objver = NULL,
2771 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2772 	    Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2773 					  oncommit, objver, extra_ops, op_flags);
2774 	    ceph_tid_t tid;
2775 	    op_submit(o, &tid);
2776 	    return tid;
2777 	  }
2778 	  Op *prepare_writesame_op(
2779 	    const object_t& oid, const object_locator_t& oloc,
2780 	    uint64_t write_len, uint64_t off,
2781 	    const SnapContext& snapc, const ceph::buffer::list &bl,
2782 	    ceph::real_time mtime, int flags,
2783 	    Context *oncommit, version_t *objver = NULL,
2784 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2785 	
2786 	    std::vector<OSDOp> ops;
2787 	    int i = init_ops(ops, 1, extra_ops);
2788 	    ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2789 	    ops[i].op.writesame.offset = off;
2790 	    ops[i].op.writesame.length = write_len;
2791 	    ops[i].op.writesame.data_length = bl.length();
2792 	    ops[i].indata = bl;
2793 	    ops[i].op.flags = op_flags;
2794 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2795 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2796 	    o->mtime = mtime;
2797 	    o->snapc = snapc;
2798 	    return o;
2799 	  }
2800 	  ceph_tid_t writesame(
2801 	    const object_t& oid, const object_locator_t& oloc,
2802 	    uint64_t write_len, uint64_t off,
2803 	    const SnapContext& snapc, const ceph::buffer::list &bl,
2804 	    ceph::real_time mtime, int flags,
2805 	    Context *oncommit, version_t *objver = NULL,
2806 	    ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2807 	
2808 	    Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2809 					 mtime, flags, oncommit, objver,
2810 					 extra_ops, op_flags);
2811 	
2812 	    ceph_tid_t tid;
2813 	    op_submit(o, &tid);
2814 	    return tid;
2815 	  }
2816 	  ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2817 			   const SnapContext& snapc, ceph::real_time mtime, int flags,
2818 			   uint64_t trunc_size, __u32 trunc_seq,
2819 			   Context *oncommit, version_t *objver = NULL,
2820 			   ObjectOperation *extra_ops = NULL) {
2821 	    std::vector<OSDOp> ops;
2822 	    int i = init_ops(ops, 1, extra_ops);
2823 	    ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2824 	    ops[i].op.extent.offset = trunc_size;
2825 	    ops[i].op.extent.truncate_size = trunc_size;
2826 	    ops[i].op.extent.truncate_seq = trunc_seq;
2827 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2828 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2829 	    o->mtime = mtime;
2830 	    o->snapc = snapc;
2831 	    ceph_tid_t tid;
2832 	    op_submit(o, &tid);
2833 	    return tid;
2834 	  }
2835 	  ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2836 			  uint64_t off, uint64_t len, const SnapContext& snapc,
2837 			  ceph::real_time mtime, int flags, Context *oncommit,
2838 		     version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2839 	    std::vector<OSDOp> ops;
2840 	    int i = init_ops(ops, 1, extra_ops);
2841 	    ops[i].op.op = CEPH_OSD_OP_ZERO;
2842 	    ops[i].op.extent.offset = off;
2843 	    ops[i].op.extent.length = len;
2844 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2845 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2846 	    o->mtime = mtime;
2847 	    o->snapc = snapc;
2848 	    ceph_tid_t tid;
2849 	    op_submit(o, &tid);
2850 	    return tid;
2851 	  }
2852 	  ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2853 				     const SnapContext& snapc, snapid_t snapid,
2854 				     ceph::real_time mtime, Context *oncommit,
2855 				     version_t *objver = NULL,
2856 				     ObjectOperation *extra_ops = NULL) {
2857 	    std::vector<OSDOp> ops;
2858 	    int i = init_ops(ops, 1, extra_ops);
2859 	    ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2860 	    ops[i].op.snap.snapid = snapid;
2861 	    Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2862 	    o->mtime = mtime;
2863 	    o->snapc = snapc;
2864 	    ceph_tid_t tid;
2865 	    op_submit(o, &tid);
2866 	    return tid;
2867 	  }
2868 	  ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2869 			    const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2870 			    int create_flags, Context *oncommit,
2871 			    version_t *objver = NULL,
2872 			    ObjectOperation *extra_ops = NULL) {
2873 	    std::vector<OSDOp> ops;
2874 	    int i = init_ops(ops, 1, extra_ops);
2875 	    ops[i].op.op = CEPH_OSD_OP_CREATE;
2876 	    ops[i].op.flags = create_flags;
2877 	    Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
2878 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2879 	    o->mtime = mtime;
2880 	    o->snapc = snapc;
2881 	    ceph_tid_t tid;
2882 	    op_submit(o, &tid);
2883 	    return tid;
2884 	  }
2885 	  Op *prepare_remove_op(
2886 	    const object_t& oid, const object_locator_t& oloc,
2887 	    const SnapContext& snapc, ceph::real_time mtime, int flags,
2888 	    Context *oncommit,
2889 	    version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2890 	    std::vector<OSDOp> ops;
2891 	    int i = init_ops(ops, 1, extra_ops);
2892 	    ops[i].op.op = CEPH_OSD_OP_DELETE;
2893 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2894 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2895 	    o->mtime = mtime;
2896 	    o->snapc = snapc;
2897 	    return o;
2898 	  }
2899 	  ceph_tid_t remove(
2900 	    const object_t& oid, const object_locator_t& oloc,
2901 	    const SnapContext& snapc, ceph::real_time mtime, int flags,
2902 	    Context *oncommit,
2903 	    version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2904 	    Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2905 				      oncommit, objver, extra_ops);
2906 	    ceph_tid_t tid;
2907 	    op_submit(o, &tid);
2908 	    return tid;
2909 	  }
2910 	
2911 	  ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2912 		      const char *name, const SnapContext& snapc, const ceph::buffer::list &bl,
2913 		      ceph::real_time mtime, int flags,
2914 		      Context *oncommit,
2915 		      version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2916 	    std::vector<OSDOp> ops;
2917 	    int i = init_ops(ops, 1, extra_ops);
2918 	    ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2919 	    ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2920 	    ops[i].op.xattr.value_len = bl.length();
2921 	    if (name)
2922 	      ops[i].indata.append(name, ops[i].op.xattr.name_len);
2923 	    ops[i].indata.append(bl);
2924 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2925 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2926 	    o->mtime = mtime;
2927 	    o->snapc = snapc;
2928 	    ceph_tid_t tid;
2929 	    op_submit(o, &tid);
2930 	    return tid;
2931 	  }
2932 	  ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2933 		      const char *name, const SnapContext& snapc,
2934 		      ceph::real_time mtime, int flags,
2935 		      Context *oncommit,
2936 		      version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2937 	    std::vector<OSDOp> ops;
2938 	    int i = init_ops(ops, 1, extra_ops);
2939 	    ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2940 	    ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2941 	    ops[i].op.xattr.value_len = 0;
2942 	    if (name)
2943 	      ops[i].indata.append(name, ops[i].op.xattr.name_len);
2944 	    Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2945 			   CEPH_OSD_FLAG_WRITE, oncommit, objver);
2946 	    o->mtime = mtime;
2947 	    o->snapc = snapc;
2948 	    ceph_tid_t tid;
2949 	    op_submit(o, &tid);
2950 	    return tid;
2951 	  }
2952 	
2953 	  void list_nobjects(NListContext *p, Context *onfinish);
2954 	  uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2955 	  uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2956 	  void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2957 	
2958 	  hobject_t enumerate_objects_begin();
2959 	  hobject_t enumerate_objects_end();
2960 	  //hobject_t enumerate_objects_begin(int n, int m);
2961 	  void enumerate_objects(
2962 	    int64_t pool_id,
2963 	    const std::string &ns,
2964 	    const hobject_t &start,
2965 	    const hobject_t &end,
2966 	    const uint32_t max,
2967 	    const ceph::buffer::list &filter_bl,
2968 	    std::list<librados::ListObjectImpl> *result, 
2969 	    hobject_t *next,
2970 	    Context *on_finish);
2971 	
2972 	  void _enumerate_reply(
2973 	      ceph::buffer::list &bl,
2974 	      int r,
2975 	      const hobject_t &end,
2976 	      const int64_t pool_id,
2977 	      int budget,
2978 	      epoch_t reply_epoch,
2979 	      std::list<librados::ListObjectImpl> *result, 
2980 	      hobject_t *next,
2981 	      Context *on_finish);
2982 	  friend class C_EnumerateReply;
2983 	
2984 	  // -------------------------
2985 	  // pool ops
2986 	private:
2987 	  void pool_op_submit(PoolOp *op);
2988 	  void _pool_op_submit(PoolOp *op);
2989 	  void _finish_pool_op(PoolOp *op, int r);
2990 	  void _do_delete_pool(int64_t pool, Context *onfinish);
2991 	public:
2992 	  int create_pool_snap(int64_t pool, std::string& snapName, Context *onfinish);
2993 	  int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2994 					Context *onfinish);
2995 	  int delete_pool_snap(int64_t pool, std::string& snapName, Context *onfinish);
2996 	  int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2997 	
2998 	  int create_pool(std::string& name, Context *onfinish,
2999 			  int crush_rule=-1);
3000 	  int delete_pool(int64_t pool, Context *onfinish);
3001 	  int delete_pool(const std::string& name, Context *onfinish);
3002 	
3003 	  void handle_pool_op_reply(MPoolOpReply *m);
3004 	  int pool_op_cancel(ceph_tid_t tid, int r);
3005 	
3006 	  // --------------------------
3007 	  // pool stats
3008 	private:
3009 	  void _poolstat_submit(PoolStatOp *op);
3010 	public:
3011 	  void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
3012 	  void get_pool_stats(std::list<std::string>& pools,
3013 			      std::map<std::string,pool_stat_t> *result,
3014 			      bool *per_pool,
3015 			      Context *onfinish);
3016 	  int pool_stat_op_cancel(ceph_tid_t tid, int r);
3017 	  void _finish_pool_stat_op(PoolStatOp *op, int r);
3018 	
3019 	  // ---------------------------
3020 	  // df stats
3021 	private:
3022 	  void _fs_stats_submit(StatfsOp *op);
3023 	public:
3024 	  void handle_fs_stats_reply(MStatfsReply *m);
3025 	  void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
3026 			    Context *onfinish);
3027 	  int statfs_op_cancel(ceph_tid_t tid, int r);
3028 	  void _finish_statfs_op(StatfsOp *op, int r);
3029 	
3030 	  // ---------------------------
3031 	  // some scatter/gather hackery
3032 	
3033 	  void _sg_read_finish(std::vector<ObjectExtent>& extents,
3034 			       std::vector<ceph::buffer::list>& resultbl,
3035 			       ceph::buffer::list *bl, Context *onfinish);
3036 	
3037 	  struct C_SGRead : public Context {
3038 	    Objecter *objecter;
3039 	    std::vector<ObjectExtent> extents;
3040 	    std::vector<ceph::buffer::list> resultbl;
3041 	    ceph::buffer::list *bl;
3042 	    Context *onfinish;
3043 	    C_SGRead(Objecter *ob,
3044 		     std::vector<ObjectExtent>& e, std::vector<ceph::buffer::list>& r, ceph::buffer::list *b,
3045 		     Context *c) :
3046 	      objecter(ob), bl(b), onfinish(c) {
3047 	      extents.swap(e);
3048 	      resultbl.swap(r);
3049 	    }
3050 	    void finish(int r) override {
3051 	      objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
3052 	    }
3053 	  };
3054 	
3055 	  void sg_read_trunc(std::vector<ObjectExtent>& extents, snapid_t snap,
3056 			     ceph::buffer::list *bl, int flags, uint64_t trunc_size,
3057 			     __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
3058 	    if (extents.size() == 1) {
3059 	      read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3060 			 extents[0].length, snap, bl, flags, extents[0].truncate_size,
3061 			 trunc_seq, onfinish, 0, 0, op_flags);
3062 	    } else {
3063 	      C_GatherBuilder gather(cct);
3064 	      std::vector<ceph::buffer::list> resultbl(extents.size());
3065 	      int i=0;
3066 	      for (auto p = extents.begin(); p != extents.end(); ++p) {
3067 		read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
3068 			   flags, p->truncate_size, trunc_seq, gather.new_sub(),
3069 			   0, 0, op_flags);
3070 	      }
3071 	      gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
3072 	      gather.activate();
3073 	    }
3074 	  }
3075 	
3076 	  void sg_read(std::vector<ObjectExtent>& extents, snapid_t snap, ceph::buffer::list *bl,
3077 		       int flags, Context *onfinish, int op_flags = 0) {
3078 	    sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
3079 	  }
3080 	
3081 	  void sg_write_trunc(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
3082 			      const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
3083 			      uint64_t trunc_size, __u32 trunc_seq,
3084 			      Context *oncommit, int op_flags = 0) {
3085 	    if (extents.size() == 1) {
3086 	      write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3087 			  extents[0].length, snapc, bl, mtime, flags,
3088 			  extents[0].truncate_size, trunc_seq, oncommit,
3089 			  0, 0, op_flags);
3090 	    } else {
3091 	      C_GatherBuilder gcom(cct, oncommit);
3092 	      for (auto p = extents.begin(); p != extents.end(); ++p) {
3093 		ceph::buffer::list cur;
3094 		for (auto bit = p->buffer_extents.begin();
3095 		     bit != p->buffer_extents.end();
3096 		     ++bit)
3097 		  bl.copy(bit->first, bit->second, cur);
3098 		ceph_assert(cur.length() == p->length);
3099 		write_trunc(p->oid, p->oloc, p->offset, p->length,
3100 		      snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3101 		      oncommit ? gcom.new_sub():0,
3102 		      0, 0, op_flags);
3103 	      }
3104 	      gcom.activate();
3105 	    }
3106 	  }
3107 	
3108 	  void sg_write(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
3109 			const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
3110 			Context *oncommit, int op_flags = 0) {
3111 	    sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3112 			   op_flags);
3113 	  }
3114 	
3115 	  void ms_handle_connect(Connection *con) override;
3116 	  bool ms_handle_reset(Connection *con) override;
3117 	  void ms_handle_remote_reset(Connection *con) override;
3118 	  bool ms_handle_refused(Connection *con) override;
3119 	
3120 	  void blacklist_self(bool set);
3121 	
3122 	private:
3123 	  epoch_t epoch_barrier = 0;
3124 	  bool retry_writes_after_first_reply;
3125 	public:
3126 	  void set_epoch_barrier(epoch_t epoch);
3127 	
3128 	  PerfCounters *get_logger() {
3129 	    return logger;
3130 	  }
3131 	};
3132 	
3133 	#endif
3134