1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include "acconfig.h"
18
19 #include <iostream>
20 #include <fstream>
21
22 #include "AsyncMessenger.h"
23
24 #include "common/config.h"
25 #include "common/Timer.h"
26 #include "common/errno.h"
27
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
31
32 #define dout_subsys ceph_subsys_ms
33 #undef dout_prefix
34 #define dout_prefix _prefix(_dout, this)
35 static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
36 return *_dout << "-- " << m->get_myaddrs() << " ";
37 }
38
39 static ostream& _prefix(std::ostream *_dout, Processor *p) {
40 return *_dout << " Processor -- ";
41 }
42
43
44 /*******************
45 * Processor
46 */
47
48 class Processor::C_processor_accept : public EventCallback {
49 Processor *pro;
50
51 public:
52 explicit C_processor_accept(Processor *p): pro(p) {}
53 void do_request(uint64_t id) override {
54 pro->accept();
55 }
56 };
57
58 Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
59 : msgr(r), net(c), worker(w),
60 listen_handler(new C_processor_accept(this)) {}
61
62 int Processor::bind(const entity_addrvec_t &bind_addrs,
63 const set<int>& avoid_ports,
64 entity_addrvec_t* bound_addrs)
65 {
66 const auto& conf = msgr->cct->_conf;
67 // bind to socket(s)
68 ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
69
70 SocketOptions opts;
71 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
72 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
73
74 listen_sockets.resize(bind_addrs.v.size());
75 *bound_addrs = bind_addrs;
76
77 for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
78 auto& listen_addr = bound_addrs->v[k];
79
80 /* bind to port */
81 int r = -1;
82
83 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
84 if (i > 0) {
85 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
86 << conf->ms_bind_retry_delay << " seconds " << dendl;
87 sleep(conf->ms_bind_retry_delay);
88 }
89
90 if (listen_addr.get_port()) {
91 worker->center.submit_to(
92 worker->center.get_id(),
93 [this, k, &listen_addr, &opts, &r]() {
94 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
95 }, false);
96 if (r < 0) {
97 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
98 << ": " << cpp_strerror(r) << dendl;
99 continue;
100 }
101 } else {
102 // try a range of ports
103 for (int port = msgr->cct->_conf->ms_bind_port_min;
104 port <= msgr->cct->_conf->ms_bind_port_max;
105 port++) {
106 if (avoid_ports.count(port))
107 continue;
108
109 listen_addr.set_port(port);
110 worker->center.submit_to(
111 worker->center.get_id(),
112 [this, k, &listen_addr, &opts, &r]() {
113 r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
114 }, false);
115 if (r == 0)
116 break;
117 }
118 if (r < 0) {
119 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
120 << " on any port in range "
121 << msgr->cct->_conf->ms_bind_port_min
122 << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
123 << cpp_strerror(r) << dendl;
124 listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
125 continue;
126 }
127 ldout(msgr->cct, 10) << __func__ << " bound on random port "
128 << listen_addr << dendl;
129 }
130 if (r == 0) {
131 break;
132 }
133 }
134
135 // It seems that binding completely failed, return with that exit status
136 if (r < 0) {
137 lderr(msgr->cct) << __func__ << " was unable to bind after "
138 << conf->ms_bind_retry_count
139 << " attempts: " << cpp_strerror(r) << dendl;
140 for (unsigned j = 0; j < k; ++j) {
141 // clean up previous bind
142 listen_sockets[j].abort_accept();
143 }
144 return r;
145 }
146 }
147
148 ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
149 return 0;
150 }
151
152 void Processor::start()
153 {
154 ldout(msgr->cct, 1) << __func__ << dendl;
155
156 // start thread
157 worker->center.submit_to(worker->center.get_id(), [this]() {
158 for (auto& listen_socket : listen_sockets) {
159 if (listen_socket) {
160 if (listen_socket.fd() == -1) {
161 ldout(msgr->cct, 1) << __func__ << " Erro: processor restart after listen_socket.fd closed. " << this << dendl;
162 return;
163 }
164 worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
165 listen_handler); }
166 }
167 }, false);
168 }
169
170 void Processor::accept()
171 {
172 SocketOptions opts;
173 opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
174 opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
175 opts.priority = msgr->get_socket_priority();
176
177 for (auto& listen_socket : listen_sockets) {
178 ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
179 << dendl;
180 unsigned accept_error_num = 0;
181
182 while (true) {
183 entity_addr_t addr;
184 ConnectedSocket cli_socket;
185 Worker *w = worker;
186 if (!msgr->get_stack()->support_local_listen_table())
187 w = msgr->get_stack()->get_worker();
188 else
189 ++w->references;
190 int r = listen_socket.accept(&cli_socket, opts, &addr, w);
191 if (r == 0) {
192 ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
193 << cli_socket.fd() << dendl;
194
195 msgr->add_accept(
196 w, std::move(cli_socket),
197 msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
198 addr);
199 accept_error_num = 0;
200 continue;
201 } else {
202 if (r == -EINTR) {
203 continue;
204 } else if (r == -EAGAIN) {
205 break;
206 } else if (r == -EMFILE || r == -ENFILE) {
207 lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
208 << " errno " << r << " " << cpp_strerror(r) << dendl;
209 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
210 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
211 ceph_abort();
212 }
213 continue;
214 } else if (r == -ECONNABORTED) {
215 ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
216 << " errno " << r << " " << cpp_strerror(r) << dendl;
217 continue;
218 } else {
219 lderr(msgr->cct) << __func__ << " no incoming connection?"
220 << " errno " << r << " " << cpp_strerror(r) << dendl;
221 if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
222 lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
223 ceph_abort();
224 }
225 continue;
226 }
227 }
228 }
229 }
230 }
231
232 void Processor::stop()
233 {
234 ldout(msgr->cct,10) << __func__ << dendl;
235
236 worker->center.submit_to(worker->center.get_id(), [this]() {
237 for (auto& listen_socket : listen_sockets) {
238 if (listen_socket) {
239 worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
240 listen_socket.abort_accept();
241 }
242 }
243 }, false);
244 }
245
246
247 struct StackSingleton {
248 CephContext *cct;
249 std::shared_ptr<NetworkStack> stack;
250
251 explicit StackSingleton(CephContext *c): cct(c) {}
252 void ready(std::string &type) {
253 if (!stack)
254 stack = NetworkStack::create(cct, type);
255 }
256 ~StackSingleton() {
257 stack->stop();
258 }
259 };
260
261
262 class C_handle_reap : public EventCallback {
263 AsyncMessenger *msgr;
264
265 public:
266 explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
267 void do_request(uint64_t id) override {
268 // judge whether is a time event
269 msgr->reap_dead();
270 }
271 };
272
273 /*******************
274 * AsyncMessenger
275 */
276
277 AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
278 const std::string &type, string mname, uint64_t _nonce)
279 : SimplePolicyMessenger(cct, name),
280 dispatch_queue(cct, this, mname),
281 nonce(_nonce)
282 {
283 std::string transport_type = "posix";
284 if (type.find("rdma") != std::string::npos)
285 transport_type = "rdma";
286 else if (type.find("dpdk") != std::string::npos)
287 transport_type = "dpdk";
288
289 auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
290 "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
291 single->ready(transport_type);
292 stack = single->stack.get();
293 stack->start();
294 local_worker = stack->get_worker();
295 local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
296 local_worker, true, true);
297 init_local_connection();
298 reap_handler = new C_handle_reap(this);
299 unsigned processor_num = 1;
300 if (stack->support_local_listen_table())
301 processor_num = stack->get_num_worker();
302 for (unsigned i = 0; i < processor_num; ++i)
303 processors.push_back(new Processor(this, stack->get_worker(i), cct));
304 }
305
306 /**
307 * Destroy the AsyncMessenger. Pretty simple since all the work is done
308 * elsewhere.
309 */
310 AsyncMessenger::~AsyncMessenger()
311 {
312 delete reap_handler;
313 ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
314 local_connection->mark_down();
315 for (auto &&p : processors)
316 delete p;
317 }
318
319 void AsyncMessenger::ready()
320 {
321 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
322
323 stack->ready();
324 if (pending_bind) {
325 int err = bindv(pending_bind_addrs);
326 if (err) {
327 lderr(cct) << __func__ << " postponed bind failed" << dendl;
328 ceph_abort();
329 }
330 }
331
332 std::lock_guard l{lock};
333 for (auto &&p : processors)
334 p->start();
335 dispatch_queue.start();
336 }
337
338 int AsyncMessenger::shutdown()
339 {
340 ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
341
342 // done! clean up.
343 for (auto &&p : processors)
344 p->stop();
345 mark_down_all();
346 // break ref cycles on the loopback connection
347 local_connection->set_priv(NULL);
348 did_bind = false;
349 lock.lock();
350 stop_cond.notify_all();
351 stopped = true;
352 lock.unlock();
353 stack->drain();
354 return 0;
355 }
356
357 int AsyncMessenger::bind(const entity_addr_t &bind_addr)
358 {
359 ldout(cct,10) << __func__ << " " << bind_addr << dendl;
360 // old bind() can take entity_addr_t(). new bindv() can take a
361 // 0.0.0.0-like address but needs type and family to be set.
362 auto a = bind_addr;
363 if (a == entity_addr_t()) {
364 a.set_type(entity_addr_t::TYPE_LEGACY);
365 if (cct->_conf->ms_bind_ipv6) {
366 a.set_family(AF_INET6);
367 } else {
368 a.set_family(AF_INET);
369 }
370 }
371 return bindv(entity_addrvec_t(a));
372 }
373
374 int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
375 {
376 lock.lock();
377
378 if (!pending_bind && started) {
379 ldout(cct,10) << __func__ << " already started" << dendl;
380 lock.unlock();
381 return -1;
382 }
383
384 ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
385
386 if (!stack->is_ready()) {
387 ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
388 pending_bind_addrs = bind_addrs;
389 pending_bind = true;
390 lock.unlock();
391 return 0;
392 }
393
394 lock.unlock();
395
396 // bind to a socket
397 set<int> avoid_ports;
398 entity_addrvec_t bound_addrs;
399 unsigned i = 0;
400 for (auto &&p : processors) {
401 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
402 if (r) {
403 // Note: this is related to local tcp listen table problem.
404 // Posix(default kernel implementation) backend shares listen table
405 // in the kernel, so all threads can use the same listen table naturally
406 // and only one thread need to bind. But other backends(like dpdk) uses local
407 // listen table, we need to bind/listen tcp port for each worker. So if the
408 // first worker failed to bind, it could be think the normal error then handle
409 // it, like port is used case. But if the first worker successfully to bind
410 // but the second worker failed, it's not expected and we need to assert
411 // here
412 ceph_assert(i == 0);
413 return r;
414 }
415 ++i;
416 }
417 _finish_bind(bind_addrs, bound_addrs);
418 return 0;
419 }
420
421 int AsyncMessenger::rebind(const set<int>& avoid_ports)
422 {
423 ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
424 ceph_assert(did_bind);
425
426 for (auto &&p : processors)
427 p->stop();
428 mark_down_all();
429
430 // adjust the nonce; we want our entity_addr_t to be truly unique.
431 nonce += 1000000;
432 ldout(cct, 10) << __func__ << " new nonce " << nonce
433 << " and addr " << get_myaddrs() << dendl;
434
435 entity_addrvec_t bound_addrs;
436 entity_addrvec_t bind_addrs = get_myaddrs();
437 set<int> new_avoid(avoid_ports);
438 for (auto& a : bind_addrs.v) {
439 new_avoid.insert(a.get_port());
440 a.set_port(0);
441 }
442 ldout(cct, 10) << __func__ << " will try " << bind_addrs
443 << " and avoid ports " << new_avoid << dendl;
444 unsigned i = 0;
445 for (auto &&p : processors) {
446 int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
447 if (r) {
448 ceph_assert(i == 0);
449 return r;
450 }
451 ++i;
452 }
453 _finish_bind(bind_addrs, bound_addrs);
454 for (auto &&p : processors) {
455 p->start();
456 }
457 return 0;
458 }
459
460 int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
461 {
462 if (!cct->_conf->ms_bind_before_connect)
463 return 0;
464 std::lock_guard l{lock};
465 if (did_bind) {
466 return 0;
467 }
468 if (started) {
469 ldout(cct, 10) << __func__ << " already started" << dendl;
470 return -1;
471 }
472 ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
473
474 set_myaddrs(entity_addrvec_t(bind_addr));
475 return 0;
476 }
477
478 void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
479 const entity_addrvec_t& listen_addrs)
480 {
481 set_myaddrs(bind_addrs);
482 for (auto& a : bind_addrs.v) {
483 if (!a.is_blank_ip()) {
484 learned_addr(a);
485 }
486 }
487
488 if (get_myaddrs().front().get_port() == 0) {
489 set_myaddrs(listen_addrs);
490 }
491 entity_addrvec_t newaddrs = *my_addrs;
492 for (auto& a : newaddrs.v) {
493 a.set_nonce(nonce);
494 }
495 set_myaddrs(newaddrs);
496
497 init_local_connection();
498
499 ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
500 did_bind = true;
501 }
502
503 int AsyncMessenger::start()
504 {
505 std::scoped_lock l{lock};
506 ldout(cct,1) << __func__ << " start" << dendl;
507
508 // register at least one entity, first!
509 ceph_assert(my_name.type() >= 0);
510
511 ceph_assert(!started);
512 started = true;
513 stopped = false;
514
515 if (!did_bind) {
516 entity_addrvec_t newaddrs = *my_addrs;
517 for (auto& a : newaddrs.v) {
518 a.nonce = nonce;
519 }
520 set_myaddrs(newaddrs);
521 _init_local_connection();
522 }
523
524 return 0;
525 }
526
527 void AsyncMessenger::wait()
528 {
529 {
530 std::unique_lock locker{lock};
531 if (!started) {
532 return;
533 }
534 if (!stopped)
535 stop_cond.wait(locker);
536 }
537 dispatch_queue.shutdown();
538 if (dispatch_queue.is_started()) {
539 ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
540 dispatch_queue.wait();
541 dispatch_queue.discard_local();
542 ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
543 }
544
545 // close all connections
546 shutdown_connections(false);
547 stack->drain();
548
549 ldout(cct, 10) << __func__ << ": done." << dendl;
550 ldout(cct, 1) << __func__ << " complete." << dendl;
551 started = false;
552 }
553
554 void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
555 const entity_addr_t &listen_addr,
556 const entity_addr_t &peer_addr)
557 {
558 std::lock_guard l{lock};
559 auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
560 listen_addr.is_msgr2(), false);
561 conn->accept(std::move(cli_socket), listen_addr, peer_addr);
562 accepting_conns.insert(conn);
563 }
564
565 AsyncConnectionRef AsyncMessenger::create_connect(
566 const entity_addrvec_t& addrs, int type, bool anon)
567 {
568 ceph_assert(ceph_mutex_is_locked(lock));
569
570 ldout(cct, 10) << __func__ << " " << addrs
571 << ", creating connection and registering" << dendl;
572
573 // here is where we decide which of the addrs to connect to. always prefer
574 // the first one, if we support it.
575 entity_addr_t target;
576 for (auto& a : addrs.v) {
577 if (!a.is_msgr2() && !a.is_legacy()) {
578 continue;
579 }
580 // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
581 // trying it? for now, just pick whichever is listed first.
582 target = a;
583 break;
584 }
585
586 // create connection
587 Worker *w = stack->get_worker();
588 auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
589 target.is_msgr2(), false);
590 conn->anon = anon;
591 conn->connect(addrs, type, target);
592 if (anon) {
593 anon_conns.insert(conn);
594 } else {
595 ceph_assert(!conns.count(addrs));
596 ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
597 << *conn->peer_addrs << dendl;
598 conns[addrs] = conn;
599 }
600 w->get_perf_counter()->inc(l_msgr_active_connections);
601
602 return conn;
603 }
604
605
606 ConnectionRef AsyncMessenger::get_loopback_connection()
607 {
608 return local_connection;
609 }
610
611 bool AsyncMessenger::should_use_msgr2()
612 {
613 // if we are bound to v1 only, and we are connecting to a v2 peer,
614 // we cannot use the peer's v2 address. otherwise the connection
615 // is assymetrical, because they would have to use v1 to connect
616 // to us, and we would use v2, and connection race detection etc
617 // would totally break down (among other things). or, the other
618 // end will be confused that we advertise ourselve with a v1
619 // address only (that we bound to) but connected with protocol v2.
620 return !did_bind || get_myaddrs().has_msgr2();
621 }
622
623 entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs)
624 {
625 if (!should_use_msgr2()) {
626 ldout(cct, 10) << __func__ << " " << addrs << " limiting to v1 ()" << dendl;
627 entity_addrvec_t r;
628 for (auto& i : addrs.v) {
629 if (i.is_msgr2()) {
630 continue;
631 }
632 r.v.push_back(i);
633 }
634 return r;
635 } else {
636 return addrs;
637 }
638 }
639
640 int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
641 {
642 std::lock_guard l{lock};
643
644 FUNCTRACE(cct);
645 ceph_assert(m);
646
647 #if defined(WITH_EVENTTRACE)
648 if (m->get_type() == CEPH_MSG_OSD_OP)
649 OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
650 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
651 OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
652 #endif
653
654 ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
655 << addrs << " -- " << *m << " -- ?+"
656 << m->get_data().length() << " " << m << dendl;
657
658 if (addrs.empty()) {
659 ldout(cct,0) << __func__ << " message " << *m
660 << " with empty dest " << addrs << dendl;
661 m->put();
662 return -EINVAL;
663 }
664
665 auto av = _filter_addrs(addrs);
666 const AsyncConnectionRef& conn = _lookup_conn(av);
667 submit_message(m, conn, av, type);
668 return 0;
669 }
670
671 ConnectionRef AsyncMessenger::connect_to(int type,
672 const entity_addrvec_t& addrs,
673 bool anon)
674 {
675 std::lock_guard l{lock};
676 if (*my_addrs == addrs ||
677 (addrs.v.size() == 1 &&
678 my_addrs->contains(addrs.front()))) {
679 // local
680 return local_connection;
681 }
682
683 auto av = _filter_addrs(addrs);
684
685 if (anon) {
686 return create_connect(av, type, anon);
687 }
688
689 AsyncConnectionRef conn = _lookup_conn(av);
690 if (conn) {
691 ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
692 } else {
693 conn = create_connect(av, type, false);
694 ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
695 }
696
697 return conn;
698 }
699
700 void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con,
701 const entity_addrvec_t& dest_addrs,
702 int dest_type)
703 {
704 if (cct->_conf->ms_dump_on_send) {
705 m->encode(-1, MSG_CRC_ALL);
706 ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
707 m->get_payload().hexdump(*_dout);
708 if (m->get_data().length() > 0) {
709 *_dout << " data:\n";
710 m->get_data().hexdump(*_dout);
711 }
712 *_dout << dendl;
713 m->clear_payload();
714 }
715
716 // existing connection?
717 if (con) {
718 con->send_message(m);
719 return ;
720 }
721
722 // local?
723 if (*my_addrs == dest_addrs ||
724 (dest_addrs.v.size() == 1 &&
725 my_addrs->contains(dest_addrs.front()))) {
726 // local
727 local_connection->send_message(m);
728 return ;
729 }
730
731 // remote, no existing connection.
732 const Policy& policy = get_policy(dest_type);
733 if (policy.server) {
734 ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
735 << ", lossy server for target type "
736 << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
737 m->put();
738 } else {
739 ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
740 << ", new connection." << dendl;
741 auto&& new_con = create_connect(dest_addrs, dest_type, false);
742 new_con->send_message(m);
743 }
744 }
745
746 /**
747 * If my_addr doesn't have an IP set, this function
748 * will fill it in from the passed addr. Otherwise it does nothing and returns.
749 */
750 bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
751 {
752 ldout(cct,1) << __func__ << " " << addrs << dendl;
753 bool ret = false;
754 std::lock_guard l{lock};
755
756 entity_addrvec_t newaddrs = *my_addrs;
757 for (auto& a : newaddrs.v) {
758 if (a.is_blank_ip()) {
759 int type = a.get_type();
760 int port = a.get_port();
761 uint32_t nonce = a.get_nonce();
762 for (auto& b : addrs.v) {
763 if (a.get_family() == b.get_family()) {
764 ldout(cct,1) << __func__ << " assuming my addr " << a
765 << " matches provided addr " << b << dendl;
766 a = b;
767 a.set_nonce(nonce);
768 a.set_type(type);
769 a.set_port(port);
770 ret = true;
771 break;
772 }
773 }
774 }
775 }
776 set_myaddrs(newaddrs);
777 if (ret) {
778 _init_local_connection();
779 }
780 ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
781 return ret;
782 }
783
784 void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
785 {
786 std::lock_guard l{lock};
787 auto t = addrs;
788 for (auto& a : t.v) {
789 a.set_nonce(nonce);
790 }
791 set_myaddrs(t);
792 _init_local_connection();
793 }
794
795 void AsyncMessenger::shutdown_connections(bool queue_reset)
796 {
797 ldout(cct,1) << __func__ << " " << dendl;
798 std::lock_guard l{lock};
799 for (const auto& c : accepting_conns) {
800 ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl;
801 c->stop(queue_reset);
802 }
803 accepting_conns.clear();
804
805 for (const auto& [e, c] : conns) {
806 ldout(cct, 5) << __func__ << " mark down " << e << " " << c << dendl;
807 c->get_perf_counter()->dec(l_msgr_active_connections);
808 c->stop(queue_reset);
809 }
810 conns.clear();
811
812 for (const auto& c : anon_conns) {
813 ldout(cct, 5) << __func__ << " mark down " << c << dendl;
814 c->get_perf_counter()->dec(l_msgr_active_connections);
815 c->stop(queue_reset);
816 }
817 anon_conns.clear();
818
819 {
820 std::lock_guard l{deleted_lock};
821 if (cct->_conf->subsys.should_gather<ceph_subsys_ms, 5>()) {
822 for (const auto& c : deleted_conns) {
823 ldout(cct, 5) << __func__ << " delete " << c << dendl;
824 }
825 }
826 deleted_conns.clear();
827 }
828 }
829
830 void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
831 {
832 std::lock_guard l{lock};
833 const AsyncConnectionRef& conn = _lookup_conn(addrs);
834 if (conn) {
835 ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
836 conn->stop(true);
837 } else {
838 ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
839 }
840 }
841
842 int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
843 {
844 int my_type = my_name.type();
845
846 // set reply protocol version
847 if (peer_type == my_type) {
848 // internal
849 return cluster_protocol;
850 } else {
851 // public
852 switch (connect ? peer_type : my_type) {
853 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
854 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
855 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
856 }
857 }
858 return 0;
859 }
860
861 int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
862 {
863 std::lock_guard l{lock};
864 if (conn->policy.server &&
865 conn->policy.lossy) {
866 anon_conns.insert(conn);
867 conn->get_perf_counter()->inc(l_msgr_active_connections);
868 return 0;
869 }
870 auto it = conns.find(*conn->peer_addrs);
871 if (it != conns.end()) {
872 auto& existing = it->second;
873
874 // lazy delete, see "deleted_conns"
875 // If conn already in, we will return 0
876 std::lock_guard l{deleted_lock};
877 if (deleted_conns.erase(existing)) {
878 conns.erase(it);
879 } else if (conn != existing) {
880 return -1;
881 }
882 }
883 ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
884 conns[*conn->peer_addrs] = conn;
885 conn->get_perf_counter()->inc(l_msgr_active_connections);
886 accepting_conns.erase(conn);
887 return 0;
888 }
889
890
891 bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
892 {
893 // be careful here: multiple threads may block here, and readers of
894 // my_addr do NOT hold any lock.
895
896 // this always goes from true -> false under the protection of the
897 // mutex. if it is already false, we need not retake the mutex at
898 // all.
899 if (!need_addr)
900 return false;
901 std::lock_guard l(lock);
902 if (need_addr) {
903 if (my_addrs->empty()) {
904 auto a = peer_addr_for_me;
905 a.set_type(entity_addr_t::TYPE_ANY);
906 a.set_nonce(nonce);
907 if (!did_bind) {
908 a.set_port(0);
909 }
910 set_myaddrs(entity_addrvec_t(a));
911 ldout(cct,10) << __func__ << " had no addrs" << dendl;
912 } else {
913 // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
914 entity_addrvec_t newaddrs = *my_addrs;
915 for (auto& a : newaddrs.v) {
916 if (a.is_blank_ip() &&
917 a.get_family() == peer_addr_for_me.get_family()) {
918 entity_addr_t t = peer_addr_for_me;
919 if (!did_bind) {
920 t.set_type(entity_addr_t::TYPE_ANY);
921 t.set_port(0);
922 } else {
923 t.set_type(a.get_type());
924 t.set_port(a.get_port());
925 }
926 t.set_nonce(a.get_nonce());
927 ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
928 a = t;
929 }
930 }
931 set_myaddrs(newaddrs);
932 }
933 ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
934 << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
935 _init_local_connection();
936 need_addr = false;
937 return true;
938 }
939 return false;
940 }
941
942 int AsyncMessenger::reap_dead()
943 {
944 ldout(cct, 1) << __func__ << " start" << dendl;
945 int num = 0;
946
947 std::lock_guard l1{lock};
948
949 {
950 std::lock_guard l2{deleted_lock};
951 for (auto& c : deleted_conns) {
952 ldout(cct, 5) << __func__ << " delete " << c << dendl;
953 auto conns_it = conns.find(*c->peer_addrs);
954 if (conns_it != conns.end() && conns_it->second == c)
955 conns.erase(conns_it);
956 accepting_conns.erase(c);
957 anon_conns.erase(c);
958 ++num;
959 }
960 deleted_conns.clear();
961 }
962
963 return num;
964 }
965