1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#include <unistd.h>
18   	
19   	#include "include/Context.h"
20   	#include "include/random.h"
21   	#include "common/errno.h"
22   	#include "AsyncMessenger.h"
23   	#include "AsyncConnection.h"
24   	
25   	#include "ProtocolV1.h"
26   	#include "ProtocolV2.h"
27   	
28   	#include "messages/MOSDOp.h"
29   	#include "messages/MOSDOpReply.h"
30   	#include "common/EventTrace.h"
31   	
32   	// Constant to limit starting sequence number to 2^31.  Nothing special about it, just a big number.  PLR
33   	#define SEQ_MASK  0x7fffffff
34   	
35   	#define dout_subsys ceph_subsys_ms
36   	#undef dout_prefix
37   	#define dout_prefix _conn_prefix(_dout)
38   	ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
39   	  return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
40   			<< *peer_addrs << " conn(" << this
41   			<< (msgr2 ? " msgr2=" : " legacy=")
42   			<< protocol.get()
43   			<< " " << ceph_con_mode_name(protocol->auth_meta->con_mode)
44   	                << " :" << port
45   	                << " s=" << get_state_name(state)
46   	                << " l=" << policy.lossy
47   	                << ").";
48   	}
49   	
50   	// Notes:
51   	// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
52   	
53   	const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
54   	
55   	class C_time_wakeup : public EventCallback {
56   	  AsyncConnectionRef conn;
57   	
58   	 public:
59   	  explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
60   	  void do_request(uint64_t fd_or_id) override {
61   	    conn->wakeup_from(fd_or_id);
62   	  }
63   	};
64   	
65   	class C_handle_read : public EventCallback {
66   	  AsyncConnectionRef conn;
67   	
68   	 public:
69   	  explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
70   	  void do_request(uint64_t fd_or_id) override {
71   	    conn->process();
72   	  }
73   	};
74   	
75   	class C_handle_write : public EventCallback {
76   	  AsyncConnectionRef conn;
77   	
78   	 public:
79   	  explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
80   	  void do_request(uint64_t fd) override {
81   	    conn->handle_write();
82   	  }
83   	};
84   	
85   	class C_handle_write_callback : public EventCallback {
86   	  AsyncConnectionRef conn;
87   	
88   	public:
89   	  explicit C_handle_write_callback(AsyncConnectionRef c) : conn(c) {}
90   	  void do_request(uint64_t fd) override { conn->handle_write_callback(); }
91   	};
92   	
93   	class C_clean_handler : public EventCallback {
94   	  AsyncConnectionRef conn;
95   	 public:
96   	  explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
97   	  void do_request(uint64_t id) override {
98   	    conn->cleanup();
99   	    delete this;
100  	  }
101  	};
102  	
103  	class C_tick_wakeup : public EventCallback {
104  	  AsyncConnectionRef conn;
105  	
106  	 public:
107  	  explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
108  	  void do_request(uint64_t fd_or_id) override {
109  	    conn->tick(fd_or_id);
110  	  }
111  	};
112  	
113  	
114  	AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
115  	                                 Worker *w, bool m2, bool local)
116  	  : Connection(cct, m),
117  	    delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
118  	    logger(w->get_perf_counter()),
119  	    state(STATE_NONE), port(-1),
120  	    dispatch_queue(q), recv_buf(NULL),
121  	    recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
122  	    recv_start(0), recv_end(0),
123  	    last_active(ceph::coarse_mono_clock::now()),
124  	    connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
125  	    inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
126  	    msgr2(m2), state_offset(0),
127  	    worker(w), center(&w->center),read_buffer(nullptr)
128  	{
129  	#ifdef UNIT_TESTS_BUILT
130  	  this->interceptor = m->interceptor;
131  	#endif
132  	  read_handler = new C_handle_read(this);
133  	  write_handler = new C_handle_write(this);
134  	  write_callback_handler = new C_handle_write_callback(this);
135  	  wakeup_handler = new C_time_wakeup(this);
136  	  tick_handler = new C_tick_wakeup(this);
137  	  // double recv_max_prefetch see "read_until"
138  	  recv_buf = new char[2*recv_max_prefetch];
139  	  if (local) {
140  	    protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this));
141  	  } else if (m2) {
142  	    protocol = std::unique_ptr<Protocol>(new ProtocolV2(this));
143  	  } else {
144  	    protocol = std::unique_ptr<Protocol>(new ProtocolV1(this));
145  	  }
146  	  logger->inc(l_msgr_created_connections);
147  	}
148  	
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
149  	AsyncConnection::~AsyncConnection()
150  	{
151  	  if (recv_buf)
152  	    delete[] recv_buf;
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
153  	  ceph_assert(!delay_state);
154  	}
155  	
156  	int AsyncConnection::get_con_mode() const
157  	{
158  	  return protocol->get_con_mode();
159  	}
160  	
161  	bool AsyncConnection::is_msgr2() const
162  	{
163  	  return protocol->proto_type == 2;
164  	}
165  	
166  	void AsyncConnection::maybe_start_delay_thread()
167  	{
168  	  if (!delay_state) {
169  	    async_msgr->cct->_conf.with_val<std::string>(
170  	      "ms_inject_delay_type",
171  	      [this](const string& s) {
172  		if (s.find(ceph_entity_type_name(peer_type)) != string::npos) {
173  		  ldout(msgr->cct, 1) << __func__ << " setting up a delay queue"
174  				      << dendl;
175  		  delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue,
176  						    conn_id);
177  		}
178  	      });
179  	  }
180  	}
181  	
182  	
183  	ssize_t AsyncConnection::read(unsigned len, char *buffer,
184  	                              std::function<void(char *, ssize_t)> callback) {
185  	  ldout(async_msgr->cct, 20) << __func__
186  	                             << (pendingReadLen ? " continue" : " start")
187  	                             << " len=" << len << dendl;
188  	  ssize_t r = read_until(len, buffer);
189  	  if (r > 0) {
190  	    readCallback = callback;
191  	    pendingReadLen = len;
192  	    read_buffer = buffer;
193  	  }
194  	  return r;
195  	}
196  	
197  	// Because this func will be called multi times to populate
198  	// the needed buffer, so the passed in bufferptr must be the same.
199  	// Normally, only "read_message" will pass existing bufferptr in
200  	//
201  	// And it will uses readahead method to reduce small read overhead,
202  	// "recv_buf" is used to store read buffer
203  	//
204  	// return the remaining bytes, 0 means this buffer is finished
205  	// else return < 0 means error
206  	ssize_t AsyncConnection::read_until(unsigned len, char *p)
207  	{
208  	  ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
209  	                             << state_offset << dendl;
210  	
211  	  if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
212  	    if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
213  	      ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
214  	      cs.shutdown();
215  	    }
216  	  }
217  	
218  	  ssize_t r = 0;
219  	  uint64_t left = len - state_offset;
220  	  if (recv_end > recv_start) {
221  	    uint64_t to_read = std::min<uint64_t>(recv_end - recv_start, left);
222  	    memcpy(p, recv_buf+recv_start, to_read);
223  	    recv_start += to_read;
224  	    left -= to_read;
225  	    ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
226  	                               << " left is " << left << " buffer still has "
227  	                               << recv_end - recv_start << dendl;
228  	    if (left == 0) {
229  	      return 0;
230  	    }
231  	    state_offset += to_read;
232  	  }
233  	
234  	  recv_end = recv_start = 0;
235  	  /* nothing left in the prefetch buffer */
236  	  if (left > (uint64_t)recv_max_prefetch) {
237  	    /* this was a large read, we don't prefetch for these */
238  	    do {
239  	      r = read_bulk(p+state_offset, left);
240  	      ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
241  	      if (r < 0) {
242  	        ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
243  	        return -1;
244  	      } else if (r == static_cast<int>(left)) {
245  	        state_offset = 0;
246  	        return 0;
247  	      }
248  	      state_offset += r;
249  	      left -= r;
250  	    } while (r > 0);
251  	  } else {
252  	    do {
253  	      r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
254  	      ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
255  	                                 << " left is " << left << " got " << r << dendl;
256  	      if (r < 0) {
257  	        ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
258  	        return -1;
259  	      }
260  	      recv_end += r;
261  	      if (r >= static_cast<int>(left)) {
262  	        recv_start = len - state_offset;
263  	        memcpy(p+state_offset, recv_buf, recv_start);
264  	        state_offset = 0;
265  	        return 0;
266  	      }
267  	      left -= r;
268  	    } while (r > 0);
269  	    memcpy(p+state_offset, recv_buf, recv_end-recv_start);
270  	    state_offset += (recv_end - recv_start);
271  	    recv_end = recv_start = 0;
272  	  }
273  	  ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
274  	                             << len - state_offset << " bytes" << dendl;
275  	  return len - state_offset;
276  	}
277  	
278  	/* return -1 means `fd` occurs error or closed, it should be closed
279  	 * return 0 means EAGAIN or EINTR */
280  	ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
281  	{
282  	  ssize_t nread;
283  	 again:
284  	  nread = cs.read(buf, len);
285  	  if (nread < 0) {
286  	    if (nread == -EAGAIN) {
287  	      nread = 0;
288  	    } else if (nread == -EINTR) {
289  	      goto again;
290  	    } else {
291  	      ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
292  	                          << " : "<< strerror(nread) << dendl;
293  	      return -1;
294  	    }
295  	  } else if (nread == 0) {
296  	    ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
297  	                              << cs.fd() << dendl;
298  	    return -1;
299  	  }
300  	  return nread;
301  	}
302  	
303  	ssize_t AsyncConnection::write(bufferlist &bl,
304  	                               std::function<void(ssize_t)> callback,
305  	                               bool more) {
306  	
307  	    std::unique_lock<std::mutex> l(write_lock);
308  	    outgoing_bl.claim_append(bl);
309  	    ssize_t r = _try_send(more);
310  	    if (r > 0) {
311  	      writeCallback = callback;
312  	    }
313  	    return r;
314  	}
315  	
316  	// return the remaining bytes, it may larger than the length of ptr
317  	// else return < 0 means error
318  	ssize_t AsyncConnection::_try_send(bool more)
319  	{
320  	  if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
321  	    if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
322  	      ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
323  	      cs.shutdown();
324  	    }
325  	  }
326  	
327  	  ceph_assert(center->in_thread());
328  	  ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
329  	                             << " bytes" << dendl;
330  	  ssize_t r = cs.send(outgoing_bl, more);
331  	  if (r < 0) {
332  	    ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
333  	    return r;
334  	  }
335  	
336  	  ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
337  	                             << " remaining bytes " << outgoing_bl.length() << dendl;
338  	
339  	  if (!open_write && is_queued()) {
340  	    center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
341  	    open_write = true;
342  	  }
343  	
344  	  if (open_write && !is_queued()) {
345  	    center->delete_file_event(cs.fd(), EVENT_WRITABLE);
346  	    open_write = false;
347  	    if (writeCallback) {
348  	      center->dispatch_event_external(write_callback_handler);
349  	    }
350  	  }
351  	
352  	  return outgoing_bl.length();
353  	}
354  	
355  	void AsyncConnection::inject_delay() {
356  	  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
357  	    ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
358  	      async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
359  	    utime_t t;
360  	    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
361  	    t.sleep();
362  	  }
363  	}
364  	
365  	void AsyncConnection::process() {
366  	  std::lock_guard<std::mutex> l(lock);
367  	  last_active = ceph::coarse_mono_clock::now();
368  	  recv_start_time = ceph::mono_clock::now();
369  	
370  	  ldout(async_msgr->cct, 20) << __func__ << dendl;
371  	
372  	  switch (state) {
373  	    case STATE_NONE: {
374  	      ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
375  	      return;
376  	    }
377  	    case STATE_CLOSED: {
378  	      ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
379  	      return;
380  	    }
381  	    case STATE_CONNECTING: {
382  	      ceph_assert(!policy.server);
383  	
384  	      // clear timer (if any) since we are connecting/re-connecting
385  	      if (last_tick_id) {
386  	        center->delete_time_event(last_tick_id);
387  	        last_tick_id = 0;
388  	      }
389  	
390  	      if (cs) {
391  	        center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
392  	        cs.close();
393  	      }
394  	
395  	      SocketOptions opts;
396  	      opts.priority = async_msgr->get_socket_priority();
397  	      opts.connect_bind_addr = msgr->get_myaddrs().front();
398  	      ssize_t r = worker->connect(target_addr, opts, &cs);
399  	      if (r < 0) {
400  	        protocol->fault();
401  	        return;
402  	      }
403  	
404  	      center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
405  	      state = STATE_CONNECTING_RE;
406  	    }
407  	    case STATE_CONNECTING_RE: {
408  	      ssize_t r = cs.is_connected();
409  	      if (r < 0) {
410  	        ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to "
411  	                                  << target_addr << dendl;
412  	        if (r == -ECONNREFUSED) {
413  	          ldout(async_msgr->cct, 2)
414  	              << __func__ << " connection refused!" << dendl;
415  	          dispatch_queue->queue_refused(this);
416  	        }
417  	        protocol->fault();
418  	        return;
419  	      } else if (r == 0) {
420  	        ldout(async_msgr->cct, 10)
421  	            << __func__ << " nonblock connect inprogress" << dendl;
422  	        if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) {
423  	          center->create_file_event(cs.fd(), EVENT_WRITABLE,
424  	                                    read_handler);
425  	        }
426  	        logger->tinc(l_msgr_running_recv_time,
427  	               ceph::mono_clock::now() - recv_start_time);
428  	        return;
429  	      }
430  	
431  	      center->delete_file_event(cs.fd(), EVENT_WRITABLE);
432  	      ldout(async_msgr->cct, 10)
433  	          << __func__ << " connect successfully, ready to send banner" << dendl;
434  	      state = STATE_CONNECTION_ESTABLISHED;
435  	      ceph_assert(last_tick_id == 0);
436  	      // exclude TCP nonblock connect time
437  	      last_connect_started = ceph::coarse_mono_clock::now();
438  	      last_tick_id = center->create_time_event(
439  	        connect_timeout_us, tick_handler);
440  	      break;
441  	    }
442  	
443  	    case STATE_ACCEPTING: {
444  	      center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
445  	      state = STATE_CONNECTION_ESTABLISHED;
446  	
447  	      break;
448  	    }
449  	
450  	    case STATE_CONNECTION_ESTABLISHED: {
451  	      if (pendingReadLen) {
452  	        ssize_t r = read(*pendingReadLen, read_buffer, readCallback);
453  	        if (r <= 0) { // read all bytes, or an error occured
454  	          pendingReadLen.reset();
455  	          char *buf_tmp = read_buffer;
456  	          read_buffer = nullptr;
457  	          readCallback(buf_tmp, r);
458  	        }
459  		logger->tinc(l_msgr_running_recv_time,
460  		    ceph::mono_clock::now() - recv_start_time);
461  	        return;
462  	      }
463  	      break;
464  	    }
465  	  }
466  	
467  	  protocol->read_event();
468  	
469  	  logger->tinc(l_msgr_running_recv_time,
470  	               ceph::mono_clock::now() - recv_start_time);
471  	}
472  	
473  	bool AsyncConnection::is_connected() {
474  	  return protocol->is_connected();
475  	}
476  	
477  	void AsyncConnection::connect(const entity_addrvec_t &addrs, int type,
478  	                              entity_addr_t &target) {
479  	
480  	  std::lock_guard<std::mutex> l(lock);
481  	  set_peer_type(type);
482  	  set_peer_addrs(addrs);
483  	  policy = msgr->get_policy(type);
484  	  target_addr = target;
485  	  _connect();
486  	}
487  	
488  	void AsyncConnection::_connect()
489  	{
490  	  ldout(async_msgr->cct, 10) << __func__ << dendl;
491  	
492  	  state = STATE_CONNECTING;
493  	  protocol->connect();
494  	  // rescheduler connection in order to avoid lock dep
495  	  // may called by external thread(send_message)
496  	  center->dispatch_event_external(read_handler);
497  	}
498  	
499  	void AsyncConnection::accept(ConnectedSocket socket,
500  				     const entity_addr_t &listen_addr,
501  				     const entity_addr_t &peer_addr)
502  	{
503  	  ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
504  				     << " listen_addr " << listen_addr
505  				     << " peer_addr " << peer_addr << dendl;
506  	  ceph_assert(socket.fd() >= 0);
507  	
508  	  std::lock_guard<std::mutex> l(lock);
509  	  cs = std::move(socket);
510  	  socket_addr = listen_addr;
511  	  target_addr = peer_addr; // until we know better
512  	  state = STATE_ACCEPTING;
513  	  protocol->accept();
514  	  // rescheduler connection in order to avoid lock dep
515  	  center->dispatch_event_external(read_handler);
516  	}
517  	
518  	int AsyncConnection::send_message(Message *m)
519  	{
520  	  FUNCTRACE(async_msgr->cct);
521  	  lgeneric_subdout(async_msgr->cct, ms,
522  			   1) << "-- " << async_msgr->get_myaddrs() << " --> "
523  			      << get_peer_addrs() << " -- "
524  			      << *m << " -- " << m << " con "
525  			      << this
526  			      << dendl;
527  	
528  	  if (is_blackhole()) {
529  	    lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
530  	      << " blackhole " << *m << dendl;
531  	    m->put();
532  	    return 0;
533  	  }
534  	
535  	  // optimistic think it's ok to encode(actually may broken now)
536  	  if (!m->get_priority())
537  	    m->set_priority(async_msgr->get_default_send_priority());
538  	
539  	  m->get_header().src = async_msgr->get_myname();
540  	  m->set_connection(this);
541  	
542  	#if defined(WITH_EVENTTRACE)
543  	  if (m->get_type() == CEPH_MSG_OSD_OP)
544  	    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
545  	  else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
546  	    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
547  	#endif
548  	
549  	  if (is_loopback) { //loopback connection
550  	    ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
551  	    std::lock_guard<std::mutex> l(write_lock);
552  	    if (protocol->is_connected()) {
553  	      dispatch_queue->local_delivery(m, m->get_priority());
554  	    } else {
555  	      ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
556  	                                 << " Drop message " << m << dendl;
557  	      m->put();
558  	    }
559  	    return 0;
560  	  }
561  	
562  	  // we don't want to consider local message here, it's too lightweight which
563  	  // may disturb users
564  	  logger->inc(l_msgr_send_messages);
565  	
566  	  protocol->send_message(m);
567  	  return 0;
568  	}
569  	
570  	entity_addr_t AsyncConnection::_infer_target_addr(const entity_addrvec_t& av)
571  	{
572  	  // pick the first addr of the same address family as socket_addr.  it could be
573  	  // an any: or v2: addr, we don't care.  it should not be a v1 addr.
574  	  for (auto& i : av.v) {
575  	    if (i.is_legacy()) {
576  	      continue;
577  	    }
578  	    if (i.get_family() == socket_addr.get_family()) {
579  	      ldout(async_msgr->cct,10) << __func__ << " " << av << " -> " << i << dendl;
580  	      return i;
581  	    }
582  	  }
583  	  ldout(async_msgr->cct,10) << __func__ << " " << av << " -> nothing to match "
584  				    << socket_addr << dendl;
585  	  return {};
586  	}
587  	
588  	void AsyncConnection::fault()
589  	{
590  	  shutdown_socket();
591  	  open_write = false;
592  	
593  	  // queue delayed items immediately
594  	  if (delay_state)
595  	    delay_state->flush();
596  	
597  	  recv_start = recv_end = 0;
598  	  state_offset = 0;
599  	  outgoing_bl.clear();
600  	}
601  	
602  	void AsyncConnection::_stop() {
603  	  writeCallback.reset();
604  	  dispatch_queue->discard_queue(conn_id);
605  	  async_msgr->unregister_conn(this);
606  	  worker->release_worker();
607  	
608  	  state = STATE_CLOSED;
609  	  open_write = false;
610  	
611  	  state_offset = 0;
612  	  // Make sure in-queue events will been processed
613  	  center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
614  	}
615  	
616  	bool AsyncConnection::is_queued() const {
617  	  return outgoing_bl.length();
618  	}
619  	
620  	void AsyncConnection::shutdown_socket() {
621  	  for (auto &&t : register_time_events) center->delete_time_event(t);
622  	  register_time_events.clear();
623  	  if (last_tick_id) {
624  	    center->delete_time_event(last_tick_id);
625  	    last_tick_id = 0;
626  	  }
627  	  if (cs) {
628  	    center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
629  	    cs.shutdown();
630  	    cs.close();
631  	  }
632  	}
633  	
634  	void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
635  	{
636  	  Message *m = nullptr;
637  	  {
638  	    std::lock_guard<std::mutex> l(delay_lock);
639  	    register_time_events.erase(id);
640  	    if (stop_dispatch)
641  	      return ;
642  	    if (delay_queue.empty())
643  	      return ;
644  	    m = delay_queue.front();
645  	    delay_queue.pop_front();
646  	  }
647  	  if (msgr->ms_can_fast_dispatch(m)) {
648  	    dispatch_queue->fast_dispatch(m);
649  	  } else {
650  	    dispatch_queue->enqueue(m, m->get_priority(), conn_id);
651  	  }
652  	}
653  	
654  	void AsyncConnection::DelayedDelivery::discard() {
655  	  stop_dispatch = true;
656  	  center->submit_to(center->get_id(),
657  	                    [this]() mutable {
658  	                      std::lock_guard<std::mutex> l(delay_lock);
659  	                      while (!delay_queue.empty()) {
660  	                        Message *m = delay_queue.front();
661  	                        dispatch_queue->dispatch_throttle_release(
662  	                            m->get_dispatch_throttle_size());
663  	                        m->put();
664  	                        delay_queue.pop_front();
665  	                      }
666  	                      for (auto i : register_time_events)
667  	                        center->delete_time_event(i);
668  	                      register_time_events.clear();
669  	                      stop_dispatch = false;
670  	                    },
671  	                    true);
672  	}
673  	
674  	void AsyncConnection::DelayedDelivery::flush() {
675  	  stop_dispatch = true;
676  	  center->submit_to(
677  	      center->get_id(), [this] () mutable {
678  	    std::lock_guard<std::mutex> l(delay_lock);
679  	    while (!delay_queue.empty()) {
680  	      Message *m = delay_queue.front();
681  	      if (msgr->ms_can_fast_dispatch(m)) {
682  	        dispatch_queue->fast_dispatch(m);
683  	      } else {
684  	        dispatch_queue->enqueue(m, m->get_priority(), conn_id);
685  	      }
686  	      delay_queue.pop_front();
687  	    }
688  	    for (auto i : register_time_events)
689  	      center->delete_time_event(i);
690  	    register_time_events.clear();
691  	    stop_dispatch = false;
692  	  }, true);
693  	}
694  	
695  	void AsyncConnection::send_keepalive()
696  	{
697  	  protocol->send_keepalive();
698  	}
699  	
700  	void AsyncConnection::mark_down()
701  	{
702  	  ldout(async_msgr->cct, 1) << __func__ << dendl;
703  	  std::lock_guard<std::mutex> l(lock);
704  	  protocol->stop();
705  	}
706  	
707  	void AsyncConnection::handle_write()
708  	{
709  	  ldout(async_msgr->cct, 10) << __func__ << dendl;
710  	  protocol->write_event();
711  	}
712  	
713  	void AsyncConnection::handle_write_callback() {
714  	  std::lock_guard<std::mutex> l(lock);
715  	  last_active = ceph::coarse_mono_clock::now();
716  	  recv_start_time = ceph::mono_clock::now();
717  	  write_lock.lock();
718  	  if (writeCallback) {
719  	    auto callback = *writeCallback;
720  	    writeCallback.reset();
721  	    write_lock.unlock();
722  	    callback(0);
723  	    return;
724  	  }
725  	  write_lock.unlock();
726  	}
727  	
728  	void AsyncConnection::stop(bool queue_reset) {
729  	  lock.lock();
730  	  bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
731  	  protocol->stop();
732  	  lock.unlock();
733  	  if (need_queue_reset) dispatch_queue->queue_reset(this);
734  	}
735  	
736  	void AsyncConnection::cleanup() {
737  	  shutdown_socket();
738  	  delete read_handler;
739  	  delete write_handler;
740  	  delete write_callback_handler;
741  	  delete wakeup_handler;
742  	  delete tick_handler;
743  	  if (delay_state) {
744  	    delete delay_state;
745  	    delay_state = NULL;
746  	  }
747  	}
748  	
749  	void AsyncConnection::wakeup_from(uint64_t id)
750  	{
751  	  lock.lock();
752  	  register_time_events.erase(id);
753  	  lock.unlock();
754  	  process();
755  	}
756  	
757  	void AsyncConnection::tick(uint64_t id)
758  	{
759  	  auto now = ceph::coarse_mono_clock::now();
760  	  ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
761  	                             << " last_active=" << last_active << dendl;
762  	  std::lock_guard<std::mutex> l(lock);
763  	  last_tick_id = 0;
764  	  if (!is_connected()) {
765  	    if (connect_timeout_us <=
766  	        (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>
767  	          (now - last_connect_started).count()) {
768  	      ldout(async_msgr->cct, 1) << __func__ << " see no progress in more than "
769  	                                << connect_timeout_us
770  	                                << " us during connecting, fault."
771  	                                << dendl;
772  	      protocol->fault();
773  	    } else {
774  	      last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
775  	    }
776  	  } else {
777  	    auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>
778  	      (now - last_active).count();
779  	    if (inactive_timeout_us < (uint64_t)idle_period) {
780  	      ldout(async_msgr->cct, 1) << __func__ << " idle (" << idle_period
781  	                                << ") for more than " << inactive_timeout_us
782  	                                << " us, fault."
783  	                                << dendl;
784  	      protocol->fault();
785  	    } else {
786  	      last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
787  	    }
788  	  }
789  	}
790