1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/scope_guard.h"
25 #include "include/stringify.h"
26
27 #include "messages/MMonGetMap.h"
28 #include "messages/MMonGetVersion.h"
29 #include "messages/MMonGetVersionReply.h"
30 #include "messages/MMonMap.h"
31 #include "messages/MConfig.h"
32 #include "messages/MGetConfig.h"
33 #include "messages/MAuth.h"
34 #include "messages/MLogAck.h"
35 #include "messages/MAuthReply.h"
36 #include "messages/MMonCommand.h"
37 #include "messages/MMonCommandAck.h"
38 #include "messages/MCommand.h"
39 #include "messages/MCommandReply.h"
40 #include "messages/MPing.h"
41
42 #include "messages/MMonSubscribe.h"
43 #include "messages/MMonSubscribeAck.h"
44 #include "common/errno.h"
45 #include "common/hostname.h"
46 #include "common/LogClient.h"
47
48 #include "MonClient.h"
49 #include "MonMap.h"
50
51 #include "auth/Auth.h"
52 #include "auth/KeyRing.h"
53 #include "auth/AuthClientHandler.h"
54 #include "auth/AuthRegistry.h"
55 #include "auth/RotatingKeyRing.h"
56
57 #define dout_subsys ceph_subsys_monc
58 #undef dout_prefix
59 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
60
61 using std::string;
62
63 MonClient::MonClient(CephContext *cct_) :
64 Dispatcher(cct_),
65 AuthServer(cct_),
66 messenger(NULL),
67 timer(cct_, monc_lock),
68 finisher(cct_),
69 initialized(false),
70 log_client(NULL),
71 more_log_pending(false),
72 want_monmap(true),
73 had_a_connection(false),
74 reopen_interval_multiplier(
75 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
76 last_mon_command_tid(0),
77 version_req_id(0)
78 {}
79
80 MonClient::~MonClient()
81 {
82 }
83
84 int MonClient::build_initial_monmap()
85 {
86 ldout(cct, 10) << __func__ << dendl;
87 int r = monmap.build_initial(cct, false, std::cerr);
88 ldout(cct,10) << "monmap:\n";
89 monmap.print(*_dout);
90 *_dout << dendl;
91 return r;
92 }
93
94 int MonClient::get_monmap()
95 {
96 ldout(cct, 10) << __func__ << dendl;
97 std::unique_lock l(monc_lock);
98
99 sub.want("monmap", 0, 0);
100 if (!_opened())
101 _reopen_session();
102 map_cond.wait(l, [this] { return !want_monmap; });
103 ldout(cct, 10) << __func__ << " done" << dendl;
104 return 0;
105 }
106
107 int MonClient::get_monmap_and_config()
108 {
109 ldout(cct, 10) << __func__ << dendl;
110 ceph_assert(!messenger);
111
112 int tries = 10;
113
114 cct->init_crypto();
115 auto shutdown_crypto = make_scope_guard([this] {
116 cct->shutdown_crypto();
117 });
118
119 int r = build_initial_monmap();
120 if (r < 0) {
121 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
122 return r;
123 }
124
125 messenger = Messenger::create_client_messenger(
126 cct, "temp_mon_client");
127 ceph_assert(messenger);
128 messenger->add_dispatcher_head(this);
129 messenger->start();
130 auto shutdown_msgr = make_scope_guard([this] {
131 messenger->shutdown();
132 messenger->wait();
133 delete messenger;
134 messenger = nullptr;
135 if (!monmap.fsid.is_zero()) {
136 cct->_conf.set_val("fsid", stringify(monmap.fsid));
137 }
138 });
139
140 while (tries-- > 0) {
141 r = init();
142 if (r < 0) {
143 return r;
144 }
145 r = authenticate(cct->_conf->client_mount_timeout);
146 if (r == -ETIMEDOUT) {
147 shutdown();
148 continue;
149 }
150 if (r < 0) {
151 break;
152 }
153 {
154 std::unique_lock l(monc_lock);
155 if (monmap.get_epoch() &&
156 !monmap.persistent_features.contains_all(
157 ceph::features::mon::FEATURE_MIMIC)) {
158 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
159 << dendl;
160 r = 0;
161 break;
162 }
163 while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
164 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
165 map_cond.wait_for(l, ceph::make_timespan(
166 cct->_conf->mon_client_hunt_interval));
167 }
168 if (got_config) {
169 ldout(cct,10) << __func__ << " success" << dendl;
170 r = 0;
171 break;
172 }
173 }
174 lderr(cct) << __func__ << " failed to get config" << dendl;
175 shutdown();
176 continue;
177 }
178
179 shutdown();
180 return r;
181 }
182
183
184 /**
185 * Ping the monitor with id @p mon_id and set the resulting reply in
186 * the provided @p result_reply, if this last parameter is not NULL.
187 *
188 * So that we don't rely on the MonClient's default messenger, set up
189 * during connect(), we create our own messenger to comunicate with the
190 * specified monitor. This is advantageous in the following ways:
191 *
192 * - Isolate the ping procedure from the rest of the MonClient's operations,
193 * allowing us to not acquire or manage the big monc_lock, thus not
194 * having to block waiting for some other operation to finish before we
195 * can proceed.
196 * * for instance, we can ping mon.FOO even if we are currently hunting
197 * or blocked waiting for auth to complete with mon.BAR.
198 *
199 * - Ping a monitor prior to establishing a connection (using connect())
200 * and properly establish the MonClient's messenger. This frees us
201 * from dealing with the complex foo that happens in connect().
202 *
203 * We also don't rely on MonClient as a dispatcher for this messenger,
204 * unlike what happens with the MonClient's default messenger. This allows
205 * us to sandbox the whole ping, having it much as a separate entity in
206 * the MonClient class, considerably simplifying the handling and dispatching
207 * of messages without needing to consider monc_lock.
208 *
209 * Current drawback is that we will establish a messenger for each ping
210 * we want to issue, instead of keeping a single messenger instance that
211 * would be used for all pings.
212 */
213 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
214 {
215 ldout(cct, 10) << __func__ << dendl;
216
217 string new_mon_id;
218 if (monmap.contains("noname-"+mon_id)) {
219 new_mon_id = "noname-"+mon_id;
220 } else {
221 new_mon_id = mon_id;
222 }
223
224 if (new_mon_id.empty()) {
225 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
226 return -EINVAL;
227 } else if (!monmap.contains(new_mon_id)) {
228 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
229 << dendl;
230 return -ENOENT;
231 }
232
233 // N.B. monc isn't initialized
234
235 auth_registry.refresh_config();
236
237 KeyRing keyring;
238 keyring.from_ceph_context(cct);
239 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
240
241 MonClientPinger *pinger = new MonClientPinger(cct,
242 &rkeyring,
243 result_reply);
244
245 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
246 smsgr->add_dispatcher_head(pinger);
247 smsgr->set_auth_client(pinger);
248 smsgr->start();
249
250 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
251 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
252 << " " << con->get_peer_addr() << dendl;
253
254 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
255 pinger->mc->start(monmap.get_epoch(), entity_name);
256 con->send_message(new MPing);
257
258 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
259 if (ret == 0) {
260 ldout(cct,10) << __func__ << " got ping reply" << dendl;
261 } else {
262 ret = -ret;
263 }
264
265 con->mark_down();
266 pinger->mc.reset();
267 smsgr->shutdown();
268 smsgr->wait();
269 delete smsgr;
270 delete pinger;
271 return ret;
272 }
273
274 bool MonClient::ms_dispatch(Message *m)
275 {
276 // we only care about these message types
277 switch (m->get_type()) {
278 case CEPH_MSG_MON_MAP:
279 case CEPH_MSG_AUTH_REPLY:
280 case CEPH_MSG_MON_SUBSCRIBE_ACK:
281 case CEPH_MSG_MON_GET_VERSION_REPLY:
282 case MSG_MON_COMMAND_ACK:
283 case MSG_COMMAND_REPLY:
284 case MSG_LOGACK:
285 case MSG_CONFIG:
286 break;
287 case CEPH_MSG_PING:
288 m->put();
289 return true;
290 default:
291 return false;
292 }
293
294 std::lock_guard lock(monc_lock);
295
296 if (!m->get_connection()->is_anon() &&
297 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
298 if (_hunting()) {
299 auto p = _find_pending_con(m->get_connection());
300 if (p == pending_cons.end()) {
301 // ignore any messages outside hunting sessions
302 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
303 m->put();
304 return true;
305 }
306 } else if (!active_con || active_con->get_con() != m->get_connection()) {
307 // ignore any messages outside our session(s)
308 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
309 m->put();
310 return true;
311 }
312 }
313
314 switch (m->get_type()) {
315 case CEPH_MSG_MON_MAP:
316 handle_monmap(static_cast<MMonMap*>(m));
317 if (passthrough_monmap) {
318 return false;
319 } else {
320 m->put();
321 }
322 break;
323 case CEPH_MSG_AUTH_REPLY:
324 handle_auth(static_cast<MAuthReply*>(m));
325 break;
326 case CEPH_MSG_MON_SUBSCRIBE_ACK:
327 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
328 break;
329 case CEPH_MSG_MON_GET_VERSION_REPLY:
330 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
331 break;
332 case MSG_MON_COMMAND_ACK:
333 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
334 break;
335 case MSG_COMMAND_REPLY:
336 if (m->get_connection()->is_anon() &&
337 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
338 // this connection is from 'tell'... ignore everything except our command
339 // reply. (we'll get misc other message because we authenticated, but we
340 // don't need them.)
341 handle_command_reply(static_cast<MCommandReply*>(m));
342 return true;
343 }
344 // leave the message for another dispatch handler (e.g., Objecter)
345 return false;
346 case MSG_LOGACK:
347 if (log_client) {
348 log_client->handle_log_ack(static_cast<MLogAck*>(m));
349 m->put();
350 if (more_log_pending) {
351 send_log();
352 }
353 } else {
354 m->put();
355 }
356 break;
357 case MSG_CONFIG:
358 handle_config(static_cast<MConfig*>(m));
359 break;
360 }
361 return true;
362 }
363
364 void MonClient::send_log(bool flush)
365 {
366 if (log_client) {
367 auto lm = log_client->get_mon_log_message(flush);
368 if (lm)
369 _send_mon_message(std::move(lm));
370 more_log_pending = log_client->are_pending();
371 }
372 }
373
374 void MonClient::flush_log()
375 {
376 std::lock_guard l(monc_lock);
377 send_log();
378 }
379
380 /* Unlike all the other message-handling functions, we don't put away a reference
381 * because we want to support MMonMap passthrough to other Dispatchers. */
382 void MonClient::handle_monmap(MMonMap *m)
383 {
384 ldout(cct, 10) << __func__ << " " << *m << dendl;
385 auto con_addrs = m->get_source_addrs();
386 string old_name = monmap.get_name(con_addrs);
387 const auto old_epoch = monmap.get_epoch();
388
389 auto p = m->monmapbl.cbegin();
390 decode(monmap, p);
391
392 ldout(cct, 10) << " got monmap " << monmap.epoch
393 << " from mon." << old_name
394 << " (according to old e" << monmap.get_epoch() << ")"
395 << dendl;
396 ldout(cct, 10) << "dump:\n";
397 monmap.print(*_dout);
398 *_dout << dendl;
399
400 if (old_epoch != monmap.get_epoch()) {
401 tried.clear();
402 }
403 if (old_name.size() == 0) {
404 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
405 _reopen_session();
406 } else {
407 auto new_name = monmap.get_name(con_addrs);
408 if (new_name.empty()) {
409 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
410 << " went away" << dendl;
411 // can't find the mon we were talking to (above)
412 _reopen_session();
413 } else if (messenger->should_use_msgr2() &&
414 monmap.get_addrs(new_name).has_msgr2() &&
415 !con_addrs.has_msgr2()) {
416 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
417 << monmap.get_addrs(new_name) << " but i'm connected to "
418 << con_addrs << ", reconnecting" << dendl;
419 _reopen_session();
420 }
421 }
422
423 sub.got("monmap", monmap.get_epoch());
424 map_cond.notify_all();
425 want_monmap = false;
426
427 if (authenticate_err == 1) {
428 _finish_auth(0);
429 }
430 }
431
432 void MonClient::handle_config(MConfig *m)
433 {
434 ldout(cct,10) << __func__ << " " << *m << dendl;
435 finisher.queue(new LambdaContext([this, m](int r) {
436 cct->_conf.set_mon_vals(cct, m->config, config_cb);
437 if (config_notify_cb) {
438 config_notify_cb();
439 }
440 m->put();
441 }));
442 got_config = true;
443 map_cond.notify_all();
444 }
445
446 // ----------------------
447
448 int MonClient::init()
449 {
450 ldout(cct, 10) << __func__ << dendl;
451
452 entity_name = cct->_conf->name;
453
454 auth_registry.refresh_config();
455
456 std::lock_guard l(monc_lock);
457 keyring.reset(new KeyRing);
458 if (auth_registry.is_supported_method(messenger->get_mytype(),
459 CEPH_AUTH_CEPHX)) {
460 // this should succeed, because auth_registry just checked!
461 int r = keyring->from_ceph_context(cct);
462 if (r != 0) {
463 // but be somewhat graceful in case there was a race condition
464 lderr(cct) << "keyring not found" << dendl;
465 return r;
466 }
467 }
468 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
469 return -ENOENT;
470 }
471
472 rotating_secrets.reset(
473 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
474
475 initialized = true;
476
477 messenger->set_auth_client(this);
478 messenger->add_dispatcher_head(this);
479
480 timer.init();
481 finisher.start();
482 schedule_tick();
483
484 return 0;
485 }
486
487 void MonClient::shutdown()
488 {
489 ldout(cct, 10) << __func__ << dendl;
490 monc_lock.lock();
491 stopping = true;
492 while (!version_requests.empty()) {
493 version_requests.begin()->second->context->complete(-ECANCELED);
494 ldout(cct, 20) << __func__ << " canceling and discarding version request "
495 << version_requests.begin()->second << dendl;
496 delete version_requests.begin()->second;
497 version_requests.erase(version_requests.begin());
498 }
499 while (!mon_commands.empty()) {
500 auto tid = mon_commands.begin()->first;
501 _cancel_mon_command(tid);
502 }
503 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
504 << " pending message(s)" << dendl;
505 waiting_for_session.clear();
506
507 active_con.reset();
508 pending_cons.clear();
509 auth.reset();
510
511 monc_lock.unlock();
512
513 if (initialized) {
514 finisher.wait_for_empty();
515 finisher.stop();
516 initialized = false;
517 }
518 monc_lock.lock();
519 timer.shutdown();
520 stopping = false;
521 monc_lock.unlock();
522 }
523
524 int MonClient::authenticate(double timeout)
525 {
526 std::unique_lock lock{monc_lock};
527
528 if (active_con) {
529 ldout(cct, 5) << "already authenticated" << dendl;
530 return 0;
531 }
532 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
533 sub.want("config", 0, 0);
534 if (!_opened())
535 _reopen_session();
536
537 auto until = ceph::real_clock::now();
538 until += ceph::make_timespan(timeout);
539 if (timeout > 0.0)
540 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
541 authenticate_err = 1; // == in progress
542 while (!active_con && authenticate_err >= 0) {
543 if (timeout > 0.0) {
544 auto r = auth_cond.wait_until(lock, until);
545 if (r == cv_status::timeout && !active_con) {
546 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
547 authenticate_err = -ETIMEDOUT;
548 }
549 } else {
550 auth_cond.wait(lock);
551 }
552 }
553
554 if (active_con) {
555 ldout(cct, 5) << __func__ << " success, global_id "
556 << active_con->get_global_id() << dendl;
557 // active_con should not have been set if there was an error
558 ceph_assert(authenticate_err >= 0);
559 authenticated = true;
560 }
561
562 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
563 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
564 }
565
566 return authenticate_err;
567 }
568
569 void MonClient::handle_auth(MAuthReply *m)
570 {
571 ceph_assert(ceph_mutex_is_locked(monc_lock));
572
573 if (m->get_connection()->is_anon()) {
574 // anon connection, used for mon tell commands
575 for (auto& p : mon_commands) {
576 if (p.second->target_con == m->get_connection()) {
577 auto& mc = p.second->target_session;
578 int ret = mc->handle_auth(m, entity_name,
579 CEPH_ENTITY_TYPE_MON,
580 rotating_secrets.get());
581 (void)ret; // we don't care
582 break;
583 }
584 }
585 m->put();
586 return;
587 }
588
589 if (!_hunting()) {
590 std::swap(active_con->get_auth(), auth);
591 int ret = active_con->authenticate(m);
592 m->put();
593 std::swap(auth, active_con->get_auth());
594 if (global_id != active_con->get_global_id()) {
595 lderr(cct) << __func__ << " peer assigned me a different global_id: "
596 << active_con->get_global_id() << dendl;
597 }
598 if (ret != -EAGAIN) {
599 _finish_auth(ret);
600 }
601 return;
602 }
603
604 // hunting
605 auto found = _find_pending_con(m->get_connection());
606 ceph_assert(found != pending_cons.end());
607 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
608 rotating_secrets.get());
609 m->put();
610 if (auth_err == -EAGAIN) {
611 return;
612 }
613 if (auth_err) {
614 pending_cons.erase(found);
615 if (!pending_cons.empty()) {
616 // keep trying with pending connections
617 return;
618 }
619 // the last try just failed, give up.
620 } else {
621 auto& mc = found->second;
622 ceph_assert(mc.have_session());
623 active_con.reset(new MonConnection(std::move(mc)));
624 pending_cons.clear();
625 }
626
627 _finish_hunting(auth_err);
628 _finish_auth(auth_err);
629 }
630
631 void MonClient::_finish_auth(int auth_err)
632 {
633 ldout(cct,10) << __func__ << " " << auth_err << dendl;
634 authenticate_err = auth_err;
635 // _resend_mon_commands() could _reopen_session() if the connected mon is not
636 // the one the MonCommand is targeting.
637 if (!auth_err && active_con) {
638 ceph_assert(auth);
639 _check_auth_tickets();
640 }
641 auth_cond.notify_all();
642
643 if (!auth_err) {
644 Context *cb = nullptr;
645 if (session_established_context) {
646 cb = session_established_context.release();
647 }
648 if (cb) {
649 monc_lock.unlock();
650 cb->complete(0);
651 monc_lock.lock();
652 }
653 }
654 }
655
656 // ---------
657
658 void MonClient::send_mon_message(MessageRef m)
659 {
660 std::lock_guard l{monc_lock};
661 _send_mon_message(std::move(m));
662 }
663
664 void MonClient::_send_mon_message(MessageRef m)
665 {
666 ceph_assert(ceph_mutex_is_locked(monc_lock));
667 if (active_con) {
668 auto cur_con = active_con->get_con();
669 ldout(cct, 10) << "_send_mon_message to mon."
670 << monmap.get_name(cur_con->get_peer_addr())
671 << " at " << cur_con->get_peer_addr() << dendl;
672 cur_con->send_message2(std::move(m));
673 } else {
674 waiting_for_session.push_back(std::move(m));
675 }
676 }
677
678 void MonClient::_reopen_session(int rank)
679 {
680 ceph_assert(ceph_mutex_is_locked(monc_lock));
681 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
682
683 active_con.reset();
684 pending_cons.clear();
685
686 _start_hunting();
687
688 if (rank >= 0) {
689 _add_conn(rank, global_id);
690 } else {
691 _add_conns(global_id);
692 }
693
694 // throw out old queued messages
695 waiting_for_session.clear();
696
697 // throw out version check requests
698 while (!version_requests.empty()) {
699 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
700 delete version_requests.begin()->second;
701 version_requests.erase(version_requests.begin());
702 }
703
704 for (auto& c : pending_cons) {
705 c.second.start(monmap.get_epoch(), entity_name);
706 }
707
708 if (sub.reload()) {
709 _renew_subs();
710 }
711 }
712
713 MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
714 {
715 auto peer = monmap.get_addrs(rank);
716 auto conn = messenger->connect_to_mon(peer);
717 MonConnection mc(cct, conn, global_id, &auth_registry);
718 auto inserted = pending_cons.insert(std::make_pair(peer, std::move(mc)));
719 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
720 << " con " << conn
721 << " addr " << peer
722 << dendl;
723 return inserted.first->second;
724 }
725
726 void MonClient::_add_conns(uint64_t global_id)
727 {
728 // collect the next batch of candidates who are listed right next to the ones
729 // already tried
730 auto get_next_batch = [this]() -> std::vector<unsigned> {
731 std::multimap<uint16_t, unsigned> ranks_by_priority;
732 boost::copy(
733 monmap.mon_info | boost::adaptors::filtered(
734 [this](auto& info) {
735 auto rank = monmap.get_rank(info.first);
736 return tried.count(rank) == 0;
737 }) | boost::adaptors::transformed(
738 [this](auto& info) {
739 auto rank = monmap.get_rank(info.first);
740 return std::make_pair(info.second.priority, rank);
741 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
742 if (ranks_by_priority.empty()) {
743 return {};
744 }
745 // only choose the monitors with lowest priority
746 auto cands = boost::make_iterator_range(
747 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
748 std::vector<unsigned> ranks;
749 boost::range::copy(cands | boost::adaptors::map_values,
750 std::back_inserter(ranks));
751 return ranks;
752 };
753 auto ranks = get_next_batch();
754 if (ranks.empty()) {
755 tried.clear(); // start over
756 ranks = get_next_batch();
757 }
758 ceph_assert(!ranks.empty());
759 if (ranks.size() > 1) {
760 std::vector<uint16_t> weights;
761 for (auto i : ranks) {
762 auto rank_name = monmap.get_name(i);
763 weights.push_back(monmap.get_weight(rank_name));
764 }
765 std::random_device rd;
766 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
767 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
768 } else {
769 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
770 std::mt19937{rd()});
771 }
772 }
773 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
774 unsigned n = cct->_conf->mon_client_hunt_parallel;
775 if (n == 0 || n > ranks.size()) {
776 n = ranks.size();
777 }
778 for (unsigned i = 0; i < n; i++) {
779 _add_conn(ranks[i], global_id);
780 tried.insert(ranks[i]);
781 }
782 }
783
784 bool MonClient::ms_handle_reset(Connection *con)
785 {
786 std::lock_guard lock(monc_lock);
787
788 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
789 return false;
790
791 if (con->is_anon()) {
792 auto p = mon_commands.begin();
793 while (p != mon_commands.end()) {
794 auto cmd = p->second;
795 ++p;
796 if (cmd->target_con == con) {
797 _send_command(cmd); // may retry or fail
798 break;
799 }
800 }
801 return true;
802 }
803
804 if (_hunting()) {
805 if (pending_cons.count(con->get_peer_addrs())) {
806 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
807 << dendl;
808 } else {
809 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
810 << dendl;
811 }
812 return true;
813 } else {
814 if (active_con && con == active_con->get_con()) {
815 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
816 << dendl;
817 _reopen_session();
818 return false;
819 } else {
820 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
821 << dendl;
822 return true;
823 }
824 }
825 }
826
827 bool MonClient::_opened() const
828 {
829 ceph_assert(ceph_mutex_is_locked(monc_lock));
830 return active_con || _hunting();
831 }
832
833 bool MonClient::_hunting() const
834 {
835 return !pending_cons.empty();
836 }
837
838 void MonClient::_start_hunting()
839 {
840 ceph_assert(!_hunting());
841 // adjust timeouts if necessary
842 if (!had_a_connection)
843 return;
844 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
845 if (reopen_interval_multiplier >
846 cct->_conf->mon_client_hunt_interval_max_multiple) {
847 reopen_interval_multiplier =
848 cct->_conf->mon_client_hunt_interval_max_multiple;
849 }
850 }
851
852 void MonClient::_finish_hunting(int auth_err)
853 {
854 ldout(cct,10) << __func__ << " " << auth_err << dendl;
855 ceph_assert(ceph_mutex_is_locked(monc_lock));
856 // the pending conns have been cleaned.
857 ceph_assert(!_hunting());
858 if (active_con) {
859 auto con = active_con->get_con();
860 ldout(cct, 1) << "found mon."
861 << monmap.get_name(con->get_peer_addr())
862 << dendl;
863 } else {
864 ldout(cct, 1) << "no mon sessions established" << dendl;
865 }
866
867 had_a_connection = true;
868 _un_backoff();
869
870 if (!auth_err) {
871 last_rotating_renew_sent = utime_t();
872 while (!waiting_for_session.empty()) {
873 _send_mon_message(std::move(waiting_for_session.front()));
874 waiting_for_session.pop_front();
875 }
876 _resend_mon_commands();
877 send_log(true);
878 if (active_con) {
879 std::swap(auth, active_con->get_auth());
880 if (global_id && global_id != active_con->get_global_id()) {
881 lderr(cct) << __func__ << " global_id changed from " << global_id
882 << " to " << active_con->get_global_id() << dendl;
883 }
884 global_id = active_con->get_global_id();
885 }
886 }
887 }
888
889 void MonClient::tick()
890 {
891 ldout(cct, 10) << __func__ << dendl;
892
893 auto reschedule_tick = make_scope_guard([this] {
894 schedule_tick();
895 });
896
897 _check_auth_tickets();
898 _check_tell_commands();
899
900 if (_hunting()) {
901 ldout(cct, 1) << "continuing hunt" << dendl;
902 return _reopen_session();
903 } else if (active_con) {
904 // just renew as needed
905 utime_t now = ceph_clock_now();
906 auto cur_con = active_con->get_con();
907 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
908 const bool maybe_renew = sub.need_renew();
909 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
910 << dendl;
911 if (maybe_renew) {
912 _renew_subs();
913 }
914 }
915
916 cur_con->send_keepalive();
917
918 if (cct->_conf->mon_client_ping_timeout > 0 &&
919 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
920 utime_t lk = cur_con->get_last_keepalive_ack();
921 utime_t interval = now - lk;
922 if (interval > cct->_conf->mon_client_ping_timeout) {
923 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
924 << " seconds), reconnecting" << dendl;
925 return _reopen_session();
926 }
927 send_log();
928 }
929
930 _un_backoff();
931 }
932 }
933
934 void MonClient::_un_backoff()
935 {
936 // un-backoff our reconnect interval
937 reopen_interval_multiplier = std::max(
938 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
939 reopen_interval_multiplier /
940 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
941 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
942 << reopen_interval_multiplier << dendl;
943 }
944
945 void MonClient::schedule_tick()
946 {
947 auto do_tick = make_lambda_context([this](int) { tick(); });
948 if (_hunting()) {
949 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
950 reopen_interval_multiplier);
951 timer.add_event_after(hunt_interval, do_tick);
952 } else {
953 timer.add_event_after(cct->_conf->mon_client_ping_interval, do_tick);
954 }
955 }
956
957 // ---------
958
959 void MonClient::_renew_subs()
960 {
961 ceph_assert(ceph_mutex_is_locked(monc_lock));
962 if (!sub.have_new()) {
963 ldout(cct, 10) << __func__ << " - empty" << dendl;
964 return;
965 }
966
967 ldout(cct, 10) << __func__ << dendl;
968 if (!_opened())
969 _reopen_session();
970 else {
971 auto m = ceph::make_message<MMonSubscribe>();
972 m->what = sub.get_subs();
973 m->hostname = ceph_get_short_hostname();
974 _send_mon_message(std::move(m));
975 sub.renewed();
976 }
977 }
978
979 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
980 {
981 sub.acked(m->interval);
982 m->put();
983 }
984
985 int MonClient::_check_auth_tickets()
986 {
987 ceph_assert(ceph_mutex_is_locked(monc_lock));
988 if (active_con && auth) {
989 if (auth->need_tickets()) {
990 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
991 auto m = ceph::make_message<MAuth>();
992 m->protocol = auth->get_protocol();
993 auth->prepare_build_request();
994 auth->build_request(m->auth_payload);
995 _send_mon_message(m);
996 }
997
998 _check_auth_rotating();
999 }
1000 return 0;
1001 }
1002
1003 int MonClient::_check_auth_rotating()
1004 {
1005 ceph_assert(ceph_mutex_is_locked(monc_lock));
1006 if (!rotating_secrets ||
1007 !auth_principal_needs_rotating_keys(entity_name)) {
1008 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1009 return 0;
1010 }
1011
1012 if (!active_con || !auth) {
1013 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1014 return 0;
1015 }
1016
1017 utime_t now = ceph_clock_now();
1018 utime_t cutoff = now;
1019 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1020 utime_t issued_at_lower_bound = now;
1021 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1022 if (!rotating_secrets->need_new_secrets(cutoff)) {
1023 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1024 rotating_secrets->dump_rotating();
1025 return 0;
1026 }
1027
1028 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1029 if (!rotating_secrets->need_new_secrets() &&
1030 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1031 // the key has expired before it has been issued?
1032 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1033 << " (before " << issued_at_lower_bound << ")" << dendl;
1034 }
1035 if ((now > last_rotating_renew_sent) &&
1036 double(now - last_rotating_renew_sent) < 1) {
1037 ldout(cct, 10) << __func__ << " called too often (last: "
1038 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1039 return 0;
1040 }
1041 auto m = ceph::make_message<MAuth>();
1042 m->protocol = auth->get_protocol();
1043 if (auth->build_rotating_request(m->auth_payload)) {
1044 last_rotating_renew_sent = now;
1045 _send_mon_message(std::move(m));
1046 }
1047 return 0;
1048 }
1049
1050 int MonClient::wait_auth_rotating(double timeout)
1051 {
1052 std::unique_lock l(monc_lock);
1053
1054 // Must be initialized
1055 ceph_assert(auth != nullptr);
1056
1057 if (auth->get_protocol() == CEPH_AUTH_NONE)
1058 return 0;
1059
1060 if (!rotating_secrets)
1061 return 0;
1062
1063 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1064 utime_t now = ceph_clock_now();
1065 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
1066 return (!auth_principal_needs_rotating_keys(entity_name) ||
1067 !rotating_secrets->need_new_secrets(now));
1068 })) {
1069 ldout(cct, 10) << __func__ << " done" << dendl;
1070 return 0;
1071 } else {
1072 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1073 return -ETIMEDOUT;
1074 }
1075 }
1076
1077 // ---------
1078
1079 void MonClient::_send_command(MonCommand *r)
1080 {
1081 if (r->is_tell()) {
1082 ++r->send_attempts;
1083 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1084 _finish_command(r, -ENXIO, "mon unavailable");
1085 return;
1086 }
1087
1088 // tell-style command
1089 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1090 if (r->target_con) {
1091 r->target_con->mark_down();
1092 }
1093 if (r->target_rank >= 0) {
1094 if (r->target_rank >= (int)monmap.size()) {
1095 ldout(cct, 10) << " target " << r->target_rank
1096 << " >= max mon " << monmap.size() << dendl;
1097 _finish_command(r, -ENOENT, "mon rank dne");
1098 return;
1099 }
1100 r->target_con = messenger->connect_to_mon(
1101 monmap.get_addrs(r->target_rank), true /* anon */);
1102 } else {
1103 if (!monmap.contains(r->target_name)) {
1104 ldout(cct, 10) << " target " << r->target_name
1105 << " not present in monmap" << dendl;
1106 _finish_command(r, -ENOENT, "mon dne");
1107 return;
1108 }
1109 r->target_con = messenger->connect_to_mon(
1110 monmap.get_addrs(r->target_name), true /* anon */);
1111 }
1112
1113 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1114 &auth_registry));
1115 r->target_session->start(monmap.get_epoch(), entity_name);
1116 r->last_send_attempt = ceph_clock_now();
1117
1118 MCommand *m = new MCommand(monmap.fsid);
1119 m->set_tid(r->tid);
1120 m->cmd = r->cmd;
1121 m->set_data(r->inbl);
1122 r->target_session->queue_command(m);
1123 return;
1124 }
1125
1126 // ugly legacy handling of pre-octopus mons
1127 entity_addr_t peer;
1128 if (active_con) {
1129 peer = active_con->get_con()->get_peer_addr();
1130 }
1131
1132 if (r->target_rank >= 0 &&
1133 r->target_rank != monmap.get_rank(peer)) {
1134 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1135 << " wants rank " << r->target_rank
1136 << ", reopening session"
1137 << dendl;
1138 if (r->target_rank >= (int)monmap.size()) {
1139 ldout(cct, 10) << " target " << r->target_rank
1140 << " >= max mon " << monmap.size() << dendl;
1141 _finish_command(r, -ENOENT, "mon rank dne");
1142 return;
1143 }
1144 _reopen_session(r->target_rank);
1145 return;
1146 }
1147 if (r->target_name.length() &&
1148 r->target_name != monmap.get_name(peer)) {
1149 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1150 << " wants mon " << r->target_name
1151 << ", reopening session"
1152 << dendl;
1153 if (!monmap.contains(r->target_name)) {
1154 ldout(cct, 10) << " target " << r->target_name
1155 << " not present in monmap" << dendl;
1156 _finish_command(r, -ENOENT, "mon dne");
1157 return;
1158 }
1159 _reopen_session(monmap.get_rank(r->target_name));
1160 return;
1161 }
1162 // fall-thru to send 'normal' CLI command
1163 }
1164
1165 // normal CLI command
1166 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1167 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1168 m->set_tid(r->tid);
1169 m->cmd = r->cmd;
1170 m->set_data(r->inbl);
1171 _send_mon_message(std::move(m));
1172 return;
1173 }
1174
1175 void MonClient::_check_tell_commands()
1176 {
1177 // resend any requests
1178 auto now = ceph_clock_now();
1179 auto p = mon_commands.begin();
1180 while (p != mon_commands.end()) {
1181 auto cmd = p->second;
1182 ++p;
1183 if (cmd->is_tell() &&
1184 cmd->last_send_attempt != utime_t() &&
1185 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1186 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1187 _send_command(cmd); // might remove cmd from mon_commands
1188 }
1189 }
1190 }
1191
1192 void MonClient::_resend_mon_commands()
1193 {
1194 // resend any requests
1195 auto p = mon_commands.begin();
1196 while (p != mon_commands.end()) {
1197 auto cmd = p->second;
1198 ++p;
1199 if (!cmd->is_tell()) {
1200 _send_command(cmd); // might remove cmd from mon_commands
1201 }
1202 }
1203 }
1204
1205 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1206 {
1207 MonCommand *r = NULL;
1208 uint64_t tid = ack->get_tid();
1209
1210 if (tid == 0 && !mon_commands.empty()) {
1211 r = mon_commands.begin()->second;
1212 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1213 } else {
1214 auto p = mon_commands.find(tid);
1215 if (p == mon_commands.end()) {
1216 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1217 ack->put();
1218 return;
1219 }
1220 r = p->second;
1221 }
1222
1223 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1224 if (r->poutbl)
1225 r->poutbl->claim(ack->get_data());
1226 _finish_command(r, ack->r, ack->rs);
1227 ack->put();
1228 }
1229
1230 void MonClient::handle_command_reply(MCommandReply *reply)
1231 {
1232 MonCommand *r = NULL;
1233 uint64_t tid = reply->get_tid();
1234
1235 if (tid == 0 && !mon_commands.empty()) {
1236 r = mon_commands.begin()->second;
1237 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1238 << dendl;
1239 } else {
1240 auto p = mon_commands.find(tid);
1241 if (p == mon_commands.end()) {
1242 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1243 << dendl;
1244 reply->put();
1245 return;
1246 }
1247 r = p->second;
1248 }
1249
1250 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1251 if (r->poutbl)
1252 r->poutbl->claim(reply->get_data());
1253 _finish_command(r, reply->r, reply->rs);
1254 reply->put();
1255 }
1256
1257 int MonClient::_cancel_mon_command(uint64_t tid)
1258 {
1259 ceph_assert(ceph_mutex_is_locked(monc_lock));
1260
1261 auto it = mon_commands.find(tid);
1262 if (it == mon_commands.end()) {
1263 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1264 return -ENOENT;
1265 }
1266
1267 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1268
1269 MonCommand *cmd = it->second;
1270 _finish_command(cmd, -ETIMEDOUT, "");
1271 return 0;
1272 }
1273
1274 void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1275 {
1276 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1277 if (r->prval)
1278 *(r->prval) = ret;
1279 if (r->prs)
1280 *(r->prs) = rs;
1281 if (r->onfinish)
1282 finisher.queue(r->onfinish, ret);
1283 if (r->target_con) {
1284 r->target_con->mark_down();
1285 }
1286 mon_commands.erase(r->tid);
1287 delete r;
1288 }
1289
1290 void MonClient::start_mon_command(const std::vector<string>& cmd,
1291 const ceph::buffer::list& inbl,
1292 ceph::buffer::list *outbl, string *outs,
1293 Context *onfinish)
1294 {
1295 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
1296 std::lock_guard l(monc_lock);
1297 if (!initialized || stopping) {
1298 if (onfinish) {
1299 onfinish->complete(-ECANCELED);
1300 }
1301 return;
1302 }
1303 MonCommand *r = new MonCommand(++last_mon_command_tid);
1304 r->cmd = cmd;
1305 r->inbl = inbl;
1306 r->poutbl = outbl;
1307 r->prs = outs;
1308 r->onfinish = onfinish;
1309 if (cct->_conf->rados_mon_op_timeout > 0) {
1310 class C_CancelMonCommand : public Context
1311 {
1312 uint64_t tid;
1313 MonClient *monc;
1314 public:
1315 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1316 void finish(int r) override {
1317 monc->_cancel_mon_command(tid);
1318 }
1319 };
1320 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1321 timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1322 }
1323 mon_commands[r->tid] = r;
1324 _send_command(r);
1325 }
1326
1327 void MonClient::start_mon_command(const string &mon_name,
1328 const std::vector<string>& cmd,
1329 const ceph::buffer::list& inbl,
1330 ceph::buffer::list *outbl, string *outs,
1331 Context *onfinish)
1332 {
1333 ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
1334 std::lock_guard l(monc_lock);
1335 if (!initialized || stopping) {
1336 if (onfinish) {
1337 onfinish->complete(-ECANCELED);
1338 }
1339 return;
1340 }
1341 MonCommand *r = new MonCommand(++last_mon_command_tid);
1342
1343 // detect/tolerate mon *rank* passed as a string
1344 string err;
1345 int rank = strict_strtoll(mon_name.c_str(), 10, &err);
1346 if (err.size() == 0 && rank >= 0) {
1347 ldout(cct,10) << __func__ << " interpreting name '" << mon_name
1348 << "' as rank " << rank << dendl;
1349 r->target_rank = rank;
1350 } else {
1351 r->target_name = mon_name;
1352 }
1353 r->cmd = cmd;
1354 r->inbl = inbl;
1355 r->poutbl = outbl;
1356 r->prs = outs;
1357 r->onfinish = onfinish;
1358 mon_commands[r->tid] = r;
1359 _send_command(r);
1360 }
1361
1362 void MonClient::start_mon_command(int rank,
1363 const std::vector<string>& cmd,
1364 const ceph::buffer::list& inbl,
1365 ceph::buffer::list *outbl, string *outs,
1366 Context *onfinish)
1367 {
1368 ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
1369 std::lock_guard l(monc_lock);
1370 if (!initialized || stopping) {
1371 if (onfinish) {
1372 onfinish->complete(-ECANCELED);
1373 }
1374 return;
1375 }
1376 MonCommand *r = new MonCommand(++last_mon_command_tid);
1377 r->target_rank = rank;
1378 r->cmd = cmd;
1379 r->inbl = inbl;
1380 r->poutbl = outbl;
1381 r->prs = outs;
1382 r->onfinish = onfinish;
1383 mon_commands[r->tid] = r;
1384 _send_command(r);
1385 }
1386
1387 // ---------
1388
1389 void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1390 {
1391 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1392 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1393 std::lock_guard l(monc_lock);
1394 auto m = ceph::make_message<MMonGetVersion>();
1395 m->what = map;
1396 m->handle = ++version_req_id;
1397 version_requests[m->handle] = req;
1398 _send_mon_message(std::move(m));
1399 }
1400
1401 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1402 {
1403 ceph_assert(ceph_mutex_is_locked(monc_lock));
1404 auto iter = version_requests.find(m->handle);
1405 if (iter == version_requests.end()) {
1406 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1407 << " not found" << dendl;
1408 } else {
1409 version_req_d *req = iter->second;
1410 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1411 version_requests.erase(iter);
1412 if (req->newest)
1413 *req->newest = m->version;
1414 if (req->oldest)
1415 *req->oldest = m->oldest_version;
1416 finisher.queue(req->context, 0);
1417 delete req;
1418 }
1419 m->put();
1420 }
1421
1422 int MonClient::get_auth_request(
1423 Connection *con,
1424 AuthConnectionMeta *auth_meta,
1425 uint32_t *auth_method,
1426 std::vector<uint32_t> *preferred_modes,
1427 ceph::buffer::list *bl)
1428 {
1429 std::lock_guard l(monc_lock);
1430 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1431 << dendl;
1432
1433 // connection to mon?
1434 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1435 ceph_assert(!auth_meta->authorizer);
1436 if (con->is_anon()) {
1437 for (auto& i : mon_commands) {
1438 if (i.second->target_con == con) {
1439 return i.second->target_session->get_auth_request(
1440 auth_method, preferred_modes, bl,
1441 entity_name, want_keys, rotating_secrets.get());
1442 }
1443 }
1444 }
1445 for (auto& i : pending_cons) {
1446 if (i.second.is_con(con)) {
1447 return i.second.get_auth_request(
1448 auth_method, preferred_modes, bl,
1449 entity_name, want_keys, rotating_secrets.get());
1450 }
1451 }
1452 return -ENOENT;
1453 }
1454
1455 // generate authorizer
1456 if (!auth) {
1457 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1458 return -EACCES;
1459 }
1460 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1461 if (!auth_meta->authorizer) {
1462 lderr(cct) << __func__ << " failed to build_authorizer for type "
1463 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1464 return -EACCES;
1465 }
1466 auth_meta->auth_method = auth_meta->authorizer->protocol;
1467 auth_registry.get_supported_modes(con->get_peer_type(),
1468 auth_meta->auth_method,
1469 preferred_modes);
1470 *bl = auth_meta->authorizer->bl;
1471 return 0;
1472 }
1473
1474 int MonClient::handle_auth_reply_more(
1475 Connection *con,
1476 AuthConnectionMeta *auth_meta,
1477 const ceph::buffer::list& bl,
1478 ceph::buffer::list *reply)
1479 {
1480 std::lock_guard l(monc_lock);
1481
1482 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1483 if (con->is_anon()) {
1484 for (auto& i : mon_commands) {
1485 if (i.second->target_con == con) {
1486 return i.second->target_session->handle_auth_reply_more(
1487 auth_meta, bl, reply);
1488 }
1489 }
1490 }
1491 for (auto& i : pending_cons) {
1492 if (i.second.is_con(con)) {
1493 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1494 }
1495 }
1496 return -ENOENT;
1497 }
1498
1499 // authorizer challenges
1500 if (!auth || !auth_meta->authorizer) {
1501 lderr(cct) << __func__ << " no authorizer?" << dendl;
1502 return -1;
1503 }
1504 auth_meta->authorizer->add_challenge(cct, bl);
1505 *reply = auth_meta->authorizer->bl;
1506 return 0;
1507 }
1508
1509 int MonClient::handle_auth_done(
1510 Connection *con,
1511 AuthConnectionMeta *auth_meta,
1512 uint64_t global_id,
1513 uint32_t con_mode,
1514 const ceph::buffer::list& bl,
1515 CryptoKey *session_key,
1516 std::string *connection_secret)
1517 {
1518 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1519 std::lock_guard l(monc_lock);
1520 if (con->is_anon()) {
1521 for (auto& i : mon_commands) {
1522 if (i.second->target_con == con) {
1523 return i.second->target_session->handle_auth_done(
1524 auth_meta, global_id, bl,
1525 session_key, connection_secret);
1526 }
1527 }
1528 }
1529 for (auto& i : pending_cons) {
1530 if (i.second.is_con(con)) {
1531 int r = i.second.handle_auth_done(
1532 auth_meta, global_id, bl,
1533 session_key, connection_secret);
1534 if (r) {
1535 pending_cons.erase(i.first);
1536 if (!pending_cons.empty()) {
1537 return r;
1538 }
1539 } else {
1540 active_con.reset(new MonConnection(std::move(i.second)));
1541 pending_cons.clear();
1542 ceph_assert(active_con->have_session());
1543 }
1544
1545 _finish_hunting(r);
1546 if (r || monmap.get_epoch() > 0) {
1547 _finish_auth(r);
1548 }
1549 return r;
1550 }
1551 }
1552 return -ENOENT;
1553 } else {
1554 // verify authorizer reply
1555 auto p = bl.begin();
1556 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1557 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1558 << dendl;
1559 return -EACCES;
1560 }
1561 auth_meta->session_key = auth_meta->authorizer->session_key;
1562 return 0;
1563 }
1564 }
1565
1566 int MonClient::handle_auth_bad_method(
1567 Connection *con,
1568 AuthConnectionMeta *auth_meta,
1569 uint32_t old_auth_method,
1570 int result,
1571 const std::vector<uint32_t>& allowed_methods,
1572 const std::vector<uint32_t>& allowed_modes)
1573 {
1574 auth_meta->allowed_methods = allowed_methods;
1575
1576 std::lock_guard l(monc_lock);
1577 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1578 if (con->is_anon()) {
1579 for (auto& i : mon_commands) {
1580 if (i.second->target_con == con) {
1581 int r = i.second->target_session->handle_auth_bad_method(
1582 old_auth_method,
1583 result,
1584 allowed_methods,
1585 allowed_modes);
1586 if (r < 0) {
1587 _finish_command(i.second, r, "auth failed");
1588 }
1589 return r;
1590 }
1591 }
1592 }
1593 for (auto& i : pending_cons) {
1594 if (i.second.is_con(con)) {
1595 int r = i.second.handle_auth_bad_method(old_auth_method,
1596 result,
1597 allowed_methods,
1598 allowed_modes);
1599 if (r == 0) {
1600 return r; // try another method on this con
1601 }
1602 pending_cons.erase(i.first);
1603 if (!pending_cons.empty()) {
1604 return r; // fail this con, maybe another con will succeed
1605 }
1606 // fail hunt
1607 _finish_hunting(r);
1608 _finish_auth(r);
1609 return r;
1610 }
1611 }
1612 return -ENOENT;
1613 } else {
1614 // huh...
1615 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1616 << " result " << cpp_strerror(result)
1617 << " and auth is " << (auth ? auth->get_protocol() : 0)
1618 << dendl;
1619 return -EACCES;
1620 }
1621 }
1622
1623 int MonClient::handle_auth_request(
1624 Connection *con,
1625 AuthConnectionMeta *auth_meta,
1626 bool more,
1627 uint32_t auth_method,
1628 const ceph::buffer::list& payload,
1629 ceph::buffer::list *reply)
1630 {
1631 if (payload.length() == 0) {
1632 // for some channels prior to nautilus (osd heartbeat), we
1633 // tolerate the lack of an authorizer.
1634 if (!con->get_messenger()->require_authorizer) {
1635 handle_authentication_dispatcher->ms_handle_authentication(con);
1636 return 1;
1637 }
1638 return -EACCES;
1639 }
1640 auth_meta->auth_mode = payload[0];
1641 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1642 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1643 return -EACCES;
1644 }
1645 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1646 auth_method);
1647 if (!ah) {
1648 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1649 << auth_method << dendl;
1650 return -EOPNOTSUPP;
1651 }
1652
1653 auto ac = &auth_meta->authorizer_challenge;
1654 if (!HAVE_FEATURE(con->get_features(), CEPHX_V2)) {
1655 if (cct->_conf->cephx_service_require_version >= 2) {
1656 ldout(cct,10) << __func__ << " client missing CEPHX_V2 ("
1657 << "cephx_service_requre_version = "
1658 << cct->_conf->cephx_service_require_version << ")" << dendl;
1659 return -EACCES;
1660 }
1661 ac = nullptr;
1662 }
1663
1664 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1665 bool isvalid = ah->verify_authorizer(
1666 cct,
1667 *rotating_secrets,
1668 payload,
1669 auth_meta->get_connection_secret_length(),
1670 reply,
1671 &con->peer_name,
1672 &con->peer_global_id,
1673 &con->peer_caps_info,
1674 &auth_meta->session_key,
1675 &auth_meta->connection_secret,
1676 ac);
1677 if (isvalid) {
1678 handle_authentication_dispatcher->ms_handle_authentication(con);
1679 return 1;
1680 }
1681 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1682 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1683 return 0;
1684 }
1685 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1686 // discard old challenge
1687 auth_meta->authorizer_challenge.reset();
1688 return -EACCES;
1689 }
1690
1691 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1692 std::lock_guard l(monc_lock);
1693 if (auth) {
1694 return auth->build_authorizer(service_id);
1695 } else {
1696 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1697 << ", but no auth is available now" << dendl;
1698 return nullptr;
1699 }
1700 }
1701
1702 #define dout_subsys ceph_subsys_monc
1703 #undef dout_prefix
1704 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1705
1706 MonConnection::MonConnection(
1707 CephContext *cct, ConnectionRef con, uint64_t global_id,
1708 AuthRegistry *ar)
1709 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1710 {}
1711
1712 MonConnection::~MonConnection()
1713 {
1714 if (con) {
1715 con->mark_down();
1716 con.reset();
1717 }
1718 }
1719
1720 bool MonConnection::have_session() const
1721 {
1722 return state == State::HAVE_SESSION;
1723 }
1724
1725 void MonConnection::start(epoch_t epoch,
1726 const EntityName& entity_name)
1727 {
1728 using ceph::encode;
1729 auth_start = ceph_clock_now();
1730
1731 if (con->get_peer_addr().is_msgr2()) {
1732 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1733 state = State::AUTHENTICATING;
1734 con->send_message(new MMonGetMap());
1735 return;
1736 }
1737
1738 // restart authentication handshake
1739 state = State::NEGOTIATING;
1740
1741 // send an initial keepalive to ensure our timestamp is valid by the
1742 // time we are in an OPENED state (by sequencing this before
1743 // authentication).
1744 con->send_keepalive();
1745
1746 auto m = new MAuth;
1747 m->protocol = CEPH_AUTH_UNKNOWN;
1748 m->monmap_epoch = epoch;
1749 __u8 struct_v = 1;
1750 encode(struct_v, m->auth_payload);
1751 std::vector<uint32_t> auth_supported;
1752 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1753 encode(auth_supported, m->auth_payload);
1754 encode(entity_name, m->auth_payload);
1755 encode(global_id, m->auth_payload);
1756 con->send_message(m);
1757 }
1758
1759 int MonConnection::get_auth_request(
1760 uint32_t *method,
1761 std::vector<uint32_t> *preferred_modes,
1762 ceph::buffer::list *bl,
1763 const EntityName& entity_name,
1764 uint32_t want_keys,
1765 RotatingKeyRing* keyring)
1766 {
1767 using ceph::encode;
1768 // choose method
(1) Event cond_true: |
Condition "this->auth_method < 0", taking true branch. |
1769 if (auth_method < 0) {
1770 std::vector<uint32_t> as;
1771 auth_registry->get_supported_methods(con->get_peer_type(), &as);
(2) Event cond_false: |
Condition "as.empty()", taking false branch. |
1772 if (as.empty()) {
1773 return -EACCES;
(3) Event if_end: |
End of if statement. |
1774 }
1775 auth_method = as.front();
1776 }
1777 *method = auth_method;
1778 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1779 preferred_modes);
(4) Event cond_true: |
Condition "should_gather", taking true branch. |
(5) Event cond_true: |
Condition "this->have_session()", taking true branch. |
1780 ldout(cct,10) << __func__ << " method " << *method
1781 << " preferred_modes " << *preferred_modes << dendl;
(6) Event cond_false: |
Condition "preferred_modes->empty()", taking false branch. |
1782 if (preferred_modes->empty()) {
1783 return -EACCES;
(7) Event if_end: |
End of if statement. |
1784 }
1785
(8) Event cond_true: |
Condition "this->auth.operator bool()", taking true branch. |
1786 if (auth) {
1787 auth.reset();
1788 }
1789 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
(9) Event cond_true: |
Condition "r == 0", taking true branch. |
1790 ceph_assert(r == 0);
1791
1792 // initial requset includes some boilerplate...
(10) Event overrun-buffer-val: |
Overrunning buffer pointed to by "char const('\n')" of 1 bytes by passing it to a function which accesses it at byte offset 7. [details] |
1793 encode((char)AUTH_MODE_MON, *bl);
1794 encode(entity_name, *bl);
1795 encode(global_id, *bl);
1796
1797 // and (maybe) some method-specific initial payload
1798 auth->build_initial_request(bl);
1799
1800 return 0;
1801 }
1802
1803 int MonConnection::handle_auth_reply_more(
1804 AuthConnectionMeta *auth_meta,
1805 const ceph::buffer::list& bl,
1806 ceph::buffer::list *reply)
1807 {
1808 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1809 ldout(cct, 30) << __func__ << " got\n";
1810 bl.hexdump(*_dout);
1811 *_dout << dendl;
1812
1813 auto p = bl.cbegin();
1814 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1815 int r = auth->handle_response(0, p, &auth_meta->session_key,
1816 &auth_meta->connection_secret);
1817 if (r == -EAGAIN) {
1818 auth->prepare_build_request();
1819 auth->build_request(*reply);
1820 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1821 << " bytes" << dendl;
1822 r = 0;
1823 } else if (r < 0) {
1824 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1825 } else {
1826 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1827 // FIXME
1828 ceph_abort(cct, "write me");
1829 }
1830 return r;
1831 }
1832
1833 int MonConnection::handle_auth_done(
1834 AuthConnectionMeta *auth_meta,
1835 uint64_t new_global_id,
1836 const ceph::buffer::list& bl,
1837 CryptoKey *session_key,
1838 std::string *connection_secret)
1839 {
1840 ldout(cct,10) << __func__ << " global_id " << new_global_id
1841 << " payload " << bl.length()
1842 << dendl;
1843 global_id = new_global_id;
1844 auth->set_global_id(global_id);
1845 auto p = bl.begin();
1846 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1847 &auth_meta->connection_secret);
1848 if (auth_err >= 0) {
1849 state = State::HAVE_SESSION;
1850 }
1851 con->set_last_keepalive_ack(auth_start);
1852
1853 if (pending_tell_command) {
1854 con->send_message2(std::move(pending_tell_command));
1855 }
1856 return auth_err;
1857 }
1858
1859 int MonConnection::handle_auth_bad_method(
1860 uint32_t old_auth_method,
1861 int result,
1862 const std::vector<uint32_t>& allowed_methods,
1863 const std::vector<uint32_t>& allowed_modes)
1864 {
1865 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1866 << " result " << cpp_strerror(result)
1867 << " allowed_methods " << allowed_methods << dendl;
1868 std::vector<uint32_t> auth_supported;
1869 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1870 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1871 old_auth_method);
1872 assert(p != auth_supported.end());
1873 p = std::find_first_of(std::next(p), auth_supported.end(),
1874 allowed_methods.begin(), allowed_methods.end());
1875 if (p == auth_supported.end()) {
1876 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1877 << " but i only support " << auth_supported << dendl;
1878 return -EACCES;
1879 }
1880 auth_method = *p;
1881 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1882 return 0;
1883 }
1884
1885 int MonConnection::handle_auth(MAuthReply* m,
1886 const EntityName& entity_name,
1887 uint32_t want_keys,
1888 RotatingKeyRing* keyring)
1889 {
1890 if (state == State::NEGOTIATING) {
1891 int r = _negotiate(m, entity_name, want_keys, keyring);
1892 if (r) {
1893 return r;
1894 }
1895 state = State::AUTHENTICATING;
1896 }
1897 int r = authenticate(m);
1898 if (!r) {
1899 state = State::HAVE_SESSION;
1900 }
1901 return r;
1902 }
1903
1904 int MonConnection::_negotiate(MAuthReply *m,
1905 const EntityName& entity_name,
1906 uint32_t want_keys,
1907 RotatingKeyRing* keyring)
1908 {
1909 if (auth && (int)m->protocol == auth->get_protocol()) {
1910 // good, negotiation completed
1911 auth->reset();
1912 return 0;
1913 }
1914
1915 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1916 if (r == -ENOTSUP) {
1917 if (m->result == -ENOTSUP) {
1918 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1919 << dendl;
1920 }
1921 return m->result;
1922 }
1923 return r;
1924 }
1925
1926 int MonConnection::_init_auth(
1927 uint32_t method,
1928 const EntityName& entity_name,
1929 uint32_t want_keys,
1930 RotatingKeyRing* keyring,
1931 bool msgr2)
1932 {
1933 ldout(cct,10) << __func__ << " method " << method << dendl;
1934 auth.reset(
1935 AuthClientHandler::create(cct, method, keyring));
1936 if (!auth) {
1937 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1938 return -ENOTSUP;
1939 }
1940
1941 // do not request MGR key unless the mon has the SERVER_KRAKEN
1942 // feature. otherwise it will give us an auth error. note that
1943 // we have to use the FEATUREMASK because pre-jewel the kraken
1944 // feature bit was used for something else.
1945 if (!msgr2 &&
1946 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1947 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1948 ldout(cct, 1) << __func__
1949 << " not requesting MGR keys from pre-kraken monitor"
1950 << dendl;
1951 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1952 }
1953 auth->set_want_keys(want_keys);
1954 auth->init(entity_name);
1955 auth->set_global_id(global_id);
1956 return 0;
1957 }
1958
1959 int MonConnection::authenticate(MAuthReply *m)
1960 {
1961 ceph_assert(auth);
1962 if (!m->global_id) {
1963 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1964 }
1965 if (m->global_id != global_id) {
1966 // it's a new session
1967 auth->reset();
1968 global_id = m->global_id;
1969 auth->set_global_id(global_id);
1970 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1971 }
1972 auto p = m->result_bl.cbegin();
1973 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1974 if (ret == -EAGAIN) {
1975 auto ma = new MAuth;
1976 ma->protocol = auth->get_protocol();
1977 auth->prepare_build_request();
1978 auth->build_request(ma->auth_payload);
1979 con->send_message(ma);
1980 }
1981 if (ret == 0 && pending_tell_command) {
1982 con->send_message2(std::move(pending_tell_command));
1983 }
1984
1985 return ret;
1986 }
1987
1988 void MonClient::register_config_callback(md_config_t::config_callback fn) {
1989 ceph_assert(!config_cb);
1990 config_cb = fn;
1991 }
1992
1993 md_config_t::config_callback MonClient::get_config_callback() {
1994 return config_cb;
1995 }
1996