1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#include <cerrno>
16   	
17   	#include "Objecter.h"
18   	#include "osd/OSDMap.h"
19   	#include "Filer.h"
20   	
21   	#include "mon/MonClient.h"
22   	
23   	#include "msg/Messenger.h"
24   	#include "msg/Message.h"
25   	
26   	#include "messages/MPing.h"
27   	#include "messages/MOSDOp.h"
28   	#include "messages/MOSDOpReply.h"
29   	#include "messages/MOSDBackoff.h"
30   	#include "messages/MOSDMap.h"
31   	
32   	#include "messages/MPoolOp.h"
33   	#include "messages/MPoolOpReply.h"
34   	
35   	#include "messages/MGetPoolStats.h"
36   	#include "messages/MGetPoolStatsReply.h"
37   	#include "messages/MStatfs.h"
38   	#include "messages/MStatfsReply.h"
39   	
40   	#include "messages/MMonCommand.h"
41   	
42   	#include "messages/MCommand.h"
43   	#include "messages/MCommandReply.h"
44   	
45   	#include "messages/MWatchNotify.h"
46   	
47   	
48   	#include "common/config.h"
49   	#include "common/perf_counters.h"
50   	#include "common/scrub_types.h"
51   	#include "include/str_list.h"
52   	#include "common/errno.h"
53   	#include "common/EventTrace.h"
54   	
55   	using std::list;
56   	using std::make_pair;
57   	using std::map;
58   	using std::ostream;
59   	using std::ostringstream;
60   	using std::pair;
61   	using std::set;
62   	using std::string;
63   	using std::stringstream;
64   	using std::vector;
65   	
66   	using ceph::decode;
67   	using ceph::encode;
68   	using ceph::Formatter;
69   	
70   	using std::defer_lock;
71   	
72   	using ceph::real_time;
73   	using ceph::real_clock;
74   	
75   	using ceph::mono_clock;
76   	using ceph::mono_time;
77   	
78   	using ceph::timespan;
79   	
80   	using ceph::shunique_lock;
81   	using ceph::acquire_shared;
82   	using ceph::acquire_unique;
83   	
84   	#define dout_subsys ceph_subsys_objecter
85   	#undef dout_prefix
86   	#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
87   	
88   	
89   	enum {
90   	  l_osdc_first = 123200,
91   	  l_osdc_op_active,
92   	  l_osdc_op_laggy,
93   	  l_osdc_op_send,
94   	  l_osdc_op_send_bytes,
95   	  l_osdc_op_resend,
96   	  l_osdc_op_reply,
97   	
98   	  l_osdc_op,
99   	  l_osdc_op_r,
100  	  l_osdc_op_w,
101  	  l_osdc_op_rmw,
102  	  l_osdc_op_pg,
103  	
104  	  l_osdc_osdop_stat,
105  	  l_osdc_osdop_create,
106  	  l_osdc_osdop_read,
107  	  l_osdc_osdop_write,
108  	  l_osdc_osdop_writefull,
109  	  l_osdc_osdop_writesame,
110  	  l_osdc_osdop_append,
111  	  l_osdc_osdop_zero,
112  	  l_osdc_osdop_truncate,
113  	  l_osdc_osdop_delete,
114  	  l_osdc_osdop_mapext,
115  	  l_osdc_osdop_sparse_read,
116  	  l_osdc_osdop_clonerange,
117  	  l_osdc_osdop_getxattr,
118  	  l_osdc_osdop_setxattr,
119  	  l_osdc_osdop_cmpxattr,
120  	  l_osdc_osdop_rmxattr,
121  	  l_osdc_osdop_resetxattrs,
122  	  l_osdc_osdop_call,
123  	  l_osdc_osdop_watch,
124  	  l_osdc_osdop_notify,
125  	  l_osdc_osdop_src_cmpxattr,
126  	  l_osdc_osdop_pgls,
127  	  l_osdc_osdop_pgls_filter,
128  	  l_osdc_osdop_other,
129  	
130  	  l_osdc_linger_active,
131  	  l_osdc_linger_send,
132  	  l_osdc_linger_resend,
133  	  l_osdc_linger_ping,
134  	
135  	  l_osdc_poolop_active,
136  	  l_osdc_poolop_send,
137  	  l_osdc_poolop_resend,
138  	
139  	  l_osdc_poolstat_active,
140  	  l_osdc_poolstat_send,
141  	  l_osdc_poolstat_resend,
142  	
143  	  l_osdc_statfs_active,
144  	  l_osdc_statfs_send,
145  	  l_osdc_statfs_resend,
146  	
147  	  l_osdc_command_active,
148  	  l_osdc_command_send,
149  	  l_osdc_command_resend,
150  	
151  	  l_osdc_map_epoch,
152  	  l_osdc_map_full,
153  	  l_osdc_map_inc,
154  	
155  	  l_osdc_osd_sessions,
156  	  l_osdc_osd_session_open,
157  	  l_osdc_osd_session_close,
158  	  l_osdc_osd_laggy,
159  	
160  	  l_osdc_osdop_omap_wr,
161  	  l_osdc_osdop_omap_rd,
162  	  l_osdc_osdop_omap_del,
163  	
164  	  l_osdc_last,
165  	};
166  	
167  	
168  	// config obs ----------------------------
169  	
170  	static const char *config_keys[] = {
171  	  "crush_location",
172  	  NULL
173  	};
174  	
175  	class Objecter::RequestStateHook : public AdminSocketHook {
176  	  Objecter *m_objecter;
177  	public:
178  	  explicit RequestStateHook(Objecter *objecter);
179  	  int call(std::string_view command, const cmdmap_t& cmdmap,
180  		   Formatter *f,
181  		   std::ostream& ss,
182  		   ceph::buffer::list& out) override;
183  	};
184  	
185  	/**
186  	 * This is a more limited form of C_Contexts, but that requires
187  	 * a ceph_context which we don't have here.
188  	 */
189  	class ObjectOperation::C_TwoContexts : public Context {
190  	  Context *first;
191  	  Context *second;
192  	public:
193  	  C_TwoContexts(Context *first, Context *second) :
194  	    first(first), second(second) {}
195  	  void finish(int r) override {
196  	    first->complete(r);
197  	    second->complete(r);
198  	    first = NULL;
199  	    second = NULL;
200  	  }
201  	
202  	  ~C_TwoContexts() override {
203  	    delete first;
204  	    delete second;
205  	  }
206  	};
207  	
208  	void ObjectOperation::add_handler(Context *extra) {
209  	  size_t last = out_handler.size() - 1;
210  	  Context *orig = out_handler[last];
211  	  if (orig) {
212  	    Context *wrapper = new C_TwoContexts(orig, extra);
213  	    out_handler[last] = wrapper;
214  	  } else {
215  	    out_handler[last] = extra;
216  	  }
217  	}
218  	
219  	Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
220  	  object_t& oid)
221  	{
222  	  if (oid.name.empty())
223  	    return unique_completion_lock();
224  	
225  	  static constexpr uint32_t HASH_PRIME = 1021;
226  	  uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
227  	    % HASH_PRIME;
228  	
229  	  return unique_completion_lock(completion_locks[h % num_locks],
230  					std::defer_lock);
231  	}
232  	
233  	const char** Objecter::get_tracked_conf_keys() const
234  	{
235  	  return config_keys;
236  	}
237  	
238  	
239  	void Objecter::handle_conf_change(const ConfigProxy& conf,
240  					  const std::set <std::string> &changed)
241  	{
242  	  if (changed.count("crush_location")) {
243  	    update_crush_location();
244  	  }
245  	}
246  	
247  	void Objecter::update_crush_location()
248  	{
249  	  unique_lock wl(rwlock);
250  	  crush_location = cct->crush_location.get_location();
251  	}
252  	
253  	// messages ------------------------------
254  	
255  	/*
256  	 * initialize only internal data structures, don't initiate cluster interaction
257  	 */
258  	void Objecter::init()
259  	{
260  	  ceph_assert(!initialized);
261  	
262  	  if (!logger) {
263  	    PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
264  	
265  	    pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
266  			PerfCountersBuilder::PRIO_CRITICAL);
267  	    pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
268  	    pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
269  	    pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
270  	    pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
271  	    pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
272  	
273  	    pcb.add_u64_counter(l_osdc_op, "op", "Operations");
274  	    pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
275  				PerfCountersBuilder::PRIO_CRITICAL);
276  	    pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
277  				PerfCountersBuilder::PRIO_CRITICAL);
278  	    pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
279  				"rdwr", PerfCountersBuilder::PRIO_INTERESTING);
280  	    pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
281  	
282  	    pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
283  	    pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
284  				"Create object operations");
285  	    pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
286  	    pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
287  	    pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
288  				"Write full object operations");
289  	    pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
290  	                        "Write same operations");
291  	    pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
292  				"Append operation");
293  	    pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
294  				"Set object to zero operations");
295  	    pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
296  				"Truncate object operations");
297  	    pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
298  				"Delete object operations");
299  	    pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
300  				"Map extent operations");
301  	    pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
302  				"Sparse read operations");
303  	    pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
304  				"Clone range operations");
305  	    pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
306  				"Get xattr operations");
307  	    pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
308  				"Set xattr operations");
309  	    pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
310  				"Xattr comparison operations");
311  	    pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
312  				"Remove xattr operations");
313  	    pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
314  				"Reset xattr operations");
315  	    pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
316  				"Call (execute) operations");
317  	    pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
318  				"Watch by object operations");
319  	    pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
320  				"Notify about object operations");
321  	    pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
322  				"Extended attribute comparison in multi operations");
323  	    pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
324  	    pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
325  	    pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
326  	
327  	    pcb.add_u64(l_osdc_linger_active, "linger_active",
328  			"Active lingering operations");
329  	    pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
330  				"Sent lingering operations");
331  	    pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
332  				"Resent lingering operations");
333  	    pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
334  				"Sent pings to lingering operations");
335  	
336  	    pcb.add_u64(l_osdc_poolop_active, "poolop_active",
337  			"Active pool operations");
338  	    pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
339  				"Sent pool operations");
340  	    pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
341  				"Resent pool operations");
342  	
343  	    pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
344  			"Active get pool stat operations");
345  	    pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
346  				"Pool stat operations sent");
347  	    pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
348  				"Resent pool stats");
349  	
350  	    pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
351  	    pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
352  	    pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
353  				"Resent FS stats");
354  	
355  	    pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
356  	    pcb.add_u64_counter(l_osdc_command_send, "command_send",
357  				"Sent commands");
358  	    pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
359  				"Resent commands");
360  	
361  	    pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
362  	    pcb.add_u64_counter(l_osdc_map_full, "map_full",
363  				"Full OSD maps received");
364  	    pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
365  				"Incremental OSD maps received");
366  	
367  	    pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
368  			"Open sessions");  // open sessions
369  	    pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
370  				"Sessions opened");
371  	    pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
372  				"Sessions closed");
373  	    pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
374  	
375  	    pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
376  				"OSD OMAP write operations");
377  	    pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
378  				"OSD OMAP read operations");
379  	    pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
380  				"OSD OMAP delete operations");
381  	
382  	    logger = pcb.create_perf_counters();
383  	    cct->get_perfcounters_collection()->add(logger);
384  	  }
385  	
386  	  m_request_state_hook = new RequestStateHook(this);
387  	  AdminSocket* admin_socket = cct->get_admin_socket();
388  	  int ret = admin_socket->register_command("objecter_requests",
389  						   m_request_state_hook,
390  						   "show in-progress osd requests");
391  	
392  	  /* Don't warn on EEXIST, happens if multiple ceph clients
393  	   * are instantiated from one process */
394  	  if (ret < 0 && ret != -EEXIST) {
395  	    lderr(cct) << "error registering admin socket command: "
396  		       << cpp_strerror(ret) << dendl;
397  	  }
398  	
399  	  update_crush_location();
400  	
401  	  cct->_conf.add_observer(this);
402  	
403  	  initialized = true;
404  	}
405  	
406  	/*
407  	 * ok, cluster interaction can happen
408  	 */
409  	void Objecter::start(const OSDMap* o)
410  	{
411  	  shared_lock rl(rwlock);
412  	
413  	  start_tick();
414  	  if (o) {
415  	    osdmap->deepish_copy_from(*o);
416  	    prune_pg_mapping(osdmap->get_pools());
417  	  } else if (osdmap->get_epoch() == 0) {
418  	    _maybe_request_map();
419  	  }
420  	}
421  	
422  	void Objecter::shutdown()
423  	{
424  	  ceph_assert(initialized);
425  	
426  	  unique_lock wl(rwlock);
427  	
428  	  initialized = false;
429  	
430  	  wl.unlock();
431  	  cct->_conf.remove_observer(this);
432  	  wl.lock();
433  	
434  	  map<int,OSDSession*>::iterator p;
435  	  while (!osd_sessions.empty()) {
436  	    p = osd_sessions.begin();
437  	    close_session(p->second);
438  	  }
439  	
440  	  while(!check_latest_map_lingers.empty()) {
441  	    map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
442  	    i->second->put();
443  	    check_latest_map_lingers.erase(i->first);
444  	  }
445  	
446  	  while(!check_latest_map_ops.empty()) {
447  	    map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
448  	    i->second->put();
449  	    check_latest_map_ops.erase(i->first);
450  	  }
451  	
452  	  while(!check_latest_map_commands.empty()) {
453  	    map<ceph_tid_t, CommandOp*>::iterator i
454  	      = check_latest_map_commands.begin();
455  	    i->second->put();
456  	    check_latest_map_commands.erase(i->first);
457  	  }
458  	
459  	  while(!poolstat_ops.empty()) {
460  	    map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
461  	    delete i->second;
462  	    poolstat_ops.erase(i->first);
463  	  }
464  	
465  	  while(!statfs_ops.empty()) {
466  	    map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
467  	    delete i->second;
468  	    statfs_ops.erase(i->first);
469  	  }
470  	
471  	  while(!pool_ops.empty()) {
472  	    map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
473  	    delete i->second;
474  	    pool_ops.erase(i->first);
475  	  }
476  	
477  	  ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
478  	  while(!homeless_session->linger_ops.empty()) {
479  	    std::map<uint64_t, LingerOp*>::iterator i
480  	      = homeless_session->linger_ops.begin();
481  	    ldout(cct, 10) << " linger_op " << i->first << dendl;
482  	    LingerOp *lop = i->second;
483  	    {
484  	      OSDSession::unique_lock swl(homeless_session->lock);
485  	      _session_linger_op_remove(homeless_session, lop);
486  	    }
487  	    linger_ops.erase(lop->linger_id);
488  	    linger_ops_set.erase(lop);
489  	    lop->put();
490  	  }
491  	
492  	  while(!homeless_session->ops.empty()) {
493  	    std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
494  	    ldout(cct, 10) << " op " << i->first << dendl;
495  	    Op *op = i->second;
496  	    {
497  	      OSDSession::unique_lock swl(homeless_session->lock);
498  	      _session_op_remove(homeless_session, op);
499  	    }
500  	    op->put();
501  	  }
502  	
503  	  while(!homeless_session->command_ops.empty()) {
504  	    std::map<ceph_tid_t, CommandOp*>::iterator i
505  	      = homeless_session->command_ops.begin();
506  	    ldout(cct, 10) << " command_op " << i->first << dendl;
507  	    CommandOp *cop = i->second;
508  	    {
509  	      OSDSession::unique_lock swl(homeless_session->lock);
510  	      _session_command_op_remove(homeless_session, cop);
511  	    }
512  	    cop->put();
513  	  }
514  	
515  	  if (tick_event) {
516  	    if (timer.cancel_event(tick_event)) {
517  	      ldout(cct, 10) <<  " successfully canceled tick" << dendl;
518  	    }
519  	    tick_event = 0;
520  	  }
521  	
522  	  if (logger) {
523  	    cct->get_perfcounters_collection()->remove(logger);
524  	    delete logger;
525  	    logger = NULL;
526  	  }
527  	
528  	  // Let go of Objecter write lock so timer thread can shutdown
529  	  wl.unlock();
530  	
531  	  // Outside of lock to avoid cycle WRT calls to RequestStateHook
532  	  // This is safe because we guarantee no concurrent calls to
533  	  // shutdown() with the ::initialized check at start.
534  	  if (m_request_state_hook) {
535  	    AdminSocket* admin_socket = cct->get_admin_socket();
536  	    admin_socket->unregister_commands(m_request_state_hook);
537  	    delete m_request_state_hook;
538  	    m_request_state_hook = NULL;
539  	  }
540  	}
541  	
542  	void Objecter::_send_linger(LingerOp *info,
543  				    shunique_lock& sul)
544  	{
545  	  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
546  	
547  	  vector<OSDOp> opv;
548  	  Context *oncommit = NULL;
549  	  LingerOp::shared_lock watchl(info->watch_lock);
550  	  ceph::buffer::list *poutbl = NULL;
551  	  if (info->registered && info->is_watch) {
552  	    ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
553  			   << dendl;
554  	    opv.push_back(OSDOp());
555  	    opv.back().op.op = CEPH_OSD_OP_WATCH;
556  	    opv.back().op.watch.cookie = info->get_cookie();
557  	    opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
558  	    opv.back().op.watch.gen = ++info->register_gen;
559  	    oncommit = new C_Linger_Reconnect(this, info);
560  	  } else {
561  	    ldout(cct, 15) << "send_linger " << info->linger_id << " register"
562  			   << dendl;
563  	    opv = info->ops;
564  	    C_Linger_Commit *c = new C_Linger_Commit(this, info);
565  	    if (!info->is_watch) {
566  	      info->notify_id = 0;
567  	      poutbl = &c->outbl;
568  	    }
569  	    oncommit = c;
570  	  }
571  	  watchl.unlock();
572  	  Op *o = new Op(info->target.base_oid, info->target.base_oloc,
573  			 opv, info->target.flags | CEPH_OSD_FLAG_READ,
574  			 oncommit, info->pobjver);
575  	  o->outbl = poutbl;
576  	  o->snapid = info->snap;
577  	  o->snapc = info->snapc;
578  	  o->mtime = info->mtime;
579  	
580  	  o->target = info->target;
581  	  o->tid = ++last_tid;
582  	
583  	  // do not resend this; we will send a new op to reregister
584  	  o->should_resend = false;
585  	  o->ctx_budgeted = true;
586  	
587  	  if (info->register_tid) {
588  	    // repeat send.  cancel old registration op, if any.
589  	    OSDSession::unique_lock sl(info->session->lock);
590  	    if (info->session->ops.count(info->register_tid)) {
591  	      Op *o = info->session->ops[info->register_tid];
592  	      _op_cancel_map_check(o);
593  	      _cancel_linger_op(o);
594  	    }
595  	    sl.unlock();
596  	  }
597  	
598  	  _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
599  	
600  	  logger->inc(l_osdc_linger_send);
601  	}
602  	
603  	void Objecter::_linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl)
604  	{
605  	  LingerOp::unique_lock wl(info->watch_lock);
606  	  ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
607  	  if (info->on_reg_commit) {
608  	    info->on_reg_commit->complete(r);
609  	    info->on_reg_commit = NULL;
610  	  }
611  	  if (r < 0 && info->on_notify_finish) {
612  	    info->on_notify_finish->complete(r);
613  	    info->on_notify_finish = nullptr;
614  	  }
615  	
616  	  // only tell the user the first time we do this
617  	  info->registered = true;
618  	  info->pobjver = NULL;
619  	
620  	  if (!info->is_watch) {
621  	    // make note of the notify_id
622  	    auto p = outbl.cbegin();
623  	    try {
624  	      decode(info->notify_id, p);
625  	      ldout(cct, 10) << "_linger_commit  notify_id=" << info->notify_id
626  			     << dendl;
627  	    }
628  	    catch (ceph::buffer::error& e) {
629  	    }
630  	  }
631  	}
632  	
633  	struct C_DoWatchError : public Context {
634  	  Objecter *objecter;
635  	  Objecter::LingerOp *info;
636  	  int err;
637  	  C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
638  	    : objecter(o), info(i), err(r) {
639  	    info->get();
640  	    info->_queued_async();
641  	  }
642  	  void finish(int r) override {
643  	    Objecter::unique_lock wl(objecter->rwlock);
644  	    bool canceled = info->canceled;
645  	    wl.unlock();
646  	
647  	    if (!canceled) {
648  	      info->watch_context->handle_error(info->get_cookie(), err);
649  	    }
650  	
651  	    info->finished_async();
652  	    info->put();
653  	  }
654  	};
655  	
656  	int Objecter::_normalize_watch_error(int r)
657  	{
658  	  // translate ENOENT -> ENOTCONN so that a delete->disconnection
659  	  // notification and a failure to reconnect because we raced with
660  	  // the delete appear the same to the user.
661  	  if (r == -ENOENT)
662  	    r = -ENOTCONN;
663  	  return r;
664  	}
665  	
666  	void Objecter::_linger_reconnect(LingerOp *info, int r)
667  	{
668  	  ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
669  			 << " (last_error " << info->last_error << ")" << dendl;
670  	  if (r < 0) {
671  	    LingerOp::unique_lock wl(info->watch_lock);
672  	    if (!info->last_error) {
673  	      r = _normalize_watch_error(r);
674  	      info->last_error = r;
675  	      if (info->watch_context) {
676  		finisher->queue(new C_DoWatchError(this, info, r));
677  	      }
678  	    }
679  	    wl.unlock();
680  	  }
681  	}
682  	
683  	void Objecter::_send_linger_ping(LingerOp *info)
684  	{
685  	  // rwlock is locked unique
686  	  // info->session->lock is locked
687  	
688  	  if (cct->_conf->objecter_inject_no_watch_ping) {
689  	    ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
690  			   << dendl;
691  	    return;
692  	  }
693  	  if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
694  	    ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
695  	    return;
696  	  }
697  	
698  	  ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
699  	  ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
700  			 << dendl;
701  	
702  	  vector<OSDOp> opv(1);
703  	  opv[0].op.op = CEPH_OSD_OP_WATCH;
704  	  opv[0].op.watch.cookie = info->get_cookie();
705  	  opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
706  	  opv[0].op.watch.gen = info->register_gen;
707  	  C_Linger_Ping *onack = new C_Linger_Ping(this, info);
708  	  Op *o = new Op(info->target.base_oid, info->target.base_oloc,
709  			 opv, info->target.flags | CEPH_OSD_FLAG_READ,
710  			 onack, NULL, NULL);
711  	  o->target = info->target;
712  	  o->should_resend = false;
713  	  _send_op_account(o);
714  	  o->tid = ++last_tid;
715  	  _session_op_assign(info->session, o);
716  	  _send_op(o);
717  	  info->ping_tid = o->tid;
718  	
719  	  onack->sent = now;
720  	  logger->inc(l_osdc_linger_ping);
721  	}
722  	
723  	void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
724  				    uint32_t register_gen)
725  	{
726  	  LingerOp::unique_lock l(info->watch_lock);
727  	  ldout(cct, 10) << __func__ << " " << info->linger_id
728  			 << " sent " << sent << " gen " << register_gen << " = " << r
729  			 << " (last_error " << info->last_error
730  			 << " register_gen " << info->register_gen << ")" << dendl;
731  	  if (info->register_gen == register_gen) {
732  	    if (r == 0) {
733  	      info->watch_valid_thru = sent;
734  	    } else if (r < 0 && !info->last_error) {
735  	      r = _normalize_watch_error(r);
736  	      info->last_error = r;
737  	      if (info->watch_context) {
738  		finisher->queue(new C_DoWatchError(this, info, r));
739  	      }
740  	    }
741  	  } else {
742  	    ldout(cct, 20) << " ignoring old gen" << dendl;
743  	  }
744  	}
745  	
746  	int Objecter::linger_check(LingerOp *info)
747  	{
748  	  LingerOp::shared_lock l(info->watch_lock);
749  	
750  	  ceph::coarse_mono_time stamp = info->watch_valid_thru;
751  	  if (!info->watch_pending_async.empty())
752  	    stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
753  	  auto age = ceph::coarse_mono_clock::now() - stamp;
754  	
755  	  ldout(cct, 10) << __func__ << " " << info->linger_id
756  			 << " err " << info->last_error
757  			 << " age " << age << dendl;
758  	  if (info->last_error)
759  	    return info->last_error;
760  	  // return a safe upper bound (we are truncating to ms)
761  	  return
762  	    1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
763  	}
764  	
765  	void Objecter::linger_cancel(LingerOp *info)
766  	{
767  	  unique_lock wl(rwlock);
768  	  _linger_cancel(info);
769  	  info->put();
770  	}
771  	
772  	void Objecter::_linger_cancel(LingerOp *info)
773  	{
774  	  // rwlock is locked unique
775  	  ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
776  	  if (!info->canceled) {
777  	    OSDSession *s = info->session;
778  	    OSDSession::unique_lock sl(s->lock);
779  	    _session_linger_op_remove(s, info);
780  	    sl.unlock();
781  	
782  	    linger_ops.erase(info->linger_id);
783  	    linger_ops_set.erase(info);
784  	    ceph_assert(linger_ops.size() == linger_ops_set.size());
785  	
786  	    info->canceled = true;
787  	    info->put();
788  	
789  	    logger->dec(l_osdc_linger_active);
790  	  }
791  	}
792  	
793  	
794  	
795  	Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
796  						      const object_locator_t& oloc,
797  						      int flags)
798  	{
799  	  LingerOp *info = new LingerOp(this);
800  	  info->target.base_oid = oid;
801  	  info->target.base_oloc = oloc;
802  	  if (info->target.base_oloc.key == oid)
803  	    info->target.base_oloc.key.clear();
804  	  info->target.flags = flags;
805  	  info->watch_valid_thru = ceph::coarse_mono_clock::now();
806  	
807  	  unique_lock l(rwlock);
808  	
809  	  // Acquire linger ID
810  	  info->linger_id = ++max_linger_id;
811  	  ldout(cct, 10) << __func__ << " info " << info
812  			 << " linger_id " << info->linger_id
813  			 << " cookie " << info->get_cookie()
814  			 << dendl;
815  	  linger_ops[info->linger_id] = info;
816  	  linger_ops_set.insert(info);
817  	  ceph_assert(linger_ops.size() == linger_ops_set.size());
818  	
819  	  info->get(); // for the caller
820  	  return info;
821  	}
822  	
823  	ceph_tid_t Objecter::linger_watch(LingerOp *info,
824  					  ObjectOperation& op,
825  					  const SnapContext& snapc,
826  					  real_time mtime,
827  					  ceph::buffer::list& inbl,
828  					  Context *oncommit,
829  					  version_t *objver)
830  	{
831  	  info->is_watch = true;
832  	  info->snapc = snapc;
833  	  info->mtime = mtime;
834  	  info->target.flags |= CEPH_OSD_FLAG_WRITE;
835  	  info->ops = op.ops;
836  	  info->inbl = inbl;
837  	  info->poutbl = NULL;
838  	  info->pobjver = objver;
839  	  info->on_reg_commit = oncommit;
840  	
841  	  info->ctx_budget = take_linger_budget(info);
842  	
843  	  shunique_lock sul(rwlock, ceph::acquire_unique);
844  	  _linger_submit(info, sul);
845  	  logger->inc(l_osdc_linger_active);
846  	
847  	  return info->linger_id;
848  	}
849  	
850  	ceph_tid_t Objecter::linger_notify(LingerOp *info,
851  					   ObjectOperation& op,
852  					   snapid_t snap, ceph::buffer::list& inbl,
853  					   ceph::buffer::list *poutbl,
854  					   Context *onfinish,
855  					   version_t *objver)
856  	{
857  	  info->snap = snap;
858  	  info->target.flags |= CEPH_OSD_FLAG_READ;
859  	  info->ops = op.ops;
860  	  info->inbl = inbl;
861  	  info->poutbl = poutbl;
862  	  info->pobjver = objver;
863  	  info->on_reg_commit = onfinish;
864  	
865  	  info->ctx_budget = take_linger_budget(info);
866  	  
867  	  shunique_lock sul(rwlock, ceph::acquire_unique);
868  	  _linger_submit(info, sul);
869  	  logger->inc(l_osdc_linger_active);
870  	
871  	  return info->linger_id;
872  	}
873  	
874  	void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
875  	{
876  	  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
877  	  ceph_assert(info->linger_id);
878  	  ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
879  	
880  	  // Populate Op::target
881  	  OSDSession *s = NULL;
882  	  _calc_target(&info->target, nullptr);
883  	
884  	  // Create LingerOp<->OSDSession relation
885  	  int r = _get_session(info->target.osd, &s, sul);
886  	  ceph_assert(r == 0);
887  	  OSDSession::unique_lock sl(s->lock);
888  	  _session_linger_op_assign(s, info);
889  	  sl.unlock();
890  	  put_session(s);
891  	
892  	  _send_linger(info, sul);
893  	}
894  	
895  	struct C_DoWatchNotify : public Context {
896  	  Objecter *objecter;
897  	  Objecter::LingerOp *info;
898  	  MWatchNotify *msg;
899  	  C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
900  	    : objecter(o), info(i), msg(m) {
901  	    info->get();
902  	    info->_queued_async();
903  	    msg->get();
904  	  }
905  	  void finish(int r) override {
906  	    objecter->_do_watch_notify(info, msg);
907  	  }
908  	};
909  	
910  	void Objecter::handle_watch_notify(MWatchNotify *m)
911  	{
912  	  shared_lock l(rwlock);
913  	  if (!initialized) {
914  	    return;
915  	  }
916  	
917  	  LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
918  	  if (linger_ops_set.count(info) == 0) {
919  	    ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
920  	    return;
921  	  }
922  	  LingerOp::unique_lock wl(info->watch_lock);
923  	  if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
924  	    if (!info->last_error) {
925  	      info->last_error = -ENOTCONN;
926  	      if (info->watch_context) {
927  		finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
928  	      }
929  	    }
930  	  } else if (!info->is_watch) {
931  	    // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
932  	    // since we know the only user (librados) is safe to call in
933  	    // fast-dispatch context
934  	    if (info->notify_id &&
935  		info->notify_id != m->notify_id) {
936  	      ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
937  			     << " != " << info->notify_id << ", ignoring" << dendl;
938  	    } else if (info->on_notify_finish) {
939  	      info->notify_result_bl->claim(m->get_data());
940  	      info->on_notify_finish->complete(m->return_code);
941  	
942  	      // if we race with reconnect we might get a second notify; only
943  	      // notify the caller once!
944  	      info->on_notify_finish = NULL;
945  	    }
946  	  } else {
947  	    finisher->queue(new C_DoWatchNotify(this, info, m));
948  	  }
949  	}
950  	
951  	void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
952  	{
953  	  ldout(cct, 10) << __func__ << " " << *m << dendl;
954  	
955  	  shared_lock l(rwlock);
956  	  ceph_assert(initialized);
957  	
958  	  if (info->canceled) {
959  	    l.unlock();
960  	    goto out;
961  	  }
962  	
963  	  // notify completion?
964  	  ceph_assert(info->is_watch);
965  	  ceph_assert(info->watch_context);
966  	  ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
967  	
968  	  l.unlock();
969  	
970  	  switch (m->opcode) {
971  	  case CEPH_WATCH_EVENT_NOTIFY:
972  	    info->watch_context->handle_notify(m->notify_id, m->cookie,
973  					       m->notifier_gid, m->bl);
974  	    break;
975  	  }
976  	
977  	 out:
978  	  info->finished_async();
979  	  info->put();
980  	  m->put();
981  	}
982  	
983  	bool Objecter::ms_dispatch(Message *m)
984  	{
985  	  ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
986  	  switch (m->get_type()) {
987  	    // these we exlusively handle
988  	  case CEPH_MSG_OSD_OPREPLY:
989  	    handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
990  	    return true;
991  	
992  	  case CEPH_MSG_OSD_BACKOFF:
993  	    handle_osd_backoff(static_cast<MOSDBackoff*>(m));
994  	    return true;
995  	
996  	  case CEPH_MSG_WATCH_NOTIFY:
997  	    handle_watch_notify(static_cast<MWatchNotify*>(m));
998  	    m->put();
999  	    return true;
1000 	
1001 	  case MSG_COMMAND_REPLY:
1002 	    if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
1003 	      handle_command_reply(static_cast<MCommandReply*>(m));
1004 	      return true;
1005 	    } else {
1006 	      return false;
1007 	    }
1008 	
1009 	  case MSG_GETPOOLSTATSREPLY:
1010 	    handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1011 	    return true;
1012 	
1013 	  case CEPH_MSG_POOLOP_REPLY:
1014 	    handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1015 	    return true;
1016 	
1017 	  case CEPH_MSG_STATFS_REPLY:
1018 	    handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1019 	    return true;
1020 	
1021 	    // these we give others a chance to inspect
1022 	
1023 	    // MDS, OSD
1024 	  case CEPH_MSG_OSD_MAP:
1025 	    handle_osd_map(static_cast<MOSDMap*>(m));
1026 	    return false;
1027 	  }
1028 	  return false;
1029 	}
1030 	
1031 	void Objecter::_scan_requests(
1032 	  OSDSession *s,
1033 	  bool skipped_map,
1034 	  bool cluster_full,
1035 	  map<int64_t, bool> *pool_full_map,
1036 	  map<ceph_tid_t, Op*>& need_resend,
1037 	  list<LingerOp*>& need_resend_linger,
1038 	  map<ceph_tid_t, CommandOp*>& need_resend_command,
1039 	  shunique_lock& sul)
1040 	{
1041 	  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
1042 	
1043 	  list<LingerOp*> unregister_lingers;
1044 	
1045 	  OSDSession::unique_lock sl(s->lock);
1046 	
1047 	  // check for changed linger mappings (_before_ regular ops)
1048 	  map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1049 	  while (lp != s->linger_ops.end()) {
1050 	    LingerOp *op = lp->second;
1051 	    ceph_assert(op->session == s);
1052 	    // check_linger_pool_dne() may touch linger_ops; prevent iterator
1053 	    // invalidation
1054 	    ++lp;
1055 	    ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1056 	    bool unregister, force_resend_writes = cluster_full;
1057 	    int r = _recalc_linger_op_target(op, sul);
1058 	    if (pool_full_map)
1059 	      force_resend_writes = force_resend_writes ||
1060 		(*pool_full_map)[op->target.base_oloc.pool];
1061 	    switch (r) {
1062 	    case RECALC_OP_TARGET_NO_ACTION:
1063 	      if (!skipped_map && !force_resend_writes)
1064 		break;
1065 	      // -- fall-thru --
1066 	    case RECALC_OP_TARGET_NEED_RESEND:
1067 	      need_resend_linger.push_back(op);
1068 	      _linger_cancel_map_check(op);
1069 	      break;
1070 	    case RECALC_OP_TARGET_POOL_DNE:
1071 	      _check_linger_pool_dne(op, &unregister);
1072 	      if (unregister) {
1073 		ldout(cct, 10) << " need to unregister linger op "
1074 			       << op->linger_id << dendl;
1075 		op->get();
1076 		unregister_lingers.push_back(op);
1077 	      }
1078 	      break;
1079 	    }
1080 	  }
1081 	
1082 	  // check for changed request mappings
1083 	  map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1084 	  while (p != s->ops.end()) {
1085 	    Op *op = p->second;
1086 	    ++p;   // check_op_pool_dne() may touch ops; prevent iterator invalidation
1087 	    ldout(cct, 10) << " checking op " << op->tid << dendl;
1088 	    _prune_snapc(osdmap->get_new_removed_snaps(), op);
1089 	    bool force_resend_writes = cluster_full;
1090 	    if (pool_full_map)
1091 	      force_resend_writes = force_resend_writes ||
1092 		(*pool_full_map)[op->target.base_oloc.pool];
1093 	    int r = _calc_target(&op->target,
1094 				 op->session ? op->session->con.get() : nullptr);
1095 	    switch (r) {
1096 	    case RECALC_OP_TARGET_NO_ACTION:
1097 	      if (!skipped_map && !(force_resend_writes && op->respects_full()))
1098 		break;
1099 	      // -- fall-thru --
1100 	    case RECALC_OP_TARGET_NEED_RESEND:
1101 	      _session_op_remove(op->session, op);
1102 	      need_resend[op->tid] = op;
1103 	      _op_cancel_map_check(op);
1104 	      break;
1105 	    case RECALC_OP_TARGET_POOL_DNE:
1106 	      _check_op_pool_dne(op, &sl);
1107 	      break;
1108 	    }
1109 	  }
1110 	
1111 	  // commands
1112 	  map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1113 	  while (cp != s->command_ops.end()) {
1114 	    CommandOp *c = cp->second;
1115 	    ++cp;
1116 	    ldout(cct, 10) << " checking command " << c->tid << dendl;
1117 	    bool force_resend_writes = cluster_full;
1118 	    if (pool_full_map)
1119 	      force_resend_writes = force_resend_writes ||
1120 		(*pool_full_map)[c->target_pg.pool()];
1121 	    int r = _calc_command_target(c, sul);
1122 	    switch (r) {
1123 	    case RECALC_OP_TARGET_NO_ACTION:
1124 	      // resend if skipped map; otherwise do nothing.
1125 	      if (!skipped_map && !force_resend_writes)
1126 		break;
1127 	      // -- fall-thru --
1128 	    case RECALC_OP_TARGET_NEED_RESEND:
1129 	      need_resend_command[c->tid] = c;
1130 	      _session_command_op_remove(c->session, c);
1131 	      _command_cancel_map_check(c);
1132 	      break;
1133 	    case RECALC_OP_TARGET_POOL_DNE:
1134 	    case RECALC_OP_TARGET_OSD_DNE:
1135 	    case RECALC_OP_TARGET_OSD_DOWN:
1136 	      _check_command_map_dne(c);
1137 	      break;
1138 	    }
1139 	  }
1140 	
1141 	  sl.unlock();
1142 	
1143 	  for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1144 	       iter != unregister_lingers.end();
1145 	       ++iter) {
1146 	    _linger_cancel(*iter);
1147 	    (*iter)->put();
1148 	  }
1149 	}
1150 	
1151 	void Objecter::handle_osd_map(MOSDMap *m)
1152 	{
1153 	  shunique_lock sul(rwlock, acquire_unique);
1154 	  if (!initialized)
1155 	    return;
1156 	
1157 	  ceph_assert(osdmap);
1158 	
1159 	  if (m->fsid != monc->get_fsid()) {
1160 	    ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1161 			  << " != " << monc->get_fsid() << dendl;
1162 	    return;
1163 	  }
1164 	
1165 	  bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1166 	  bool cluster_full = _osdmap_full_flag();
1167 	  bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1168 	    _osdmap_has_pool_full();
1169 	  map<int64_t, bool> pool_full_map;
1170 	  for (map<int64_t, pg_pool_t>::const_iterator it
1171 		 = osdmap->get_pools().begin();
1172 	       it != osdmap->get_pools().end(); ++it)
1173 	    pool_full_map[it->first] = _osdmap_pool_full(it->second);
1174 	
1175 	
1176 	  list<LingerOp*> need_resend_linger;
1177 	  map<ceph_tid_t, Op*> need_resend;
1178 	  map<ceph_tid_t, CommandOp*> need_resend_command;
1179 	
1180 	  if (m->get_last() <= osdmap->get_epoch()) {
1181 	    ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1182 			  << m->get_first() << "," << m->get_last()
1183 			  << "] <= " << osdmap->get_epoch() << dendl;
1184 	  } else {
1185 	    ldout(cct, 3) << "handle_osd_map got epochs ["
1186 			  << m->get_first() << "," << m->get_last()
1187 			  << "] > " << osdmap->get_epoch() << dendl;
1188 	
1189 	    if (osdmap->get_epoch()) {
1190 	      bool skipped_map = false;
1191 	      // we want incrementals
1192 	      for (epoch_t e = osdmap->get_epoch() + 1;
1193 		   e <= m->get_last();
1194 		   e++) {
1195 	
1196 		if (osdmap->get_epoch() == e-1 &&
1197 		    m->incremental_maps.count(e)) {
1198 		  ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1199 				<< dendl;
1200 		  OSDMap::Incremental inc(m->incremental_maps[e]);
1201 		  osdmap->apply_incremental(inc);
1202 	
1203 	          emit_blacklist_events(inc);
1204 	
1205 		  logger->inc(l_osdc_map_inc);
1206 		}
1207 		else if (m->maps.count(e)) {
1208 		  ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
1209 	          auto new_osdmap = std::make_unique<OSDMap>();
1210 	          new_osdmap->decode(m->maps[e]);
1211 	
1212 	          emit_blacklist_events(*osdmap, *new_osdmap);
1213 	          osdmap = std::move(new_osdmap);
1214 	
1215 		  logger->inc(l_osdc_map_full);
1216 		}
1217 		else {
1218 		  if (e >= m->get_oldest()) {
1219 		    ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1220 				  << osdmap->get_epoch()+1 << dendl;
1221 		    _maybe_request_map();
1222 		    break;
1223 		  }
1224 		  ldout(cct, 3) << "handle_osd_map missing epoch "
1225 				<< osdmap->get_epoch()+1
1226 				<< ", jumping to " << m->get_oldest() << dendl;
1227 		  e = m->get_oldest() - 1;
1228 		  skipped_map = true;
1229 		  continue;
1230 		}
1231 		logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1232 	
1233 	        prune_pg_mapping(osdmap->get_pools());
1234 		cluster_full = cluster_full || _osdmap_full_flag();
1235 		update_pool_full_map(pool_full_map);
1236 	
1237 		// check all outstanding requests on every epoch
1238 		for (auto& i : need_resend) {
1239 		  _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
1240 		}
1241 		_scan_requests(homeless_session, skipped_map, cluster_full,
1242 			       &pool_full_map, need_resend,
1243 			       need_resend_linger, need_resend_command, sul);
1244 		for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1245 		     p != osd_sessions.end(); ) {
1246 		  OSDSession *s = p->second;
1247 		  _scan_requests(s, skipped_map, cluster_full,
1248 				 &pool_full_map, need_resend,
1249 				 need_resend_linger, need_resend_command, sul);
1250 		  ++p;
1251 		  // osd down or addr change?
1252 		  if (!osdmap->is_up(s->osd) ||
1253 		      (s->con &&
1254 		       s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
1255 		    close_session(s);
1256 		  }
1257 		}
1258 	
1259 		ceph_assert(e == osdmap->get_epoch());
1260 	      }
1261 	
1262 	    } else {
1263 	      // first map.  we want the full thing.
1264 	      if (m->maps.count(m->get_last())) {
1265 		for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1266 		     p != osd_sessions.end(); ++p) {
1267 		  OSDSession *s = p->second;
1268 		  _scan_requests(s, false, false, NULL, need_resend,
1269 				 need_resend_linger, need_resend_command, sul);
1270 		}
1271 		ldout(cct, 3) << "handle_osd_map decoding full epoch "
1272 			      << m->get_last() << dendl;
1273 		osdmap->decode(m->maps[m->get_last()]);
1274 	        prune_pg_mapping(osdmap->get_pools());
1275 	
1276 		_scan_requests(homeless_session, false, false, NULL,
1277 			       need_resend, need_resend_linger,
1278 			       need_resend_command, sul);
1279 	      } else {
1280 		ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1281 			      << dendl;
1282 		monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1283 		monc->renew_subs();
1284 	      }
1285 	    }
1286 	  }
1287 	
1288 	  // make sure need_resend targets reflect latest map
1289 	  for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1290 	    Op *op = p->second;
1291 	    if (op->target.epoch < osdmap->get_epoch()) {
1292 	      ldout(cct, 10) << __func__ << "  checking op " << p->first << dendl;
1293 	      int r = _calc_target(&op->target, nullptr);
1294 	      if (r == RECALC_OP_TARGET_POOL_DNE) {
1295 		p = need_resend.erase(p);
1296 		_check_op_pool_dne(op, nullptr);
1297 	      } else {
1298 		++p;
1299 	      }
1300 	    } else {
1301 	      ++p;
1302 	    }
1303 	  }
1304 	
1305 	  bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1306 	  bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1307 	    || _osdmap_has_pool_full();
1308 	
1309 	  // was/is paused?
1310 	  if (was_pauserd || was_pausewr || pauserd || pausewr ||
1311 	      osdmap->get_epoch() < epoch_barrier) {
1312 	    _maybe_request_map();
1313 	  }
1314 	
1315 	  // resend requests
1316 	  for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1317 	       p != need_resend.end(); ++p) {
1318 	    Op *op = p->second;
1319 	    OSDSession *s = op->session;
1320 	    bool mapped_session = false;
1321 	    if (!s) {
1322 	      int r = _map_session(&op->target, &s, sul);
1323 	      ceph_assert(r == 0);
1324 	      mapped_session = true;
1325 	    } else {
1326 	      get_session(s);
1327 	    }
1328 	    OSDSession::unique_lock sl(s->lock);
1329 	    if (mapped_session) {
1330 	      _session_op_assign(s, op);
1331 	    }
1332 	    if (op->should_resend) {
1333 	      if (!op->session->is_homeless() && !op->target.paused) {
1334 		logger->inc(l_osdc_op_resend);
1335 		_send_op(op);
1336 	      }
1337 	    } else {
1338 	      _op_cancel_map_check(op);
1339 	      _cancel_linger_op(op);
1340 	    }
1341 	    sl.unlock();
1342 	    put_session(s);
1343 	  }
1344 	  for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1345 	       p != need_resend_linger.end(); ++p) {
1346 	    LingerOp *op = *p;
1347 	    ceph_assert(op->session);
1348 	    if (!op->session->is_homeless()) {
1349 	      logger->inc(l_osdc_linger_resend);
1350 	      _send_linger(op, sul);
1351 	    }
1352 	  }
1353 	  for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1354 	       p != need_resend_command.end(); ++p) {
1355 	    CommandOp *c = p->second;
1356 	    if (c->target.osd >= 0) {
1357 	      _assign_command_session(c, sul);
1358 	      if (c->session && !c->session->is_homeless()) {
1359 		_send_command(c);
1360 	      }
1361 	    }
1362 	  }
1363 	
1364 	  _dump_active();
1365 	
1366 	  // finish any Contexts that were waiting on a map update
1367 	  map<epoch_t,list< pair< Context*, int > > >::iterator p =
1368 	    waiting_for_map.begin();
1369 	  while (p != waiting_for_map.end() &&
1370 		 p->first <= osdmap->get_epoch()) {
1371 	    //go through the list and call the onfinish methods
1372 	    for (list<pair<Context*, int> >::iterator i = p->second.begin();
1373 		 i != p->second.end(); ++i) {
1374 	      i->first->complete(i->second);
1375 	    }
1376 	    waiting_for_map.erase(p++);
1377 	  }
1378 	
1379 	  monc->sub_got("osdmap", osdmap->get_epoch());
1380 	
1381 	  if (!waiting_for_map.empty()) {
1382 	    _maybe_request_map();
1383 	  }
1384 	}
1385 	
1386 	void Objecter::enable_blacklist_events()
1387 	{
1388 	  unique_lock wl(rwlock);
1389 	
1390 	  blacklist_events_enabled = true;
1391 	}
1392 	
1393 	void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1394 	{
1395 	  unique_lock wl(rwlock);
1396 	
1397 	  if (events->empty()) {
1398 	    events->swap(blacklist_events);
1399 	  } else {
1400 	    for (const auto &i : blacklist_events) {
1401 	      events->insert(i);
1402 	    }
1403 	    blacklist_events.clear();
1404 	  }
1405 	}
1406 	
1407 	void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1408 	{
1409 	  if (!blacklist_events_enabled) {
1410 	    return;
1411 	  }
1412 	
1413 	  for (const auto &i : inc.new_blacklist) {
1414 	    blacklist_events.insert(i.first);
1415 	  }
1416 	}
1417 	
1418 	void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1419 	                                     const OSDMap &new_osd_map)
1420 	{
1421 	  if (!blacklist_events_enabled) {
1422 	    return;
1423 	  }
1424 	
1425 	  std::set<entity_addr_t> old_set;
1426 	  std::set<entity_addr_t> new_set;
1427 	
1428 	  old_osd_map.get_blacklist(&old_set);
1429 	  new_osd_map.get_blacklist(&new_set);
1430 	
1431 	  std::set<entity_addr_t> delta_set;
1432 	  std::set_difference(
1433 	      new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1434 	      std::inserter(delta_set, delta_set.begin()));
1435 	  blacklist_events.insert(delta_set.begin(), delta_set.end());
1436 	}
1437 	
1438 	// op pool check
1439 	
1440 	void Objecter::C_Op_Map_Latest::finish(int r)
1441 	{
1442 	  if (r == -EAGAIN || r == -ECANCELED)
1443 	    return;
1444 	
1445 	  lgeneric_subdout(objecter->cct, objecter, 10)
1446 	    << "op_map_latest r=" << r << " tid=" << tid
1447 	    << " latest " << latest << dendl;
1448 	
1449 	  Objecter::unique_lock wl(objecter->rwlock);
1450 	
1451 	  map<ceph_tid_t, Op*>::iterator iter =
1452 	    objecter->check_latest_map_ops.find(tid);
1453 	  if (iter == objecter->check_latest_map_ops.end()) {
1454 	    lgeneric_subdout(objecter->cct, objecter, 10)
1455 	      << "op_map_latest op "<< tid << " not found" << dendl;
1456 	    return;
1457 	  }
1458 	
1459 	  Op *op = iter->second;
1460 	  objecter->check_latest_map_ops.erase(iter);
1461 	
1462 	  lgeneric_subdout(objecter->cct, objecter, 20)
1463 	    << "op_map_latest op "<< op << dendl;
1464 	
1465 	  if (op->map_dne_bound == 0)
1466 	    op->map_dne_bound = latest;
1467 	
1468 	  OSDSession::unique_lock sl(op->session->lock, defer_lock);
1469 	  objecter->_check_op_pool_dne(op, &sl);
1470 	
1471 	  op->put();
1472 	}
1473 	
1474 	int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1475 					snapid_t *snap) const
1476 	{
1477 	  shared_lock rl(rwlock);
1478 	
1479 	  auto& pools = osdmap->get_pools();
1480 	  auto iter = pools.find(poolid);
1481 	  if (iter == pools.end()) {
1482 	    return -ENOENT;
1483 	  }
1484 	  const pg_pool_t& pg_pool = iter->second;
1485 	  for (auto p = pg_pool.snaps.begin();
1486 	       p != pg_pool.snaps.end();
1487 	       ++p) {
1488 	    if (p->second.name == snap_name) {
1489 	      *snap = p->first;
1490 	      return 0;
1491 	    }
1492 	  }
1493 	  return -ENOENT;
1494 	}
1495 	
1496 	int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1497 					 pool_snap_info_t *info) const
1498 	{
1499 	  shared_lock rl(rwlock);
1500 	
1501 	  auto& pools = osdmap->get_pools();
1502 	  auto iter = pools.find(poolid);
1503 	  if (iter == pools.end()) {
1504 	    return -ENOENT;
1505 	  }
1506 	  const pg_pool_t& pg_pool = iter->second;
1507 	  auto p = pg_pool.snaps.find(snap);
1508 	  if (p == pg_pool.snaps.end())
1509 	    return -ENOENT;
1510 	  *info = p->second;
1511 	
1512 	  return 0;
1513 	}
1514 	
1515 	int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1516 	{
1517 	  shared_lock rl(rwlock);
1518 	
1519 	  const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1520 	  if (!pi)
1521 	    return -ENOENT;
1522 	  for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1523 	       p != pi->snaps.end();
1524 	       ++p) {
1525 	    snaps->push_back(p->first);
1526 	  }
1527 	  return 0;
1528 	}
1529 	
1530 	// sl may be unlocked.
1531 	void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1532 	{
1533 	  // rwlock is locked unique
1534 	
1535 	  if (op->target.pool_ever_existed) {
1536 	    // the pool previously existed and now it does not, which means it
1537 	    // was deleted.
1538 	    op->map_dne_bound = osdmap->get_epoch();
1539 	    ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1540 			   << " pool previously exists but now does not"
1541 			   << dendl;
1542 	  } else {
1543 	    ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1544 			   << " current " << osdmap->get_epoch()
1545 			   << " map_dne_bound " << op->map_dne_bound
1546 			   << dendl;
1547 	  }
1548 	  if (op->map_dne_bound > 0) {
1549 	    if (osdmap->get_epoch() >= op->map_dne_bound) {
1550 	      // we had a new enough map
1551 	      ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1552 			     << " concluding pool " << op->target.base_pgid.pool()
1553 			     << " dne" << dendl;
1554 	      if (op->onfinish) {
1555 		num_in_flight--;
1556 		op->onfinish->complete(-ENOENT);
1557 	      }
1558 	
1559 	      OSDSession *s = op->session;
1560 	      if (s) {
1561 		ceph_assert(s != NULL);
1562 		ceph_assert(sl->mutex() == &s->lock);
1563 		bool session_locked = sl->owns_lock();
1564 		if (!session_locked) {
1565 		  sl->lock();
1566 		}
1567 		_finish_op(op, 0);
1568 		if (!session_locked) {
1569 		  sl->unlock();
1570 		}
1571 	      } else {
1572 		_finish_op(op, 0);	// no session
1573 	      }
1574 	    }
1575 	  } else {
1576 	    _send_op_map_check(op);
1577 	  }
1578 	}
1579 	
1580 	void Objecter::_send_op_map_check(Op *op)
1581 	{
1582 	  // rwlock is locked unique
1583 	  // ask the monitor
1584 	  if (check_latest_map_ops.count(op->tid) == 0) {
1585 	    op->get();
1586 	    check_latest_map_ops[op->tid] = op;
1587 	    C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1588 	    monc->get_version("osdmap", &c->latest, NULL, c);
1589 	  }
1590 	}
1591 	
1592 	void Objecter::_op_cancel_map_check(Op *op)
1593 	{
1594 	  // rwlock is locked unique
1595 	  map<ceph_tid_t, Op*>::iterator iter =
1596 	    check_latest_map_ops.find(op->tid);
1597 	  if (iter != check_latest_map_ops.end()) {
1598 	    Op *op = iter->second;
1599 	    op->put();
1600 	    check_latest_map_ops.erase(iter);
1601 	  }
1602 	}
1603 	
1604 	// linger pool check
1605 	
1606 	void Objecter::C_Linger_Map_Latest::finish(int r)
1607 	{
1608 	  if (r == -EAGAIN || r == -ECANCELED) {
1609 	    // ignore callback; we will retry in resend_mon_ops()
1610 	    return;
1611 	  }
1612 	
1613 	  unique_lock wl(objecter->rwlock);
1614 	
1615 	  map<uint64_t, LingerOp*>::iterator iter =
1616 	    objecter->check_latest_map_lingers.find(linger_id);
1617 	  if (iter == objecter->check_latest_map_lingers.end()) {
1618 	    return;
1619 	  }
1620 	
1621 	  LingerOp *op = iter->second;
1622 	  objecter->check_latest_map_lingers.erase(iter);
1623 	
1624 	  if (op->map_dne_bound == 0)
1625 	    op->map_dne_bound = latest;
1626 	
1627 	  bool unregister;
1628 	  objecter->_check_linger_pool_dne(op, &unregister);
1629 	
1630 	  if (unregister) {
1631 	    objecter->_linger_cancel(op);
1632 	  }
1633 	
1634 	  op->put();
1635 	}
1636 	
1637 	void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1638 	{
1639 	  // rwlock is locked unique
1640 	
1641 	  *need_unregister = false;
1642 	
1643 	  if (op->register_gen > 0) {
1644 	    ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1645 			   << " pool previously existed but now does not"
1646 			   << dendl;
1647 	    op->map_dne_bound = osdmap->get_epoch();
1648 	  } else {
1649 	    ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1650 			   << " current " << osdmap->get_epoch()
1651 			   << " map_dne_bound " << op->map_dne_bound
1652 			   << dendl;
1653 	  }
1654 	  if (op->map_dne_bound > 0) {
1655 	    if (osdmap->get_epoch() >= op->map_dne_bound) {
1656 	      LingerOp::unique_lock wl{op->watch_lock};
1657 	      if (op->on_reg_commit) {
1658 		op->on_reg_commit->complete(-ENOENT);
1659 		op->on_reg_commit = nullptr;
1660 	      }
1661 	      if (op->on_notify_finish) {
1662 	        op->on_notify_finish->complete(-ENOENT);
1663 	        op->on_notify_finish = nullptr;
1664 	      }
1665 	      *need_unregister = true;
1666 	    }
1667 	  } else {
1668 	    _send_linger_map_check(op);
1669 	  }
1670 	}
1671 	
1672 	void Objecter::_send_linger_map_check(LingerOp *op)
1673 	{
1674 	  // ask the monitor
1675 	  if (check_latest_map_lingers.count(op->linger_id) == 0) {
1676 	    op->get();
1677 	    check_latest_map_lingers[op->linger_id] = op;
1678 	    C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1679 	    monc->get_version("osdmap", &c->latest, NULL, c);
1680 	  }
1681 	}
1682 	
1683 	void Objecter::_linger_cancel_map_check(LingerOp *op)
1684 	{
1685 	  // rwlock is locked unique
1686 	
1687 	  map<uint64_t, LingerOp*>::iterator iter =
1688 	    check_latest_map_lingers.find(op->linger_id);
1689 	  if (iter != check_latest_map_lingers.end()) {
1690 	    LingerOp *op = iter->second;
1691 	    op->put();
1692 	    check_latest_map_lingers.erase(iter);
1693 	  }
1694 	}
1695 	
1696 	// command pool check
1697 	
1698 	void Objecter::C_Command_Map_Latest::finish(int r)
1699 	{
1700 	  if (r == -EAGAIN || r == -ECANCELED) {
1701 	    // ignore callback; we will retry in resend_mon_ops()
1702 	    return;
1703 	  }
1704 	
1705 	  unique_lock wl(objecter->rwlock);
1706 	
1707 	  map<uint64_t, CommandOp*>::iterator iter =
1708 	    objecter->check_latest_map_commands.find(tid);
1709 	  if (iter == objecter->check_latest_map_commands.end()) {
1710 	    return;
1711 	  }
1712 	
1713 	  CommandOp *c = iter->second;
1714 	  objecter->check_latest_map_commands.erase(iter);
1715 	
1716 	  if (c->map_dne_bound == 0)
1717 	    c->map_dne_bound = latest;
1718 	
1719 	  OSDSession::unique_lock sul(c->session->lock);
1720 	  objecter->_check_command_map_dne(c);
1721 	  sul.unlock();
1722 	
1723 	  c->put();
1724 	}
1725 	
1726 	void Objecter::_check_command_map_dne(CommandOp *c)
1727 	{
1728 	  // rwlock is locked unique
1729 	  // session is locked unique
1730 	
1731 	  ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1732 			 << " current " << osdmap->get_epoch()
1733 			 << " map_dne_bound " << c->map_dne_bound
1734 			 << dendl;
1735 	  if (c->map_dne_bound > 0) {
1736 	    if (osdmap->get_epoch() >= c->map_dne_bound) {
1737 	      _finish_command(c, c->map_check_error, c->map_check_error_str);
1738 	    }
1739 	  } else {
1740 	    _send_command_map_check(c);
1741 	  }
1742 	}
1743 	
1744 	void Objecter::_send_command_map_check(CommandOp *c)
1745 	{
1746 	  // rwlock is locked unique
1747 	  // session is locked unique
1748 	
1749 	  // ask the monitor
1750 	  if (check_latest_map_commands.count(c->tid) == 0) {
1751 	    c->get();
1752 	    check_latest_map_commands[c->tid] = c;
1753 	    C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1754 	    monc->get_version("osdmap", &f->latest, NULL, f);
1755 	  }
1756 	}
1757 	
1758 	void Objecter::_command_cancel_map_check(CommandOp *c)
1759 	{
1760 	  // rwlock is locked uniqe
1761 	
1762 	  map<uint64_t, CommandOp*>::iterator iter =
1763 	    check_latest_map_commands.find(c->tid);
1764 	  if (iter != check_latest_map_commands.end()) {
1765 	    CommandOp *c = iter->second;
1766 	    c->put();
1767 	    check_latest_map_commands.erase(iter);
1768 	  }
1769 	}
1770 	
1771 	
1772 	/**
1773 	 * Look up OSDSession by OSD id.
1774 	 *
1775 	 * @returns 0 on success, or -EAGAIN if the lock context requires
1776 	 * promotion to write.
1777 	 */
1778 	int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1779 	{
1780 	  ceph_assert(sul && sul.mutex() == &rwlock);
1781 	
1782 	  if (osd < 0) {
1783 	    *session = homeless_session;
1784 	    ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1785 			   << dendl;
1786 	    return 0;
1787 	  }
1788 	
1789 	  map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1790 	  if (p != osd_sessions.end()) {
1791 	    OSDSession *s = p->second;
1792 	    s->get();
1793 	    *session = s;
1794 	    ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1795 			   << s->get_nref() << dendl;
1796 	    return 0;
1797 	  }
1798 	  if (!sul.owns_lock()) {
1799 	    return -EAGAIN;
1800 	  }
1801 	  OSDSession *s = new OSDSession(cct, osd);
1802 	  osd_sessions[osd] = s;
1803 	  s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1804 	  s->con->set_priv(RefCountedPtr{s});
1805 	  logger->inc(l_osdc_osd_session_open);
1806 	  logger->set(l_osdc_osd_sessions, osd_sessions.size());
1807 	  s->get();
1808 	  *session = s;
1809 	  ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1810 			 << s->get_nref() << dendl;
1811 	  return 0;
1812 	}
1813 	
1814 	void Objecter::put_session(Objecter::OSDSession *s)
1815 	{
1816 	  if (s && !s->is_homeless()) {
1817 	    ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1818 			   << s->get_nref() << dendl;
1819 	    s->put();
1820 	  }
1821 	}
1822 	
1823 	void Objecter::get_session(Objecter::OSDSession *s)
1824 	{
1825 	  ceph_assert(s != NULL);
1826 	
1827 	  if (!s->is_homeless()) {
1828 	    ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1829 			   << s->get_nref() << dendl;
1830 	    s->get();
1831 	  }
1832 	}
1833 	
1834 	void Objecter::_reopen_session(OSDSession *s)
1835 	{
1836 	  // rwlock is locked unique
1837 	  // s->lock is locked
1838 	
1839 	  auto addrs = osdmap->get_addrs(s->osd);
1840 	  ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1841 			 << addrs << dendl;
1842 	  if (s->con) {
1843 	    s->con->set_priv(NULL);
1844 	    s->con->mark_down();
1845 	    logger->inc(l_osdc_osd_session_close);
1846 	  }
1847 	  s->con = messenger->connect_to_osd(addrs);
1848 	  s->con->set_priv(RefCountedPtr{s});
1849 	  s->incarnation++;
1850 	  logger->inc(l_osdc_osd_session_open);
1851 	}
1852 	
1853 	void Objecter::close_session(OSDSession *s)
1854 	{
1855 	  // rwlock is locked unique
1856 	
1857 	  ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1858 	  if (s->con) {
1859 	    s->con->set_priv(NULL);
1860 	    s->con->mark_down();
1861 	    logger->inc(l_osdc_osd_session_close);
1862 	  }
1863 	  OSDSession::unique_lock sl(s->lock);
1864 	
1865 	  std::list<LingerOp*> homeless_lingers;
1866 	  std::list<CommandOp*> homeless_commands;
1867 	  std::list<Op*> homeless_ops;
1868 	
1869 	  while (!s->linger_ops.empty()) {
1870 	    std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1871 	    ldout(cct, 10) << " linger_op " << i->first << dendl;
1872 	    homeless_lingers.push_back(i->second);
1873 	    _session_linger_op_remove(s, i->second);
1874 	  }
1875 	
1876 	  while (!s->ops.empty()) {
1877 	    std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1878 	    ldout(cct, 10) << " op " << i->first << dendl;
1879 	    homeless_ops.push_back(i->second);
1880 	    _session_op_remove(s, i->second);
1881 	  }
1882 	
1883 	  while (!s->command_ops.empty()) {
1884 	    std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1885 	    ldout(cct, 10) << " command_op " << i->first << dendl;
1886 	    homeless_commands.push_back(i->second);
1887 	    _session_command_op_remove(s, i->second);
1888 	  }
1889 	
1890 	  osd_sessions.erase(s->osd);
1891 	  sl.unlock();
1892 	  put_session(s);
1893 	
1894 	  // Assign any leftover ops to the homeless session
1895 	  {
1896 	    OSDSession::unique_lock hsl(homeless_session->lock);
1897 	    for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1898 		 i != homeless_lingers.end(); ++i) {
1899 	      _session_linger_op_assign(homeless_session, *i);
1900 	    }
1901 	    for (std::list<Op*>::iterator i = homeless_ops.begin();
1902 		 i != homeless_ops.end(); ++i) {
1903 	      _session_op_assign(homeless_session, *i);
1904 	    }
1905 	    for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1906 		 i != homeless_commands.end(); ++i) {
1907 	      _session_command_op_assign(homeless_session, *i);
1908 	    }
1909 	  }
1910 	
1911 	  logger->set(l_osdc_osd_sessions, osd_sessions.size());
1912 	}
1913 	
1914 	void Objecter::wait_for_osd_map()
1915 	{
1916 	  unique_lock l(rwlock);
1917 	  if (osdmap->get_epoch()) {
1918 	    l.unlock();
1919 	    return;
1920 	  }
1921 	
1922 	  // Leave this since it goes with C_SafeCond
1923 	  ceph::mutex lock = ceph::make_mutex("");
1924 	  ceph::condition_variable cond;
1925 	  bool done;
1926 	  std::unique_lock mlock{lock};
1927 	  C_SafeCond *context = new C_SafeCond(lock, cond, &done, NULL);
1928 	  waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1929 	  l.unlock();
1930 	  cond.wait(mlock, [&done] { return done; });
1931 	}
1932 	
1933 	struct C_Objecter_GetVersion : public Context {
1934 	  Objecter *objecter;
1935 	  uint64_t oldest, newest;
1936 	  Context *fin;
1937 	  C_Objecter_GetVersion(Objecter *o, Context *c)
1938 	    : objecter(o), oldest(0), newest(0), fin(c) {}
1939 	  void finish(int r) override {
1940 	    if (r >= 0) {
1941 	      objecter->get_latest_version(oldest, newest, fin);
1942 	    } else if (r == -EAGAIN) { // try again as instructed
1943 	      objecter->wait_for_latest_osdmap(fin);
1944 	    } else {
1945 	      // it doesn't return any other error codes!
1946 	      ceph_abort();
1947 	    }
1948 	  }
1949 	};
1950 	
1951 	void Objecter::wait_for_latest_osdmap(Context *fin)
1952 	{
1953 	  ldout(cct, 10) << __func__ << dendl;
1954 	  C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1955 	  monc->get_version("osdmap", &c->newest, &c->oldest, c);
1956 	}
1957 	
1958 	void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1959 	{
1960 	  unique_lock wl(rwlock);
1961 	  if (osdmap->get_epoch() >= newest) {
1962 	    ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1963 	    wl.unlock();
1964 	    if (fin)
1965 	      fin->complete(0);
1966 	    return;
1967 	  }
1968 	
1969 	  ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1970 	  _wait_for_new_map(fin, newest, 0);
1971 	}
1972 	
1973 	void Objecter::maybe_request_map()
1974 	{
1975 	  shared_lock rl(rwlock);
1976 	  _maybe_request_map();
1977 	}
1978 	
1979 	void Objecter::_maybe_request_map()
1980 	{
1981 	  // rwlock is locked
1982 	  int flag = 0;
1983 	  if (_osdmap_full_flag()
1984 	      || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1985 	      || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1986 	    ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1987 	      "osd map (FULL flag is set)" << dendl;
1988 	  } else {
1989 	    ldout(cct, 10)
1990 	      << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1991 	    flag = CEPH_SUBSCRIBE_ONETIME;
1992 	  }
1993 	  epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1994 	  if (monc->sub_want("osdmap", epoch, flag)) {
1995 	    monc->renew_subs();
1996 	  }
1997 	}
1998 	
1999 	void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
2000 	{
2001 	  // rwlock is locked unique
2002 	  waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2003 	  _maybe_request_map();
2004 	}
2005 	
2006 	
2007 	/**
2008 	 * Use this together with wait_for_map: this is a pre-check to avoid
2009 	 * allocating a Context for wait_for_map if we can see that we
2010 	 * definitely already have the epoch.
2011 	 *
2012 	 * This does *not* replace the need to handle the return value of
2013 	 * wait_for_map: just because we don't have it in this pre-check
2014 	 * doesn't mean we won't have it when calling back into wait_for_map,
2015 	 * since the objecter lock is dropped in between.
2016 	 */
2017 	bool Objecter::have_map(const epoch_t epoch)
2018 	{
2019 	  shared_lock rl(rwlock);
2020 	  if (osdmap->get_epoch() >= epoch) {
2021 	    return true;
2022 	  } else {
2023 	    return false;
2024 	  }
2025 	}
2026 	
2027 	bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2028 	{
2029 	  unique_lock wl(rwlock);
2030 	  if (osdmap->get_epoch() >= epoch) {
2031 	    return true;
2032 	  }
2033 	  _wait_for_new_map(c, epoch, err);
2034 	  return false;
2035 	}
2036 	
2037 	void Objecter::_kick_requests(OSDSession *session,
2038 				      map<uint64_t, LingerOp *>& lresend)
2039 	{
2040 	  // rwlock is locked unique
2041 	
2042 	  // clear backoffs
2043 	  session->backoffs.clear();
2044 	  session->backoffs_by_id.clear();
2045 	
2046 	  // resend ops
2047 	  map<ceph_tid_t,Op*> resend;  // resend in tid order
2048 	  for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2049 	       p != session->ops.end();) {
2050 	    Op *op = p->second;
2051 	    ++p;
2052 	    if (op->should_resend) {
2053 	      if (!op->target.paused)
2054 		resend[op->tid] = op;
2055 	    } else {
2056 	      _op_cancel_map_check(op);
2057 	      _cancel_linger_op(op);
2058 	    }
2059 	  }
2060 	
2061 	  logger->inc(l_osdc_op_resend, resend.size());
2062 	  while (!resend.empty()) {
2063 	    _send_op(resend.begin()->second);
2064 	    resend.erase(resend.begin());
2065 	  }
2066 	
2067 	  // resend lingers
2068 	  logger->inc(l_osdc_linger_resend, session->linger_ops.size());
2069 	  for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2070 	       j != session->linger_ops.end(); ++j) {
2071 	    LingerOp *op = j->second;
2072 	    op->get();
2073 	    ceph_assert(lresend.count(j->first) == 0);
2074 	    lresend[j->first] = op;
2075 	  }
2076 	
2077 	  // resend commands
2078 	  logger->inc(l_osdc_command_resend, session->command_ops.size());
2079 	  map<uint64_t,CommandOp*> cresend;  // resend in order
2080 	  for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2081 	       k != session->command_ops.end(); ++k) {
2082 	    cresend[k->first] = k->second;
2083 	  }
2084 	  while (!cresend.empty()) {
2085 	    _send_command(cresend.begin()->second);
2086 	    cresend.erase(cresend.begin());
2087 	  }
2088 	}
2089 	
2090 	void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2091 					  unique_lock& ul)
2092 	{
2093 	  ceph_assert(ul.owns_lock());
2094 	  shunique_lock sul(std::move(ul));
2095 	  while (!lresend.empty()) {
2096 	    LingerOp *op = lresend.begin()->second;
2097 	    if (!op->canceled) {
2098 	      _send_linger(op, sul);
2099 	    }
2100 	    op->put();
2101 	    lresend.erase(lresend.begin());
2102 	  }
2103 	  ul = sul.release_to_unique();
2104 	}
2105 	
2106 	void Objecter::start_tick()
2107 	{
2108 	  ceph_assert(tick_event == 0);
2109 	  tick_event =
2110 	    timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2111 			    &Objecter::tick, this);
2112 	}
2113 	
2114 	void Objecter::tick()
2115 	{
2116 	  shared_lock rl(rwlock);
2117 	
2118 	  ldout(cct, 10) << "tick" << dendl;
2119 	
2120 	  // we are only called by C_Tick
2121 	  tick_event = 0;
2122 	
2123 	  if (!initialized) {
2124 	    // we raced with shutdown
2125 	    ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2126 	    return;
2127 	  }
2128 	
2129 	  set<OSDSession*> toping;
2130 	
2131 	
2132 	  // look for laggy requests
2133 	  auto cutoff = ceph::coarse_mono_clock::now();
2134 	  cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout);  // timeout
2135 	
2136 	  unsigned laggy_ops = 0;
2137 	
2138 	  for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2139 	       siter != osd_sessions.end(); ++siter) {
2140 	    OSDSession *s = siter->second;
2141 	    OSDSession::lock_guard l(s->lock);
2142 	    bool found = false;
2143 	    for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2144 		p != s->ops.end();
2145 		++p) {
2146 	      Op *op = p->second;
2147 	      ceph_assert(op->session);
2148 	      if (op->stamp < cutoff) {
2149 		ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2150 			      << " is laggy" << dendl;
2151 		found = true;
2152 		++laggy_ops;
2153 	      }
2154 	    }
2155 	    for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2156 		p != s->linger_ops.end();
2157 		++p) {
2158 	      LingerOp *op = p->second;
2159 	      LingerOp::unique_lock wl(op->watch_lock);
2160 	      ceph_assert(op->session);
2161 	      ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2162 			     << " (osd." << op->session->osd << ")" << dendl;
2163 	      found = true;
2164 	      if (op->is_watch && op->registered && !op->last_error)
2165 		_send_linger_ping(op);
2166 	    }
2167 	    for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2168 		p != s->command_ops.end();
2169 		++p) {
2170 	      CommandOp *op = p->second;
2171 	      ceph_assert(op->session);
2172 	      ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2173 			     << " (osd." << op->session->osd << ")" << dendl;
2174 	      found = true;
2175 	    }
2176 	    if (found)
2177 	      toping.insert(s);
2178 	  }
2179 	  if (num_homeless_ops || !toping.empty()) {
2180 	    _maybe_request_map();
2181 	  }
2182 	
2183 	  logger->set(l_osdc_op_laggy, laggy_ops);
2184 	  logger->set(l_osdc_osd_laggy, toping.size());
2185 	
2186 	  if (!toping.empty()) {
2187 	    // send a ping to these osds, to ensure we detect any session resets
2188 	    // (osd reply message policy is lossy)
2189 	    for (set<OSDSession*>::const_iterator i = toping.begin();
2190 		 i != toping.end();
2191 		 ++i) {
2192 	      (*i)->con->send_message(new MPing);
2193 	    }
2194 	  }
2195 	
2196 	  // Make sure we don't reschedule if we wake up after shutdown
2197 	  if (initialized) {
2198 	    tick_event = timer.reschedule_me(ceph::make_timespan(
2199 					       cct->_conf->objecter_tick_interval));
2200 	  }
2201 	}
2202 	
2203 	void Objecter::resend_mon_ops()
2204 	{
2205 	  unique_lock wl(rwlock);
2206 	
2207 	  ldout(cct, 10) << "resend_mon_ops" << dendl;
2208 	
2209 	  for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2210 	       p != poolstat_ops.end();
2211 	       ++p) {
2212 	    _poolstat_submit(p->second);
2213 	    logger->inc(l_osdc_poolstat_resend);
2214 	  }
2215 	
2216 	  for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2217 	       p != statfs_ops.end();
2218 	       ++p) {
2219 	    _fs_stats_submit(p->second);
2220 	    logger->inc(l_osdc_statfs_resend);
2221 	  }
2222 	
2223 	  for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2224 	       p != pool_ops.end();
2225 	       ++p) {
2226 	    _pool_op_submit(p->second);
2227 	    logger->inc(l_osdc_poolop_resend);
2228 	  }
2229 	
2230 	  for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2231 	       p != check_latest_map_ops.end();
2232 	       ++p) {
2233 	    C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2234 	    monc->get_version("osdmap", &c->latest, NULL, c);
2235 	  }
2236 	
2237 	  for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2238 	       p != check_latest_map_lingers.end();
2239 	       ++p) {
2240 	    C_Linger_Map_Latest *c
2241 	      = new C_Linger_Map_Latest(this, p->second->linger_id);
2242 	    monc->get_version("osdmap", &c->latest, NULL, c);
2243 	  }
2244 	
2245 	  for (map<uint64_t, CommandOp*>::iterator p
2246 		 = check_latest_map_commands.begin();
2247 	       p != check_latest_map_commands.end();
2248 	       ++p) {
2249 	    C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2250 	    monc->get_version("osdmap", &c->latest, NULL, c);
2251 	  }
2252 	}
2253 	
2254 	// read | write ---------------------------
2255 	
2256 	void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2257 	{
2258 	  shunique_lock rl(rwlock, ceph::acquire_shared);
2259 	  ceph_tid_t tid = 0;
2260 	  if (!ptid)
2261 	    ptid = &tid;
2262 	  op->trace.event("op submit");
2263 	  _op_submit_with_budget(op, rl, ptid, ctx_budget);
2264 	}
2265 	
2266 	void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2267 					      ceph_tid_t *ptid,
2268 					      int *ctx_budget)
2269 	{
2270 	  ceph_assert(initialized);
2271 	
2272 	  ceph_assert(op->ops.size() == op->out_bl.size());
2273 	  ceph_assert(op->ops.size() == op->out_rval.size());
2274 	  ceph_assert(op->ops.size() == op->out_handler.size());
2275 	
2276 	  // throttle.  before we look at any state, because
2277 	  // _take_op_budget() may drop our lock while it blocks.
2278 	  if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2279 	    int op_budget = _take_op_budget(op, sul);
2280 	    // take and pass out the budget for the first OP
2281 	    // in the context session
2282 	    if (ctx_budget && (*ctx_budget == -1)) {
2283 	      *ctx_budget = op_budget;
2284 	    }
2285 	  }
2286 	
2287 	  if (osd_timeout > timespan(0)) {
2288 	    if (op->tid == 0)
2289 	      op->tid = ++last_tid;
2290 	    auto tid = op->tid;
2291 	    op->ontimeout = timer.add_event(osd_timeout,
2292 					    [this, tid]() {
2293 					      op_cancel(tid, -ETIMEDOUT); });
2294 	  }
2295 	
2296 	  _op_submit(op, sul, ptid);
2297 	}
2298 	
2299 	void Objecter::_send_op_account(Op *op)
2300 	{
2301 	  inflight_ops++;
2302 	
2303 	  // add to gather set(s)
2304 	  if (op->onfinish) {
2305 	    num_in_flight++;
2306 	  } else {
2307 	    ldout(cct, 20) << " note: not requesting reply" << dendl;
2308 	  }
2309 	
2310 	  logger->inc(l_osdc_op_active);
2311 	  logger->inc(l_osdc_op);
2312 	
2313 	  if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2314 	      (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2315 	    logger->inc(l_osdc_op_rmw);
2316 	  else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2317 	    logger->inc(l_osdc_op_w);
2318 	  else if (op->target.flags & CEPH_OSD_FLAG_READ)
2319 	    logger->inc(l_osdc_op_r);
2320 	
2321 	  if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2322 	    logger->inc(l_osdc_op_pg);
2323 	
2324 	  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2325 	    int code = l_osdc_osdop_other;
2326 	    switch (p->op.op) {
2327 	    case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2328 	    case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2329 	    case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2330 	    case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2331 	    case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2332 	    case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2333 	    case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2334 	    case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2335 	    case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2336 	    case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2337 	    case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2338 	    case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2339 	    case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2340 	    case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2341 	    case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2342 	    case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2343 	    case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2344 	
2345 	    // OMAP read operations
2346 	    case CEPH_OSD_OP_OMAPGETVALS:
2347 	    case CEPH_OSD_OP_OMAPGETKEYS:
2348 	    case CEPH_OSD_OP_OMAPGETHEADER:
2349 	    case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2350 	    case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2351 	
2352 	    // OMAP write operations
2353 	    case CEPH_OSD_OP_OMAPSETVALS:
2354 	    case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2355 	
2356 	    // OMAP del operations
2357 	    case CEPH_OSD_OP_OMAPCLEAR:
2358 	    case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2359 	
2360 	    case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2361 	    case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2362 	    case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2363 	    }
2364 	    if (code)
2365 	      logger->inc(code);
2366 	  }
2367 	}
2368 	
2369 	void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2370 	{
2371 	  // rwlock is locked
2372 	
2373 	  ldout(cct, 10) << __func__ << " op " << op << dendl;
2374 	
2375 	  // pick target
2376 	  ceph_assert(op->session == NULL);
2377 	  OSDSession *s = NULL;
2378 	
2379 	  bool check_for_latest_map = _calc_target(&op->target, nullptr)
2380 	    == RECALC_OP_TARGET_POOL_DNE;
2381 	
2382 	  // Try to get a session, including a retry if we need to take write lock
2383 	  int r = _get_session(op->target.osd, &s, sul);
2384 	  if (r == -EAGAIN ||
2385 	      (check_for_latest_map && sul.owns_lock_shared()) ||
2386 	      cct->_conf->objecter_debug_inject_relock_delay) {
2387 	    epoch_t orig_epoch = osdmap->get_epoch();
2388 	    sul.unlock();
2389 	    if (cct->_conf->objecter_debug_inject_relock_delay) {
2390 	      sleep(1);
2391 	    }
2392 	    sul.lock();
2393 	    if (orig_epoch != osdmap->get_epoch()) {
2394 	      // map changed; recalculate mapping
2395 	      ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2396 			     << dendl;
2397 	      check_for_latest_map = _calc_target(&op->target, nullptr)
2398 		== RECALC_OP_TARGET_POOL_DNE;
2399 	      if (s) {
2400 		put_session(s);
2401 		s = NULL;
2402 		r = -EAGAIN;
2403 	      }
2404 	    }
2405 	  }
2406 	  if (r == -EAGAIN) {
2407 	    ceph_assert(s == NULL);
2408 	    r = _get_session(op->target.osd, &s, sul);
2409 	  }
2410 	  ceph_assert(r == 0);
2411 	  ceph_assert(s);  // may be homeless
2412 	
2413 	  _send_op_account(op);
2414 	
2415 	  // send?
2416 	
2417 	  ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2418 	
2419 	  if (pool_full_try) {
2420 	    op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2421 	  }
2422 	
2423 	  bool need_send = false;
2424 	
2425 	  if (osdmap->get_epoch() < epoch_barrier) {
2426 	    ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2427 			   << dendl;
2428 	    op->target.paused = true;
2429 	    _maybe_request_map();
2430 	  } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2431 	             osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2432 	    ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2433 			   << dendl;
2434 	    op->target.paused = true;
2435 	    _maybe_request_map();
2436 	  } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2437 		     osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2438 	    ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2439 			   << dendl;
2440 	    op->target.paused = true;
2441 	    _maybe_request_map();
2442 	  } else if (op->respects_full() &&
2443 		     (_osdmap_full_flag() ||
2444 		      _osdmap_pool_full(op->target.base_oloc.pool))) {
2445 	    ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2446 			  << op->tid << dendl;
2447 	    op->target.paused = true;
2448 	    _maybe_request_map();
2449 	  } else if (!s->is_homeless()) {
2450 	    need_send = true;
2451 	  } else {
2452 	    _maybe_request_map();
2453 	  }
2454 	
2455 	  OSDSession::unique_lock sl(s->lock);
2456 	  if (op->tid == 0)
2457 	    op->tid = ++last_tid;
2458 	
2459 	  ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2460 			 << " '" << op->target.base_oloc << "' '"
2461 			 << op->target.target_oloc << "' " << op->ops << " tid "
2462 			 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2463 			 << dendl;
2464 	
2465 	  _session_op_assign(s, op);
2466 	
2467 	  if (need_send) {
2468 	    _send_op(op);
2469 	  }
2470 	
2471 	  // Last chance to touch Op here, after giving up session lock it can
2472 	  // be freed at any time by response handler.
2473 	  ceph_tid_t tid = op->tid;
2474 	  if (check_for_latest_map) {
2475 	    _send_op_map_check(op);
2476 	  }
2477 	  if (ptid)
2478 	    *ptid = tid;
2479 	  op = NULL;
2480 	
2481 	  sl.unlock();
2482 	  put_session(s);
2483 	
2484 	  ldout(cct, 5) << num_in_flight << " in flight" << dendl;
2485 	}
2486 	
2487 	int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2488 	{
2489 	  ceph_assert(initialized);
2490 	
2491 	  OSDSession::unique_lock sl(s->lock);
2492 	
2493 	  map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2494 	  if (p == s->ops.end()) {
2495 	    ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2496 			   << s->osd << dendl;
2497 	    return -ENOENT;
2498 	  }
2499 	
2500 	#if 0
2501 	  if (s->con) {
2502 	    ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
2503 			   << " on " << s->con << dendl;
2504 	    s->con->revoke_rx_buffer(tid);
2505 	  }
2506 	#endif
2507 	
2508 	  ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2509 			 << dendl;
2510 	  Op *op = p->second;
2511 	  if (op->onfinish) {
2512 	    num_in_flight--;
2513 	    op->onfinish->complete(r);
2514 	    op->onfinish = NULL;
2515 	  }
2516 	  _op_cancel_map_check(op);
2517 	  _finish_op(op, r);
2518 	  sl.unlock();
2519 	
2520 	  return 0;
2521 	}
2522 	
2523 	int Objecter::op_cancel(ceph_tid_t tid, int r)
2524 	{
2525 	  int ret = 0;
2526 	
2527 	  unique_lock wl(rwlock);
2528 	  ret = _op_cancel(tid, r);
2529 	
2530 	  return ret;
2531 	}
2532 	
2533 	int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2534 	{
2535 	  unique_lock wl(rwlock);
2536 	  ldout(cct,10) << __func__ << " " << tids << dendl;
2537 	  for (auto tid : tids) {
2538 	    _op_cancel(tid, r);
2539 	  }
2540 	  return 0;
2541 	}
2542 	
2543 	int Objecter::_op_cancel(ceph_tid_t tid, int r)
2544 	{
2545 	  int ret = 0;
2546 	
2547 	  ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2548 			<< dendl;
2549 	
2550 	start:
2551 	
2552 	  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2553 	       siter != osd_sessions.end(); ++siter) {
2554 	    OSDSession *s = siter->second;
2555 	    OSDSession::shared_lock sl(s->lock);
2556 	    if (s->ops.find(tid) != s->ops.end()) {
2557 	      sl.unlock();
2558 	      ret = op_cancel(s, tid, r);
2559 	      if (ret == -ENOENT) {
2560 		/* oh no! raced, maybe tid moved to another session, restarting */
2561 		goto start;
2562 	      }
2563 	      return ret;
2564 	    }
2565 	  }
2566 	
2567 	  ldout(cct, 5) << __func__ << ": tid " << tid
2568 			<< " not found in live sessions" << dendl;
2569 	
2570 	  // Handle case where the op is in homeless session
2571 	  OSDSession::shared_lock sl(homeless_session->lock);
2572 	  if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2573 	    sl.unlock();
2574 	    ret = op_cancel(homeless_session, tid, r);
2575 	    if (ret == -ENOENT) {
2576 	      /* oh no! raced, maybe tid moved to another session, restarting */
2577 	      goto start;
2578 	    } else {
2579 	      return ret;
2580 	    }
2581 	  } else {
2582 	    sl.unlock();
2583 	  }
2584 	
2585 	  ldout(cct, 5) << __func__ << ": tid " << tid
2586 			<< " not found in homeless session" << dendl;
2587 	
2588 	  return ret;
2589 	}
2590 	
2591 	
2592 	epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2593 	{
2594 	  unique_lock wl(rwlock);
2595 	
2596 	  std::vector<ceph_tid_t> to_cancel;
2597 	  bool found = false;
2598 	
2599 	  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2600 	       siter != osd_sessions.end(); ++siter) {
2601 	    OSDSession *s = siter->second;
2602 	    OSDSession::shared_lock sl(s->lock);
2603 	    for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2604 		 op_i != s->ops.end(); ++op_i) {
2605 	      if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2606 		  && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2607 		to_cancel.push_back(op_i->first);
2608 	      }
2609 	    }
2610 	    sl.unlock();
2611 	
2612 	    for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2613 		 titer != to_cancel.end();
2614 		 ++titer) {
2615 	      int cancel_result = op_cancel(s, *titer, r);
2616 	      // We hold rwlock across search and cancellation, so cancels
2617 	      // should always succeed
2618 	      ceph_assert(cancel_result == 0);
2619 	    }
2620 	    if (!found && to_cancel.size())
2621 	      found = true;
2622 	    to_cancel.clear();
2623 	  }
2624 	
2625 	  const epoch_t epoch = osdmap->get_epoch();
2626 	
2627 	  wl.unlock();
2628 	
2629 	  if (found) {
2630 	    return epoch;
2631 	  } else {
2632 	    return -1;
2633 	  }
2634 	}
2635 	
2636 	bool Objecter::is_pg_changed(
2637 	  int oldprimary,
2638 	  const vector<int>& oldacting,
2639 	  int newprimary,
2640 	  const vector<int>& newacting,
2641 	  bool any_change)
2642 	{
2643 	  if (OSDMap::primary_changed(
2644 		oldprimary,
2645 		oldacting,
2646 		newprimary,
2647 		newacting))
2648 	    return true;
2649 	  if (any_change && oldacting != newacting)
2650 	    return true;
2651 	  return false;      // same primary (tho replicas may have changed)
2652 	}
2653 	
2654 	bool Objecter::target_should_be_paused(op_target_t *t)
2655 	{
2656 	  const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2657 	  bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2658 	  bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2659 	    _osdmap_full_flag() || _osdmap_pool_full(*pi);
2660 	
2661 	  return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2662 	    (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2663 	    (osdmap->get_epoch() < epoch_barrier);
2664 	}
2665 	
2666 	/**
2667 	 * Locking public accessor for _osdmap_full_flag
2668 	 */
2669 	bool Objecter::osdmap_full_flag() const
2670 	{
2671 	  shared_lock rl(rwlock);
2672 	
2673 	  return _osdmap_full_flag();
2674 	}
2675 	
2676 	bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2677 	{
2678 	  shared_lock rl(rwlock);
2679 	
2680 	  if (_osdmap_full_flag()) {
2681 	    return true;
2682 	  }
2683 	
2684 	  return _osdmap_pool_full(pool_id);
2685 	}
2686 	
2687 	bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2688 	{
2689 	  const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2690 	  if (pool == NULL) {
2691 	    ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2692 	    return false;
2693 	  }
2694 	
2695 	  return _osdmap_pool_full(*pool);
2696 	}
2697 	
2698 	bool Objecter::_osdmap_has_pool_full() const
2699 	{
2700 	  for (map<int64_t, pg_pool_t>::const_iterator it
2701 		 = osdmap->get_pools().begin();
2702 	       it != osdmap->get_pools().end(); ++it) {
2703 	    if (_osdmap_pool_full(it->second))
2704 	      return true;
2705 	  }
2706 	  return false;
2707 	}
2708 	
2709 	bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2710 	{
2711 	  return p.has_flag(pg_pool_t::FLAG_FULL) && honor_pool_full;
2712 	}
2713 	
2714 	/**
2715 	 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2716 	 */
2717 	bool Objecter::_osdmap_full_flag() const
2718 	{
2719 	  // Ignore the FULL flag if the caller does not have honor_osdmap_full
2720 	  return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
2721 	}
2722 	
2723 	void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2724 	{
2725 	  for (map<int64_t, pg_pool_t>::const_iterator it
2726 		 = osdmap->get_pools().begin();
2727 	       it != osdmap->get_pools().end(); ++it) {
2728 	    if (pool_full_map.find(it->first) == pool_full_map.end()) {
2729 	      pool_full_map[it->first] = _osdmap_pool_full(it->second);
2730 	    } else {
2731 	      pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2732 		pool_full_map[it->first];
2733 	    }
2734 	  }
2735 	}
2736 	
2737 	int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2738 						   const string& ns)
2739 	{
2740 	  shared_lock rl(rwlock);
2741 	  const pg_pool_t *p = osdmap->get_pg_pool(pool);
2742 	  if (!p)
2743 	    return -ENOENT;
2744 	  return p->hash_key(key, ns);
2745 	}
2746 	
2747 	int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2748 						      const string& ns)
2749 	{
2750 	  shared_lock rl(rwlock);
2751 	  const pg_pool_t *p = osdmap->get_pg_pool(pool);
2752 	  if (!p)
2753 	    return -ENOENT;
2754 	  return p->raw_hash_to_pg(p->hash_key(key, ns));
2755 	}
2756 	
2757 	void Objecter::_prune_snapc(
2758 	  const mempool::osdmap::map<int64_t,
2759 	  snap_interval_set_t>& new_removed_snaps,
2760 	  Op *op)
2761 	{
2762 	  bool match = false;
2763 	  auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2764 	  if (i != new_removed_snaps.end()) {
2765 	    for (auto s : op->snapc.snaps) {
2766 	      if (i->second.contains(s)) {
2767 		match = true;
2768 		break;
2769 	      }
2770 	    }
2771 	    if (match) {
2772 	      vector<snapid_t> new_snaps;
2773 	      for (auto s : op->snapc.snaps) {
2774 		if (!i->second.contains(s)) {
2775 		  new_snaps.push_back(s);
2776 		}
2777 	      }
2778 	      op->snapc.snaps.swap(new_snaps);
2779 	      ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2780 			    << " (was " << new_snaps << ")" << dendl;
2781 	    }
2782 	  }
2783 	}
2784 	
2785 	int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2786 	{
2787 	  // rwlock is locked
2788 	  bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2789 	  bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2790 	  t->epoch = osdmap->get_epoch();
2791 	  ldout(cct,20) << __func__ << " epoch " << t->epoch
2792 			<< " base " << t->base_oid << " " << t->base_oloc
2793 			<< " precalc_pgid " << (int)t->precalc_pgid
2794 			<< " pgid " << t->base_pgid
2795 			<< (is_read ? " is_read" : "")
2796 			<< (is_write ? " is_write" : "")
2797 			<< dendl;
2798 	
2799 	  const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2800 	  if (!pi) {
2801 	    t->osd = -1;
2802 	    return RECALC_OP_TARGET_POOL_DNE;
2803 	  }
2804 	  ldout(cct,30) << __func__ << "  base pi " << pi
2805 			<< " pg_num " << pi->get_pg_num() << dendl;
2806 	
2807 	  bool force_resend = false;
2808 	  if (osdmap->get_epoch() == pi->last_force_op_resend) {
2809 	    if (t->last_force_resend < pi->last_force_op_resend) {
2810 	      t->last_force_resend = pi->last_force_op_resend;
2811 	      force_resend = true;
2812 	    } else if (t->last_force_resend == 0) {
2813 	      force_resend = true;
2814 	    }
2815 	  }
2816 	
2817 	  // apply tiering
2818 	  t->target_oid = t->base_oid;
2819 	  t->target_oloc = t->base_oloc;
2820 	  if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2821 	    if (is_read && pi->has_read_tier())
2822 	      t->target_oloc.pool = pi->read_tier;
2823 	    if (is_write && pi->has_write_tier())
2824 	      t->target_oloc.pool = pi->write_tier;
2825 	    pi = osdmap->get_pg_pool(t->target_oloc.pool);
2826 	    if (!pi) {
2827 	      t->osd = -1;
2828 	      return RECALC_OP_TARGET_POOL_DNE;
2829 	    }
2830 	  }
2831 	
2832 	  pg_t pgid;
2833 	  if (t->precalc_pgid) {
2834 	    ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2835 	    ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2836 	    ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2837 	    pgid = t->base_pgid;
2838 	  } else {
2839 	    int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2840 						   pgid);
2841 	    if (ret == -ENOENT) {
2842 	      t->osd = -1;
2843 	      return RECALC_OP_TARGET_POOL_DNE;
2844 	    }
2845 	  }
2846 	  ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2847 			<< t->target_oloc << " -> pgid " << pgid << dendl;
2848 	  ldout(cct,30) << __func__ << "  target pi " << pi
2849 			<< " pg_num " << pi->get_pg_num() << dendl;
2850 	  t->pool_ever_existed = true;
2851 	
2852 	  int size = pi->size;
2853 	  int min_size = pi->min_size;
2854 	  unsigned pg_num = pi->get_pg_num();
2855 	  unsigned pg_num_mask = pi->get_pg_num_mask();
2856 	  unsigned pg_num_pending = pi->get_pg_num_pending();
2857 	  int up_primary, acting_primary;
2858 	  vector<int> up, acting;
2859 	  ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2860 	  pg_t actual_pgid(actual_ps, pgid.pool());
2861 	  pg_mapping_t pg_mapping;
2862 	  pg_mapping.epoch = osdmap->get_epoch();
2863 	  if (lookup_pg_mapping(actual_pgid, &pg_mapping)) {
2864 	    up = pg_mapping.up;
2865 	    up_primary = pg_mapping.up_primary;
2866 	    acting = pg_mapping.acting;
2867 	    acting_primary = pg_mapping.acting_primary;
2868 	  } else {
2869 	    osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2870 	                                 &acting, &acting_primary);
2871 	    pg_mapping_t pg_mapping(osdmap->get_epoch(),
2872 	                            up, up_primary, acting, acting_primary);
2873 	    update_pg_mapping(actual_pgid, std::move(pg_mapping));
2874 	  }
2875 	  bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
2876 	  bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
2877 	  unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2878 	  pg_t prev_pgid(prev_seed, pgid.pool());
2879 	  if (any_change && PastIntervals::is_new_interval(
2880 		t->acting_primary,
2881 		acting_primary,
2882 		t->acting,
2883 		acting,
2884 		t->up_primary,
2885 		up_primary,
2886 		t->up,
2887 		up,
2888 		t->size,
2889 		size,
2890 		t->min_size,
2891 		min_size,
2892 		t->pg_num,
2893 		pg_num,
2894 		t->pg_num_pending,
2895 		pg_num_pending,
2896 		t->sort_bitwise,
2897 		sort_bitwise,
2898 		t->recovery_deletes,
2899 		recovery_deletes,
2900 		prev_pgid)) {
2901 	    force_resend = true;
2902 	  }
2903 	
2904 	  bool unpaused = false;
2905 	  bool should_be_paused = target_should_be_paused(t);
2906 	  if (t->paused && !should_be_paused) {
2907 	    unpaused = true;
2908 	  }
2909 	  t->paused = should_be_paused;
2910 	
2911 	  bool legacy_change =
2912 	    t->pgid != pgid ||
2913 	      is_pg_changed(
2914 		t->acting_primary, t->acting, acting_primary, acting,
2915 		t->used_replica || any_change);
2916 	  bool split_or_merge = false;
2917 	  if (t->pg_num) {
2918 	    split_or_merge =
2919 	      prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2920 	      prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2921 	      prev_pgid.is_merge_target(t->pg_num, pg_num);
2922 	  }
2923 	
2924 	  if (legacy_change || split_or_merge || force_resend) {
2925 	    t->pgid = pgid;
2926 	    t->acting = acting;
2927 	    t->acting_primary = acting_primary;
2928 	    t->up_primary = up_primary;
2929 	    t->up = up;
2930 	    t->size = size;
2931 	    t->min_size = min_size;
2932 	    t->pg_num = pg_num;
2933 	    t->pg_num_mask = pg_num_mask;
2934 	    t->pg_num_pending = pg_num_pending;
2935 	    spg_t spgid(actual_pgid);
2936 	    if (pi->is_erasure()) {
2937 	      for (uint8_t i = 0; i < acting.size(); ++i) {
2938 	        if (acting[i] == acting_primary) {
2939 	          spgid.reset_shard(shard_id_t(i));
2940 	          break;
2941 	        }
2942 	      }
2943 	    }
2944 	    t->actual_pgid = spgid;
2945 	    t->sort_bitwise = sort_bitwise;
2946 	    t->recovery_deletes = recovery_deletes;
2947 	    ldout(cct, 10) << __func__ << " "
2948 			   << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2949 			   << " acting " << acting
2950 			   << " primary " << acting_primary << dendl;
2951 	    t->used_replica = false;
2952 	    if (acting_primary == -1) {
2953 	      t->osd = -1;
2954 	    } else {
2955 	      int osd;
2956 	      bool read = is_read && !is_write;
2957 	      if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2958 		int p = rand() % acting.size();
2959 		if (p)
2960 		  t->used_replica = true;
2961 		osd = acting[p];
2962 		ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2963 			       << dendl;
2964 	      } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2965 			 acting.size() > 1) {
2966 		// look for a local replica.  prefer the primary if the
2967 		// distance is the same.
2968 		int best = -1;
2969 		int best_locality = 0;
2970 		for (unsigned i = 0; i < acting.size(); ++i) {
2971 		  int locality = osdmap->crush->get_common_ancestor_distance(
2972 			 cct, acting[i], crush_location);
2973 		  ldout(cct, 20) << __func__ << " localize: rank " << i
2974 				 << " osd." << acting[i]
2975 				 << " locality " << locality << dendl;
2976 		  if (i == 0 ||
2977 		      (locality >= 0 && best_locality >= 0 &&
2978 		       locality < best_locality) ||
2979 		      (best_locality < 0 && locality >= 0)) {
2980 		    best = i;
2981 		    best_locality = locality;
2982 		    if (i)
2983 		      t->used_replica = true;
2984 		  }
2985 		}
2986 		ceph_assert(best >= 0);
2987 		osd = acting[best];
2988 	      } else {
2989 		osd = acting_primary;
2990 	      }
2991 	      t->osd = osd;
2992 	    }
2993 	  }
2994 	  if (legacy_change || unpaused || force_resend) {
2995 	    return RECALC_OP_TARGET_NEED_RESEND;
2996 	  }
2997 	  if (split_or_merge &&
2998 	      (osdmap->require_osd_release >= ceph_release_t::luminous ||
2999 	       HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
3000 			    RESEND_ON_SPLIT))) {
3001 	    return RECALC_OP_TARGET_NEED_RESEND;
3002 	  }
3003 	  return RECALC_OP_TARGET_NO_ACTION;
3004 	}
3005 	
3006 	int Objecter::_map_session(op_target_t *target, OSDSession **s,
3007 				   shunique_lock& sul)
3008 	{
3009 	  _calc_target(target, nullptr);
3010 	  return _get_session(target->osd, s, sul);
3011 	}
3012 	
3013 	void Objecter::_session_op_assign(OSDSession *to, Op *op)
3014 	{
3015 	  // to->lock is locked
3016 	  ceph_assert(op->session == NULL);
3017 	  ceph_assert(op->tid);
3018 	
3019 	  get_session(to);
3020 	  op->session = to;
3021 	  to->ops[op->tid] = op;
3022 	
3023 	  if (to->is_homeless()) {
3024 	    num_homeless_ops++;
3025 	  }
3026 	
3027 	  ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3028 	}
3029 	
3030 	void Objecter::_session_op_remove(OSDSession *from, Op *op)
3031 	{
3032 	  ceph_assert(op->session == from);
3033 	  // from->lock is locked
3034 	
3035 	  if (from->is_homeless()) {
3036 	    num_homeless_ops--;
3037 	  }
3038 	
3039 	  from->ops.erase(op->tid);
3040 	  put_session(from);
3041 	  op->session = NULL;
3042 	
3043 	  ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3044 	}
3045 	
3046 	void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3047 	{
3048 	  // to lock is locked unique
3049 	  ceph_assert(op->session == NULL);
3050 	
3051 	  if (to->is_homeless()) {
3052 	    num_homeless_ops++;
3053 	  }
3054 	
3055 	  get_session(to);
3056 	  op->session = to;
3057 	  to->linger_ops[op->linger_id] = op;
3058 	
3059 	  ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3060 			 << dendl;
3061 	}
3062 	
3063 	void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3064 	{
3065 	  ceph_assert(from == op->session);
3066 	  // from->lock is locked unique
3067 	
3068 	  if (from->is_homeless()) {
3069 	    num_homeless_ops--;
3070 	  }
3071 	
3072 	  from->linger_ops.erase(op->linger_id);
3073 	  put_session(from);
3074 	  op->session = NULL;
3075 	
3076 	  ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3077 			 << dendl;
3078 	}
3079 	
3080 	void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3081 	{
3082 	  ceph_assert(from == op->session);
3083 	  // from->lock is locked
3084 	
3085 	  if (from->is_homeless()) {
3086 	    num_homeless_ops--;
3087 	  }
3088 	
3089 	  from->command_ops.erase(op->tid);
3090 	  put_session(from);
3091 	  op->session = NULL;
3092 	
3093 	  ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3094 	}
3095 	
3096 	void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3097 	{
3098 	  // to->lock is locked
3099 	  ceph_assert(op->session == NULL);
3100 	  ceph_assert(op->tid);
3101 	
3102 	  if (to->is_homeless()) {
3103 	    num_homeless_ops++;
3104 	  }
3105 	
3106 	  get_session(to);
3107 	  op->session = to;
3108 	  to->command_ops[op->tid] = op;
3109 	
3110 	  ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3111 	}
3112 	
3113 	int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3114 					       shunique_lock& sul)
3115 	{
3116 	  // rwlock is locked unique
3117 	
3118 	  int r = _calc_target(&linger_op->target, nullptr, true);
3119 	  if (r == RECALC_OP_TARGET_NEED_RESEND) {
3120 	    ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3121 			   << " pgid " << linger_op->target.pgid
3122 			   << " acting " << linger_op->target.acting << dendl;
3123 	
3124 	    OSDSession *s = NULL;
3125 	    r = _get_session(linger_op->target.osd, &s, sul);
3126 	    ceph_assert(r == 0);
3127 	
3128 	    if (linger_op->session != s) {
3129 	      // NB locking two sessions (s and linger_op->session) at the
3130 	      // same time here is only safe because we are the only one that
3131 	      // takes two, and we are holding rwlock for write.  Disable
3132 	      // lockdep because it doesn't know that.
3133 	      OSDSession::unique_lock sl(s->lock);
3134 	      _session_linger_op_remove(linger_op->session, linger_op);
3135 	      _session_linger_op_assign(s, linger_op);
3136 	    }
3137 	
3138 	    put_session(s);
3139 	    return RECALC_OP_TARGET_NEED_RESEND;
3140 	  }
3141 	  return r;
3142 	}
3143 	
3144 	void Objecter::_cancel_linger_op(Op *op)
3145 	{
3146 	  ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3147 	
3148 	  ceph_assert(!op->should_resend);
3149 	  if (op->onfinish) {
3150 	    delete op->onfinish;
3151 	    num_in_flight--;
3152 	  }
3153 	
3154 	  _finish_op(op, 0);
3155 	}
3156 	
3157 	void Objecter::_finish_op(Op *op, int r)
3158 	{
3159 	  ldout(cct, 15) << __func__ << " " << op->tid << dendl;
3160 	
3161 	  // op->session->lock is locked unique or op->session is null
3162 	
3163 	  if (!op->ctx_budgeted && op->budget >= 0) {
3164 	    put_op_budget_bytes(op->budget);
3165 	    op->budget = -1;
3166 	  }
3167 	
3168 	  if (op->ontimeout && r != -ETIMEDOUT)
3169 	    timer.cancel_event(op->ontimeout);
3170 	
3171 	  if (op->session) {
3172 	    _session_op_remove(op->session, op);
3173 	  }
3174 	
3175 	  logger->dec(l_osdc_op_active);
3176 	
3177 	  ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3178 	
3179 	  inflight_ops--;
3180 	
3181 	  op->put();
3182 	}
3183 	
3184 	MOSDOp *Objecter::_prepare_osd_op(Op *op)
3185 	{
3186 	  // rwlock is locked
3187 	
3188 	  int flags = op->target.flags;
3189 	  flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3190 	
3191 	  // Nothing checks this any longer, but needed for compatibility with
3192 	  // pre-luminous osds
3193 	  flags |= CEPH_OSD_FLAG_ONDISK;
3194 	
3195 	  if (!honor_pool_full)
3196 	    flags |= CEPH_OSD_FLAG_FULL_FORCE;
3197 	
3198 	  op->target.paused = false;
3199 	  op->stamp = ceph::coarse_mono_clock::now();
3200 	
3201 	  hobject_t hobj = op->target.get_hobj();
3202 	  MOSDOp *m = new MOSDOp(client_inc, op->tid,
3203 				 hobj, op->target.actual_pgid,
3204 				 osdmap->get_epoch(),
3205 				 flags, op->features);
3206 	
3207 	  m->set_snapid(op->snapid);
3208 	  m->set_snap_seq(op->snapc.seq);
3209 	  m->set_snaps(op->snapc.snaps);
3210 	
3211 	  m->ops = op->ops;
3212 	  m->set_mtime(op->mtime);
3213 	  m->set_retry_attempt(op->attempts++);
3214 	
3215 	  if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3216 	    op->trace.init("op", &trace_endpoint);
3217 	  }
3218 	
3219 	  if (op->priority)
3220 	    m->set_priority(op->priority);
3221 	  else
3222 	    m->set_priority(cct->_conf->osd_client_op_priority);
3223 	
3224 	  if (op->reqid != osd_reqid_t()) {
3225 	    m->set_reqid(op->reqid);
3226 	  }
3227 	
3228 	  logger->inc(l_osdc_op_send);
3229 	  ssize_t sum = 0;
3230 	  for (unsigned i = 0; i < m->ops.size(); i++) {
3231 	    sum += m->ops[i].indata.length();
3232 	  }
3233 	  logger->inc(l_osdc_op_send_bytes, sum);
3234 	
3235 	  return m;
3236 	}
3237 	
3238 	void Objecter::_send_op(Op *op)
3239 	{
3240 	  // rwlock is locked
3241 	  // op->session->lock is locked
3242 	
3243 	  // backoff?
3244 	  auto p = op->session->backoffs.find(op->target.actual_pgid);
3245 	  if (p != op->session->backoffs.end()) {
3246 	    hobject_t hoid = op->target.get_hobj();
3247 	    auto q = p->second.lower_bound(hoid);
3248 	    if (q != p->second.begin()) {
3249 	      --q;
3250 	      if (hoid >= q->second.end) {
3251 		++q;
3252 	      }
3253 	    }
3254 	    if (q != p->second.end()) {
3255 	      ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3256 			     << "," << q->second.end << ")" << dendl;
3257 	      int r = cmp(hoid, q->second.begin);
3258 	      if (r == 0 || (r > 0 && hoid < q->second.end)) {
3259 		ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3260 			       << " id " << q->second.id << " on " << hoid
3261 			       << ", queuing " << op << " tid " << op->tid << dendl;
3262 		return;
3263 	      }
3264 	    }
3265 	  }
3266 	
3267 	  ceph_assert(op->tid > 0);
3268 	  MOSDOp *m = _prepare_osd_op(op);
3269 	
3270 	  if (op->target.actual_pgid != m->get_spg()) {
3271 	    ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3272 			   << m->get_spg() << " to " << op->target.actual_pgid
3273 			   << ", updating and reencoding" << dendl;
3274 	    m->set_spg(op->target.actual_pgid);
3275 	    m->clear_payload();  // reencode
3276 	  }
3277 	
3278 	  ldout(cct, 15) << "_send_op " << op->tid << " to "
3279 			 << op->target.actual_pgid << " on osd." << op->session->osd
3280 			 << dendl;
3281 	
3282 	  ConnectionRef con = op->session->con;
3283 	  ceph_assert(con);
3284 	
3285 	#if 0
3286 	  // preallocated rx ceph::buffer?
3287 	  if (op->con) {
3288 	    ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
3289 			   << op->con << dendl;
3290 	    op->con->revoke_rx_buffer(op->tid);
3291 	  }
3292 	  if (op->outbl &&
3293 	      op->ontimeout == 0 &&  // only post rx_buffer if no timeout; see #9582
3294 	      op->outbl->length()) {
3295 	    op->outbl->invalidate_crc();  // messenger writes through c_str()
3296 	    ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
3297 			   << dendl;
3298 	    op->con = con;
3299 	    op->con->post_rx_buffer(op->tid, *op->outbl);
3300 	  }
3301 	#endif
3302 	
3303 	  op->incarnation = op->session->incarnation;
3304 	
3305 	  if (op->trace.valid()) {
3306 	    m->trace.init("op msg", nullptr, &op->trace);
3307 	  }
3308 	  op->session->con->send_message(m);
3309 	}
3310 	
3311 	int Objecter::calc_op_budget(const vector<OSDOp>& ops)
3312 	{
3313 	  int op_budget = 0;
3314 	  for (vector<OSDOp>::const_iterator i = ops.begin();
3315 	       i != ops.end();
3316 	       ++i) {
3317 	    if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3318 	      op_budget += i->indata.length();
3319 	    } else if (ceph_osd_op_mode_read(i->op.op)) {
3320 	      if (ceph_osd_op_uses_extent(i->op.op)) {
3321 	        if ((int64_t)i->op.extent.length > 0)
3322 	          op_budget += (int64_t)i->op.extent.length;
3323 	      } else if (ceph_osd_op_type_attr(i->op.op)) {
3324 	        op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3325 	      }
3326 	    }
3327 	  }
3328 	  return op_budget;
3329 	}
3330 	
3331 	void Objecter::_throttle_op(Op *op,
3332 				    shunique_lock& sul,
3333 				    int op_budget)
3334 	{
3335 	  ceph_assert(sul && sul.mutex() == &rwlock);
3336 	  bool locked_for_write = sul.owns_lock();
3337 	
3338 	  if (!op_budget)
3339 	    op_budget = calc_op_budget(op->ops);
3340 	  if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3341 	    sul.unlock();
3342 	    op_throttle_bytes.get(op_budget);
3343 	    if (locked_for_write)
3344 	      sul.lock();
3345 	    else
3346 	      sul.lock_shared();
3347 	  }
3348 	  if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3349 	    sul.unlock();
3350 	    op_throttle_ops.get(1);
3351 	    if (locked_for_write)
3352 	      sul.lock();
3353 	    else
3354 	      sul.lock_shared();
3355 	  }
3356 	}
3357 	
3358 	int Objecter::take_linger_budget(LingerOp *info)
3359 	{
3360 	  return 1;
3361 	}
3362 	
3363 	/* This function DOES put the passed message before returning */
3364 	void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3365 	{
3366 	  ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3367 	
3368 	  // get pio
3369 	  ceph_tid_t tid = m->get_tid();
3370 	
3371 	  shunique_lock sul(rwlock, ceph::acquire_shared);
3372 	  if (!initialized) {
3373 	    m->put();
3374 	    return;
3375 	  }
3376 	
3377 	  ConnectionRef con = m->get_connection();
3378 	  auto priv = con->get_priv();
3379 	  auto s = static_cast<OSDSession*>(priv.get());
3380 	  if (!s || s->con != con) {
3381 	    ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3382 	    m->put();
3383 	    return;
3384 	  }
3385 	
3386 	  OSDSession::unique_lock sl(s->lock);
3387 	
3388 	  map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3389 	  if (iter == s->ops.end()) {
3390 	    ldout(cct, 7) << "handle_osd_op_reply " << tid
3391 			  << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3392 							    " onnvram" : " ack"))
3393 			  << " ... stray" << dendl;
3394 	    sl.unlock();
3395 	    m->put();
3396 	    return;
3397 	  }
3398 	
3399 	  ldout(cct, 7) << "handle_osd_op_reply " << tid
3400 			<< (m->is_ondisk() ? " ondisk" :
3401 			    (m->is_onnvram() ? " onnvram" : " ack"))
3402 			<< " uv " << m->get_user_version()
3403 			<< " in " << m->get_pg()
3404 			<< " attempt " << m->get_retry_attempt()
3405 			<< dendl;
3406 	  Op *op = iter->second;
3407 	  op->trace.event("osd op reply");
3408 	
3409 	  if (retry_writes_after_first_reply && op->attempts == 1 &&
3410 	      (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3411 	    ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3412 	    if (op->onfinish) {
3413 	      num_in_flight--;
3414 	    }
3415 	    _session_op_remove(s, op);
3416 	    sl.unlock();
3417 	
3418 	    _op_submit(op, sul, NULL);
3419 	    m->put();
3420 	    return;
3421 	  }
3422 	
3423 	  if (m->get_retry_attempt() >= 0) {
3424 	    if (m->get_retry_attempt() != (op->attempts - 1)) {
3425 	      ldout(cct, 7) << " ignoring reply from attempt "
3426 			    << m->get_retry_attempt()
3427 			    << " from " << m->get_source_inst()
3428 			    << "; last attempt " << (op->attempts - 1) << " sent to "
3429 			    << op->session->con->get_peer_addr() << dendl;
3430 	      m->put();
3431 	      sl.unlock();
3432 	      return;
3433 	    }
3434 	  } else {
3435 	    // we don't know the request attempt because the server is old, so
3436 	    // just accept this one.  we may do ACK callbacks we shouldn't
3437 	    // have, but that is better than doing callbacks out of order.
3438 	  }
3439 	
3440 	  Context *onfinish = 0;
3441 	
3442 	  int rc = m->get_result();
3443 	
3444 	  if (m->is_redirect_reply()) {
3445 	    ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3446 	    if (op->onfinish)
3447 	      num_in_flight--;
3448 	    _session_op_remove(s, op);
3449 	    sl.unlock();
3450 	
3451 	    // FIXME: two redirects could race and reorder
3452 	
3453 	    op->tid = 0;
3454 	    m->get_redirect().combine_with_locator(op->target.target_oloc,
3455 						   op->target.target_oid.name);
3456 	    op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3457 				 CEPH_OSD_FLAG_IGNORE_CACHE |
3458 				 CEPH_OSD_FLAG_IGNORE_OVERLAY);
3459 	    _op_submit(op, sul, NULL);
3460 	    m->put();
3461 	    return;
3462 	  }
3463 	
3464 	  if (rc == -EAGAIN) {
3465 	    ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
3466 	    if (op->onfinish)
3467 	      num_in_flight--;
3468 	    _session_op_remove(s, op);
3469 	    sl.unlock();
3470 	
3471 	    op->tid = 0;
3472 	    op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3473 				  CEPH_OSD_FLAG_LOCALIZE_READS);
3474 	    op->target.pgid = pg_t();
3475 	    _op_submit(op, sul, NULL);
3476 	    m->put();
3477 	    return;
3478 	  }
3479 	
3480 	  sul.unlock();
3481 	
3482 	  if (op->objver)
3483 	    *op->objver = m->get_user_version();
3484 	  if (op->reply_epoch)
3485 	    *op->reply_epoch = m->get_map_epoch();
3486 	  if (op->data_offset)
3487 	    *op->data_offset = m->get_header().data_off;
3488 	
3489 	  // got data?
3490 	  if (op->outbl) {
3491 	#if 0
3492 	    if (op->con)
3493 	      op->con->revoke_rx_buffer(op->tid);
3494 	#endif
3495 	    auto& bl = m->get_data();
3496 	    if (op->outbl->length() == bl.length() &&
3497 		bl.get_num_buffers() <= 1) {
3498 	      // this is here to keep previous users to *relied* on getting data
3499 	      // read into existing buffers happy.  Notably,
3500 	      // libradosstriper::RadosStriperImpl::aio_read().
3501 	      ldout(cct,10) << __func__ << " copying resulting " << bl.length()
3502 			    << " into existing ceph::buffer of length " << op->outbl->length()
3503 			    << dendl;
3504 	      ceph::buffer::list t;
3505 	      t.claim(*op->outbl);
3506 	      t.invalidate_crc();  // we're overwriting the raw buffers via c_str()
3507 	      bl.copy(0, bl.length(), t.c_str());
3508 	      op->outbl->substr_of(t, 0, bl.length());
3509 	    } else {
3510 	      m->claim_data(*op->outbl);
3511 	    }
3512 	    op->outbl = 0;
3513 	  }
3514 	
3515 	  // per-op result demuxing
3516 	  vector<OSDOp> out_ops;
3517 	  m->claim_ops(out_ops);
3518 	
3519 	  if (out_ops.size() != op->ops.size())
3520 	    ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3521 			  << " != request ops " << op->ops
3522 			  << " from " << m->get_source_inst() << dendl;
3523 	
3524 	  vector<ceph::buffer::list*>::iterator pb = op->out_bl.begin();
3525 	  vector<int*>::iterator pr = op->out_rval.begin();
3526 	  vector<Context*>::iterator ph = op->out_handler.begin();
3527 	  ceph_assert(op->out_bl.size() == op->out_rval.size());
3528 	  ceph_assert(op->out_bl.size() == op->out_handler.size());
3529 	  vector<OSDOp>::iterator p = out_ops.begin();
3530 	  for (unsigned i = 0;
3531 	       p != out_ops.end() && pb != op->out_bl.end();
3532 	       ++i, ++p, ++pb, ++pr, ++ph) {
3533 	    ldout(cct, 10) << " op " << i << " rval " << p->rval
3534 			   << " len " << p->outdata.length() << dendl;
3535 	    if (*pb)
3536 	      **pb = p->outdata;
3537 	    // set rval before running handlers so that handlers
3538 	    // can change it if e.g. decoding fails
3539 	    if (*pr)
3540 	      **pr = ceph_to_hostos_errno(p->rval);
3541 	    if (*ph) {
3542 	      ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
3543 	      (*ph)->complete(ceph_to_hostos_errno(p->rval));
3544 	      *ph = NULL;
3545 	    }
3546 	  }
3547 	
3548 	  // NOTE: we assume that since we only request ONDISK ever we will
3549 	  // only ever get back one (type of) ack ever.
3550 	
3551 	  if (op->onfinish) {
3552 	    num_in_flight--;
3553 	    onfinish = op->onfinish;
3554 	    op->onfinish = NULL;
3555 	  }
3556 	  logger->inc(l_osdc_op_reply);
3557 	
3558 	  /* get it before we call _finish_op() */
3559 	  auto completion_lock = s->get_lock(op->target.base_oid);
3560 	
3561 	  ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3562 	  _finish_op(op, 0);
3563 	
3564 	  ldout(cct, 5) << num_in_flight << " in flight" << dendl;
3565 	
3566 	  // serialize completions
3567 	  if (completion_lock.mutex()) {
3568 	    completion_lock.lock();
3569 	  }
3570 	  sl.unlock();
3571 	
3572 	  // do callbacks
3573 	  if (onfinish) {
3574 	    onfinish->complete(rc);
3575 	  }
3576 	  if (completion_lock.mutex()) {
3577 	    completion_lock.unlock();
3578 	  }
3579 	
3580 	  m->put();
3581 	}
3582 	
3583 	void Objecter::handle_osd_backoff(MOSDBackoff *m)
3584 	{
3585 	  ldout(cct, 10) << __func__ << " " << *m << dendl;
3586 	  shunique_lock sul(rwlock, ceph::acquire_shared);
3587 	  if (!initialized) {
3588 	    m->put();
3589 	    return;
3590 	  }
3591 	
3592 	  ConnectionRef con = m->get_connection();
3593 	  auto priv = con->get_priv();
3594 	  auto s = static_cast<OSDSession*>(priv.get());
3595 	  if (!s || s->con != con) {
3596 	    ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3597 	    m->put();
3598 	    return;
3599 	  }
3600 	
3601 	  get_session(s);
3602 	
3603 	  OSDSession::unique_lock sl(s->lock);
3604 	
3605 	  switch (m->op) {
3606 	  case CEPH_OSD_BACKOFF_OP_BLOCK:
3607 	    {
3608 	      // register
3609 	      OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3610 	      s->backoffs_by_id.insert(make_pair(m->id, &b));
3611 	      b.pgid = m->pgid;
3612 	      b.id = m->id;
3613 	      b.begin = m->begin;
3614 	      b.end = m->end;
3615 	
3616 	      // ack with original backoff's epoch so that the osd can discard this if
3617 	      // there was a pg split.
3618 	      Message *r = new MOSDBackoff(m->pgid,
3619 					   m->map_epoch,
3620 					   CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3621 					   m->id, m->begin, m->end);
3622 	      // this priority must match the MOSDOps from _prepare_osd_op
3623 	      r->set_priority(cct->_conf->osd_client_op_priority);
3624 	      con->send_message(r);
3625 	    }
3626 	    break;
3627 	
3628 	  case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3629 	    {
3630 	      auto p = s->backoffs_by_id.find(m->id);
3631 	      if (p != s->backoffs_by_id.end()) {
3632 		OSDBackoff *b = p->second;
3633 		if (b->begin != m->begin &&
3634 		    b->end != m->end) {
3635 		  lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3636 			     << " unblock on ["
3637 			     << m->begin << "," << m->end << ") but backoff is ["
3638 			     << b->begin << "," << b->end << ")" << dendl;
3639 		  // hrmpf, unblock it anyway.
3640 		}
3641 		ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3642 			       << " id " << b->id
3643 			       << " [" << b->begin << "," << b->end
3644 			       << ")" << dendl;
3645 		auto spgp = s->backoffs.find(b->pgid);
3646 		ceph_assert(spgp != s->backoffs.end());
3647 		spgp->second.erase(b->begin);
3648 		if (spgp->second.empty()) {
3649 		  s->backoffs.erase(spgp);
3650 		}
3651 		s->backoffs_by_id.erase(p);
3652 	
3653 		// check for any ops to resend
3654 		for (auto& q : s->ops) {
3655 		  if (q.second->target.actual_pgid == m->pgid) {
3656 		    int r = q.second->target.contained_by(m->begin, m->end);
3657 		    ldout(cct, 20) << __func__ <<  " contained_by " << r << " on "
3658 				   << q.second->target.get_hobj() << dendl;
3659 		    if (r) {
3660 		      _send_op(q.second);
3661 		    }
3662 		  }
3663 		}
3664 	      } else {
3665 		lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3666 			   << " unblock on ["
3667 			   << m->begin << "," << m->end << ") but backoff dne" << dendl;
3668 	      }
3669 	    }
3670 	    break;
3671 	
3672 	  default:
3673 	    ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3674 	  }
3675 	
3676 	  sul.unlock();
3677 	  sl.unlock();
3678 	
3679 	  m->put();
3680 	  put_session(s);
3681 	}
3682 	
3683 	uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3684 					      uint32_t pos)
3685 	{
3686 	  shared_lock rl(rwlock);
3687 	  list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3688 					pos, list_context->pool_id, string());
3689 	  ldout(cct, 10) << __func__ << " " << list_context
3690 			 << " pos " << pos << " -> " << list_context->pos << dendl;
3691 	  pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3692 	  list_context->current_pg = actual.ps();
3693 	  list_context->at_end_of_pool = false;
3694 	  return pos;
3695 	}
3696 	
3697 	uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3698 					      const hobject_t& cursor)
3699 	{
3700 	  shared_lock rl(rwlock);
3701 	  ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3702 	  list_context->pos = cursor;
3703 	  list_context->at_end_of_pool = false;
3704 	  pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3705 	  list_context->current_pg = actual.ps();
3706 	  list_context->sort_bitwise = true;
3707 	  return list_context->current_pg;
3708 	}
3709 	
3710 	void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3711 	                                        hobject_t *cursor)
3712 	{
3713 	  shared_lock rl(rwlock);
3714 	  if (list_context->list.empty()) {
3715 	    *cursor = list_context->pos;
3716 	  } else {
3717 	    const librados::ListObjectImpl& entry = list_context->list.front();
3718 	    const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3719 	    uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3720 	    *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3721 	  }
3722 	}
3723 	
3724 	void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3725 	{
3726 	  ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3727 			 << " pool_snap_seq " << list_context->pool_snap_seq
3728 			 << " max_entries " << list_context->max_entries
3729 			 << " list_context " << list_context
3730 			 << " onfinish " << onfinish
3731 			 << " current_pg " << list_context->current_pg
3732 			 << " pos " << list_context->pos << dendl;
3733 	
3734 	  shared_lock rl(rwlock);
3735 	  const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3736 	  if (!pool) { // pool is gone
3737 	    rl.unlock();
3738 	    put_nlist_context_budget(list_context);
3739 	    onfinish->complete(-ENOENT);
3740 	    return;
3741 	  }
3742 	  int pg_num = pool->get_pg_num();
3743 	  bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3744 	
3745 	  if (list_context->pos.is_min()) {
3746 	    list_context->starting_pg_num = 0;
3747 	    list_context->sort_bitwise = sort_bitwise;
3748 	    list_context->starting_pg_num = pg_num;
3749 	  }
3750 	  if (list_context->sort_bitwise != sort_bitwise) {
3751 	    list_context->pos = hobject_t(
3752 	      object_t(), string(), CEPH_NOSNAP,
3753 	      list_context->current_pg, list_context->pool_id, string());
3754 	    list_context->sort_bitwise = sort_bitwise;
3755 	    ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3756 			   << list_context->pos << dendl;
3757 	  }
3758 	  if (list_context->starting_pg_num != pg_num) {
3759 	    if (!sort_bitwise) {
3760 	      // start reading from the beginning; the pgs have changed
3761 	      ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3762 	      list_context->pos = collection_list_handle_t();
3763 	    }
3764 	    list_context->starting_pg_num = pg_num;
3765 	  }
3766 	
3767 	  if (list_context->pos.is_max()) {
3768 	    ldout(cct, 20) << __func__ << " end of pool, list "
3769 			   << list_context->list << dendl;
3770 	    if (list_context->list.empty()) {
3771 	      list_context->at_end_of_pool = true;
3772 	    }
3773 	    // release the listing context's budget once all
3774 	    // OPs (in the session) are finished
3775 	    put_nlist_context_budget(list_context);
3776 	    onfinish->complete(0);
3777 	    return;
3778 	  }
3779 	
3780 	  ObjectOperation op;
3781 	  op.pg_nls(list_context->max_entries, list_context->filter,
3782 		    list_context->pos, osdmap->get_epoch());
3783 	  list_context->bl.clear();
3784 	  C_NList *onack = new C_NList(list_context, onfinish, this);
3785 	  object_locator_t oloc(list_context->pool_id, list_context->nspace);
3786 	
3787 	  // note current_pg in case we don't have (or lose) SORTBITWISE
3788 	  list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3789 	  rl.unlock();
3790 	
3791 	  pg_read(list_context->current_pg, oloc, op,
3792 		  &list_context->bl, 0, onack, &onack->epoch,
3793 		  &list_context->ctx_budget);
3794 	}
3795 	
3796 	void Objecter::_nlist_reply(NListContext *list_context, int r,
3797 				    Context *final_finish, epoch_t reply_epoch)
3798 	{
3799 	  ldout(cct, 10) << __func__ << " " << list_context << dendl;
3800 	
3801 	  auto iter = list_context->bl.cbegin();
3802 	  pg_nls_response_t response;
3803 	  decode(response, iter);
3804 	  if (!iter.end()) {
3805 	    // we do this as legacy.
3806 	    ceph::buffer::list legacy_extra_info;
3807 	    decode(legacy_extra_info, iter);
3808 	  }
3809 	
3810 	  // if the osd returns 1 (newer code), or handle MAX, it means we
3811 	  // hit the end of the pg.
3812 	  if ((response.handle.is_max() || r == 1) &&
3813 	      !list_context->sort_bitwise) {
3814 	    // legacy OSD and !sortbitwise, figure out the next PG on our own
3815 	    ++list_context->current_pg;
3816 	    if (list_context->current_pg == list_context->starting_pg_num) {
3817 	      // end of pool
3818 	      list_context->pos = hobject_t::get_max();
3819 	    } else {
3820 	      // next pg
3821 	      list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3822 					    list_context->current_pg,
3823 					    list_context->pool_id, string());
3824 	    }
3825 	  } else {
3826 	    list_context->pos = response.handle;
3827 	  }
3828 	
3829 	  int response_size = response.entries.size();
3830 	  ldout(cct, 20) << " response.entries.size " << response_size
3831 			 << ", response.entries " << response.entries
3832 			 << ", handle " << response.handle
3833 			 << ", tentative new pos " << list_context->pos << dendl;
3834 	  if (response_size) {
3835 	    list_context->list.splice(list_context->list.end(), response.entries);
3836 	  }
3837 	
3838 	  if (list_context->list.size() >= list_context->max_entries) {
3839 	    ldout(cct, 20) << " hit max, returning results so far, "
3840 			   << list_context->list << dendl;
3841 	    // release the listing context's budget once all
3842 	    // OPs (in the session) are finished
3843 	    put_nlist_context_budget(list_context);
3844 	    final_finish->complete(0);
3845 	    return;
3846 	  }
3847 	
3848 	  // continue!
3849 	  list_nobjects(list_context, final_finish);
3850 	}
3851 	
3852 	void Objecter::put_nlist_context_budget(NListContext *list_context)
3853 	{
3854 	  if (list_context->ctx_budget >= 0) {
3855 	    ldout(cct, 10) << " release listing context's budget " <<
3856 	      list_context->ctx_budget << dendl;
3857 	    put_op_budget_bytes(list_context->ctx_budget);
3858 	    list_context->ctx_budget = -1;
3859 	  }
3860 	}
3861 	
3862 	// snapshots
3863 	
3864 	int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3865 				       Context *onfinish)
3866 	{
3867 	  unique_lock wl(rwlock);
3868 	  ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3869 			 << snap_name << dendl;
3870 	
3871 	  const pg_pool_t *p = osdmap->get_pg_pool(pool);
3872 	  if (!p)
3873 	    return -EINVAL;
3874 	  if (p->snap_exists(snap_name.c_str()))
3875 	    return -EEXIST;
3876 	
3877 	  PoolOp *op = new PoolOp;
3878 	  if (!op)
3879 	    return -ENOMEM;
3880 	  op->tid = ++last_tid;
3881 	  op->pool = pool;
3882 	  op->name = snap_name;
3883 	  op->onfinish = onfinish;
3884 	  op->pool_op = POOL_OP_CREATE_SNAP;
3885 	  pool_ops[op->tid] = op;
3886 	
3887 	  pool_op_submit(op);
3888 	
3889 	  return 0;
3890 	}
3891 	
3892 	struct C_SelfmanagedSnap : public Context {
3893 	  ceph::buffer::list bl;
3894 	  snapid_t *psnapid;
3895 	  Context *fin;
3896 	  C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3897 	  void finish(int r) override {
3898 	    if (r == 0) {
3899 	      try {
3900 	        auto p = bl.cbegin();
3901 	        decode(*psnapid, p);
3902 	      } catch (ceph::buffer::error&) {
3903 	        r = -EIO;
3904 	      }
3905 	    }
3906 	    fin->complete(r);
3907 	  }
3908 	};
3909 	
3910 	int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3911 						Context *onfinish)
3912 	{
3913 	  unique_lock wl(rwlock);
3914 	  ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3915 	  PoolOp *op = new PoolOp;
3916 	  if (!op) return -ENOMEM;
3917 	  op->tid = ++last_tid;
3918 	  op->pool = pool;
3919 	  C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3920 	  op->onfinish = fin;
3921 	  op->blp = &fin->bl;
3922 	  op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3923 	  pool_ops[op->tid] = op;
3924 	
3925 	  pool_op_submit(op);
3926 	  return 0;
3927 	}
3928 	
3929 	int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3930 				       Context *onfinish)
3931 	{
3932 	  unique_lock wl(rwlock);
3933 	  ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3934 			 << snap_name << dendl;
3935 	
3936 	  const pg_pool_t *p = osdmap->get_pg_pool(pool);
3937 	  if (!p)
3938 	    return -EINVAL;
3939 	  if (!p->snap_exists(snap_name.c_str()))
3940 	    return -ENOENT;
3941 	
3942 	  PoolOp *op = new PoolOp;
3943 	  if (!op)
3944 	    return -ENOMEM;
3945 	  op->tid = ++last_tid;
3946 	  op->pool = pool;
3947 	  op->name = snap_name;
3948 	  op->onfinish = onfinish;
3949 	  op->pool_op = POOL_OP_DELETE_SNAP;
3950 	  pool_ops[op->tid] = op;
3951 	
3952 	  pool_op_submit(op);
3953 	
3954 	  return 0;
3955 	}
3956 	
3957 	int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3958 					      Context *onfinish)
3959 	{
3960 	  unique_lock wl(rwlock);
3961 	  ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3962 			 << snap << dendl;
3963 	  PoolOp *op = new PoolOp;
3964 	  if (!op) return -ENOMEM;
3965 	  op->tid = ++last_tid;
3966 	  op->pool = pool;
3967 	  op->onfinish = onfinish;
3968 	  op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3969 	  op->snapid = snap;
3970 	  pool_ops[op->tid] = op;
3971 	
3972 	  pool_op_submit(op);
3973 	
3974 	  return 0;
3975 	}
3976 	
3977 	int Objecter::create_pool(string& name, Context *onfinish,
3978 				  int crush_rule)
3979 	{
3980 	  unique_lock wl(rwlock);
3981 	  ldout(cct, 10) << "create_pool name=" << name << dendl;
3982 	
3983 	  if (osdmap->lookup_pg_pool_name(name) >= 0)
3984 	    return -EEXIST;
3985 	
3986 	  PoolOp *op = new PoolOp;
3987 	  if (!op)
3988 	    return -ENOMEM;
3989 	  op->tid = ++last_tid;
3990 	  op->pool = 0;
3991 	  op->name = name;
3992 	  op->onfinish = onfinish;
3993 	  op->pool_op = POOL_OP_CREATE;
3994 	  pool_ops[op->tid] = op;
3995 	  op->crush_rule = crush_rule;
3996 	
3997 	  pool_op_submit(op);
3998 	
3999 	  return 0;
4000 	}
4001 	
4002 	int Objecter::delete_pool(int64_t pool, Context *onfinish)
4003 	{
4004 	  unique_lock wl(rwlock);
4005 	  ldout(cct, 10) << "delete_pool " << pool << dendl;
4006 	
4007 	  if (!osdmap->have_pg_pool(pool))
4008 	    return -ENOENT;
4009 	
4010 	  _do_delete_pool(pool, onfinish);
4011 	  return 0;
4012 	}
4013 	
4014 	int Objecter::delete_pool(const string &pool_name, Context *onfinish)
4015 	{
4016 	  unique_lock wl(rwlock);
4017 	  ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4018 	
4019 	  int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4020 	  if (pool < 0)
4021 	    return pool;
4022 	
4023 	  _do_delete_pool(pool, onfinish);
4024 	  return 0;
4025 	}
4026 	
4027 	void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
4028 	{
4029 	  PoolOp *op = new PoolOp;
4030 	  op->tid = ++last_tid;
4031 	  op->pool = pool;
4032 	  op->name = "delete";
4033 	  op->onfinish = onfinish;
4034 	  op->pool_op = POOL_OP_DELETE;
4035 	  pool_ops[op->tid] = op;
4036 	  pool_op_submit(op);
4037 	}
4038 	
4039 	void Objecter::pool_op_submit(PoolOp *op)
4040 	{
4041 	  // rwlock is locked
4042 	  if (mon_timeout > timespan(0)) {
4043 	    op->ontimeout = timer.add_event(mon_timeout,
4044 					    [this, op]() {
4045 					      pool_op_cancel(op->tid, -ETIMEDOUT); });
4046 	  }
4047 	  _pool_op_submit(op);
4048 	}
4049 	
4050 	void Objecter::_pool_op_submit(PoolOp *op)
4051 	{
4052 	  // rwlock is locked unique
4053 	
4054 	  ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4055 	  MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4056 				   op->name, op->pool_op,
4057 				   last_seen_osdmap_version);
4058 	  if (op->snapid) m->snapid = op->snapid;
4059 	  if (op->crush_rule) m->crush_rule = op->crush_rule;
4060 	  monc->send_mon_message(m);
4061 	  op->last_submit = ceph::coarse_mono_clock::now();
4062 	
4063 	  logger->inc(l_osdc_poolop_send);
4064 	}
4065 	
4066 	/**
4067 	 * Handle a reply to a PoolOp message. Check that we sent the message
4068 	 * and give the caller responsibility for the returned ceph::buffer::list.
4069 	 * Then either call the finisher or stash the PoolOp, depending on if we
4070 	 * have a new enough map.
4071 	 * Lastly, clean up the message and PoolOp.
4072 	 */
4073 	void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4074 	{
4075 	  FUNCTRACE(cct);
4076 	  shunique_lock sul(rwlock, acquire_shared);
4077 	  if (!initialized) {
4078 	    sul.unlock();
4079 	    m->put();
4080 	    return;
4081 	  }
4082 	
4083 	  ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4084 	  ceph_tid_t tid = m->get_tid();
4085 	  map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4086 	  if (iter != pool_ops.end()) {
4087 	    PoolOp *op = iter->second;
4088 	    ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4089 			   << ceph_pool_op_name(op->pool_op) << dendl;
4090 	    if (op->blp)
4091 	      op->blp->claim(m->response_data);
4092 	    if (m->version > last_seen_osdmap_version)
4093 	      last_seen_osdmap_version = m->version;
4094 	    if (osdmap->get_epoch() < m->epoch) {
4095 	      sul.unlock();
4096 	      sul.lock();
4097 	      // recheck op existence since we have let go of rwlock
4098 	      // (for promotion) above.
4099 	      iter = pool_ops.find(tid);
4100 	      if (iter == pool_ops.end())
4101 		goto done; // op is gone.
4102 	      if (osdmap->get_epoch() < m->epoch) {
4103 		ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4104 			       << " before calling back" << dendl;
4105 		_wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4106 	      } else {
4107 		// map epoch changed, probably because a MOSDMap message
4108 		// sneaked in. Do caller-specified callback now or else
4109 		// we lose it forever.
4110 		ceph_assert(op->onfinish);
4111 		op->onfinish->complete(m->replyCode);
4112 	      }
4113 	    } else {
4114 	      ceph_assert(op->onfinish);
4115 	      op->onfinish->complete(m->replyCode);
4116 	    }
4117 	    op->onfinish = NULL;
4118 	    if (!sul.owns_lock()) {
4119 	      sul.unlock();
4120 	      sul.lock();
4121 	    }
4122 	    iter = pool_ops.find(tid);
4123 	    if (iter != pool_ops.end()) {
4124 	      _finish_pool_op(op, 0);
4125 	    }
4126 	  } else {
4127 	    ldout(cct, 10) << "unknown request " << tid << dendl;
4128 	  }
4129 	
4130 	done:
4131 	  // Not strictly necessary, since we'll release it on return.
4132 	  sul.unlock();
4133 	
4134 	  ldout(cct, 10) << "done" << dendl;
4135 	  m->put();
4136 	}
4137 	
4138 	int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4139 	{
4140 	  ceph_assert(initialized);
4141 	
4142 	  unique_lock wl(rwlock);
4143 	
4144 	  map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4145 	  if (it == pool_ops.end()) {
4146 	    ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4147 	    return -ENOENT;
4148 	  }
4149 	
4150 	  ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4151 	
4152 	  PoolOp *op = it->second;
4153 	  if (op->onfinish)
4154 	    op->onfinish->complete(r);
4155 	
4156 	  _finish_pool_op(op, r);
4157 	  return 0;
4158 	}
4159 	
4160 	void Objecter::_finish_pool_op(PoolOp *op, int r)
4161 	{
4162 	  // rwlock is locked unique
4163 	  pool_ops.erase(op->tid);
4164 	  logger->set(l_osdc_poolop_active, pool_ops.size());
4165 	
4166 	  if (op->ontimeout && r != -ETIMEDOUT) {
4167 	    timer.cancel_event(op->ontimeout);
4168 	  }
4169 	
4170 	  delete op;
4171 	}
4172 	
4173 	// pool stats
4174 	
4175 	void Objecter::get_pool_stats(list<string>& pools,
4176 				      map<string,pool_stat_t> *result,
4177 				      bool *per_pool,
4178 				      Context *onfinish)
4179 	{
4180 	  ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4181 	
4182 	  PoolStatOp *op = new PoolStatOp;
4183 	  op->tid = ++last_tid;
4184 	  op->pools = pools;
4185 	  op->pool_stats = result;
4186 	  op->per_pool = per_pool;
4187 	  op->onfinish = onfinish;
4188 	  if (mon_timeout > timespan(0)) {
4189 	    op->ontimeout = timer.add_event(mon_timeout,
4190 					    [this, op]() {
4191 					      pool_stat_op_cancel(op->tid,
4192 								  -ETIMEDOUT); });
4193 	  } else {
4194 	    op->ontimeout = 0;
4195 	  }
4196 	
4197 	  unique_lock wl(rwlock);
4198 	
4199 	  poolstat_ops[op->tid] = op;
4200 	
4201 	  logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4202 	
4203 	  _poolstat_submit(op);
4204 	}
4205 	
4206 	void Objecter::_poolstat_submit(PoolStatOp *op)
4207 	{
4208 	  ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4209 	  monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4210 						   op->pools,
4211 						   last_seen_pgmap_version));
4212 	  op->last_submit = ceph::coarse_mono_clock::now();
4213 	
4214 	  logger->inc(l_osdc_poolstat_send);
4215 	}
4216 	
4217 	void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4218 	{
4219 	  ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4220 	  ceph_tid_t tid = m->get_tid();
4221 	
4222 	  unique_lock wl(rwlock);
4223 	  if (!initialized) {
4224 	    m->put();
4225 	    return;
4226 	  }
4227 	
4228 	  map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4229 	  if (iter != poolstat_ops.end()) {
4230 	    PoolStatOp *op = poolstat_ops[tid];
4231 	    ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4232 	    *op->pool_stats = m->pool_stats;
4233 	    *op->per_pool = m->per_pool;
4234 	    if (m->version > last_seen_pgmap_version) {
4235 	      last_seen_pgmap_version = m->version;
4236 	    }
4237 	    op->onfinish->complete(0);
4238 	    _finish_pool_stat_op(op, 0);
4239 	  } else {
4240 	    ldout(cct, 10) << "unknown request " << tid << dendl;
4241 	  }
4242 	  ldout(cct, 10) << "done" << dendl;
4243 	  m->put();
4244 	}
4245 	
4246 	int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4247 	{
4248 	  ceph_assert(initialized);
4249 	
4250 	  unique_lock wl(rwlock);
4251 	
4252 	  map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4253 	  if (it == poolstat_ops.end()) {
4254 	    ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4255 	    return -ENOENT;
4256 	  }
4257 	
4258 	  ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4259 	
4260 	  PoolStatOp *op = it->second;
4261 	  if (op->onfinish)
4262 	    op->onfinish->complete(r);
4263 	  _finish_pool_stat_op(op, r);
4264 	  return 0;
4265 	}
4266 	
4267 	void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4268 	{
4269 	  // rwlock is locked unique
4270 	
4271 	  poolstat_ops.erase(op->tid);
4272 	  logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4273 	
4274 	  if (op->ontimeout && r != -ETIMEDOUT)
4275 	    timer.cancel_event(op->ontimeout);
4276 	
4277 	  delete op;
4278 	}
4279 	
4280 	void Objecter::get_fs_stats(ceph_statfs& result,
4281 				    boost::optional<int64_t> data_pool,
4282 				    Context *onfinish)
4283 	{
4284 	  ldout(cct, 10) << "get_fs_stats" << dendl;
4285 	  unique_lock l(rwlock);
4286 	
4287 	  StatfsOp *op = new StatfsOp;
4288 	  op->tid = ++last_tid;
4289 	  op->stats = &result;
4290 	  op->data_pool = data_pool;
4291 	  op->onfinish = onfinish;
4292 	  if (mon_timeout > timespan(0)) {
4293 	    op->ontimeout = timer.add_event(mon_timeout,
4294 					    [this, op]() {
4295 					      statfs_op_cancel(op->tid,
4296 							       -ETIMEDOUT); });
4297 	  } else {
4298 	    op->ontimeout = 0;
4299 	  }
4300 	  statfs_ops[op->tid] = op;
4301 	
4302 	  logger->set(l_osdc_statfs_active, statfs_ops.size());
4303 	
4304 	  _fs_stats_submit(op);
4305 	}
4306 	
4307 	void Objecter::_fs_stats_submit(StatfsOp *op)
4308 	{
4309 	  // rwlock is locked unique
4310 	
4311 	  ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4312 	  monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
4313 					     op->data_pool,
4314 					     last_seen_pgmap_version));
4315 	  op->last_submit = ceph::coarse_mono_clock::now();
4316 	
4317 	  logger->inc(l_osdc_statfs_send);
4318 	}
4319 	
4320 	void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4321 	{
4322 	  unique_lock wl(rwlock);
4323 	  if (!initialized) {
4324 	    m->put();
4325 	    return;
4326 	  }
4327 	
4328 	  ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4329 	  ceph_tid_t tid = m->get_tid();
4330 	
4331 	  if (statfs_ops.count(tid)) {
4332 	    StatfsOp *op = statfs_ops[tid];
4333 	    ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4334 	    *(op->stats) = m->h.st;
4335 	    if (m->h.version > last_seen_pgmap_version)
4336 	      last_seen_pgmap_version = m->h.version;
4337 	    op->onfinish->complete(0);
4338 	    _finish_statfs_op(op, 0);
4339 	  } else {
4340 	    ldout(cct, 10) << "unknown request " << tid << dendl;
4341 	  }
4342 	  m->put();
4343 	  ldout(cct, 10) << "done" << dendl;
4344 	}
4345 	
4346 	int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4347 	{
4348 	  ceph_assert(initialized);
4349 	
4350 	  unique_lock wl(rwlock);
4351 	
4352 	  map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4353 	  if (it == statfs_ops.end()) {
4354 	    ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4355 	    return -ENOENT;
4356 	  }
4357 	
4358 	  ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4359 	
4360 	  StatfsOp *op = it->second;
4361 	  if (op->onfinish)
4362 	    op->onfinish->complete(r);
4363 	  _finish_statfs_op(op, r);
4364 	  return 0;
4365 	}
4366 	
4367 	void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4368 	{
4369 	  // rwlock is locked unique
4370 	
4371 	  statfs_ops.erase(op->tid);
4372 	  logger->set(l_osdc_statfs_active, statfs_ops.size());
4373 	
4374 	  if (op->ontimeout && r != -ETIMEDOUT)
4375 	    timer.cancel_event(op->ontimeout);
4376 	
4377 	  delete op;
4378 	}
4379 	
4380 	// scatter/gather
4381 	
4382 	void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4383 				       vector<ceph::buffer::list>& resultbl,
4384 				       ceph::buffer::list *bl, Context *onfinish)
4385 	{
4386 	  // all done
4387 	  ldout(cct, 15) << "_sg_read_finish" << dendl;
4388 	
4389 	  if (extents.size() > 1) {
4390 	    Striper::StripedReadResult r;
4391 	    vector<ceph::buffer::list>::iterator bit = resultbl.begin();
4392 	    for (vector<ObjectExtent>::iterator eit = extents.begin();
4393 		 eit != extents.end();
4394 		 ++eit, ++bit) {
4395 	      r.add_partial_result(cct, *bit, eit->buffer_extents);
4396 	    }
4397 	    bl->clear();
4398 	    r.assemble_result(cct, *bl, false);
4399 	  } else {
4400 	    ldout(cct, 15) << "  only one frag" << dendl;
4401 	    bl->claim(resultbl[0]);
4402 	  }
4403 	
4404 	  // done
4405 	  uint64_t bytes_read = bl->length();
4406 	  ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4407 	
4408 	  if (onfinish) {
4409 	    onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4410 	  }
4411 	}
4412 	
4413 	
4414 	void Objecter::ms_handle_connect(Connection *con)
4415 	{
4416 	  ldout(cct, 10) << "ms_handle_connect " << con << dendl;
4417 	  if (!initialized)
4418 	    return;
4419 	
4420 	  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4421 	    resend_mon_ops();
4422 	}
4423 	
4424 	bool Objecter::ms_handle_reset(Connection *con)
4425 	{
4426 	  if (!initialized)
4427 	    return false;
4428 	  if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4429 	    unique_lock wl(rwlock);
4430 	
4431 	    auto priv = con->get_priv();
4432 	    auto session = static_cast<OSDSession*>(priv.get());
4433 	    if (session) {
4434 	      ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4435 			    << " osd." << session->osd << dendl;
4436 	      // the session maybe had been closed if new osdmap just handled
4437 	      // says the osd down
4438 	      if (!(initialized && osdmap->is_up(session->osd))) {
4439 		ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
4440 		wl.unlock();
4441 		return false;
4442 	      }
4443 	      map<uint64_t, LingerOp *> lresend;
4444 	      OSDSession::unique_lock sl(session->lock);
4445 	      _reopen_session(session);
4446 	      _kick_requests(session, lresend);
4447 	      sl.unlock();
4448 	      _linger_ops_resend(lresend, wl);
4449 	      wl.unlock();
4450 	      maybe_request_map();
4451 	    }
4452 	    return true;
4453 	  }
4454 	  return false;
4455 	}
4456 	
4457 	void Objecter::ms_handle_remote_reset(Connection *con)
4458 	{
4459 	  /*
4460 	   * treat these the same.
4461 	   */
4462 	  ms_handle_reset(con);
4463 	}
4464 	
4465 	bool Objecter::ms_handle_refused(Connection *con)
4466 	{
4467 	  // just log for now
4468 	  if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4469 	    int osd = osdmap->identify_osd(con->get_peer_addr());
4470 	    if (osd >= 0) {
4471 	      ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4472 	    }
4473 	  }
4474 	  return false;
4475 	}
4476 	
4477 	void Objecter::op_target_t::dump(Formatter *f) const
4478 	{
4479 	  f->dump_stream("pg") << pgid;
4480 	  f->dump_int("osd", osd);
4481 	  f->dump_stream("object_id") << base_oid;
4482 	  f->dump_stream("object_locator") << base_oloc;
4483 	  f->dump_stream("target_object_id") << target_oid;
4484 	  f->dump_stream("target_object_locator") << target_oloc;
4485 	  f->dump_int("paused", (int)paused);
4486 	  f->dump_int("used_replica", (int)used_replica);
4487 	  f->dump_int("precalc_pgid", (int)precalc_pgid);
4488 	}
4489 	
4490 	void Objecter::_dump_active(OSDSession *s)
4491 	{
4492 	  for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4493 	       p != s->ops.end();
4494 	       ++p) {
4495 	    Op *op = p->second;
4496 	    ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4497 			   << "\tosd." << (op->session ? op->session->osd : -1)
4498 			   << "\t" << op->target.base_oid
4499 			   << "\t" << op->ops << dendl;
4500 	  }
4501 	}
4502 	
4503 	void Objecter::_dump_active()
4504 	{
4505 	  ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
4506 			 << dendl;
4507 	  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4508 	       siter != osd_sessions.end(); ++siter) {
4509 	    OSDSession *s = siter->second;
4510 	    OSDSession::shared_lock sl(s->lock);
4511 	    _dump_active(s);
4512 	    sl.unlock();
4513 	  }
4514 	  _dump_active(homeless_session);
4515 	}
4516 	
4517 	void Objecter::dump_active()
4518 	{
4519 	  shared_lock rl(rwlock);
4520 	  _dump_active();
4521 	  rl.unlock();
4522 	}
4523 	
4524 	void Objecter::dump_requests(Formatter *fmt)
4525 	{
4526 	  // Read-lock on Objecter held here
4527 	  fmt->open_object_section("requests");
4528 	  dump_ops(fmt);
4529 	  dump_linger_ops(fmt);
4530 	  dump_pool_ops(fmt);
4531 	  dump_pool_stat_ops(fmt);
4532 	  dump_statfs_ops(fmt);
4533 	  dump_command_ops(fmt);
4534 	  fmt->close_section(); // requests object
4535 	}
4536 	
4537 	void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4538 	{
4539 	  for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4540 	       p != s->ops.end();
4541 	       ++p) {
4542 	    Op *op = p->second;
4543 	    auto age = std::chrono::duration<double>(coarse_mono_clock::now() - op->stamp);
4544 	    fmt->open_object_section("op");
4545 	    fmt->dump_unsigned("tid", op->tid);
4546 	    op->target.dump(fmt);
4547 	    fmt->dump_stream("last_sent") << op->stamp;
4548 	    fmt->dump_float("age", age.count());
4549 	    fmt->dump_int("attempts", op->attempts);
4550 	    fmt->dump_stream("snapid") << op->snapid;
4551 	    fmt->dump_stream("snap_context") << op->snapc;
4552 	    fmt->dump_stream("mtime") << op->mtime;
4553 	
4554 	    fmt->open_array_section("osd_ops");
4555 	    for (vector<OSDOp>::const_iterator it = op->ops.begin();
4556 		 it != op->ops.end();
4557 		 ++it) {
4558 	      fmt->dump_stream("osd_op") << *it;
4559 	    }
4560 	    fmt->close_section(); // osd_ops array
4561 	
4562 	    fmt->close_section(); // op object
4563 	  }
4564 	}
4565 	
4566 	void Objecter::dump_ops(Formatter *fmt)
4567 	{
4568 	  // Read-lock on Objecter held
4569 	  fmt->open_array_section("ops");
4570 	  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4571 	       siter != osd_sessions.end(); ++siter) {
4572 	    OSDSession *s = siter->second;
4573 	    OSDSession::shared_lock sl(s->lock);
4574 	    _dump_ops(s, fmt);
4575 	    sl.unlock();
4576 	  }
4577 	  _dump_ops(homeless_session, fmt);
4578 	  fmt->close_section(); // ops array
4579 	}
4580 	
4581 	void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4582 	{
4583 	  for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4584 	       p != s->linger_ops.end();
4585 	       ++p) {
4586 	    LingerOp *op = p->second;
4587 	    fmt->open_object_section("linger_op");
4588 	    fmt->dump_unsigned("linger_id", op->linger_id);
4589 	    op->target.dump(fmt);
4590 	    fmt->dump_stream("snapid") << op->snap;
4591 	    fmt->dump_stream("registered") << op->registered;
4592 	    fmt->close_section(); // linger_op object
4593 	  }
4594 	}
4595 	
4596 	void Objecter::dump_linger_ops(Formatter *fmt)
4597 	{
4598 	  // We have a read-lock on the objecter
4599 	  fmt->open_array_section("linger_ops");
4600 	  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4601 	       siter != osd_sessions.end(); ++siter) {
4602 	    OSDSession *s = siter->second;
4603 	    OSDSession::shared_lock sl(s->lock);
4604 	    _dump_linger_ops(s, fmt);
4605 	    sl.unlock();
4606 	  }
4607 	  _dump_linger_ops(homeless_session, fmt);
4608 	  fmt->close_section(); // linger_ops array
4609 	}
4610 	
4611 	void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4612 	{
4613 	  for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4614 	       p != s->command_ops.end();
4615 	       ++p) {
4616 	    CommandOp *op = p->second;
4617 	    fmt->open_object_section("command_op");
4618 	    fmt->dump_unsigned("command_id", op->tid);
4619 	    fmt->dump_int("osd", op->session ? op->session->osd : -1);
4620 	    fmt->open_array_section("command");
4621 	    for (vector<string>::const_iterator q = op->cmd.begin();
4622 		 q != op->cmd.end(); ++q)
4623 	      fmt->dump_string("word", *q);
4624 	    fmt->close_section();
4625 	    if (op->target_osd >= 0)
4626 	      fmt->dump_int("target_osd", op->target_osd);
4627 	    else
4628 	      fmt->dump_stream("target_pg") << op->target_pg;
4629 	    fmt->close_section(); // command_op object
4630 	  }
4631 	}
4632 	
4633 	void Objecter::dump_command_ops(Formatter *fmt)
4634 	{
4635 	  // We have a read-lock on the Objecter here
4636 	  fmt->open_array_section("command_ops");
4637 	  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4638 	       siter != osd_sessions.end(); ++siter) {
4639 	    OSDSession *s = siter->second;
4640 	    OSDSession::shared_lock sl(s->lock);
4641 	    _dump_command_ops(s, fmt);
4642 	    sl.unlock();
4643 	  }
4644 	  _dump_command_ops(homeless_session, fmt);
4645 	  fmt->close_section(); // command_ops array
4646 	}
4647 	
4648 	void Objecter::dump_pool_ops(Formatter *fmt) const
4649 	{
4650 	  fmt->open_array_section("pool_ops");
4651 	  for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4652 	       p != pool_ops.end();
4653 	       ++p) {
4654 	    PoolOp *op = p->second;
4655 	    fmt->open_object_section("pool_op");
4656 	    fmt->dump_unsigned("tid", op->tid);
4657 	    fmt->dump_int("pool", op->pool);
4658 	    fmt->dump_string("name", op->name);
4659 	    fmt->dump_int("operation_type", op->pool_op);
4660 	    fmt->dump_unsigned("crush_rule", op->crush_rule);
4661 	    fmt->dump_stream("snapid") << op->snapid;
4662 	    fmt->dump_stream("last_sent") << op->last_submit;
4663 	    fmt->close_section(); // pool_op object
4664 	  }
4665 	  fmt->close_section(); // pool_ops array
4666 	}
4667 	
4668 	void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4669 	{
4670 	  fmt->open_array_section("pool_stat_ops");
4671 	  for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4672 	       p != poolstat_ops.end();
4673 	       ++p) {
4674 	    PoolStatOp *op = p->second;
4675 	    fmt->open_object_section("pool_stat_op");
4676 	    fmt->dump_unsigned("tid", op->tid);
4677 	    fmt->dump_stream("last_sent") << op->last_submit;
4678 	
4679 	    fmt->open_array_section("pools");
4680 	    for (list<string>::const_iterator it = op->pools.begin();
4681 		 it != op->pools.end();
4682 		 ++it) {
4683 	      fmt->dump_string("pool", *it);
4684 	    }
4685 	    fmt->close_section(); // pools array
4686 	
4687 	    fmt->close_section(); // pool_stat_op object
4688 	  }
4689 	  fmt->close_section(); // pool_stat_ops array
4690 	}
4691 	
4692 	void Objecter::dump_statfs_ops(Formatter *fmt) const
4693 	{
4694 	  fmt->open_array_section("statfs_ops");
4695 	  for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4696 	       p != statfs_ops.end();
4697 	       ++p) {
4698 	    StatfsOp *op = p->second;
4699 	    fmt->open_object_section("statfs_op");
4700 	    fmt->dump_unsigned("tid", op->tid);
4701 	    fmt->dump_stream("last_sent") << op->last_submit;
4702 	    fmt->close_section(); // statfs_op object
4703 	  }
4704 	  fmt->close_section(); // statfs_ops array
4705 	}
4706 	
4707 	Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4708 	  m_objecter(objecter)
4709 	{
4710 	}
4711 	
4712 	int Objecter::RequestStateHook::call(std::string_view command,
4713 					     const cmdmap_t& cmdmap,
4714 					     Formatter *f,
4715 					     std::ostream& ss,
4716 					     ceph::buffer::list& out)
4717 	{
4718 	  shared_lock rl(m_objecter->rwlock);
4719 	  m_objecter->dump_requests(f);
4720 	  return 0;
4721 	}
4722 	
4723 	void Objecter::blacklist_self(bool set)
4724 	{
4725 	  ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4726 	
4727 	  vector<string> cmd;
4728 	  cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4729 	  if (set)
4730 	    cmd.push_back("\"blacklistop\":\"add\",");
4731 	  else
4732 	    cmd.push_back("\"blacklistop\":\"rm\",");
4733 	  stringstream ss;
4734 	  // this is somewhat imprecise in that we are blacklisting our first addr only
4735 	  ss << messenger->get_myaddrs().front().get_legacy_str();
4736 	  cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4737 	
4738 	  MMonCommand *m = new MMonCommand(monc->get_fsid());
4739 	  m->cmd = cmd;
4740 	
4741 	  monc->send_mon_message(m);
4742 	}
4743 	
4744 	// commands
4745 	
4746 	void Objecter::handle_command_reply(MCommandReply *m)
4747 	{
4748 	  unique_lock wl(rwlock);
4749 	  if (!initialized) {
4750 	    m->put();
4751 	    return;
4752 	  }
4753 	
4754 	  ConnectionRef con = m->get_connection();
4755 	  auto priv = con->get_priv();
4756 	  auto s = static_cast<OSDSession*>(priv.get());
4757 	  if (!s || s->con != con) {
4758 	    ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4759 	    m->put();
4760 	    return;
4761 	  }
4762 	
4763 	  OSDSession::shared_lock sl(s->lock);
4764 	  map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4765 	  if (p == s->command_ops.end()) {
4766 	    ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4767 			   << " not found" << dendl;
4768 	    m->put();
4769 	    sl.unlock();
4770 	    return;
4771 	  }
4772 	
4773 	  CommandOp *c = p->second;
4774 	  if (!c->session ||
4775 	      m->get_connection() != c->session->con) {
4776 	    ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4777 			   << " got reply from wrong connection "
4778 			   << m->get_connection() << " " << m->get_source_inst()
4779 			   << dendl;
4780 	    m->put();
4781 	    sl.unlock();
4782 	    return;
4783 	  }
4784 	  if (m->r == -EAGAIN) {
4785 	    ldout(cct,10) << __func__ << " tid " << m->get_tid()
4786 			  << " got EAGAIN, requesting map and resending" << dendl;
4787 	    // NOTE: This might resend twice... once now, and once again when
4788 	    // we get an updated osdmap and the PG is found to have moved.
4789 	    _maybe_request_map();
4790 	    _send_command(c);
4791 	    m->put();
4792 	    sl.unlock();
4793 	    return;
4794 	  }
4795 	
4796 	  if (c->poutbl) {
4797 	    c->poutbl->claim(m->get_data());
4798 	  }
4799 	
4800 	  sl.unlock();
4801 	
4802 	  OSDSession::unique_lock sul(s->lock);
4803 	  _finish_command(c, m->r, m->rs);
4804 	  sul.unlock();
4805 	
4806 	  m->put();
4807 	}
4808 	
4809 	void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4810 	{
4811 	  shunique_lock sul(rwlock, ceph::acquire_unique);
4812 	
4813 	  ceph_tid_t tid = ++last_tid;
4814 	  ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4815 	  c->tid = tid;
4816 	
4817 	  {
4818 	    OSDSession::unique_lock hs_wl(homeless_session->lock);
4819 	    _session_command_op_assign(homeless_session, c);
4820 	  }
4821 	
4822 	  _calc_command_target(c, sul);
4823 	  _assign_command_session(c, sul);
4824 	  if (osd_timeout > timespan(0)) {
4825 	    c->ontimeout = timer.add_event(osd_timeout,
4826 					   [this, c, tid]() {
4827 					     command_op_cancel(c->session, tid,
4828 							       -ETIMEDOUT); });
4829 	  }
4830 	
4831 	  if (!c->session->is_homeless()) {
4832 	    _send_command(c);
4833 	  } else {
4834 	    _maybe_request_map();
4835 	  }
4836 	  if (c->map_check_error)
4837 	    _send_command_map_check(c);
4838 	  *ptid = tid;
4839 	
4840 	  logger->inc(l_osdc_command_active);
4841 	}
4842 	
4843 	int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4844 	{
4845 	  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
4846 	
4847 	  c->map_check_error = 0;
4848 	
4849 	  // ignore overlays, just like we do with pg ops
4850 	  c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4851 	
4852 	  if (c->target_osd >= 0) {
4853 	    if (!osdmap->exists(c->target_osd)) {
4854 	      c->map_check_error = -ENOENT;
4855 	      c->map_check_error_str = "osd dne";
4856 	      c->target.osd = -1;
4857 	      return RECALC_OP_TARGET_OSD_DNE;
4858 	    }
4859 	    if (osdmap->is_down(c->target_osd)) {
4860 	      c->map_check_error = -ENXIO;
4861 	      c->map_check_error_str = "osd down";
4862 	      c->target.osd = -1;
4863 	      return RECALC_OP_TARGET_OSD_DOWN;
4864 	    }
4865 	    c->target.osd = c->target_osd;
4866 	  } else {
4867 	    int ret = _calc_target(&(c->target), nullptr, true);
4868 	    if (ret == RECALC_OP_TARGET_POOL_DNE) {
4869 	      c->map_check_error = -ENOENT;
4870 	      c->map_check_error_str = "pool dne";
4871 	      c->target.osd = -1;
4872 	      return ret;
4873 	    } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4874 	      c->map_check_error = -ENXIO;
4875 	      c->map_check_error_str = "osd down";
4876 	      c->target.osd = -1;
4877 	      return ret;
4878 	    }
4879 	  }
4880 	
4881 	  OSDSession *s;
4882 	  int r = _get_session(c->target.osd, &s, sul);
4883 	  ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4884 	
4885 	  if (c->session != s) {
4886 	    put_session(s);
4887 	    return RECALC_OP_TARGET_NEED_RESEND;
4888 	  }
4889 	
4890 	  put_session(s);
4891 	
4892 	  ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4893 			 << c->session << dendl;
4894 	
4895 	  return RECALC_OP_TARGET_NO_ACTION;
4896 	}
4897 	
4898 	void Objecter::_assign_command_session(CommandOp *c,
4899 					       shunique_lock& sul)
4900 	{
4901 	  ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
4902 	
4903 	  OSDSession *s;
4904 	  int r = _get_session(c->target.osd, &s, sul);
4905 	  ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4906 	
4907 	  if (c->session != s) {
4908 	    if (c->session) {
4909 	      OSDSession *cs = c->session;
4910 	      OSDSession::unique_lock csl(cs->lock);
4911 	      _session_command_op_remove(c->session, c);
4912 	      csl.unlock();
4913 	    }
4914 	    OSDSession::unique_lock sl(s->lock);
4915 	    _session_command_op_assign(s, c);
4916 	  }
4917 	
4918 	  put_session(s);
4919 	}
4920 	
4921 	void Objecter::_send_command(CommandOp *c)
4922 	{
4923 	  ldout(cct, 10) << "_send_command " << c->tid << dendl;
4924 	  ceph_assert(c->session);
4925 	  ceph_assert(c->session->con);
4926 	  MCommand *m = new MCommand(monc->monmap.fsid);
4927 	  m->cmd = c->cmd;
4928 	  m->set_data(c->inbl);
4929 	  m->set_tid(c->tid);
4930 	  c->session->con->send_message(m);
4931 	  logger->inc(l_osdc_command_send);
4932 	}
4933 	
4934 	int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4935 	{
4936 	  ceph_assert(initialized);
4937 	
4938 	  unique_lock wl(rwlock);
4939 	
4940 	  map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4941 	  if (it == s->command_ops.end()) {
4942 	    ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4943 	    return -ENOENT;
4944 	  }
4945 	
4946 	  ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4947 	
4948 	  CommandOp *op = it->second;
4949 	  _command_cancel_map_check(op);
4950 	  OSDSession::unique_lock sl(op->session->lock);
4951 	  _finish_command(op, r, "");
4952 	  sl.unlock();
4953 	  return 0;
4954 	}
4955 	
4956 	void Objecter::_finish_command(CommandOp *c, int r, string rs)
4957 	{
4958 	  // rwlock is locked unique
4959 	  // session lock is locked
4960 	
4961 	  ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4962 			 << rs << dendl;
4963 	  if (c->prs)
4964 	    *c->prs = rs;
4965 	  if (c->onfinish)
4966 	    c->onfinish->complete(r);
4967 	
4968 	  if (c->ontimeout && r != -ETIMEDOUT)
4969 	    timer.cancel_event(c->ontimeout);
4970 	
4971 	  _session_command_op_remove(c->session, c);
4972 	
4973 	  c->put();
4974 	
4975 	  logger->dec(l_osdc_command_active);
4976 	}
4977 	
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
4978 	Objecter::OSDSession::~OSDSession()
4979 	{
4980 	  // Caller is responsible for re-assigning or
4981 	  // destroying any ops that were assigned to us
4982 	  ceph_assert(ops.empty());
4983 	  ceph_assert(linger_ops.empty());
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
4984 	  ceph_assert(command_ops.empty());
4985 	}
4986 	
4987 	Objecter::Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
4988 			   Finisher *fin,
4989 			   double mon_timeout,
4990 			   double osd_timeout) :
4991 	  Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
4992 	  trace_endpoint("0.0.0.0", 0, "Objecter"),
4993 	  osdmap{std::make_unique<OSDMap>()},
4994 	  homeless_session(new OSDSession(cct, -1)),
4995 	  mon_timeout(ceph::make_timespan(mon_timeout)),
4996 	  osd_timeout(ceph::make_timespan(osd_timeout)),
4997 	  op_throttle_bytes(cct, "objecter_bytes",
4998 			    cct->_conf->objecter_inflight_op_bytes),
4999 	  op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
5000 	  retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
5001 	{}
5002 	
5003 	Objecter::~Objecter()
5004 	{
5005 	  ceph_assert(homeless_session->get_nref() == 1);
5006 	  ceph_assert(num_homeless_ops == 0);
5007 	  homeless_session->put();
5008 	
5009 	  ceph_assert(osd_sessions.empty());
5010 	  ceph_assert(poolstat_ops.empty());
5011 	  ceph_assert(statfs_ops.empty());
5012 	  ceph_assert(pool_ops.empty());
5013 	  ceph_assert(waiting_for_map.empty());
5014 	  ceph_assert(linger_ops.empty());
5015 	  ceph_assert(check_latest_map_lingers.empty());
5016 	  ceph_assert(check_latest_map_ops.empty());
5017 	  ceph_assert(check_latest_map_commands.empty());
5018 	
5019 	  ceph_assert(!m_request_state_hook);
5020 	  ceph_assert(!logger);
5021 	}
5022 	
5023 	/**
5024 	 * Wait until this OSD map epoch is received before
5025 	 * sending any more operations to OSDs.  Use this
5026 	 * when it is known that the client can't trust
5027 	 * anything from before this epoch (e.g. due to
5028 	 * client blacklist at this epoch).
5029 	 */
5030 	void Objecter::set_epoch_barrier(epoch_t epoch)
5031 	{
5032 	  unique_lock wl(rwlock);
5033 	
5034 	  ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5035 			<< epoch_barrier << ") current epoch " << osdmap->get_epoch()
5036 			<< dendl;
5037 	  if (epoch > epoch_barrier) {
5038 	    epoch_barrier = epoch;
5039 	    _maybe_request_map();
5040 	  }
5041 	}
5042 	
5043 	
5044 	
5045 	hobject_t Objecter::enumerate_objects_begin()
5046 	{
5047 	  return hobject_t();
5048 	}
5049 	
5050 	hobject_t Objecter::enumerate_objects_end()
5051 	{
5052 	  return hobject_t::get_max();
5053 	}
5054 	
5055 	struct C_EnumerateReply : public Context {
5056 	  ceph::buffer::list bl;
5057 	
5058 	  Objecter *objecter;
5059 	  hobject_t *next;
5060 	  std::list<librados::ListObjectImpl> *result;
5061 	  const hobject_t end;
5062 	  const int64_t pool_id;
5063 	  Context *on_finish;
5064 	
5065 	  epoch_t epoch;
5066 	  int budget;
5067 	
5068 	  C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5069 	      std::list<librados::ListObjectImpl> *result_,
5070 	      const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5071 	    objecter(objecter_), next(next_), result(result_),
5072 	    end(end_), pool_id(pool_id_), on_finish(on_finish_),
5073 	    epoch(0), budget(-1)
5074 	  {}
5075 	
5076 	  void finish(int r) override {
5077 	    objecter->_enumerate_reply(
5078 	      bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5079 	  }
5080 	};
5081 	
5082 	void Objecter::enumerate_objects(
5083 	    int64_t pool_id,
5084 	    const std::string &ns,
5085 	    const hobject_t &start,
5086 	    const hobject_t &end,
5087 	    const uint32_t max,
5088 	    const ceph::buffer::list &filter_bl,
5089 	    std::list<librados::ListObjectImpl> *result, 
5090 	    hobject_t *next,
5091 	    Context *on_finish)
5092 	{
5093 	  ceph_assert(result);
5094 	
5095 	  if (!end.is_max() && start > end) {
5096 	    lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5097 	    on_finish->complete(-EINVAL);
5098 	    return;
5099 	  }
5100 	
5101 	  if (max < 1) {
5102 	    lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5103 	    on_finish->complete(-EINVAL);
5104 	    return;
5105 	  }
5106 	
5107 	  if (start.is_max()) {
5108 	    on_finish->complete(0);
5109 	    return;
5110 	  }
5111 	
5112 	  shared_lock rl(rwlock);
5113 	  ceph_assert(osdmap->get_epoch());
5114 	  if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5115 	    rl.unlock();
5116 	    lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5117 	    on_finish->complete(-EOPNOTSUPP);
5118 	    return;
5119 	  }
5120 	  const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5121 	  if (!p) {
5122 	    lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5123 		       << osdmap->get_epoch() << dendl;
5124 	    rl.unlock();
5125 	    on_finish->complete(-ENOENT);
5126 	    return;
5127 	  } else {
5128 	    rl.unlock();
5129 	  }
5130 	
5131 	  ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5132 	
5133 	  // Stash completion state
5134 	  C_EnumerateReply *on_ack = new C_EnumerateReply(
5135 	      this, next, result, end, pool_id, on_finish);
5136 	
5137 	  ObjectOperation op;
5138 	  op.pg_nls(max, filter_bl, start, 0);
5139 	
5140 	  // Issue.  See you later in _enumerate_reply
5141 	  object_locator_t oloc(pool_id, ns);
5142 	  pg_read(start.get_hash(), oloc, op,
5143 		  &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5144 	}
5145 	
5146 	void Objecter::_enumerate_reply(
5147 	    ceph::buffer::list &bl,
5148 	    int r,
5149 	    const hobject_t &end,
5150 	    const int64_t pool_id,
5151 	    int budget,
5152 	    epoch_t reply_epoch,
5153 	    std::list<librados::ListObjectImpl> *result,
5154 	    hobject_t *next,
5155 	    Context *on_finish)
5156 	{
5157 	  if (budget >= 0) {
5158 	    put_op_budget_bytes(budget);
5159 	  }
5160 	
5161 	  if (r < 0) {
5162 	    ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5163 	    on_finish->complete(r);
5164 	    return;
5165 	  }
5166 	
5167 	  ceph_assert(next != NULL);
5168 	
5169 	  // Decode the results
5170 	  auto iter = bl.cbegin();
5171 	  pg_nls_response_t response;
5172 	
5173 	  decode(response, iter);
5174 	  if (!iter.end()) {
5175 	    // extra_info isn't used anywhere. We do this solely to preserve
5176 	    // backward compatibility
5177 	    ceph::buffer::list legacy_extra_info;
5178 	    decode(legacy_extra_info, iter);
5179 	  }
5180 	
5181 	  ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5182 			 << " handle " << response.handle
5183 			 << " reply_epoch " << reply_epoch << dendl;
5184 	  ldout(cct, 20) << __func__ << ": response.entries.size "
5185 			 << response.entries.size() << ", response.entries "
5186 			 << response.entries << dendl;
5187 	  if (response.handle <= end) {
5188 	    *next = response.handle;
5189 	  } else {
5190 	    ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5191 			   << dendl;
5192 	    *next = end;
5193 	
5194 	    // drop anything after 'end'
5195 	    shared_lock rl(rwlock);
5196 	    const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5197 	    if (!pool) {
5198 	      // pool is gone, drop any results which are now meaningless.
5199 	      rl.unlock();
5200 	      on_finish->complete(-ENOENT);
5201 	      return;
5202 	    }
5203 	    while (!response.entries.empty()) {
5204 	      uint32_t hash = response.entries.back().locator.empty() ?
5205 		pool->hash_key(response.entries.back().oid,
5206 			       response.entries.back().nspace) :
5207 		pool->hash_key(response.entries.back().locator,
5208 			       response.entries.back().nspace);
5209 	      hobject_t last(response.entries.back().oid,
5210 			     response.entries.back().locator,
5211 			     CEPH_NOSNAP,
5212 			     hash,
5213 			     pool_id,
5214 			     response.entries.back().nspace);
5215 	      if (last < end)
5216 		break;
5217 	      ldout(cct, 20) << __func__ << " dropping item " << last
5218 			     << " >= end " << end << dendl;
5219 	      response.entries.pop_back();
5220 	    }
5221 	    rl.unlock();
5222 	  }
5223 	  if (!response.entries.empty()) {
5224 	    result->merge(response.entries);
5225 	  }
5226 	
5227 	  // release the listing context's budget once all
5228 	  // OPs (in the session) are finished
5229 	#if 0
5230 	  put_nlist_context_budget(list_context);
5231 	#endif
5232 	  on_finish->complete(r);
5233 	  return;
5234 	}
5235 	
5236 	namespace {
5237 	  using namespace librados;
5238 	
5239 	  template <typename T>
5240 	  void do_decode(std::vector<T>& items, std::vector<ceph::buffer::list>& bls)
5241 	  {
5242 	    for (auto bl : bls) {
5243 	      auto p = bl.cbegin();
5244 	      T t;
5245 	      decode(t, p);
5246 	      items.push_back(t);
5247 	    }
5248 	  }
5249 	
5250 	  struct C_ObjectOperation_scrub_ls : public Context {
5251 	    ceph::buffer::list bl;
5252 	    uint32_t *interval;
5253 	    std::vector<inconsistent_obj_t> *objects = nullptr;
5254 	    std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5255 	    int *rval;
5256 	
5257 	    C_ObjectOperation_scrub_ls(uint32_t *interval,
5258 				       std::vector<inconsistent_obj_t> *objects,
5259 				       int *rval)
5260 	      : interval(interval), objects(objects), rval(rval) {}
5261 	    C_ObjectOperation_scrub_ls(uint32_t *interval,
5262 				       std::vector<inconsistent_snapset_t> *snapsets,
5263 				       int *rval)
5264 	      : interval(interval), snapsets(snapsets), rval(rval) {}
5265 	    void finish(int r) override {
5266 	      if (r < 0 && r != -EAGAIN) {
5267 	        if (rval)
5268 	          *rval = r;
5269 		return;
5270 	      }
5271 	
5272 	      if (rval)
5273 	        *rval = 0;
5274 	
5275 	      try {
5276 		decode();
5277 	      } catch (ceph::buffer::error&) {
5278 		if (rval)
5279 		  *rval = -EIO;
5280 	      }
5281 	    }
5282 	  private:
5283 	    void decode() {
5284 	      scrub_ls_result_t result;
5285 	      auto p = bl.cbegin();
5286 	      result.decode(p);
5287 	      *interval = result.interval;
5288 	      if (objects) {
5289 		do_decode(*objects, result.vals);
5290 	      } else {
5291 		do_decode(*snapsets, result.vals);
5292 	      }
5293 	    }
5294 	  };
5295 	
5296 	  template <typename T>
5297 	  void do_scrub_ls(::ObjectOperation *op,
5298 			   const scrub_ls_arg_t& arg,
5299 			   std::vector<T> *items,
5300 			   uint32_t *interval,
5301 			   int *rval)
5302 	  {
5303 	    OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5304 	    op->flags |= CEPH_OSD_FLAG_PGOP;
5305 	    ceph_assert(interval);
5306 	    arg.encode(osd_op.indata);
5307 	    unsigned p = op->ops.size() - 1;
5308 	    auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5309 	    op->out_handler[p] = h;
5310 	    op->out_bl[p] = &h->bl;
5311 	    op->out_rval[p] = rval;
5312 	  }
5313 	}
5314 	
5315 	void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5316 					 uint64_t max_to_get,
5317 					 std::vector<librados::inconsistent_obj_t> *objects,
5318 					 uint32_t *interval,
5319 					 int *rval)
5320 	{
5321 	  scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5322 	  do_scrub_ls(this, arg, objects, interval, rval);
5323 	}
5324 	
5325 	void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5326 					 uint64_t max_to_get,
5327 					 std::vector<librados::inconsistent_snapset_t> *snapsets,
5328 					 uint32_t *interval,
5329 					 int *rval)
5330 	{
5331 	  scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5332 	  do_scrub_ls(this, arg, snapsets, interval, rval);
5333 	}
5334