1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#include <algorithm>
16   	#include <iterator>
17   	#include <random>
18   	#include <boost/range/adaptor/map.hpp>
19   	#include <boost/range/adaptor/filtered.hpp>
20   	#include <boost/range/algorithm/copy.hpp>
21   	#include <boost/range/algorithm_ext/copy_n.hpp>
22   	#include "common/weighted_shuffle.h"
23   	
24   	#include "include/scope_guard.h"
25   	#include "include/stringify.h"
26   	
27   	#include "messages/MMonGetMap.h"
28   	#include "messages/MMonGetVersion.h"
29   	#include "messages/MMonGetVersionReply.h"
30   	#include "messages/MMonMap.h"
31   	#include "messages/MConfig.h"
32   	#include "messages/MGetConfig.h"
33   	#include "messages/MAuth.h"
34   	#include "messages/MLogAck.h"
35   	#include "messages/MAuthReply.h"
36   	#include "messages/MMonCommand.h"
37   	#include "messages/MMonCommandAck.h"
38   	#include "messages/MCommand.h"
39   	#include "messages/MCommandReply.h"
40   	#include "messages/MPing.h"
41   	
42   	#include "messages/MMonSubscribe.h"
43   	#include "messages/MMonSubscribeAck.h"
44   	#include "common/errno.h"
45   	#include "common/hostname.h"
46   	#include "common/LogClient.h"
47   	
48   	#include "MonClient.h"
49   	#include "MonMap.h"
50   	
51   	#include "auth/Auth.h"
52   	#include "auth/KeyRing.h"
53   	#include "auth/AuthClientHandler.h"
54   	#include "auth/AuthRegistry.h"
55   	#include "auth/RotatingKeyRing.h"
56   	
57   	#define dout_subsys ceph_subsys_monc
58   	#undef dout_prefix
59   	#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
60   	
61   	using std::string;
62   	
63   	MonClient::MonClient(CephContext *cct_) :
64   	  Dispatcher(cct_),
65   	  AuthServer(cct_),
66   	  messenger(NULL),
67   	  timer(cct_, monc_lock),
68   	  finisher(cct_),
69   	  initialized(false),
70   	  log_client(NULL),
71   	  more_log_pending(false),
72   	  want_monmap(true),
73   	  had_a_connection(false),
(1) Event fun_call_w_exception: Called function throws an exception of type "boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::bad_get> >". [details]
74   	  reopen_interval_multiplier(
75   	    cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
76   	  last_mon_command_tid(0),
77   	  version_req_id(0)
78   	{}
79   	
80   	MonClient::~MonClient()
81   	{
82   	}
83   	
84   	int MonClient::build_initial_monmap()
85   	{
86   	  ldout(cct, 10) << __func__ << dendl;
87   	  int r = monmap.build_initial(cct, false, std::cerr);
88   	  ldout(cct,10) << "monmap:\n";
89   	  monmap.print(*_dout);
90   	  *_dout << dendl;
91   	  return r;
92   	}
93   	
94   	int MonClient::get_monmap()
95   	{
96   	  ldout(cct, 10) << __func__ << dendl;
97   	  std::unique_lock l(monc_lock);
98   	  
99   	  sub.want("monmap", 0, 0);
100  	  if (!_opened())
101  	    _reopen_session();
102  	  map_cond.wait(l, [this] { return !want_monmap; });
103  	  ldout(cct, 10) << __func__ << " done" << dendl;
104  	  return 0;
105  	}
106  	
107  	int MonClient::get_monmap_and_config()
108  	{
109  	  ldout(cct, 10) << __func__ << dendl;
110  	  ceph_assert(!messenger);
111  	
112  	  int tries = 10;
113  	
114  	  cct->init_crypto();
115  	  auto shutdown_crypto = make_scope_guard([this] {
116  	    cct->shutdown_crypto();
117  	  });
118  	
119  	  int r = build_initial_monmap();
120  	  if (r < 0) {
121  	    lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
122  	    return r;
123  	  }
124  	
125  	  messenger = Messenger::create_client_messenger(
126  	    cct, "temp_mon_client");
127  	  ceph_assert(messenger);
128  	  messenger->add_dispatcher_head(this);
129  	  messenger->start();
130  	  auto shutdown_msgr = make_scope_guard([this] {
131  	    messenger->shutdown();
132  	    messenger->wait();
133  	    delete messenger;
134  	    messenger = nullptr;
135  	    if (!monmap.fsid.is_zero()) {
136  	      cct->_conf.set_val("fsid", stringify(monmap.fsid));
137  	    }
138  	  });
139  	
140  	  while (tries-- > 0) {
141  	    r = init();
142  	    if (r < 0) {
143  	      return r;
144  	    }
145  	    r = authenticate(cct->_conf->client_mount_timeout);
146  	    if (r == -ETIMEDOUT) {
147  	      shutdown();
148  	      continue;
149  	    }
150  	    if (r < 0) {
151  	      break;
152  	    }
153  	    {
154  	      std::unique_lock l(monc_lock);
155  	      if (monmap.get_epoch() &&
156  		  !monmap.persistent_features.contains_all(
157  		    ceph::features::mon::FEATURE_MIMIC)) {
158  		ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
159  			      << dendl;
160  		r = 0;
161  		break;
162  	      }
163  	      while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
164  		ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
165  		map_cond.wait_for(l, ceph::make_timespan(
166  	          cct->_conf->mon_client_hunt_interval));
167  	      }
168  	      if (got_config) {
169  		ldout(cct,10) << __func__ << " success" << dendl;
170  		r = 0;
171  		break;
172  	      }
173  	    }
174  	    lderr(cct) << __func__ << " failed to get config" << dendl;
175  	    shutdown();
176  	    continue;
177  	  }
178  	
179  	  shutdown();
180  	  return r;
181  	}
182  	
183  	
184  	/**
185  	 * Ping the monitor with id @p mon_id and set the resulting reply in
186  	 * the provided @p result_reply, if this last parameter is not NULL.
187  	 *
188  	 * So that we don't rely on the MonClient's default messenger, set up
189  	 * during connect(), we create our own messenger to comunicate with the
190  	 * specified monitor.  This is advantageous in the following ways:
191  	 *
192  	 * - Isolate the ping procedure from the rest of the MonClient's operations,
193  	 *   allowing us to not acquire or manage the big monc_lock, thus not
194  	 *   having to block waiting for some other operation to finish before we
195  	 *   can proceed.
196  	 *   * for instance, we can ping mon.FOO even if we are currently hunting
197  	 *     or blocked waiting for auth to complete with mon.BAR.
198  	 *
199  	 * - Ping a monitor prior to establishing a connection (using connect())
200  	 *   and properly establish the MonClient's messenger.  This frees us
201  	 *   from dealing with the complex foo that happens in connect().
202  	 *
203  	 * We also don't rely on MonClient as a dispatcher for this messenger,
204  	 * unlike what happens with the MonClient's default messenger.  This allows
205  	 * us to sandbox the whole ping, having it much as a separate entity in
206  	 * the MonClient class, considerably simplifying the handling and dispatching
207  	 * of messages without needing to consider monc_lock.
208  	 *
209  	 * Current drawback is that we will establish a messenger for each ping
210  	 * we want to issue, instead of keeping a single messenger instance that
211  	 * would be used for all pings.
212  	 */
213  	int MonClient::ping_monitor(const string &mon_id, string *result_reply)
214  	{
215  	  ldout(cct, 10) << __func__ << dendl;
216  	
217  	  string new_mon_id;
218  	  if (monmap.contains("noname-"+mon_id)) {
219  	    new_mon_id = "noname-"+mon_id;
220  	  } else {
221  	    new_mon_id = mon_id;
222  	  }
223  	
224  	  if (new_mon_id.empty()) {
225  	    ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
226  	    return -EINVAL;
227  	  } else if (!monmap.contains(new_mon_id)) {
228  	    ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
229  	                   << dendl;
230  	    return -ENOENT;
231  	  }
232  	
233  	  // N.B. monc isn't initialized
234  	
235  	  auth_registry.refresh_config();
236  	
237  	  KeyRing keyring;
238  	  keyring.from_ceph_context(cct);
239  	  RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
240  	
241  	  MonClientPinger *pinger = new MonClientPinger(cct,
242  							&rkeyring,
243  							result_reply);
244  	
245  	  Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
246  	  smsgr->add_dispatcher_head(pinger);
247  	  smsgr->set_auth_client(pinger);
248  	  smsgr->start();
249  	
250  	  ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
251  	  ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
252  	                 << " " << con->get_peer_addr() << dendl;
253  	
254  	  pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
255  	  pinger->mc->start(monmap.get_epoch(), entity_name);
256  	  con->send_message(new MPing);
257  	
258  	  int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
259  	  if (ret == 0) {
260  	    ldout(cct,10) << __func__ << " got ping reply" << dendl;
261  	  } else {
262  	    ret = -ret;
263  	  }
264  	
265  	  con->mark_down();
266  	  pinger->mc.reset();
267  	  smsgr->shutdown();
268  	  smsgr->wait();
269  	  delete smsgr;
270  	  delete pinger;
271  	  return ret;
272  	}
273  	
274  	bool MonClient::ms_dispatch(Message *m)
275  	{
276  	  // we only care about these message types
277  	  switch (m->get_type()) {
278  	  case CEPH_MSG_MON_MAP:
279  	  case CEPH_MSG_AUTH_REPLY:
280  	  case CEPH_MSG_MON_SUBSCRIBE_ACK:
281  	  case CEPH_MSG_MON_GET_VERSION_REPLY:
282  	  case MSG_MON_COMMAND_ACK:
283  	  case MSG_COMMAND_REPLY:
284  	  case MSG_LOGACK:
285  	  case MSG_CONFIG:
286  	    break;
287  	  case CEPH_MSG_PING:
288  	    m->put();
289  	    return true;
290  	  default:
291  	    return false;
292  	  }
293  	
294  	  std::lock_guard lock(monc_lock);
295  	
296  	  if (!m->get_connection()->is_anon() &&
297  	      m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
298  	    if (_hunting()) {
299  	      auto p = _find_pending_con(m->get_connection());
300  	      if (p == pending_cons.end()) {
301  		// ignore any messages outside hunting sessions
302  		ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
303  		m->put();
304  		return true;
305  	      }
306  	    } else if (!active_con || active_con->get_con() != m->get_connection()) {
307  	      // ignore any messages outside our session(s)
308  	      ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
309  	      m->put();
310  	      return true;
311  	    }
312  	  }
313  	
314  	  switch (m->get_type()) {
315  	  case CEPH_MSG_MON_MAP:
316  	    handle_monmap(static_cast<MMonMap*>(m));
317  	    if (passthrough_monmap) {
318  	      return false;
319  	    } else {
320  	      m->put();
321  	    }
322  	    break;
323  	  case CEPH_MSG_AUTH_REPLY:
324  	    handle_auth(static_cast<MAuthReply*>(m));
325  	    break;
326  	  case CEPH_MSG_MON_SUBSCRIBE_ACK:
327  	    handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
328  	    break;
329  	  case CEPH_MSG_MON_GET_VERSION_REPLY:
330  	    handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
331  	    break;
332  	  case MSG_MON_COMMAND_ACK:
333  	    handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
334  	    break;
335  	  case MSG_COMMAND_REPLY:
336  	    if (m->get_connection()->is_anon() &&
337  	        m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
338  	      // this connection is from 'tell'... ignore everything except our command
339  	      // reply.  (we'll get misc other message because we authenticated, but we
340  	      // don't need them.)
341  	      handle_command_reply(static_cast<MCommandReply*>(m));
342  	      return true;
343  	    }
344  	    // leave the message for another dispatch handler (e.g., Objecter)
345  	    return false;
346  	  case MSG_LOGACK:
347  	    if (log_client) {
348  	      log_client->handle_log_ack(static_cast<MLogAck*>(m));
349  	      m->put();
350  	      if (more_log_pending) {
351  		send_log();
352  	      }
353  	    } else {
354  	      m->put();
355  	    }
356  	    break;
357  	  case MSG_CONFIG:
358  	    handle_config(static_cast<MConfig*>(m));
359  	    break;
360  	  }
361  	  return true;
362  	}
363  	
364  	void MonClient::send_log(bool flush)
365  	{
366  	  if (log_client) {
367  	    auto lm = log_client->get_mon_log_message(flush);
368  	    if (lm)
369  	      _send_mon_message(std::move(lm));
370  	    more_log_pending = log_client->are_pending();
371  	  }
372  	}
373  	
374  	void MonClient::flush_log()
375  	{
376  	  std::lock_guard l(monc_lock);
377  	  send_log();
378  	}
379  	
380  	/* Unlike all the other message-handling functions, we don't put away a reference
381  	* because we want to support MMonMap passthrough to other Dispatchers. */
382  	void MonClient::handle_monmap(MMonMap *m)
383  	{
384  	  ldout(cct, 10) << __func__ << " " << *m << dendl;
385  	  auto con_addrs = m->get_source_addrs();
386  	  string old_name = monmap.get_name(con_addrs);
387  	  const auto old_epoch = monmap.get_epoch();
388  	
389  	  auto p = m->monmapbl.cbegin();
390  	  decode(monmap, p);
391  	
392  	  ldout(cct, 10) << " got monmap " << monmap.epoch
393  			 << " from mon." << old_name
394  			 << " (according to old e" << monmap.get_epoch() << ")"
395  	 		 << dendl;
396  	  ldout(cct, 10) << "dump:\n";
397  	  monmap.print(*_dout);
398  	  *_dout << dendl;
399  	
400  	  if (old_epoch != monmap.get_epoch()) {
401  	    tried.clear();
402  	  }
403  	  if (old_name.size() == 0) {
404  	    ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
405  	    _reopen_session();
406  	  } else {
407  	    auto new_name = monmap.get_name(con_addrs);
408  	    if (new_name.empty()) {
409  	      ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
410  			     << " went away" << dendl;
411  	      // can't find the mon we were talking to (above)
412  	      _reopen_session();
413  	    } else if (messenger->should_use_msgr2() &&
414  		       monmap.get_addrs(new_name).has_msgr2() &&
415  		       !con_addrs.has_msgr2()) {
416  	      ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
417  			   << monmap.get_addrs(new_name) << " but i'm connected to "
418  			   << con_addrs << ", reconnecting" << dendl;
419  	      _reopen_session();
420  	    }
421  	  }
422  	
423  	  sub.got("monmap", monmap.get_epoch());
424  	  map_cond.notify_all();
425  	  want_monmap = false;
426  	
427  	  if (authenticate_err == 1) {
428  	    _finish_auth(0);
429  	  }
430  	}
431  	
432  	void MonClient::handle_config(MConfig *m)
433  	{
434  	  ldout(cct,10) << __func__ << " " << *m << dendl;
435  	  finisher.queue(new LambdaContext([this, m](int r) {
436  		cct->_conf.set_mon_vals(cct, m->config, config_cb);
437  		if (config_notify_cb) {
438  		  config_notify_cb();
439  		}
440  		m->put();
441  	      }));
442  	  got_config = true;
443  	  map_cond.notify_all();
444  	}
445  	
446  	// ----------------------
447  	
448  	int MonClient::init()
449  	{
450  	  ldout(cct, 10) << __func__ << dendl;
451  	
452  	  entity_name = cct->_conf->name;
453  	
454  	  auth_registry.refresh_config();
455  	
456  	  std::lock_guard l(monc_lock);
457  	  keyring.reset(new KeyRing);
458  	  if (auth_registry.is_supported_method(messenger->get_mytype(),
459  						CEPH_AUTH_CEPHX)) {
460  	    // this should succeed, because auth_registry just checked!
461  	    int r = keyring->from_ceph_context(cct);
462  	    if (r != 0) {
463  	      // but be somewhat graceful in case there was a race condition
464  	      lderr(cct) << "keyring not found" << dendl;
465  	      return r;
466  	    }
467  	  }
468  	  if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
469  	    return -ENOENT;
470  	  }
471  	
472  	  rotating_secrets.reset(
473  	    new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
474  	
475  	  initialized = true;
476  	
477  	  messenger->set_auth_client(this);
478  	  messenger->add_dispatcher_head(this);
479  	
480  	  timer.init();
481  	  finisher.start();
482  	  schedule_tick();
483  	
484  	  return 0;
485  	}
486  	
487  	void MonClient::shutdown()
488  	{
489  	  ldout(cct, 10) << __func__ << dendl;
490  	  monc_lock.lock();
491  	  stopping = true;
492  	  while (!version_requests.empty()) {
493  	    version_requests.begin()->second->context->complete(-ECANCELED);
494  	    ldout(cct, 20) << __func__ << " canceling and discarding version request "
495  			   << version_requests.begin()->second << dendl;
496  	    delete version_requests.begin()->second;
497  	    version_requests.erase(version_requests.begin());
498  	  }
499  	  while (!mon_commands.empty()) {
500  	    auto tid = mon_commands.begin()->first;
501  	    _cancel_mon_command(tid);
502  	  }
503  	  ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
504  			 << " pending message(s)" << dendl;
505  	  waiting_for_session.clear();
506  	
507  	  active_con.reset();
508  	  pending_cons.clear();
509  	  auth.reset();
510  	
511  	  monc_lock.unlock();
512  	
513  	  if (initialized) {
514  	    finisher.wait_for_empty();
515  	    finisher.stop();
516  	    initialized = false;
517  	  }
518  	  monc_lock.lock();
519  	  timer.shutdown();
520  	  stopping = false;
521  	  monc_lock.unlock();
522  	}
523  	
524  	int MonClient::authenticate(double timeout)
525  	{
526  	  std::unique_lock lock{monc_lock};
527  	
528  	  if (active_con) {
529  	    ldout(cct, 5) << "already authenticated" << dendl;
530  	    return 0;
531  	  }
532  	  sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
533  	  sub.want("config", 0, 0);
534  	  if (!_opened())
535  	    _reopen_session();
536  	
537  	  auto until = ceph::real_clock::now();
538  	  until += ceph::make_timespan(timeout);
539  	  if (timeout > 0.0)
540  	    ldout(cct, 10) << "authenticate will time out at " << until << dendl;
541  	  authenticate_err = 1;  // == in progress
542  	  while (!active_con && authenticate_err >= 0) {
543  	    if (timeout > 0.0) {
544  	      auto r = auth_cond.wait_until(lock, until);
545  	      if (r == cv_status::timeout && !active_con) {
546  		ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
547  		authenticate_err = -ETIMEDOUT;
548  	      }
549  	    } else {
550  	      auth_cond.wait(lock);
551  	    }
552  	  }
553  	
554  	  if (active_con) {
555  	    ldout(cct, 5) << __func__ << " success, global_id "
556  			  << active_con->get_global_id() << dendl;
557  	    // active_con should not have been set if there was an error
558  	    ceph_assert(authenticate_err >= 0);
559  	    authenticated = true;
560  	  }
561  	
562  	  if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
563  	    lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
564  	  }
565  	
566  	  return authenticate_err;
567  	}
568  	
569  	void MonClient::handle_auth(MAuthReply *m)
570  	{
571  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
572  	
573  	  if (m->get_connection()->is_anon()) {
574  	    // anon connection, used for mon tell commands
575  	    for (auto& p : mon_commands) {
576  	      if (p.second->target_con == m->get_connection()) {
577  		auto& mc = p.second->target_session;
578  		int ret = mc->handle_auth(m, entity_name,
579  					  CEPH_ENTITY_TYPE_MON,
580  					  rotating_secrets.get());
581  		(void)ret; // we don't care
582  		break;
583  	      }
584  	    }
585  	    m->put();
586  	    return;
587  	  }
588  	
589  	  if (!_hunting()) {
590  	    std::swap(active_con->get_auth(), auth);
591  	    int ret = active_con->authenticate(m);
592  	    m->put();
593  	    std::swap(auth, active_con->get_auth());
594  	    if (global_id != active_con->get_global_id()) {
595  	      lderr(cct) << __func__ << " peer assigned me a different global_id: "
596  			 << active_con->get_global_id() << dendl;
597  	    }
598  	    if (ret != -EAGAIN) {
599  	      _finish_auth(ret);
600  	    }
601  	    return;
602  	  }
603  	
604  	  // hunting
605  	  auto found = _find_pending_con(m->get_connection());
606  	  ceph_assert(found != pending_cons.end());
607  	  int auth_err = found->second.handle_auth(m, entity_name, want_keys,
608  						   rotating_secrets.get());
609  	  m->put();
610  	  if (auth_err == -EAGAIN) {
611  	    return;
612  	  }
613  	  if (auth_err) {
614  	    pending_cons.erase(found);
615  	    if (!pending_cons.empty()) {
616  	      // keep trying with pending connections
617  	      return;
618  	    }
619  	    // the last try just failed, give up.
620  	  } else {
621  	    auto& mc = found->second;
622  	    ceph_assert(mc.have_session());
623  	    active_con.reset(new MonConnection(std::move(mc)));
624  	    pending_cons.clear();
625  	  }
626  	
627  	  _finish_hunting(auth_err);
628  	  _finish_auth(auth_err);
629  	}
630  	
631  	void MonClient::_finish_auth(int auth_err)
632  	{
633  	  ldout(cct,10) << __func__ << " " << auth_err << dendl;
634  	  authenticate_err = auth_err;
635  	  // _resend_mon_commands() could _reopen_session() if the connected mon is not
636  	  // the one the MonCommand is targeting.
637  	  if (!auth_err && active_con) {
638  	    ceph_assert(auth);
639  	    _check_auth_tickets();
640  	  }
641  	  auth_cond.notify_all();
642  	
643  	  if (!auth_err) {
644  	    Context *cb = nullptr;
645  	    if (session_established_context) {
646  	      cb = session_established_context.release();
647  	    }
648  	    if (cb) {
649  	      monc_lock.unlock();
650  	      cb->complete(0);
651  	      monc_lock.lock();
652  	    }
653  	  }
654  	}
655  	
656  	// ---------
657  	
658  	void MonClient::send_mon_message(MessageRef m)
659  	{
660  	  std::lock_guard l{monc_lock};
661  	  _send_mon_message(std::move(m));
662  	}
663  	
664  	void MonClient::_send_mon_message(MessageRef m)
665  	{
666  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
667  	  if (active_con) {
668  	    auto cur_con = active_con->get_con();
669  	    ldout(cct, 10) << "_send_mon_message to mon."
670  			   << monmap.get_name(cur_con->get_peer_addr())
671  			   << " at " << cur_con->get_peer_addr() << dendl;
672  	    cur_con->send_message2(std::move(m));
673  	  } else {
674  	    waiting_for_session.push_back(std::move(m));
675  	  }
676  	}
677  	
678  	void MonClient::_reopen_session(int rank)
679  	{
680  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
681  	  ldout(cct, 10) << __func__ << " rank " << rank << dendl;
682  	
683  	  active_con.reset();
684  	  pending_cons.clear();
685  	
686  	  _start_hunting();
687  	
688  	  if (rank >= 0) {
689  	    _add_conn(rank, global_id);
690  	  } else {
691  	    _add_conns(global_id);
692  	  }
693  	
694  	  // throw out old queued messages
695  	  waiting_for_session.clear();
696  	
697  	  // throw out version check requests
698  	  while (!version_requests.empty()) {
699  	    finisher.queue(version_requests.begin()->second->context, -EAGAIN);
700  	    delete version_requests.begin()->second;
701  	    version_requests.erase(version_requests.begin());
702  	  }
703  	
704  	  for (auto& c : pending_cons) {
705  	    c.second.start(monmap.get_epoch(), entity_name);
706  	  }
707  	
708  	  if (sub.reload()) {
709  	    _renew_subs();
710  	  }
711  	}
712  	
713  	MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
714  	{
715  	  auto peer = monmap.get_addrs(rank);
716  	  auto conn = messenger->connect_to_mon(peer);
717  	  MonConnection mc(cct, conn, global_id, &auth_registry);
718  	  auto inserted = pending_cons.insert(std::make_pair(peer, std::move(mc)));
719  	  ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
720  	                 << " con " << conn
721  	                 << " addr " << peer
722  	                 << dendl;
723  	  return inserted.first->second;
724  	}
725  	
726  	void MonClient::_add_conns(uint64_t global_id)
727  	{
728  	  // collect the next batch of candidates who are listed right next to the ones
729  	  // already tried
730  	  auto get_next_batch = [this]() -> std::vector<unsigned> {
731  	    std::multimap<uint16_t, unsigned> ranks_by_priority;
732  	    boost::copy(
733  	      monmap.mon_info | boost::adaptors::filtered(
734  	        [this](auto& info) {
735  	          auto rank = monmap.get_rank(info.first);
736  	          return tried.count(rank) == 0;
737  	        }) | boost::adaptors::transformed(
738  	          [this](auto& info) {
739  	            auto rank = monmap.get_rank(info.first);
740  	            return std::make_pair(info.second.priority, rank);
741  	          }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
742  	    if (ranks_by_priority.empty()) {
743  	      return {};
744  	    }
745  	    // only choose the monitors with lowest priority
746  	    auto cands = boost::make_iterator_range(
747  	      ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
748  	    std::vector<unsigned> ranks;
749  	    boost::range::copy(cands | boost::adaptors::map_values,
750  			       std::back_inserter(ranks));
751  	    return ranks;
752  	  };
753  	  auto ranks = get_next_batch();
754  	  if (ranks.empty()) {
755  	    tried.clear();  // start over
756  	    ranks = get_next_batch();
757  	  }
758  	  ceph_assert(!ranks.empty());
759  	  if (ranks.size() > 1) {
760  	    std::vector<uint16_t> weights;
761  	    for (auto i : ranks) {
762  	      auto rank_name = monmap.get_name(i);
763  	      weights.push_back(monmap.get_weight(rank_name));
764  	    }
765  	    std::random_device rd;
766  	    if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
767  	      std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
768  	    } else {
769  	      weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
770  			       std::mt19937{rd()});
771  	    }
772  	  }
773  	  ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
774  	  unsigned n = cct->_conf->mon_client_hunt_parallel;
775  	  if (n == 0 || n > ranks.size()) {
776  	    n = ranks.size();
777  	  }
778  	  for (unsigned i = 0; i < n; i++) {
779  	    _add_conn(ranks[i], global_id);
780  	    tried.insert(ranks[i]);
781  	  }
782  	}
783  	
784  	bool MonClient::ms_handle_reset(Connection *con)
785  	{
786  	  std::lock_guard lock(monc_lock);
787  	
788  	  if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
789  	    return false;
790  	
791  	  if (con->is_anon()) {
792  	    auto p = mon_commands.begin();
793  	    while (p != mon_commands.end()) {
794  	      auto cmd = p->second;
795  	      ++p;
796  	      if (cmd->target_con == con) {
797  		_send_command(cmd); // may retry or fail
798  		break;
799  	      }
800  	    }
801  	    return true;
802  	  }
803  	
804  	  if (_hunting()) {
805  	    if (pending_cons.count(con->get_peer_addrs())) {
806  	      ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
807  			     << dendl;
808  	    } else {
809  	      ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
810  			     << dendl;
811  	    }
812  	    return true;
813  	  } else {
814  	    if (active_con && con == active_con->get_con()) {
815  	      ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
816  			     << dendl;
817  	      _reopen_session();
818  	      return false;
819  	    } else {
820  	      ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
821  			     << dendl;
822  	      return true;
823  	    }
824  	  }
825  	}
826  	
827  	bool MonClient::_opened() const
828  	{
829  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
830  	  return active_con || _hunting();
831  	}
832  	
833  	bool MonClient::_hunting() const
834  	{
835  	  return !pending_cons.empty();
836  	}
837  	
838  	void MonClient::_start_hunting()
839  	{
840  	  ceph_assert(!_hunting());
841  	  // adjust timeouts if necessary
842  	  if (!had_a_connection)
843  	    return;
844  	  reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
845  	  if (reopen_interval_multiplier >
846  	      cct->_conf->mon_client_hunt_interval_max_multiple) {
847  	    reopen_interval_multiplier =
848  	      cct->_conf->mon_client_hunt_interval_max_multiple;
849  	  }
850  	}
851  	
852  	void MonClient::_finish_hunting(int auth_err)
853  	{
854  	  ldout(cct,10) << __func__ << " " << auth_err << dendl;
855  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
856  	  // the pending conns have been cleaned.
857  	  ceph_assert(!_hunting());
858  	  if (active_con) {
859  	    auto con = active_con->get_con();
860  	    ldout(cct, 1) << "found mon."
861  			  << monmap.get_name(con->get_peer_addr())
862  			  << dendl;
863  	  } else {
864  	    ldout(cct, 1) << "no mon sessions established" << dendl;
865  	  }
866  	
867  	  had_a_connection = true;
868  	  _un_backoff();
869  	
870  	  if (!auth_err) {
871  	    last_rotating_renew_sent = utime_t();
872  	    while (!waiting_for_session.empty()) {
873  	      _send_mon_message(std::move(waiting_for_session.front()));
874  	      waiting_for_session.pop_front();
875  	    }
876  	    _resend_mon_commands();
877  	    send_log(true);
878  	    if (active_con) {
879  	      std::swap(auth, active_con->get_auth());
880  	      if (global_id && global_id != active_con->get_global_id()) {
881  		lderr(cct) << __func__ << " global_id changed from " << global_id
882  			   << " to " << active_con->get_global_id() << dendl;
883  	      }
884  	      global_id = active_con->get_global_id();
885  	    }
886  	  }
887  	}
888  	
889  	void MonClient::tick()
890  	{
891  	  ldout(cct, 10) << __func__ << dendl;
892  	
893  	  auto reschedule_tick = make_scope_guard([this] {
894  	      schedule_tick();
895  	    });
896  	
897  	  _check_auth_tickets();
898  	  _check_tell_commands();
899  	  
900  	  if (_hunting()) {
901  	    ldout(cct, 1) << "continuing hunt" << dendl;
902  	    return _reopen_session();
903  	  } else if (active_con) {
904  	    // just renew as needed
905  	    utime_t now = ceph_clock_now();
906  	    auto cur_con = active_con->get_con();
907  	    if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
908  	      const bool maybe_renew = sub.need_renew();
909  	      ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
910  			     << dendl;
911  	      if (maybe_renew) {
912  		_renew_subs();
913  	      }
914  	    }
915  	
916  	    cur_con->send_keepalive();
917  	
918  	    if (cct->_conf->mon_client_ping_timeout > 0 &&
919  		cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
920  	      utime_t lk = cur_con->get_last_keepalive_ack();
921  	      utime_t interval = now - lk;
922  	      if (interval > cct->_conf->mon_client_ping_timeout) {
923  		ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
924  			      << " seconds), reconnecting" << dendl;
925  		return _reopen_session();
926  	      }
927  	      send_log();
928  	    }
929  	
930  	    _un_backoff();
931  	  }
932  	}
933  	
934  	void MonClient::_un_backoff()
935  	{
936  	  // un-backoff our reconnect interval
937  	  reopen_interval_multiplier = std::max(
938  	    cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
939  	    reopen_interval_multiplier /
940  	    cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
941  	  ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
942  			 << reopen_interval_multiplier << dendl;
943  	}
944  	
945  	void MonClient::schedule_tick()
946  	{
947  	  auto do_tick = make_lambda_context([this](int) { tick(); });
948  	  if (_hunting()) {
949  	    const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
950  					reopen_interval_multiplier);
951  	    timer.add_event_after(hunt_interval, do_tick);
952  	  } else {
953  	    timer.add_event_after(cct->_conf->mon_client_ping_interval, do_tick);
954  	  }
955  	}
956  	
957  	// ---------
958  	
959  	void MonClient::_renew_subs()
960  	{
961  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
962  	  if (!sub.have_new()) {
963  	    ldout(cct, 10) << __func__ << " - empty" << dendl;
964  	    return;
965  	  }
966  	
967  	  ldout(cct, 10) << __func__ << dendl;
968  	  if (!_opened())
969  	    _reopen_session();
970  	  else {
971  	    auto m = ceph::make_message<MMonSubscribe>();
972  	    m->what = sub.get_subs();
973  	    m->hostname = ceph_get_short_hostname();
974  	    _send_mon_message(std::move(m));
975  	    sub.renewed();
976  	  }
977  	}
978  	
979  	void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
980  	{
981  	  sub.acked(m->interval);
982  	  m->put();
983  	}
984  	
985  	int MonClient::_check_auth_tickets()
986  	{
987  	  ceph_assert(ceph_mutex_is_locked(monc_lock));
988  	  if (active_con && auth) {
989  	    if (auth->need_tickets()) {
990  	      ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
991  	      auto m = ceph::make_message<MAuth>();
992  	      m->protocol = auth->get_protocol();
993  	      auth->prepare_build_request();
994  	      auth->build_request(m->auth_payload);
995  	      _send_mon_message(m);
996  	    }
997  	
998  	    _check_auth_rotating();
999  	  }
1000 	  return 0;
1001 	}
1002 	
1003 	int MonClient::_check_auth_rotating()
1004 	{
1005 	  ceph_assert(ceph_mutex_is_locked(monc_lock));
1006 	  if (!rotating_secrets ||
1007 	      !auth_principal_needs_rotating_keys(entity_name)) {
1008 	    ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1009 	    return 0;
1010 	  }
1011 	
1012 	  if (!active_con || !auth) {
1013 	    ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1014 	    return 0;
1015 	  }
1016 	
1017 	  utime_t now = ceph_clock_now();
1018 	  utime_t cutoff = now;
1019 	  cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1020 	  utime_t issued_at_lower_bound = now;
1021 	  issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1022 	  if (!rotating_secrets->need_new_secrets(cutoff)) {
1023 	    ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1024 	    rotating_secrets->dump_rotating();
1025 	    return 0;
1026 	  }
1027 	
1028 	  ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1029 	  if (!rotating_secrets->need_new_secrets() &&
1030 	      rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1031 	    // the key has expired before it has been issued?
1032 	    lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1033 	               << " (before " << issued_at_lower_bound << ")" << dendl;
1034 	  }
1035 	  if ((now > last_rotating_renew_sent) &&
1036 	      double(now - last_rotating_renew_sent) < 1) {
1037 	    ldout(cct, 10) << __func__ << " called too often (last: "
1038 	                   << last_rotating_renew_sent << "), skipping refresh" << dendl;
1039 	    return 0;
1040 	  }
1041 	  auto m = ceph::make_message<MAuth>();
1042 	  m->protocol = auth->get_protocol();
1043 	  if (auth->build_rotating_request(m->auth_payload)) {
1044 	    last_rotating_renew_sent = now;
1045 	    _send_mon_message(std::move(m));
1046 	  }
1047 	  return 0;
1048 	}
1049 	
1050 	int MonClient::wait_auth_rotating(double timeout)
1051 	{
1052 	  std::unique_lock l(monc_lock);
1053 	
1054 	  // Must be initialized
1055 	  ceph_assert(auth != nullptr);
1056 	
1057 	  if (auth->get_protocol() == CEPH_AUTH_NONE)
1058 	    return 0;
1059 	  
1060 	  if (!rotating_secrets)
1061 	    return 0;
1062 	
1063 	  ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1064 	  utime_t now = ceph_clock_now();
1065 	  if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
1066 	    return (!auth_principal_needs_rotating_keys(entity_name) ||
1067 		    !rotating_secrets->need_new_secrets(now));
1068 	  })) {
1069 	    ldout(cct, 10) << __func__ << " done" << dendl;
1070 	    return 0;
1071 	  } else {
1072 	    ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1073 	    return -ETIMEDOUT;
1074 	  }
1075 	}
1076 	
1077 	// ---------
1078 	
1079 	void MonClient::_send_command(MonCommand *r)
1080 	{
1081 	  if (r->is_tell()) {
1082 	    ++r->send_attempts;
1083 	    if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1084 	      _finish_command(r, -ENXIO, "mon unavailable");
1085 	      return;
1086 	    }
1087 	
1088 	    // tell-style command
1089 	    if (monmap.min_mon_release >= ceph_release_t::octopus) {
1090 	      if (r->target_con) {
1091 		r->target_con->mark_down();
1092 	      }
1093 	      if (r->target_rank >= 0) {
1094 		if (r->target_rank >= (int)monmap.size()) {
1095 		  ldout(cct, 10) << " target " << r->target_rank
1096 				 << " >= max mon " << monmap.size() << dendl;
1097 		  _finish_command(r, -ENOENT, "mon rank dne");
1098 		  return;
1099 		}
1100 		r->target_con = messenger->connect_to_mon(
1101 		  monmap.get_addrs(r->target_rank), true /* anon */);
1102 	      } else {
1103 		if (!monmap.contains(r->target_name)) {
1104 		  ldout(cct, 10) << " target " << r->target_name
1105 				 << " not present in monmap" << dendl;
1106 		  _finish_command(r, -ENOENT, "mon dne");
1107 		  return;
1108 		}
1109 		r->target_con = messenger->connect_to_mon(
1110 		  monmap.get_addrs(r->target_name), true /* anon */);
1111 	      }
1112 	
1113 	      r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1114 							&auth_registry));
1115 	      r->target_session->start(monmap.get_epoch(), entity_name);
1116 	      r->last_send_attempt = ceph_clock_now();
1117 	
1118 	      MCommand *m = new MCommand(monmap.fsid);
1119 	      m->set_tid(r->tid);
1120 	      m->cmd = r->cmd;
1121 	      m->set_data(r->inbl);
1122 	      r->target_session->queue_command(m);
1123 	      return;
1124 	    }
1125 	
1126 	    // ugly legacy handling of pre-octopus mons
1127 	    entity_addr_t peer;
1128 	    if (active_con) {
1129 	      peer = active_con->get_con()->get_peer_addr();
1130 	    }
1131 	
1132 	    if (r->target_rank >= 0 &&
1133 		r->target_rank != monmap.get_rank(peer)) {
1134 	      ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1135 			     << " wants rank " << r->target_rank
1136 			     << ", reopening session"
1137 			     << dendl;
1138 	      if (r->target_rank >= (int)monmap.size()) {
1139 		ldout(cct, 10) << " target " << r->target_rank
1140 			       << " >= max mon " << monmap.size() << dendl;
1141 		_finish_command(r, -ENOENT, "mon rank dne");
1142 		return;
1143 	      }
1144 	      _reopen_session(r->target_rank);
1145 	      return;
1146 	    }
1147 	    if (r->target_name.length() &&
1148 		r->target_name != monmap.get_name(peer)) {
1149 	      ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1150 			     << " wants mon " << r->target_name
1151 			     << ", reopening session"
1152 			     << dendl;
1153 	      if (!monmap.contains(r->target_name)) {
1154 		ldout(cct, 10) << " target " << r->target_name
1155 			       << " not present in monmap" << dendl;
1156 		_finish_command(r, -ENOENT, "mon dne");
1157 		return;
1158 	      }
1159 	      _reopen_session(monmap.get_rank(r->target_name));
1160 	      return;
1161 	    }
1162 	    // fall-thru to send 'normal' CLI command
1163 	  }
1164 	
1165 	  // normal CLI command
1166 	  ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1167 	  auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1168 	  m->set_tid(r->tid);
1169 	  m->cmd = r->cmd;
1170 	  m->set_data(r->inbl);
1171 	  _send_mon_message(std::move(m));
1172 	  return;
1173 	}
1174 	
1175 	void MonClient::_check_tell_commands()
1176 	{
1177 	  // resend any requests
1178 	  auto now = ceph_clock_now();
1179 	  auto p = mon_commands.begin();
1180 	  while (p != mon_commands.end()) {
1181 	    auto cmd = p->second;
1182 	    ++p;
1183 	    if (cmd->is_tell() &&
1184 		cmd->last_send_attempt != utime_t() &&
1185 		now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1186 	      ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1187 	      _send_command(cmd); // might remove cmd from mon_commands
1188 	    }
1189 	  }
1190 	}
1191 	
1192 	void MonClient::_resend_mon_commands()
1193 	{
1194 	  // resend any requests
1195 	  auto p = mon_commands.begin();
1196 	  while (p != mon_commands.end()) {
1197 	    auto cmd = p->second;
1198 	    ++p;
1199 	    if (!cmd->is_tell()) {
1200 	      _send_command(cmd); // might remove cmd from mon_commands
1201 	    }
1202 	  }
1203 	}
1204 	
1205 	void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1206 	{
1207 	  MonCommand *r = NULL;
1208 	  uint64_t tid = ack->get_tid();
1209 	
1210 	  if (tid == 0 && !mon_commands.empty()) {
1211 	    r = mon_commands.begin()->second;
1212 	    ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1213 	  } else {
1214 	    auto p = mon_commands.find(tid);
1215 	    if (p == mon_commands.end()) {
1216 	      ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1217 	      ack->put();
1218 	      return;
1219 	    }
1220 	    r = p->second;
1221 	  }
1222 	
1223 	  ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1224 	  if (r->poutbl)
1225 	    r->poutbl->claim(ack->get_data());
1226 	  _finish_command(r, ack->r, ack->rs);
1227 	  ack->put();
1228 	}
1229 	
1230 	void MonClient::handle_command_reply(MCommandReply *reply)
1231 	{
1232 	  MonCommand *r = NULL;
1233 	  uint64_t tid = reply->get_tid();
1234 	
1235 	  if (tid == 0 && !mon_commands.empty()) {
1236 	    r = mon_commands.begin()->second;
1237 	    ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1238 			   << dendl;
1239 	  } else {
1240 	    auto p = mon_commands.find(tid);
1241 	    if (p == mon_commands.end()) {
1242 	      ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1243 			     << dendl;
1244 	      reply->put();
1245 	      return;
1246 	    }
1247 	    r = p->second;
1248 	  }
1249 	
1250 	  ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1251 	  if (r->poutbl)
1252 	    r->poutbl->claim(reply->get_data());
1253 	  _finish_command(r, reply->r, reply->rs);
1254 	  reply->put();
1255 	}
1256 	
1257 	int MonClient::_cancel_mon_command(uint64_t tid)
1258 	{
1259 	  ceph_assert(ceph_mutex_is_locked(monc_lock));
1260 	
1261 	  auto it = mon_commands.find(tid);
1262 	  if (it == mon_commands.end()) {
1263 	    ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1264 	    return -ENOENT;
1265 	  }
1266 	
1267 	  ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1268 	
1269 	  MonCommand *cmd = it->second;
1270 	  _finish_command(cmd, -ETIMEDOUT, "");
1271 	  return 0;
1272 	}
1273 	
1274 	void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1275 	{
1276 	  ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1277 	  if (r->prval)
1278 	    *(r->prval) = ret;
1279 	  if (r->prs)
1280 	    *(r->prs) = rs;
1281 	  if (r->onfinish)
1282 	    finisher.queue(r->onfinish, ret);
1283 	  if (r->target_con) {
1284 	    r->target_con->mark_down();
1285 	  }
1286 	  mon_commands.erase(r->tid);
1287 	  delete r;
1288 	}
1289 	
1290 	void MonClient::start_mon_command(const std::vector<string>& cmd,
1291 	                                  const ceph::buffer::list& inbl,
1292 	                                  ceph::buffer::list *outbl, string *outs,
1293 	                                  Context *onfinish)
1294 	{
1295 	  ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
1296 	  std::lock_guard l(monc_lock);
1297 	  if (!initialized || stopping) {
1298 	    if (onfinish) {
1299 	      onfinish->complete(-ECANCELED);
1300 	    }
1301 	    return;
1302 	  }
1303 	  MonCommand *r = new MonCommand(++last_mon_command_tid);
1304 	  r->cmd = cmd;
1305 	  r->inbl = inbl;
1306 	  r->poutbl = outbl;
1307 	  r->prs = outs;
1308 	  r->onfinish = onfinish;
1309 	  if (cct->_conf->rados_mon_op_timeout > 0) {
1310 	    class C_CancelMonCommand : public Context
1311 	    {
1312 	      uint64_t tid;
1313 	      MonClient *monc;
1314 	      public:
1315 	      C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1316 	      void finish(int r) override {
1317 		monc->_cancel_mon_command(tid);
1318 	      }
1319 	    };
1320 	    r->ontimeout = new C_CancelMonCommand(r->tid, this);
1321 	    timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1322 	  }
1323 	  mon_commands[r->tid] = r;
1324 	  _send_command(r);
1325 	}
1326 	
1327 	void MonClient::start_mon_command(const string &mon_name,
1328 	                                  const std::vector<string>& cmd,
1329 	                                  const ceph::buffer::list& inbl,
1330 	                                  ceph::buffer::list *outbl, string *outs,
1331 	                                  Context *onfinish)
1332 	{
1333 	  ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
1334 	  std::lock_guard l(monc_lock);
1335 	  if (!initialized || stopping) {
1336 	    if (onfinish) {
1337 	      onfinish->complete(-ECANCELED);
1338 	    }
1339 	    return;
1340 	  }
1341 	  MonCommand *r = new MonCommand(++last_mon_command_tid);
1342 	
1343 	  // detect/tolerate mon *rank* passed as a string
1344 	  string err;
1345 	  int rank = strict_strtoll(mon_name.c_str(), 10, &err);
1346 	  if (err.size() == 0 && rank >= 0) {
1347 	    ldout(cct,10) << __func__ << " interpreting name '" << mon_name
1348 			  << "' as rank " << rank << dendl;
1349 	    r->target_rank = rank;
1350 	  } else {
1351 	    r->target_name = mon_name;
1352 	  }
1353 	  r->cmd = cmd;
1354 	  r->inbl = inbl;
1355 	  r->poutbl = outbl;
1356 	  r->prs = outs;
1357 	  r->onfinish = onfinish;
1358 	  mon_commands[r->tid] = r;
1359 	  _send_command(r);
1360 	}
1361 	
1362 	void MonClient::start_mon_command(int rank,
1363 	                                  const std::vector<string>& cmd,
1364 	                                  const ceph::buffer::list& inbl,
1365 	                                  ceph::buffer::list *outbl, string *outs,
1366 	                                  Context *onfinish)
1367 	{
1368 	  ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
1369 	  std::lock_guard l(monc_lock);
1370 	  if (!initialized || stopping) {
1371 	    if (onfinish) {
1372 	      onfinish->complete(-ECANCELED);
1373 	    }
1374 	    return;
1375 	  }
1376 	  MonCommand *r = new MonCommand(++last_mon_command_tid);
1377 	  r->target_rank = rank;
1378 	  r->cmd = cmd;
1379 	  r->inbl = inbl;
1380 	  r->poutbl = outbl;
1381 	  r->prs = outs;
1382 	  r->onfinish = onfinish;
1383 	  mon_commands[r->tid] = r;
1384 	  _send_command(r);
1385 	}
1386 	
1387 	// ---------
1388 	
1389 	void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1390 	{
1391 	  version_req_d *req = new version_req_d(onfinish, newest, oldest);
1392 	  ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1393 	  std::lock_guard l(monc_lock);
1394 	  auto m = ceph::make_message<MMonGetVersion>();
1395 	  m->what = map;
1396 	  m->handle = ++version_req_id;
1397 	  version_requests[m->handle] = req;
1398 	  _send_mon_message(std::move(m));
1399 	}
1400 	
1401 	void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1402 	{
1403 	  ceph_assert(ceph_mutex_is_locked(monc_lock));
1404 	  auto iter = version_requests.find(m->handle);
1405 	  if (iter == version_requests.end()) {
1406 	    ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1407 			  << " not found" << dendl;
1408 	  } else {
1409 	    version_req_d *req = iter->second;
1410 	    ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1411 	    version_requests.erase(iter);
1412 	    if (req->newest)
1413 	      *req->newest = m->version;
1414 	    if (req->oldest)
1415 	      *req->oldest = m->oldest_version;
1416 	    finisher.queue(req->context, 0);
1417 	    delete req;
1418 	  }
1419 	  m->put();
1420 	}
1421 	
1422 	int MonClient::get_auth_request(
1423 	  Connection *con,
1424 	  AuthConnectionMeta *auth_meta,
1425 	  uint32_t *auth_method,
1426 	  std::vector<uint32_t> *preferred_modes,
1427 	  ceph::buffer::list *bl)
1428 	{
1429 	  std::lock_guard l(monc_lock);
1430 	  ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1431 			<< dendl;
1432 	
1433 	  // connection to mon?
1434 	  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1435 	    ceph_assert(!auth_meta->authorizer);
1436 	    if (con->is_anon()) {
1437 	      for (auto& i : mon_commands) {
1438 		if (i.second->target_con == con) {
1439 		  return i.second->target_session->get_auth_request(
1440 		    auth_method, preferred_modes, bl,
1441 		    entity_name, want_keys, rotating_secrets.get());
1442 		}
1443 	      }
1444 	    }
1445 	    for (auto& i : pending_cons) {
1446 	      if (i.second.is_con(con)) {
1447 		return i.second.get_auth_request(
1448 		  auth_method, preferred_modes, bl,
1449 		  entity_name, want_keys, rotating_secrets.get());
1450 	      }
1451 	    }
1452 	    return -ENOENT;
1453 	  }
1454 	
1455 	  // generate authorizer
1456 	  if (!auth) {
1457 	    lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1458 	    return -EACCES;
1459 	  }
1460 	  auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1461 	  if (!auth_meta->authorizer) {
1462 	    lderr(cct) << __func__ << " failed to build_authorizer for type "
1463 		       << ceph_entity_type_name(con->get_peer_type()) << dendl;
1464 	    return -EACCES;
1465 	  }
1466 	  auth_meta->auth_method = auth_meta->authorizer->protocol;
1467 	  auth_registry.get_supported_modes(con->get_peer_type(),
1468 					    auth_meta->auth_method,
1469 					    preferred_modes);
1470 	  *bl = auth_meta->authorizer->bl;
1471 	  return 0;
1472 	}
1473 	
1474 	int MonClient::handle_auth_reply_more(
1475 	  Connection *con,
1476 	  AuthConnectionMeta *auth_meta,
1477 	  const ceph::buffer::list& bl,
1478 	  ceph::buffer::list *reply)
1479 	{
1480 	  std::lock_guard l(monc_lock);
1481 	
1482 	  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1483 	    if (con->is_anon()) {
1484 	      for (auto& i : mon_commands) {
1485 		if (i.second->target_con == con) {
1486 		  return i.second->target_session->handle_auth_reply_more(
1487 		    auth_meta, bl, reply);
1488 		}
1489 	      }
1490 	    }
1491 	    for (auto& i : pending_cons) {
1492 	      if (i.second.is_con(con)) {
1493 		return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1494 	      }
1495 	    }
1496 	    return -ENOENT;
1497 	  }
1498 	
1499 	  // authorizer challenges
1500 	  if (!auth || !auth_meta->authorizer) {
1501 	    lderr(cct) << __func__ << " no authorizer?" << dendl;
1502 	    return -1;
1503 	  }
1504 	  auth_meta->authorizer->add_challenge(cct, bl);
1505 	  *reply = auth_meta->authorizer->bl;
1506 	  return 0;
1507 	}
1508 	
1509 	int MonClient::handle_auth_done(
1510 	  Connection *con,
1511 	  AuthConnectionMeta *auth_meta,
1512 	  uint64_t global_id,
1513 	  uint32_t con_mode,
1514 	  const ceph::buffer::list& bl,
1515 	  CryptoKey *session_key,
1516 	  std::string *connection_secret)
1517 	{
1518 	  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1519 	    std::lock_guard l(monc_lock);
1520 	    if (con->is_anon()) {
1521 	      for (auto& i : mon_commands) {
1522 		if (i.second->target_con == con) {
1523 		  return i.second->target_session->handle_auth_done(
1524 		    auth_meta, global_id, bl,
1525 		    session_key, connection_secret);
1526 		}
1527 	      }
1528 	    }
1529 	    for (auto& i : pending_cons) {
1530 	      if (i.second.is_con(con)) {
1531 		int r = i.second.handle_auth_done(
1532 		  auth_meta, global_id, bl,
1533 		  session_key, connection_secret);
1534 		if (r) {
1535 		  pending_cons.erase(i.first);
1536 		  if (!pending_cons.empty()) {
1537 		    return r;
1538 		  }
1539 		} else {
1540 		  active_con.reset(new MonConnection(std::move(i.second)));
1541 		  pending_cons.clear();
1542 		  ceph_assert(active_con->have_session());
1543 		}
1544 	
1545 		_finish_hunting(r);
1546 		if (r || monmap.get_epoch() > 0) {
1547 		  _finish_auth(r);
1548 		}
1549 		return r;
1550 	      }
1551 	    }
1552 	    return -ENOENT;
1553 	  } else {
1554 	    // verify authorizer reply
1555 	    auto p = bl.begin();
1556 	    if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1557 	      ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1558 			    << dendl;
1559 	      return -EACCES;
1560 	    }
1561 	    auth_meta->session_key = auth_meta->authorizer->session_key;
1562 	    return 0;
1563 	  }
1564 	}
1565 	
1566 	int MonClient::handle_auth_bad_method(
1567 	  Connection *con,
1568 	  AuthConnectionMeta *auth_meta,
1569 	  uint32_t old_auth_method,
1570 	  int result,
1571 	  const std::vector<uint32_t>& allowed_methods,
1572 	  const std::vector<uint32_t>& allowed_modes)
1573 	{
1574 	  auth_meta->allowed_methods = allowed_methods;
1575 	
1576 	  std::lock_guard l(monc_lock);
1577 	  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1578 	    if (con->is_anon()) {
1579 	      for (auto& i : mon_commands) {
1580 		if (i.second->target_con == con) {
1581 		  int r = i.second->target_session->handle_auth_bad_method(
1582 		    old_auth_method,
1583 		    result,
1584 		    allowed_methods,
1585 		    allowed_modes);
1586 		  if (r < 0) {
1587 		    _finish_command(i.second, r, "auth failed");
1588 		  }
1589 		  return r;
1590 		}
1591 	      }
1592 	    }
1593 	    for (auto& i : pending_cons) {
1594 	      if (i.second.is_con(con)) {
1595 		int r = i.second.handle_auth_bad_method(old_auth_method,
1596 							result,
1597 							allowed_methods,
1598 							allowed_modes);
1599 		if (r == 0) {
1600 		  return r; // try another method on this con
1601 		}
1602 		pending_cons.erase(i.first);
1603 		if (!pending_cons.empty()) {
1604 		  return r;  // fail this con, maybe another con will succeed
1605 		}
1606 		// fail hunt
1607 		_finish_hunting(r);
1608 		_finish_auth(r);
1609 		return r;
1610 	      }
1611 	    }
1612 	    return -ENOENT;
1613 	  } else {
1614 	    // huh...
1615 	    ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1616 			  << " result " << cpp_strerror(result)
1617 			  << " and auth is " << (auth ? auth->get_protocol() : 0)
1618 			  << dendl;
1619 	    return -EACCES;
1620 	  }
1621 	}
1622 	
1623 	int MonClient::handle_auth_request(
1624 	  Connection *con,
1625 	  AuthConnectionMeta *auth_meta,
1626 	  bool more,
1627 	  uint32_t auth_method,
1628 	  const ceph::buffer::list& payload,
1629 	  ceph::buffer::list *reply)
1630 	{
1631 	  if (payload.length() == 0) {
1632 	    // for some channels prior to nautilus (osd heartbeat), we
1633 	    // tolerate the lack of an authorizer.
1634 	    if (!con->get_messenger()->require_authorizer) {
1635 	      handle_authentication_dispatcher->ms_handle_authentication(con);
1636 	      return 1;
1637 	    }
1638 	    return -EACCES;
1639 	  }
1640 	  auth_meta->auth_mode = payload[0];
1641 	  if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1642 	      auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1643 	    return -EACCES;
1644 	  }
1645 	  AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1646 								auth_method);
1647 	  if (!ah) {
1648 	    lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1649 		       << auth_method << dendl;
1650 	    return -EOPNOTSUPP;
1651 	  }
1652 	
1653 	  auto ac = &auth_meta->authorizer_challenge;
1654 	  if (!HAVE_FEATURE(con->get_features(), CEPHX_V2)) {
1655 	    if (cct->_conf->cephx_service_require_version >= 2) {
1656 	      ldout(cct,10) << __func__ << " client missing CEPHX_V2 ("
1657 			    << "cephx_service_requre_version = "
1658 			    << cct->_conf->cephx_service_require_version << ")" << dendl;
1659 	      return -EACCES;
1660 	    }
1661 	    ac = nullptr;
1662 	  }
1663 	
1664 	  bool was_challenge = (bool)auth_meta->authorizer_challenge;
1665 	  bool isvalid = ah->verify_authorizer(
1666 	    cct,
1667 	    *rotating_secrets,
1668 	    payload,
1669 	    auth_meta->get_connection_secret_length(),
1670 	    reply,
1671 	    &con->peer_name,
1672 	    &con->peer_global_id,
1673 	    &con->peer_caps_info,
1674 	    &auth_meta->session_key,
1675 	    &auth_meta->connection_secret,
1676 	    ac);
1677 	  if (isvalid) {
1678 	    handle_authentication_dispatcher->ms_handle_authentication(con);
1679 	    return 1;
1680 	  }
1681 	  if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1682 	    ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1683 	    return 0;
1684 	  }
1685 	  ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1686 	  // discard old challenge
1687 	  auth_meta->authorizer_challenge.reset();
1688 	  return -EACCES;
1689 	}
1690 	
1691 	AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1692 	  std::lock_guard l(monc_lock);
1693 	  if (auth) {
1694 	    return auth->build_authorizer(service_id);
1695 	  } else {
1696 	    ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1697 			  << ", but no auth is available now" << dendl;
1698 	    return nullptr;
1699 	  }
1700 	}
1701 	
1702 	#define dout_subsys ceph_subsys_monc
1703 	#undef dout_prefix
1704 	#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1705 	
1706 	MonConnection::MonConnection(
1707 	  CephContext *cct, ConnectionRef con, uint64_t global_id,
1708 	  AuthRegistry *ar)
1709 	  : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1710 	{}
1711 	
1712 	MonConnection::~MonConnection()
1713 	{
1714 	  if (con) {
1715 	    con->mark_down();
1716 	    con.reset();
1717 	  }
1718 	}
1719 	
1720 	bool MonConnection::have_session() const
1721 	{
1722 	  return state == State::HAVE_SESSION;
1723 	}
1724 	
1725 	void MonConnection::start(epoch_t epoch,
1726 				  const EntityName& entity_name)
1727 	{
1728 	  using ceph::encode;
1729 	  auth_start = ceph_clock_now();
1730 	
1731 	  if (con->get_peer_addr().is_msgr2()) {
1732 	    ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1733 	    state = State::AUTHENTICATING;
1734 	    con->send_message(new MMonGetMap());
1735 	    return;
1736 	  }
1737 	
1738 	  // restart authentication handshake
1739 	  state = State::NEGOTIATING;
1740 	
1741 	  // send an initial keepalive to ensure our timestamp is valid by the
1742 	  // time we are in an OPENED state (by sequencing this before
1743 	  // authentication).
1744 	  con->send_keepalive();
1745 	
1746 	  auto m = new MAuth;
1747 	  m->protocol = CEPH_AUTH_UNKNOWN;
1748 	  m->monmap_epoch = epoch;
1749 	  __u8 struct_v = 1;
1750 	  encode(struct_v, m->auth_payload);
1751 	  std::vector<uint32_t> auth_supported;
1752 	  auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1753 	  encode(auth_supported, m->auth_payload);
1754 	  encode(entity_name, m->auth_payload);
1755 	  encode(global_id, m->auth_payload);
1756 	  con->send_message(m);
1757 	}
1758 	
1759 	int MonConnection::get_auth_request(
1760 	  uint32_t *method,
1761 	  std::vector<uint32_t> *preferred_modes,
1762 	  ceph::buffer::list *bl,
1763 	  const EntityName& entity_name,
1764 	  uint32_t want_keys,
1765 	  RotatingKeyRing* keyring)
1766 	{
1767 	  using ceph::encode;
1768 	  // choose method
1769 	  if (auth_method < 0) {
1770 	    std::vector<uint32_t> as;
1771 	    auth_registry->get_supported_methods(con->get_peer_type(), &as);
1772 	    if (as.empty()) {
1773 	      return -EACCES;
1774 	    }
1775 	    auth_method = as.front();
1776 	  }
1777 	  *method = auth_method;
1778 	  auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1779 					     preferred_modes);
1780 	  ldout(cct,10) << __func__ << " method " << *method
1781 			<< " preferred_modes " << *preferred_modes << dendl;
1782 	  if (preferred_modes->empty()) {
1783 	    return -EACCES;
1784 	  }
1785 	
1786 	  if (auth) {
1787 	    auth.reset();
1788 	  }
1789 	  int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1790 	  ceph_assert(r == 0);
1791 	
1792 	  // initial requset includes some boilerplate...
1793 	  encode((char)AUTH_MODE_MON, *bl);
1794 	  encode(entity_name, *bl);
1795 	  encode(global_id, *bl);
1796 	
1797 	  // and (maybe) some method-specific initial payload
1798 	  auth->build_initial_request(bl);
1799 	
1800 	  return 0;
1801 	}
1802 	
1803 	int MonConnection::handle_auth_reply_more(
1804 	  AuthConnectionMeta *auth_meta,
1805 	  const ceph::buffer::list& bl,
1806 	  ceph::buffer::list *reply)
1807 	{
1808 	  ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1809 	  ldout(cct, 30) << __func__ << " got\n";
1810 	  bl.hexdump(*_dout);
1811 	  *_dout << dendl;
1812 	
1813 	  auto p = bl.cbegin();
1814 	  ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1815 	  int r = auth->handle_response(0, p, &auth_meta->session_key,
1816 					&auth_meta->connection_secret);
1817 	  if (r == -EAGAIN) {
1818 	    auth->prepare_build_request();
1819 	    auth->build_request(*reply);
1820 	    ldout(cct, 10) << __func__ << " responding with " << reply->length()
1821 			   << " bytes" << dendl;
1822 	    r = 0;
1823 	  } else if (r < 0) {
1824 	    lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1825 	  } else {
1826 	    ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1827 	    // FIXME
1828 	    ceph_abort(cct, "write me");
1829 	  }
1830 	  return r;
1831 	}
1832 	
1833 	int MonConnection::handle_auth_done(
1834 	  AuthConnectionMeta *auth_meta,
1835 	  uint64_t new_global_id,
1836 	  const ceph::buffer::list& bl,
1837 	  CryptoKey *session_key,
1838 	  std::string *connection_secret)
1839 	{
1840 	  ldout(cct,10) << __func__ << " global_id " << new_global_id
1841 			<< " payload " << bl.length()
1842 			<< dendl;
1843 	  global_id = new_global_id;
1844 	  auth->set_global_id(global_id);
1845 	  auto p = bl.begin();
1846 	  int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1847 					       &auth_meta->connection_secret);
1848 	  if (auth_err >= 0) {
1849 	    state = State::HAVE_SESSION;
1850 	  }
1851 	  con->set_last_keepalive_ack(auth_start);
1852 	
1853 	  if (pending_tell_command) {
1854 	    con->send_message2(std::move(pending_tell_command));
1855 	  }
1856 	  return auth_err;
1857 	}
1858 	
1859 	int MonConnection::handle_auth_bad_method(
1860 	  uint32_t old_auth_method,
1861 	  int result,
1862 	  const std::vector<uint32_t>& allowed_methods,
1863 	  const std::vector<uint32_t>& allowed_modes)
1864 	{
1865 	  ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1866 			<< " result " << cpp_strerror(result)
1867 			<< " allowed_methods " << allowed_methods << dendl;
1868 	  std::vector<uint32_t> auth_supported;
1869 	  auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1870 	  auto p = std::find(auth_supported.begin(), auth_supported.end(),
1871 			     old_auth_method);
1872 	  assert(p != auth_supported.end());
1873 	  p = std::find_first_of(std::next(p), auth_supported.end(),
1874 				 allowed_methods.begin(), allowed_methods.end());
1875 	  if (p == auth_supported.end()) {
1876 	    lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1877 		       << " but i only support " << auth_supported << dendl;
1878 	    return -EACCES;
1879 	  }
1880 	  auth_method = *p;
1881 	  ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1882 	  return 0;
1883 	}
1884 	
1885 	int MonConnection::handle_auth(MAuthReply* m,
1886 				       const EntityName& entity_name,
1887 				       uint32_t want_keys,
1888 				       RotatingKeyRing* keyring)
1889 	{
1890 	  if (state == State::NEGOTIATING) {
1891 	    int r = _negotiate(m, entity_name, want_keys, keyring);
1892 	    if (r) {
1893 	      return r;
1894 	    }
1895 	    state = State::AUTHENTICATING;
1896 	  }
1897 	  int r = authenticate(m);
1898 	  if (!r) {
1899 	    state = State::HAVE_SESSION;
1900 	  }
1901 	  return r;
1902 	}
1903 	
1904 	int MonConnection::_negotiate(MAuthReply *m,
1905 				      const EntityName& entity_name,
1906 				      uint32_t want_keys,
1907 				      RotatingKeyRing* keyring)
1908 	{
1909 	  if (auth && (int)m->protocol == auth->get_protocol()) {
1910 	    // good, negotiation completed
1911 	    auth->reset();
1912 	    return 0;
1913 	  }
1914 	
1915 	  int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1916 	  if (r == -ENOTSUP) {
1917 	    if (m->result == -ENOTSUP) {
1918 	      ldout(cct, 10) << "none of our auth protocols are supported by the server"
1919 			     << dendl;
1920 	    }
1921 	    return m->result;
1922 	  }
1923 	  return r;
1924 	}
1925 	
1926 	int MonConnection::_init_auth(
1927 	  uint32_t method,
1928 	  const EntityName& entity_name,
1929 	  uint32_t want_keys,
1930 	  RotatingKeyRing* keyring,
1931 	  bool msgr2)
1932 	{
1933 	  ldout(cct,10) << __func__ << " method " << method << dendl;
1934 	  auth.reset(
1935 	    AuthClientHandler::create(cct, method, keyring));
1936 	  if (!auth) {
1937 	    ldout(cct, 10) << " no handler for protocol " << method << dendl;
1938 	    return -ENOTSUP;
1939 	  }
1940 	
1941 	  // do not request MGR key unless the mon has the SERVER_KRAKEN
1942 	  // feature.  otherwise it will give us an auth error.  note that
1943 	  // we have to use the FEATUREMASK because pre-jewel the kraken
1944 	  // feature bit was used for something else.
1945 	  if (!msgr2 &&
1946 	      (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1947 	      !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1948 	    ldout(cct, 1) << __func__
1949 			  << " not requesting MGR keys from pre-kraken monitor"
1950 			  << dendl;
1951 	    want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1952 	  }
1953 	  auth->set_want_keys(want_keys);
1954 	  auth->init(entity_name);
1955 	  auth->set_global_id(global_id);
1956 	  return 0;
1957 	}
1958 	
1959 	int MonConnection::authenticate(MAuthReply *m)
1960 	{
1961 	  ceph_assert(auth);
1962 	  if (!m->global_id) {
1963 	    ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1964 	  }
1965 	  if (m->global_id != global_id) {
1966 	    // it's a new session
1967 	    auth->reset();
1968 	    global_id = m->global_id;
1969 	    auth->set_global_id(global_id);
1970 	    ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1971 	  }
1972 	  auto p = m->result_bl.cbegin();
1973 	  int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1974 	  if (ret == -EAGAIN) {
1975 	    auto ma = new MAuth;
1976 	    ma->protocol = auth->get_protocol();
1977 	    auth->prepare_build_request();
1978 	    auth->build_request(ma->auth_payload);
1979 	    con->send_message(ma);
1980 	  }
1981 	  if (ret == 0 && pending_tell_command) {
1982 	    con->send_message2(std::move(pending_tell_command));
1983 	  }
1984 	
1985 	  return ret;
1986 	}
1987 	
1988 	void MonClient::register_config_callback(md_config_t::config_callback fn) {
1989 	  ceph_assert(!config_cb);
1990 	  config_cb = fn;
1991 	}
1992 	
1993 	md_config_t::config_callback MonClient::get_config_callback() {
1994 	  return config_cb;
1995 	}
1996