1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#include "acconfig.h"
18   	
19   	#include <iostream>
20   	#include <fstream>
21   	
22   	#include "AsyncMessenger.h"
23   	
24   	#include "common/config.h"
25   	#include "common/Timer.h"
26   	#include "common/errno.h"
27   	
28   	#include "messages/MOSDOp.h"
29   	#include "messages/MOSDOpReply.h"
30   	#include "common/EventTrace.h"
31   	
32   	#define dout_subsys ceph_subsys_ms
33   	#undef dout_prefix
34   	#define dout_prefix _prefix(_dout, this)
35   	static ostream& _prefix(std::ostream *_dout, AsyncMessenger *m) {
36   	  return *_dout << "-- " << m->get_myaddrs() << " ";
37   	}
38   	
39   	static ostream& _prefix(std::ostream *_dout, Processor *p) {
40   	  return *_dout << " Processor -- ";
41   	}
42   	
43   	
44   	/*******************
45   	 * Processor
46   	 */
47   	
48   	class Processor::C_processor_accept : public EventCallback {
49   	  Processor *pro;
50   	
51   	 public:
52   	  explicit C_processor_accept(Processor *p): pro(p) {}
53   	  void do_request(uint64_t id) override {
54   	    pro->accept();
55   	  }
56   	};
57   	
58   	Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
59   	  : msgr(r), net(c), worker(w),
60   	    listen_handler(new C_processor_accept(this)) {}
61   	
62   	int Processor::bind(const entity_addrvec_t &bind_addrs,
63   			    const set<int>& avoid_ports,
64   			    entity_addrvec_t* bound_addrs)
65   	{
66   	  const auto& conf = msgr->cct->_conf;
67   	  // bind to socket(s)
68   	  ldout(msgr->cct, 10) << __func__ << " " << bind_addrs << dendl;
69   	
70   	  SocketOptions opts;
71   	  opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
72   	  opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
73   	
74   	  listen_sockets.resize(bind_addrs.v.size());
75   	  *bound_addrs = bind_addrs;
76   	
77   	  for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
78   	    auto& listen_addr = bound_addrs->v[k];
79   	
80   	    /* bind to port */
81   	    int r = -1;
82   	
83   	    for (int i = 0; i < conf->ms_bind_retry_count; i++) {
84   	      if (i > 0) {
85   		lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
86   				 << conf->ms_bind_retry_delay << " seconds " << dendl;
87   		sleep(conf->ms_bind_retry_delay);
88   	      }
89   	
90   	      if (listen_addr.get_port()) {
91   		worker->center.submit_to(
92   		  worker->center.get_id(),
93   		  [this, k, &listen_addr, &opts, &r]() {
94   		    r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
95   		  }, false);
96   		if (r < 0) {
97   		  lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
98   				   << ": " << cpp_strerror(r) << dendl;
99   		  continue;
100  		}
101  	      } else {
102  		// try a range of ports
103  		for (int port = msgr->cct->_conf->ms_bind_port_min;
104  		     port <= msgr->cct->_conf->ms_bind_port_max;
105  		     port++) {
106  		  if (avoid_ports.count(port))
107  		    continue;
108  	
109  		  listen_addr.set_port(port);
110  		  worker->center.submit_to(
111  		    worker->center.get_id(),
112  		    [this, k, &listen_addr, &opts, &r]() {
113  		      r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
114  		    }, false);
115  		  if (r == 0)
116  		    break;
117  		}
118  		if (r < 0) {
119  		  lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
120  				   << " on any port in range "
121  				   << msgr->cct->_conf->ms_bind_port_min
122  				   << "-" << msgr->cct->_conf->ms_bind_port_max << ": "
123  				   << cpp_strerror(r) << dendl;
124  		  listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again.
125  		  continue;
126  		}
127  		ldout(msgr->cct, 10) << __func__ << " bound on random port "
128  				     << listen_addr << dendl;
129  	      }
130  	      if (r == 0) {
131  		break;
132  	      }
133  	    }
134  	
135  	    // It seems that binding completely failed, return with that exit status
136  	    if (r < 0) {
137  	      lderr(msgr->cct) << __func__ << " was unable to bind after "
138  			       << conf->ms_bind_retry_count
139  			       << " attempts: " << cpp_strerror(r) << dendl;
140  	      for (unsigned j = 0; j < k; ++j) {
141  		// clean up previous bind
142  		listen_sockets[j].abort_accept();
143  	      }
144  	      return r;
145  	    }
146  	  }
147  	
148  	  ldout(msgr->cct, 10) << __func__ << " bound to " << *bound_addrs << dendl;
149  	  return 0;
150  	}
151  	
152  	void Processor::start()
153  	{
154  	  ldout(msgr->cct, 1) << __func__ << dendl;
155  	
156  	  // start thread
157  	  worker->center.submit_to(worker->center.get_id(), [this]() {
158  	      for (auto& listen_socket : listen_sockets) {
159  		if (listen_socket) {
160  	          if (listen_socket.fd() == -1) {
161  	            ldout(msgr->cct, 1) << __func__ << " Erro: processor restart after listen_socket.fd closed. " << this << dendl;
162  	            return;
163  	          }
164  		  worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
165  						   listen_handler); }
166  	      }
167  	    }, false);
168  	}
169  	
170  	void Processor::accept()
171  	{
172  	  SocketOptions opts;
173  	  opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay;
174  	  opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf;
175  	  opts.priority = msgr->get_socket_priority();
176  	
177  	  for (auto& listen_socket : listen_sockets) {
178  	    ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd()
179  				 << dendl;
180  	    unsigned accept_error_num = 0;
181  	
182  	    while (true) {
183  	      entity_addr_t addr;
184  	      ConnectedSocket cli_socket;
185  	      Worker *w = worker;
186  	      if (!msgr->get_stack()->support_local_listen_table())
187  		w = msgr->get_stack()->get_worker();
188  	      else
189  		++w->references;
190  	      int r = listen_socket.accept(&cli_socket, opts, &addr, w);
191  	      if (r == 0) {
192  		ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
193  				     << cli_socket.fd() << dendl;
194  	
195  		msgr->add_accept(
196  		  w, std::move(cli_socket),
197  		  msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
198  		  addr);
199  		accept_error_num = 0;
200  		continue;
201  	      } else {
202  		if (r == -EINTR) {
203  		  continue;
204  		} else if (r == -EAGAIN) {
205  		  break;
206  		} else if (r == -EMFILE || r == -ENFILE) {
207  		  lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd()
208  				   << " errno " << r << " " << cpp_strerror(r) << dendl;
209  		  if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
210  		    lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
211  		    ceph_abort();
212  		  }
213  		  continue;
214  		} else if (r == -ECONNABORTED) {
215  		  ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd()
216  				      << " errno " << r << " " << cpp_strerror(r) << dendl;
217  		  continue;
218  		} else {
219  		  lderr(msgr->cct) << __func__ << " no incoming connection?"
220  				   << " errno " << r << " " << cpp_strerror(r) << dendl;
221  		  if (++accept_error_num > msgr->cct->_conf->ms_max_accept_failures) {
222  		    lderr(msgr->cct) << "Proccessor accept has encountered enough error numbers, just do ceph_abort()." << dendl;
223  		    ceph_abort();
224  		  }
225  		  continue;
226  		}
227  	      }
228  	    }
229  	  }
230  	}
231  	
232  	void Processor::stop()
233  	{
234  	  ldout(msgr->cct,10) << __func__ << dendl;
235  	
236  	  worker->center.submit_to(worker->center.get_id(), [this]() {
237  	      for (auto& listen_socket : listen_sockets) {
238  		if (listen_socket) {
239  		  worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE);
240  		  listen_socket.abort_accept();
241  		}
242  	      }
243  	    }, false);
244  	}
245  	
246  	
247  	struct StackSingleton {
248  	  CephContext *cct;
249  	  std::shared_ptr<NetworkStack> stack;
250  	
251  	  explicit StackSingleton(CephContext *c): cct(c) {}
252  	  void ready(std::string &type) {
253  	    if (!stack)
254  	      stack = NetworkStack::create(cct, type);
255  	  }
256  	  ~StackSingleton() {
257  	    stack->stop();
258  	  }
259  	};
260  	
261  	
262  	class C_handle_reap : public EventCallback {
263  	  AsyncMessenger *msgr;
264  	
265  	  public:
266  	  explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
267  	  void do_request(uint64_t id) override {
268  	    // judge whether is a time event
269  	    msgr->reap_dead();
270  	  }
271  	};
272  	
273  	/*******************
274  	 * AsyncMessenger
275  	 */
276  	
277  	AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
278  	                               const std::string &type, string mname, uint64_t _nonce)
279  	  : SimplePolicyMessenger(cct, name),
280  	    dispatch_queue(cct, this, mname),
281  	    nonce(_nonce)
282  	{
283  	  std::string transport_type = "posix";
284  	  if (type.find("rdma") != std::string::npos)
285  	    transport_type = "rdma";
286  	  else if (type.find("dpdk") != std::string::npos)
287  	    transport_type = "dpdk";
288  	
289  	  auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
290  	    "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
291  	  single->ready(transport_type);
292  	  stack = single->stack.get();
293  	  stack->start();
294  	  local_worker = stack->get_worker();
295  	  local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
296  						 local_worker, true, true);
297  	  init_local_connection();
298  	  reap_handler = new C_handle_reap(this);
299  	  unsigned processor_num = 1;
300  	  if (stack->support_local_listen_table())
301  	    processor_num = stack->get_num_worker();
302  	  for (unsigned i = 0; i < processor_num; ++i)
303  	    processors.push_back(new Processor(this, stack->get_worker(i), cct));
304  	}
305  	
306  	/**
307  	 * Destroy the AsyncMessenger. Pretty simple since all the work is done
308  	 * elsewhere.
309  	 */
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
310  	AsyncMessenger::~AsyncMessenger()
311  	{
312  	  delete reap_handler;
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
313  	  ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
314  	  local_connection->mark_down();
315  	  for (auto &&p : processors)
316  	    delete p;
317  	}
318  	
319  	void AsyncMessenger::ready()
320  	{
321  	  ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
322  	
323  	  stack->ready();
324  	  if (pending_bind) {
325  	    int err = bindv(pending_bind_addrs);
326  	    if (err) {
327  	      lderr(cct) << __func__ << " postponed bind failed" << dendl;
328  	      ceph_abort();
329  	    }
330  	  }
331  	
332  	  std::lock_guard l{lock};
333  	  for (auto &&p : processors)
334  	    p->start();
335  	  dispatch_queue.start();
336  	}
337  	
338  	int AsyncMessenger::shutdown()
339  	{
340  	  ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;
341  	
342  	  // done!  clean up.
343  	  for (auto &&p : processors)
344  	    p->stop();
345  	  mark_down_all();
346  	  // break ref cycles on the loopback connection
347  	  local_connection->set_priv(NULL);
348  	  did_bind = false;
349  	  lock.lock();
350  	  stop_cond.notify_all();
351  	  stopped = true;
352  	  lock.unlock();
353  	  stack->drain();
354  	  return 0;
355  	}
356  	
357  	int AsyncMessenger::bind(const entity_addr_t &bind_addr)
358  	{
359  	  ldout(cct,10) << __func__ << " " << bind_addr << dendl;
360  	  // old bind() can take entity_addr_t(). new bindv() can take a
361  	  // 0.0.0.0-like address but needs type and family to be set.
362  	  auto a = bind_addr;
363  	  if (a == entity_addr_t()) {
364  	    a.set_type(entity_addr_t::TYPE_LEGACY);
365  	    if (cct->_conf->ms_bind_ipv6) {
366  	      a.set_family(AF_INET6);
367  	    } else {
368  	      a.set_family(AF_INET);
369  	    }
370  	  }
371  	  return bindv(entity_addrvec_t(a));
372  	}
373  	
374  	int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
375  	{
376  	  lock.lock();
377  	
378  	  if (!pending_bind && started) {
379  	    ldout(cct,10) << __func__ << " already started" << dendl;
380  	    lock.unlock();
381  	    return -1;
382  	  }
383  	
384  	  ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
385  	
386  	  if (!stack->is_ready()) {
387  	    ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
388  	    pending_bind_addrs = bind_addrs;
389  	    pending_bind = true;
390  	    lock.unlock();
391  	    return 0;
392  	  }
393  	
394  	  lock.unlock();
395  	
396  	  // bind to a socket
397  	  set<int> avoid_ports;
398  	  entity_addrvec_t bound_addrs;
399  	  unsigned i = 0;
400  	  for (auto &&p : processors) {
401  	    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
402  	    if (r) {
403  	      // Note: this is related to local tcp listen table problem.
404  	      // Posix(default kernel implementation) backend shares listen table
405  	      // in the kernel, so all threads can use the same listen table naturally
406  	      // and only one thread need to bind. But other backends(like dpdk) uses local
407  	      // listen table, we need to bind/listen tcp port for each worker. So if the
408  	      // first worker failed to bind, it could be think the normal error then handle
409  	      // it, like port is used case. But if the first worker successfully to bind
410  	      // but the second worker failed, it's not expected and we need to assert
411  	      // here
412  	      ceph_assert(i == 0);
413  	      return r;
414  	    }
415  	    ++i;
416  	  }
417  	  _finish_bind(bind_addrs, bound_addrs);
418  	  return 0;
419  	}
420  	
421  	int AsyncMessenger::rebind(const set<int>& avoid_ports)
422  	{
423  	  ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
424  	  ceph_assert(did_bind);
425  	
426  	  for (auto &&p : processors)
427  	    p->stop();
428  	  mark_down_all();
429  	
430  	  // adjust the nonce; we want our entity_addr_t to be truly unique.
431  	  nonce += 1000000;
432  	  ldout(cct, 10) << __func__ << " new nonce " << nonce
433  			 << " and addr " << get_myaddrs() << dendl;
434  	
435  	  entity_addrvec_t bound_addrs;
436  	  entity_addrvec_t bind_addrs = get_myaddrs();
437  	  set<int> new_avoid(avoid_ports);
438  	  for (auto& a : bind_addrs.v) {
439  	    new_avoid.insert(a.get_port());
440  	    a.set_port(0);
441  	  }
442  	  ldout(cct, 10) << __func__ << " will try " << bind_addrs
443  			 << " and avoid ports " << new_avoid << dendl;
444  	  unsigned i = 0;
445  	  for (auto &&p : processors) {
446  	    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
447  	    if (r) {
448  	      ceph_assert(i == 0);
449  	      return r;
450  	    }
451  	    ++i;
452  	  }
453  	  _finish_bind(bind_addrs, bound_addrs);
454  	  for (auto &&p : processors) {
455  	    p->start();
456  	  }
457  	  return 0;
458  	}
459  	
460  	int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
461  	{
462  	  if (!cct->_conf->ms_bind_before_connect)
463  	    return 0;
464  	  std::lock_guard l{lock};
465  	  if (did_bind) {
466  	    return 0;
467  	  }
468  	  if (started) {
469  	    ldout(cct, 10) << __func__ << " already started" << dendl;
470  	    return -1;
471  	  }
472  	  ldout(cct, 10) << __func__ << " " << bind_addr << dendl;
473  	
474  	  set_myaddrs(entity_addrvec_t(bind_addr));
475  	  return 0;
476  	}
477  	
478  	void AsyncMessenger::_finish_bind(const entity_addrvec_t& bind_addrs,
479  					  const entity_addrvec_t& listen_addrs)
480  	{
481  	  set_myaddrs(bind_addrs);
482  	  for (auto& a : bind_addrs.v) {
483  	    if (!a.is_blank_ip()) {
484  	      learned_addr(a);
485  	    }
486  	  }
487  	
488  	  if (get_myaddrs().front().get_port() == 0) {
489  	    set_myaddrs(listen_addrs);
490  	  }
491  	  entity_addrvec_t newaddrs = *my_addrs;
492  	  for (auto& a : newaddrs.v) {
493  	    a.set_nonce(nonce);
494  	  }
495  	  set_myaddrs(newaddrs);
496  	
497  	  init_local_connection();
498  	
499  	  ldout(cct,1) << __func__ << " bind my_addrs is " << get_myaddrs() << dendl;
500  	  did_bind = true;
501  	}
502  	
503  	int AsyncMessenger::start()
504  	{
505  	  std::scoped_lock l{lock};
506  	  ldout(cct,1) << __func__ << " start" << dendl;
507  	
508  	  // register at least one entity, first!
509  	  ceph_assert(my_name.type() >= 0);
510  	
511  	  ceph_assert(!started);
512  	  started = true;
513  	  stopped = false;
514  	
515  	  if (!did_bind) {
516  	    entity_addrvec_t newaddrs = *my_addrs;
517  	    for (auto& a : newaddrs.v) {
518  	      a.nonce = nonce;
519  	    }
520  	    set_myaddrs(newaddrs);
521  	    _init_local_connection();
522  	  }
523  	
524  	  return 0;
525  	}
526  	
527  	void AsyncMessenger::wait()
528  	{
529  	  {
530  	    std::unique_lock locker{lock};
531  	    if (!started) {
532  	      return;
533  	    }
534  	    if (!stopped)
535  	      stop_cond.wait(locker);
536  	  }
537  	  dispatch_queue.shutdown();
538  	  if (dispatch_queue.is_started()) {
539  	    ldout(cct, 10) << __func__ << ": waiting for dispatch queue" << dendl;
540  	    dispatch_queue.wait();
541  	    dispatch_queue.discard_local();
542  	    ldout(cct, 10) << __func__ << ": dispatch queue is stopped" << dendl;
543  	  }
544  	
545  	  // close all connections
546  	  shutdown_connections(false);
547  	  stack->drain();
548  	
549  	  ldout(cct, 10) << __func__ << ": done." << dendl;
550  	  ldout(cct, 1) << __func__ << " complete." << dendl;
551  	  started = false;
552  	}
553  	
554  	void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
555  					const entity_addr_t &listen_addr,
556  					const entity_addr_t &peer_addr)
557  	{
558  	  std::lock_guard l{lock};
559  	  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
560  							listen_addr.is_msgr2(), false);
561  	  conn->accept(std::move(cli_socket), listen_addr, peer_addr);
562  	  accepting_conns.insert(conn);
563  	}
564  	
565  	AsyncConnectionRef AsyncMessenger::create_connect(
566  	  const entity_addrvec_t& addrs, int type, bool anon)
567  	{
568  	  ceph_assert(ceph_mutex_is_locked(lock));
569  	
570  	  ldout(cct, 10) << __func__ << " " << addrs
571  	      << ", creating connection and registering" << dendl;
572  	
573  	  // here is where we decide which of the addrs to connect to.  always prefer
574  	  // the first one, if we support it.
575  	  entity_addr_t target;
576  	  for (auto& a : addrs.v) {
577  	    if (!a.is_msgr2() && !a.is_legacy()) {
578  	      continue;
579  	    }
580  	    // FIXME: for ipv4 vs ipv6, check whether local host can handle ipv6 before
581  	    // trying it?  for now, just pick whichever is listed first.
582  	    target = a;
583  	    break;
584  	  }
585  	
586  	  // create connection
587  	  Worker *w = stack->get_worker();
588  	  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
589  							target.is_msgr2(), false);
590  	  conn->anon = anon;
591  	  conn->connect(addrs, type, target);
592  	  if (anon) {
593  	    anon_conns.insert(conn);
594  	  } else {
595  	    ceph_assert(!conns.count(addrs));
596  	    ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
597  			   << *conn->peer_addrs << dendl;
598  	    conns[addrs] = conn;
599  	  }
600  	  w->get_perf_counter()->inc(l_msgr_active_connections);
601  	
602  	  return conn;
603  	}
604  	
605  	
606  	ConnectionRef AsyncMessenger::get_loopback_connection()
607  	{
608  	  return local_connection;
609  	}
610  	
611  	bool AsyncMessenger::should_use_msgr2()
612  	{
613  	  // if we are bound to v1 only, and we are connecting to a v2 peer,
614  	  // we cannot use the peer's v2 address. otherwise the connection
615  	  // is assymetrical, because they would have to use v1 to connect
616  	  // to us, and we would use v2, and connection race detection etc
617  	  // would totally break down (among other things).  or, the other
618  	  // end will be confused that we advertise ourselve with a v1
619  	  // address only (that we bound to) but connected with protocol v2.
620  	  return !did_bind || get_myaddrs().has_msgr2();
621  	}
622  	
623  	entity_addrvec_t AsyncMessenger::_filter_addrs(const entity_addrvec_t& addrs)
624  	{
625  	  if (!should_use_msgr2()) {
626  	    ldout(cct, 10) << __func__ << " " << addrs << " limiting to v1 ()" << dendl;
627  	    entity_addrvec_t r;
628  	    for (auto& i : addrs.v) {
629  	      if (i.is_msgr2()) {
630  		continue;
631  	      }
632  	      r.v.push_back(i);
633  	    }
634  	    return r;
635  	  } else {
636  	    return addrs;
637  	  }
638  	}
639  	
640  	int AsyncMessenger::send_to(Message *m, int type, const entity_addrvec_t& addrs)
641  	{
642  	  std::lock_guard l{lock};
643  	
644  	  FUNCTRACE(cct);
645  	  ceph_assert(m);
646  	
647  	#if defined(WITH_EVENTTRACE)
648  	  if (m->get_type() == CEPH_MSG_OSD_OP)
649  	    OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
650  	  else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
651  	    OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");
652  	#endif
653  	
654  	  ldout(cct, 1) << __func__ << "--> " << ceph_entity_type_name(type) << " "
655  	      << addrs << " -- " << *m << " -- ?+"
656  	      << m->get_data().length() << " " << m << dendl;
657  	
658  	  if (addrs.empty()) {
659  	    ldout(cct,0) << __func__ <<  " message " << *m
660  	        << " with empty dest " << addrs << dendl;
661  	    m->put();
662  	    return -EINVAL;
663  	  }
664  	
665  	  auto av = _filter_addrs(addrs);
666  	  const AsyncConnectionRef& conn = _lookup_conn(av);
667  	  submit_message(m, conn, av, type);
668  	  return 0;
669  	}
670  	
671  	ConnectionRef AsyncMessenger::connect_to(int type,
672  						 const entity_addrvec_t& addrs,
673  						 bool anon)
674  	{
675  	  std::lock_guard l{lock};
676  	  if (*my_addrs == addrs ||
677  	      (addrs.v.size() == 1 &&
678  	       my_addrs->contains(addrs.front()))) {
679  	    // local
680  	    return local_connection;
681  	  }
682  	
683  	  auto av = _filter_addrs(addrs);
684  	
685  	  if (anon) {
686  	    return create_connect(av, type, anon);
687  	  }
688  	
689  	  AsyncConnectionRef conn = _lookup_conn(av);
690  	  if (conn) {
691  	    ldout(cct, 10) << __func__ << " " << av << " existing " << conn << dendl;
692  	  } else {
693  	    conn = create_connect(av, type, false);
694  	    ldout(cct, 10) << __func__ << " " << av << " new " << conn << dendl;
695  	  }
696  	
697  	  return conn;
698  	}
699  	
700  	void AsyncMessenger::submit_message(Message *m, const AsyncConnectionRef& con,
701  	                                    const entity_addrvec_t& dest_addrs,
702  					    int dest_type)
703  	{
704  	  if (cct->_conf->ms_dump_on_send) {
705  	    m->encode(-1, MSG_CRC_ALL);
706  	    ldout(cct, 0) << __func__ << " submit_message " << *m << "\n";
707  	    m->get_payload().hexdump(*_dout);
708  	    if (m->get_data().length() > 0) {
709  	      *_dout << " data:\n";
710  	      m->get_data().hexdump(*_dout);
711  	    }
712  	    *_dout << dendl;
713  	    m->clear_payload();
714  	  }
715  	
716  	  // existing connection?
717  	  if (con) {
718  	    con->send_message(m);
719  	    return ;
720  	  }
721  	
722  	  // local?
723  	  if (*my_addrs == dest_addrs ||
724  	      (dest_addrs.v.size() == 1 &&
725  	       my_addrs->contains(dest_addrs.front()))) {
726  	    // local
727  	    local_connection->send_message(m);
728  	    return ;
729  	  }
730  	
731  	  // remote, no existing connection.
732  	  const Policy& policy = get_policy(dest_type);
733  	  if (policy.server) {
734  	    ldout(cct, 20) << __func__ << " " << *m << " remote, " << dest_addrs
735  	        << ", lossy server for target type "
736  	        << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
737  	    m->put();
738  	  } else {
739  	    ldout(cct,20) << __func__ << " " << *m << " remote, " << dest_addrs
740  			  << ", new connection." << dendl;
741  	    auto&& new_con = create_connect(dest_addrs, dest_type, false);
742  	    new_con->send_message(m);
743  	  }
744  	}
745  	
746  	/**
747  	 * If my_addr doesn't have an IP set, this function
748  	 * will fill it in from the passed addr. Otherwise it does nothing and returns.
749  	 */
750  	bool AsyncMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
751  	{
752  	  ldout(cct,1) << __func__ << " " << addrs << dendl;
753  	  bool ret = false;
754  	  std::lock_guard l{lock};
755  	
756  	  entity_addrvec_t newaddrs = *my_addrs;
757  	  for (auto& a : newaddrs.v) {
758  	    if (a.is_blank_ip()) {
759  	      int type = a.get_type();
760  	      int port = a.get_port();
761  	      uint32_t nonce = a.get_nonce();
762  	      for (auto& b : addrs.v) {
763  		if (a.get_family() == b.get_family()) {
764  		  ldout(cct,1) << __func__ << " assuming my addr " << a
765  			       << " matches provided addr " << b << dendl;
766  		  a = b;
767  		  a.set_nonce(nonce);
768  		  a.set_type(type);
769  		  a.set_port(port);
770  		  ret = true;
771  		  break;
772  		}
773  	      }
774  	    }
775  	  }
776  	  set_myaddrs(newaddrs);
777  	  if (ret) {
778  	    _init_local_connection();
779  	  }
780  	  ldout(cct,1) << __func__ << " now " << *my_addrs << dendl;
781  	  return ret;
782  	}
783  	
784  	void AsyncMessenger::set_addrs(const entity_addrvec_t &addrs)
785  	{
786  	  std::lock_guard l{lock};
787  	  auto t = addrs;
788  	  for (auto& a : t.v) {
789  	    a.set_nonce(nonce);
790  	  }
791  	  set_myaddrs(t);
792  	  _init_local_connection();
793  	}
794  	
795  	void AsyncMessenger::shutdown_connections(bool queue_reset)
796  	{
797  	  ldout(cct,1) << __func__ << " " << dendl;
798  	  std::lock_guard l{lock};
799  	  for (const auto& c : accepting_conns) {
800  	    ldout(cct, 5) << __func__ << " accepting_conn " << c << dendl;
801  	    c->stop(queue_reset);
802  	  }
803  	  accepting_conns.clear();
804  	
805  	  for (const auto& [e, c] : conns) {
806  	    ldout(cct, 5) << __func__ << " mark down " << e << " " << c << dendl;
807  	    c->get_perf_counter()->dec(l_msgr_active_connections);
808  	    c->stop(queue_reset);
809  	  }
810  	  conns.clear();
811  	
812  	  for (const auto& c : anon_conns) {
813  	    ldout(cct, 5) << __func__ << " mark down " << c << dendl;
814  	    c->get_perf_counter()->dec(l_msgr_active_connections);
815  	    c->stop(queue_reset);
816  	  }
817  	  anon_conns.clear();
818  	
819  	  {
820  	    std::lock_guard l{deleted_lock};
821  	    if (cct->_conf->subsys.should_gather<ceph_subsys_ms, 5>()) {
822  	      for (const auto& c : deleted_conns) {
823  	        ldout(cct, 5) << __func__ << " delete " << c << dendl;
824  	      }
825  	    }
826  	    deleted_conns.clear();
827  	  }
828  	}
829  	
830  	void AsyncMessenger::mark_down_addrs(const entity_addrvec_t& addrs)
831  	{
832  	  std::lock_guard l{lock};
833  	  const AsyncConnectionRef& conn = _lookup_conn(addrs);
834  	  if (conn) {
835  	    ldout(cct, 1) << __func__ << " " << addrs << " -- " << conn << dendl;
836  	    conn->stop(true);
837  	  } else {
838  	    ldout(cct, 1) << __func__ << " " << addrs << " -- connection dne" << dendl;
839  	  }
840  	}
841  	
842  	int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
843  	{
844  	  int my_type = my_name.type();
845  	
846  	  // set reply protocol version
847  	  if (peer_type == my_type) {
848  	    // internal
849  	    return cluster_protocol;
850  	  } else {
851  	    // public
852  	    switch (connect ? peer_type : my_type) {
853  	      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
854  	      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
855  	      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
856  	    }
857  	  }
858  	  return 0;
859  	}
860  	
861  	int AsyncMessenger::accept_conn(const AsyncConnectionRef& conn)
862  	{
863  	  std::lock_guard l{lock};
864  	  if (conn->policy.server &&
865  	      conn->policy.lossy) {
866  	    anon_conns.insert(conn);
867  	    conn->get_perf_counter()->inc(l_msgr_active_connections);
868  	    return 0;
869  	  }
870  	  auto it = conns.find(*conn->peer_addrs);
871  	  if (it != conns.end()) {
872  	    auto& existing = it->second;
873  	
874  	    // lazy delete, see "deleted_conns"
875  	    // If conn already in, we will return 0
876  	    std::lock_guard l{deleted_lock};
877  	    if (deleted_conns.erase(existing)) {
878  	      conns.erase(it);
879  	    } else if (conn != existing) {
880  	      return -1;
881  	    }
882  	  }
883  	  ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
884  	  conns[*conn->peer_addrs] = conn;
885  	  conn->get_perf_counter()->inc(l_msgr_active_connections);
886  	  accepting_conns.erase(conn);
887  	  return 0;
888  	}
889  	
890  	
891  	bool AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
892  	{
893  	  // be careful here: multiple threads may block here, and readers of
894  	  // my_addr do NOT hold any lock.
895  	
896  	  // this always goes from true -> false under the protection of the
897  	  // mutex.  if it is already false, we need not retake the mutex at
898  	  // all.
899  	  if (!need_addr)
900  	    return false;
901  	  std::lock_guard l(lock);
902  	  if (need_addr) {
903  	    if (my_addrs->empty()) {
904  	      auto a = peer_addr_for_me;
905  	      a.set_type(entity_addr_t::TYPE_ANY);
906  	      a.set_nonce(nonce);
907  	      if (!did_bind) {
908  		a.set_port(0);
909  	      }
910  	      set_myaddrs(entity_addrvec_t(a));
911  	      ldout(cct,10) << __func__ << " had no addrs" << dendl;
912  	    } else {
913  	      // fix all addrs of the same family, regardless of type (msgr2 vs legacy)
914  	      entity_addrvec_t newaddrs = *my_addrs;
915  	      for (auto& a : newaddrs.v) {
916  		if (a.is_blank_ip() &&
917  		    a.get_family() == peer_addr_for_me.get_family()) {
918  		  entity_addr_t t = peer_addr_for_me;
919  		  if (!did_bind) {
920  		    t.set_type(entity_addr_t::TYPE_ANY);
921  		    t.set_port(0);
922  		  } else {	  
923  		    t.set_type(a.get_type());
924  		    t.set_port(a.get_port());
925  		  }
926  		  t.set_nonce(a.get_nonce());
927  		  ldout(cct,10) << __func__ << " " << a << " -> " << t << dendl;
928  		  a = t;
929  		}
930  	      }
931  	      set_myaddrs(newaddrs);
932  	    }
933  	    ldout(cct, 1) << __func__ << " learned my addr " << *my_addrs
934  			  << " (peer_addr_for_me " << peer_addr_for_me << ")" << dendl;
935  	    _init_local_connection();
936  	    need_addr = false;
937  	    return true;
938  	  }
939  	  return false;
940  	}
941  	
942  	int AsyncMessenger::reap_dead()
943  	{
944  	  ldout(cct, 1) << __func__ << " start" << dendl;
945  	  int num = 0;
946  	
947  	  std::lock_guard l1{lock};
948  	
949  	  {
950  	    std::lock_guard l2{deleted_lock};
951  	    for (auto& c : deleted_conns) {
952  	      ldout(cct, 5) << __func__ << " delete " << c << dendl;
953  	      auto conns_it = conns.find(*c->peer_addrs);
954  	      if (conns_it != conns.end() && conns_it->second == c)
955  	        conns.erase(conns_it);
956  	      accepting_conns.erase(c);
957  	      anon_conns.erase(c);
958  	      ++num;
959  	    }
960  	    deleted_conns.clear();
961  	  }
962  	
963  	  return num;
964  	}
965