1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "ProtocolV1.h"
5    	
6    	#include "common/errno.h"
7    	
8    	#include "AsyncConnection.h"
9    	#include "AsyncMessenger.h"
10   	#include "common/EventTrace.h"
11   	#include "include/random.h"
12   	#include "auth/AuthClient.h"
13   	#include "auth/AuthServer.h"
14   	
15   	#define dout_subsys ceph_subsys_ms
16   	#undef dout_prefix
17   	#define dout_prefix _conn_prefix(_dout)
18   	ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
19   	  return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
20   	                << *connection->peer_addrs
21   			<< " conn("
22   	                << connection << " " << this
23   	                << " :" << connection->port << " s=" << get_state_name(state)
24   	                << " pgs=" << peer_global_seq << " cs=" << connect_seq
25   	                << " l=" << connection->policy.lossy << ").";
26   	}
27   	
28   	#define WRITE(B, C) write(CONTINUATION(C), B)
29   	
30   	#define READ(L, C) read(CONTINUATION(C), L)
31   	
32   	#define READB(L, B, C) read(CONTINUATION(C), L, B)
33   	
34   	// Constant to limit starting sequence number to 2^31.  Nothing special about
35   	// it, just a big number.  PLR
36   	#define SEQ_MASK 0x7fffffff
37   	
38   	const int ASYNC_COALESCE_THRESHOLD = 256;
39   	
40   	using namespace std;
41   	
42   	static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
43   	  // create a buffer to read into that matches the data alignment
44   	  unsigned alloc_len = 0;
45   	  unsigned left = len;
46   	  unsigned head = 0;
47   	  if (off & ~CEPH_PAGE_MASK) {
48   	    // head
49   	    alloc_len += CEPH_PAGE_SIZE;
50   	    head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
51   	    left -= head;
52   	  }
53   	  alloc_len += left;
54   	  bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
55   	  if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
56   	  data.push_back(std::move(ptr));
57   	}
58   	
59   	/**
60   	 * Protocol V1
61   	 **/
62   	
63   	ProtocolV1::ProtocolV1(AsyncConnection *connection)
64   	    : Protocol(1, connection),
65   	      temp_buffer(nullptr),
66   	      can_write(WriteStatus::NOWRITE),
67   	      keepalive(false),
68   	      connect_seq(0),
69   	      peer_global_seq(0),
70   	      msg_left(0),
71   	      cur_msg_size(0),
72   	      replacing(false),
73   	      is_reset_from_peer(false),
74   	      once_ready(false),
75   	      state(NONE),
76   	      global_seq(0),
77   	      wait_for_seq(false) {
78   	  temp_buffer = new char[4096];
79   	}
80   	
81   	ProtocolV1::~ProtocolV1() {
82   	  ceph_assert(out_q.empty());
83   	  ceph_assert(sent.empty());
84   	
85   	  delete[] temp_buffer;
86   	}
87   	
88   	void ProtocolV1::connect() {
89   	  this->state = START_CONNECT;
90   	
91   	  // reset connect state variables
92   	  authorizer_buf.clear();
93   	  memset(&connect_msg, 0, sizeof(connect_msg));
94   	  memset(&connect_reply, 0, sizeof(connect_reply));
95   	
96   	  global_seq = messenger->get_global_seq();
97   	}
98   	
99   	void ProtocolV1::accept() { this->state = START_ACCEPT; }
100  	
101  	bool ProtocolV1::is_connected() {
102  	  return can_write.load() == WriteStatus::CANWRITE;
103  	}
104  	
105  	void ProtocolV1::stop() {
106  	  ldout(cct, 20) << __func__ << dendl;
107  	  if (state == CLOSED) {
108  	    return;
109  	  }
110  	
111  	  if (connection->delay_state) connection->delay_state->flush();
112  	
113  	  ldout(cct, 2) << __func__ << dendl;
114  	  std::lock_guard<std::mutex> l(connection->write_lock);
115  	
116  	  reset_recv_state();
117  	  discard_out_queue();
118  	
119  	  connection->_stop();
120  	
121  	  can_write = WriteStatus::CLOSED;
122  	  state = CLOSED;
123  	}
124  	
125  	void ProtocolV1::fault() {
126  	  ldout(cct, 20) << __func__ << dendl;
127  	
128  	  if (state == CLOSED || state == NONE) {
129  	    ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
130  	    return;
131  	  }
132  	
133  	  if (connection->policy.lossy && state != START_CONNECT &&
134  	      state != CONNECTING) {
135  	    ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
136  	    stop();
137  	    connection->dispatch_queue->queue_reset(connection);
138  	    return;
139  	  }
140  	
141  	  connection->write_lock.lock();
142  	  can_write = WriteStatus::NOWRITE;
143  	  is_reset_from_peer = false;
144  	
145  	  // requeue sent items
146  	  requeue_sent();
147  	
148  	  if (!once_ready && out_q.empty() && state >= START_ACCEPT &&
149  	      state <= ACCEPTING_WAIT_CONNECT_MSG_AUTH && !replacing) {
150  	    ldout(cct, 10) << __func__ << " with nothing to send and in the half "
151  	                   << " accept state just closed" << dendl;
152  	    connection->write_lock.unlock();
153  	    stop();
154  	    connection->dispatch_queue->queue_reset(connection);
155  	    return;
156  	  }
157  	  replacing = false;
158  	
159  	  connection->fault();
160  	
161  	  reset_recv_state();
162  	
163  	  if (connection->policy.standby && out_q.empty() && !keepalive &&
164  	      state != WAIT) {
165  	    ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
166  	                   << dendl;
167  	    state = STANDBY;
168  	    connection->write_lock.unlock();
169  	    return;
170  	  }
171  	
172  	  connection->write_lock.unlock();
173  	
174  	  if ((state >= START_CONNECT && state <= CONNECTING_SEND_CONNECT_MSG) ||
175  	      state == WAIT) {
176  	    // backoff!
177  	    if (state == WAIT) {
178  	      backoff.set_from_double(cct->_conf->ms_max_backoff);
179  	    } else if (backoff == utime_t()) {
180  	      backoff.set_from_double(cct->_conf->ms_initial_backoff);
181  	    } else {
182  	      backoff += backoff;
183  	      if (backoff > cct->_conf->ms_max_backoff)
184  	        backoff.set_from_double(cct->_conf->ms_max_backoff);
185  	    }
186  	
187  	    global_seq = messenger->get_global_seq();
188  	    state = START_CONNECT;
189  	    connection->state = AsyncConnection::STATE_CONNECTING;
190  	    ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
191  	    // woke up again;
192  	    connection->register_time_events.insert(
193  	        connection->center->create_time_event(backoff.to_nsec() / 1000,
194  	                                              connection->wakeup_handler));
195  	  } else {
196  	    // policy maybe empty when state is in accept
197  	    if (connection->policy.server) {
198  	      ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
199  	      state = STANDBY;
200  	    } else {
201  	      ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
202  	      connect_seq++;
203  	      global_seq = messenger->get_global_seq();
204  	      state = START_CONNECT;
205  	      connection->state = AsyncConnection::STATE_CONNECTING;
206  	    }
207  	    backoff = utime_t();
208  	    connection->center->dispatch_event_external(connection->read_handler);
209  	  }
210  	}
211  	
212  	void ProtocolV1::send_message(Message *m) {
213  	  bufferlist bl;
214  	  uint64_t f = connection->get_features();
215  	
216  	  // TODO: Currently not all messages supports reencode like MOSDMap, so here
217  	  // only let fast dispatch support messages prepare message
218  	  bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
219  	  if (can_fast_prepare) {
220  	    prepare_send_message(f, m, bl);
221  	  }
222  	
223  	  std::lock_guard<std::mutex> l(connection->write_lock);
224  	  // "features" changes will change the payload encoding
225  	  if (can_fast_prepare &&
226  	      (can_write == WriteStatus::NOWRITE || connection->get_features() != f)) {
227  	    // ensure the correctness of message encoding
228  	    bl.clear();
229  	    m->clear_payload();
230  	    ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
231  	                  << " != " << connection->get_features() << dendl;
232  	  }
233  	  if (can_write == WriteStatus::CLOSED) {
234  	    ldout(cct, 10) << __func__ << " connection closed."
235  	                   << " Drop message " << m << dendl;
236  	    m->put();
237  	  } else {
238  	    m->queue_start = ceph::mono_clock::now();
239  	    m->trace.event("async enqueueing message");
240  	    out_q[m->get_priority()].emplace_back(std::move(bl), m);
241  	    ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
242  	                   << dendl;
243  	    if (can_write != WriteStatus::REPLACING && !write_in_progress) {
244  	      write_in_progress = true;
245  	      connection->center->dispatch_event_external(connection->write_handler);
246  	    }
247  	  }
248  	}
249  	
250  	void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
251  	                                      bufferlist &bl) {
252  	  ldout(cct, 20) << __func__ << " m " << *m << dendl;
253  	
254  	  // associate message with Connection (for benefit of encode_payload)
255  	  ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
256  			 << features << " " << m  << " " << *m << dendl;
257  	
258  	  // encode and copy out of *m
259  	  // in write_message we update header.seq and need recalc crc
260  	  // so skip calc header in encode function.
261  	  m->encode(features, messenger->crcflags, true);
262  	
263  	  bl.append(m->get_payload());
264  	  bl.append(m->get_middle());
265  	  bl.append(m->get_data());
266  	}
267  	
268  	void ProtocolV1::send_keepalive() {
269  	  ldout(cct, 10) << __func__ << dendl;
270  	  std::lock_guard<std::mutex> l(connection->write_lock);
271  	  if (can_write != WriteStatus::CLOSED) {
272  	    keepalive = true;
273  	    connection->center->dispatch_event_external(connection->write_handler);
274  	  }
275  	}
276  	
277  	void ProtocolV1::read_event() {
278  	  ldout(cct, 20) << __func__ << dendl;
279  	  switch (state) {
280  	    case START_CONNECT:
281  	      CONTINUATION_RUN(CONTINUATION(send_client_banner));
282  	      break;
283  	    case START_ACCEPT:
284  	      CONTINUATION_RUN(CONTINUATION(send_server_banner));
285  	      break;
286  	    case OPENED:
287  	      CONTINUATION_RUN(CONTINUATION(wait_message));
288  	      break;
289  	    case THROTTLE_MESSAGE:
290  	      CONTINUATION_RUN(CONTINUATION(throttle_message));
291  	      break;
292  	    case THROTTLE_BYTES:
293  	      CONTINUATION_RUN(CONTINUATION(throttle_bytes));
294  	      break;
295  	    case THROTTLE_DISPATCH_QUEUE:
296  	      CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
297  	      break;
298  	    default:
299  	      break;
300  	  }
301  	}
302  	
303  	void ProtocolV1::write_event() {
304  	  ldout(cct, 10) << __func__ << dendl;
305  	  ssize_t r = 0;
306  	
307  	  connection->write_lock.lock();
308  	  if (can_write == WriteStatus::CANWRITE) {
309  	    if (keepalive) {
310  	      append_keepalive_or_ack();
311  	      keepalive = false;
312  	    }
313  	
314  	    auto start = ceph::mono_clock::now();
315  	    bool more;
316  	    do {
317  	      bufferlist data;
318  	      Message *m = _get_next_outgoing(&data);
319  	      if (!m) {
320  	        break;
321  	      }
322  	
323  	      if (!connection->policy.lossy) {
324  	        // put on sent list
325  	        sent.push_back(m);
326  	        m->get();
327  	      }
328  	      more = !out_q.empty();
329  	      connection->write_lock.unlock();
330  	
331  	      // send_message or requeue messages may not encode message
332  	      if (!data.length()) {
333  	        prepare_send_message(connection->get_features(), m, data);
334  	      }
335  	
336  	      if (m->queue_start != ceph::mono_time()) {
337  	        connection->logger->tinc(l_msgr_send_messages_queue_lat,
338  					 ceph::mono_clock::now() - m->queue_start);
339  	      }
340  	
341  	      r = write_message(m, data, more);
342  	
343  	      connection->write_lock.lock();
344  	      if (r == 0) {
345  	        ;
346  	      } else if (r < 0) {
347  	        ldout(cct, 1) << __func__ << " send msg failed" << dendl;
348  	        break;
349  	      } else if (r > 0) {
350  		// Outbound message in-progress, thread will be re-awoken
351  		// when the outbound socket is writeable again
352  		break;
353  	      }
354  	    } while (can_write == WriteStatus::CANWRITE);
355  	    write_in_progress = false;
356  	    connection->write_lock.unlock();
357  	
358  	    // if r > 0 mean data still lefted, so no need _try_send.
359  	    if (r == 0) {
360  	      uint64_t left = ack_left;
361  	      if (left) {
362  	        ceph_le64 s;
363  	        s = in_seq;
364  	        connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK);
365  	        connection->outgoing_bl.append((char *)&s, sizeof(s));
366  	        ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
367  	                       << " messages" << dendl;
368  	        ack_left -= left;
369  	        left = ack_left;
370  	        r = connection->_try_send(left);
371  	      } else if (is_queued()) {
372  	        r = connection->_try_send();
373  	      }
374  	    }
375  	
376  	    connection->logger->tinc(l_msgr_running_send_time,
377  	                             ceph::mono_clock::now() - start);
378  	    if (r < 0) {
379  	      ldout(cct, 1) << __func__ << " send msg failed" << dendl;
380  	      connection->lock.lock();
381  	      fault();
382  	      connection->lock.unlock();
383  	      return;
384  	    }
385  	  } else {
386  	    write_in_progress = false;
387  	    connection->write_lock.unlock();
388  	    connection->lock.lock();
389  	    connection->write_lock.lock();
390  	    if (state == STANDBY && !connection->policy.server && is_queued()) {
391  	      ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
392  	      connection->_connect();
393  	    } else if (connection->cs && state != NONE && state != CLOSED &&
394  	               state != START_CONNECT) {
395  	      r = connection->_try_send();
396  	      if (r < 0) {
397  	        ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
398  	        connection->write_lock.unlock();
399  	        fault();
400  	        connection->lock.unlock();
401  	        return;
402  	      }
403  	    }
404  	    connection->write_lock.unlock();
405  	    connection->lock.unlock();
406  	  }
407  	}
408  	
409  	bool ProtocolV1::is_queued() {
410  	  return !out_q.empty() || connection->is_queued();
411  	}
412  	
413  	void ProtocolV1::run_continuation(CtPtr pcontinuation) {
414  	  if (pcontinuation) {
415  	    CONTINUATION_RUN(*pcontinuation);
416  	  }
417  	}
418  	
419  	CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
420  	                       int len, char *buffer) {
421  	  if (!buffer) {
422  	    buffer = temp_buffer;
423  	  }
424  	  ssize_t r = connection->read(len, buffer,
425  	                               [&next, this](char *buffer, int r) {
426  	                                 next.setParams(buffer, r);
427  	                                 CONTINUATION_RUN(next);
428  	                               });
429  	  if (r <= 0) {
430  	    next.setParams(buffer, r);
431  	    return &next;
432  	  }
433  	
434  	  return nullptr;
435  	}
436  	
437  	CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
438  	                        bufferlist &buffer) {
439  	  ssize_t r = connection->write(buffer, [&next, this](int r) {
440  	    next.setParams(r);
441  	    CONTINUATION_RUN(next);
442  	  });
443  	  if (r <= 0) {
444  	    next.setParams(r);
445  	    return &next;
446  	  }
447  	
448  	  return nullptr;
449  	}
450  	
451  	CtPtr ProtocolV1::ready() {
452  	  ldout(cct, 25) << __func__ << dendl;
453  	
454  	  // make sure no pending tick timer
455  	  if (connection->last_tick_id) {
456  	    connection->center->delete_time_event(connection->last_tick_id);
457  	  }
458  	  connection->last_tick_id = connection->center->create_time_event(
459  	      connection->inactive_timeout_us, connection->tick_handler);
460  	
461  	  connection->write_lock.lock();
462  	  can_write = WriteStatus::CANWRITE;
463  	  if (is_queued()) {
464  	    connection->center->dispatch_event_external(connection->write_handler);
465  	  }
466  	  connection->write_lock.unlock();
467  	  connection->maybe_start_delay_thread();
468  	
469  	  state = OPENED;
470  	  return wait_message();
471  	}
472  	
473  	CtPtr ProtocolV1::wait_message() {
474  	  if (state != OPENED) {  // must have changed due to a replace
475  	    return nullptr;
476  	  }
477  	
478  	  ldout(cct, 20) << __func__ << dendl;
479  	
480  	  return READ(sizeof(char), handle_message);
481  	}
482  	
483  	CtPtr ProtocolV1::handle_message(char *buffer, int r) {
484  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
485  	
486  	  if (r < 0) {
487  	    ldout(cct, 1) << __func__ << " read tag failed" << dendl;
488  	    return _fault();
489  	  }
490  	
491  	  char tag = buffer[0];
492  	  ldout(cct, 20) << __func__ << " process tag " << (int)tag << dendl;
493  	
494  	  if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
495  	    ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
496  	    connection->set_last_keepalive(ceph_clock_now());
497  	  } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
498  	    return READ(sizeof(ceph_timespec), handle_keepalive2);
499  	  } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
500  	    return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
501  	  } else if (tag == CEPH_MSGR_TAG_ACK) {
502  	    return READ(sizeof(ceph_le64), handle_tag_ack);
503  	  } else if (tag == CEPH_MSGR_TAG_MSG) {
504  	    recv_stamp = ceph_clock_now();
505  	    ldout(cct, 20) << __func__ << " begin MSG" << dendl;
506  	    return READ(sizeof(ceph_msg_header), handle_message_header);
507  	  } else if (tag == CEPH_MSGR_TAG_CLOSE) {
508  	    ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
509  	    stop();
510  	  } else {
511  	    ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
512  	    return _fault();
513  	  }
514  	  return nullptr;
515  	}
516  	
517  	CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
518  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
519  	
520  	  if (r < 0) {
521  	    ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
522  	    return _fault();
523  	  }
524  	
525  	  ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
526  	
527  	  ceph_timespec *t;
528  	  t = (ceph_timespec *)buffer;
529  	  utime_t kp_t = utime_t(*t);
530  	  connection->write_lock.lock();
531  	  append_keepalive_or_ack(true, &kp_t);
532  	  connection->write_lock.unlock();
533  	
534  	  ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
535  	  connection->set_last_keepalive(ceph_clock_now());
536  	
537  	  if (is_connected()) {
538  	    connection->center->dispatch_event_external(connection->write_handler);
539  	  }
540  	
541  	  return CONTINUE(wait_message);
542  	}
543  	
544  	void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
545  	  ldout(cct, 10) << __func__ << dendl;
546  	  if (ack) {
547  	    ceph_assert(tp);
548  	    struct ceph_timespec ts;
549  	    tp->encode_timeval(&ts);
550  	    connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
551  	    connection->outgoing_bl.append((char *)&ts, sizeof(ts));
552  	  } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
553  	    struct ceph_timespec ts;
554  	    utime_t t = ceph_clock_now();
555  	    t.encode_timeval(&ts);
556  	    connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
557  	    connection->outgoing_bl.append((char *)&ts, sizeof(ts));
558  	  } else {
559  	    connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
560  	  }
561  	}
562  	
563  	CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
564  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
565  	
566  	  if (r < 0) {
567  	    ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
568  	    return _fault();
569  	  }
570  	
571  	  ceph_timespec *t;
572  	  t = (ceph_timespec *)buffer;
573  	  connection->set_last_keepalive_ack(utime_t(*t));
574  	  ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
575  	
576  	  return CONTINUE(wait_message);
577  	}
578  	
579  	CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
580  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
581  	
582  	  if (r < 0) {
583  	    ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
584  	    return _fault();
585  	  }
586  	
587  	  ceph_le64 seq;
588  	  seq = *(ceph_le64 *)buffer;
589  	  ldout(cct, 20) << __func__ << " got ACK" << dendl;
590  	
591  	  ldout(cct, 15) << __func__ << " got ack seq " << seq << dendl;
592  	  // trim sent list
593  	  static const int max_pending = 128;
594  	  int i = 0;
595  	  auto now = ceph::mono_clock::now();
596  	  Message *pending[max_pending];
597  	  connection->write_lock.lock();
598  	  while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
599  	    Message *m = sent.front();
600  	    sent.pop_front();
601  	    pending[i++] = m;
602  	    ldout(cct, 10) << __func__ << " got ack seq " << seq
603  	                   << " >= " << m->get_seq() << " on " << m << " " << *m
604  	                   << dendl;
605  	  }
606  	  connection->write_lock.unlock();
607  	  connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
608  	  for (int k = 0; k < i; k++) {
609  	    pending[k]->put();
610  	  }
611  	
612  	  return CONTINUE(wait_message);
613  	}
614  	
615  	CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
616  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
617  	
618  	  if (r < 0) {
619  	    ldout(cct, 1) << __func__ << " read message header failed" << dendl;
620  	    return _fault();
621  	  }
622  	
623  	  ldout(cct, 20) << __func__ << " got MSG header" << dendl;
624  	
625  	  current_header = *((ceph_msg_header *)buffer);
626  	
627  	  ldout(cct, 20) << __func__ << " got envelope type=" << current_header.type << " src "
628  	                 << entity_name_t(current_header.src) << " front=" << current_header.front_len
629  	                 << " data=" << current_header.data_len << " off " << current_header.data_off
630  	                 << dendl;
631  	
632  	  if (messenger->crcflags & MSG_CRC_HEADER) {
633  	    __u32 header_crc = 0;
634  	    header_crc = ceph_crc32c(0, (unsigned char *)&current_header,
635  	                             sizeof(current_header) - sizeof(current_header.crc));
636  	    // verify header crc
637  	    if (header_crc != current_header.crc) {
638  	      ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
639  	                    << " != " << current_header.crc << dendl;
640  	      return _fault();
641  	    }
642  	  }
643  	
644  	  // Reset state
645  	  data_buf.clear();
646  	  front.clear();
647  	  middle.clear();
648  	  data.clear();
649  	
650  	  state = THROTTLE_MESSAGE;
651  	  return CONTINUE(throttle_message);
652  	}
653  	
654  	CtPtr ProtocolV1::throttle_message() {
655  	  ldout(cct, 20) << __func__ << dendl;
656  	
657  	  if (connection->policy.throttler_messages) {
658  	    ldout(cct, 10) << __func__ << " wants " << 1
659  	                   << " message from policy throttler "
660  	                   << connection->policy.throttler_messages->get_current()
661  	                   << "/" << connection->policy.throttler_messages->get_max()
662  	                   << dendl;
663  	    if (!connection->policy.throttler_messages->get_or_fail()) {
664  	      ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
665  	                     << connection->policy.throttler_messages->get_current()
666  	                     << "/" << connection->policy.throttler_messages->get_max()
667  	                     << " failed, just wait." << dendl;
668  	      // following thread pool deal with th full message queue isn't a
669  	      // short time, so we can wait a ms.
670  	      if (connection->register_time_events.empty()) {
671  	        connection->register_time_events.insert(
672  	            connection->center->create_time_event(1000,
673  	                                                  connection->wakeup_handler));
674  	      }
675  	      return nullptr;
676  	    }
677  	  }
678  	
679  	  state = THROTTLE_BYTES;
680  	  return CONTINUE(throttle_bytes);
681  	}
682  	
683  	CtPtr ProtocolV1::throttle_bytes() {
684  	  ldout(cct, 20) << __func__ << dendl;
685  	
686  	  cur_msg_size = current_header.front_len + current_header.middle_len +
687  	                 current_header.data_len;
688  	  if (cur_msg_size) {
689  	    if (connection->policy.throttler_bytes) {
690  	      ldout(cct, 10) << __func__ << " wants " << cur_msg_size
691  	                     << " bytes from policy throttler "
692  	                     << connection->policy.throttler_bytes->get_current() << "/"
693  	                     << connection->policy.throttler_bytes->get_max() << dendl;
694  	      if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
695  	        ldout(cct, 10) << __func__ << " wants " << cur_msg_size
696  	                       << " bytes from policy throttler "
697  	                       << connection->policy.throttler_bytes->get_current()
698  	                       << "/" << connection->policy.throttler_bytes->get_max()
699  	                       << " failed, just wait." << dendl;
700  	        // following thread pool deal with th full message queue isn't a
701  	        // short time, so we can wait a ms.
702  	        if (connection->register_time_events.empty()) {
703  	          connection->register_time_events.insert(
704  	              connection->center->create_time_event(
705  	                  1000, connection->wakeup_handler));
706  	        }
707  	        return nullptr;
708  	      }
709  	    }
710  	  }
711  	
712  	  state = THROTTLE_DISPATCH_QUEUE;
713  	  return CONTINUE(throttle_dispatch_queue);
714  	}
715  	
716  	CtPtr ProtocolV1::throttle_dispatch_queue() {
717  	  ldout(cct, 20) << __func__ << dendl;
718  	
719  	  if (cur_msg_size) {
720  	    if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
721  	            cur_msg_size)) {
722  	      ldout(cct, 10)
723  	          << __func__ << " wants " << cur_msg_size
724  	          << " bytes from dispatch throttle "
725  	          << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
726  	          << connection->dispatch_queue->dispatch_throttler.get_max()
727  	          << " failed, just wait." << dendl;
728  	      // following thread pool deal with th full message queue isn't a
729  	      // short time, so we can wait a ms.
730  	      if (connection->register_time_events.empty()) {
731  	        connection->register_time_events.insert(
732  	            connection->center->create_time_event(1000,
733  	                                                  connection->wakeup_handler));
734  	      }
735  	      return nullptr;
736  	    }
737  	  }
738  	
739  	  throttle_stamp = ceph_clock_now();
740  	
741  	  state = READ_MESSAGE_FRONT;
742  	  return read_message_front();
743  	}
744  	
745  	CtPtr ProtocolV1::read_message_front() {
746  	  ldout(cct, 20) << __func__ << dendl;
747  	
748  	  unsigned front_len = current_header.front_len;
749  	  if (front_len) {
750  	    if (!front.length()) {
751  	      front.push_back(buffer::create(front_len));
752  	    }
753  	    return READB(front_len, front.c_str(), handle_message_front);
754  	  }
755  	  return read_message_middle();
756  	}
757  	
758  	CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
759  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
760  	
761  	  if (r < 0) {
762  	    ldout(cct, 1) << __func__ << " read message front failed" << dendl;
763  	    return _fault();
764  	  }
765  	
766  	  ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
767  	
768  	  return read_message_middle();
769  	}
770  	
771  	CtPtr ProtocolV1::read_message_middle() {
772  	  ldout(cct, 20) << __func__ << dendl;
773  	
774  	  if (current_header.middle_len) {
775  	    if (!middle.length()) {
776  	      middle.push_back(buffer::create(current_header.middle_len));
777  	    }
778  	    return READB(current_header.middle_len, middle.c_str(),
779  	                 handle_message_middle);
780  	  }
781  	
782  	  return read_message_data_prepare();
783  	}
784  	
785  	CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
786  	  ldout(cct, 20) << __func__ << " r" << r << dendl;
787  	
788  	  if (r < 0) {
789  	    ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
790  	    return _fault();
791  	  }
792  	
793  	  ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
794  	
795  	  return read_message_data_prepare();
796  	}
797  	
798  	CtPtr ProtocolV1::read_message_data_prepare() {
799  	  ldout(cct, 20) << __func__ << dendl;
800  	
801  	  unsigned data_len = current_header.data_len;
802  	  unsigned data_off = current_header.data_off;
803  	
804  	  if (data_len) {
805  	    // get a buffer
806  	#if 0
807  	    // rx_buffers is broken by design... see
808  	    //  http://tracker.ceph.com/issues/22480
809  	    map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
810  	        connection->rx_buffers.find(current_header.tid);
811  	    if (p != connection->rx_buffers.end()) {
812  	      ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
813  	                     << " at offset " << data_off << " len "
814  	                     << p->second.first.length() << dendl;
815  	      data_buf = p->second.first;
816  	      // make sure it's big enough
817  	      if (data_buf.length() < data_len)
818  	        data_buf.push_back(buffer::create(data_len - data_buf.length()));
819  	      data_blp = data_buf.begin();
820  	    } else {
821  	      ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
822  	                     << data_off << dendl;
823  	      alloc_aligned_buffer(data_buf, data_len, data_off);
824  	      data_blp = data_buf.begin();
825  	    }
826  	#else
827  	    ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
828  			   << data_off << dendl;
829  	    alloc_aligned_buffer(data_buf, data_len, data_off);
830  	    data_blp = data_buf.begin();
831  	#endif
832  	  }
833  	
834  	  msg_left = data_len;
835  	
836  	  return CONTINUE(read_message_data);
837  	}
838  	
839  	CtPtr ProtocolV1::read_message_data() {
840  	  ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
841  	
842  	  if (msg_left > 0) {
843  	    bufferptr bp = data_blp.get_current_ptr();
844  	    unsigned read_len = std::min(bp.length(), msg_left);
845  	
846  	    return READB(read_len, bp.c_str(), handle_message_data);
847  	  }
848  	
849  	  return read_message_footer();
850  	}
851  	
852  	CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
853  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
854  	
855  	  if (r < 0) {
856  	    ldout(cct, 1) << __func__ << " read data error " << dendl;
857  	    return _fault();
858  	  }
859  	
860  	  bufferptr bp = data_blp.get_current_ptr();
861  	  unsigned read_len = std::min(bp.length(), msg_left);
862  	  ceph_assert(read_len <
863  		      static_cast<unsigned>(std::numeric_limits<int>::max()));
864  	  data_blp.advance(read_len);
865  	  data.append(bp, 0, read_len);
866  	  msg_left -= read_len;
867  	
868  	  return CONTINUE(read_message_data);
869  	}
870  	
871  	CtPtr ProtocolV1::read_message_footer() {
872  	  ldout(cct, 20) << __func__ << dendl;
873  	
874  	  state = READ_FOOTER_AND_DISPATCH;
875  	
876  	  unsigned len;
877  	  if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
878  	    len = sizeof(ceph_msg_footer);
879  	  } else {
880  	    len = sizeof(ceph_msg_footer_old);
881  	  }
882  	
883  	  return READ(len, handle_message_footer);
884  	}
885  	
886  	CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
887  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
888  	
889  	  if (r < 0) {
890  	    ldout(cct, 1) << __func__ << " read footer data error " << dendl;
891  	    return _fault();
892  	  }
893  	
894  	  ceph_msg_footer footer;
895  	  ceph_msg_footer_old old_footer;
896  	
897  	  if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
898  	    footer = *((ceph_msg_footer *)buffer);
899  	  } else {
900  	    old_footer = *((ceph_msg_footer_old *)buffer);
901  	    footer.front_crc = old_footer.front_crc;
902  	    footer.middle_crc = old_footer.middle_crc;
903  	    footer.data_crc = old_footer.data_crc;
904  	    footer.sig = 0;
905  	    footer.flags = old_footer.flags;
906  	  }
907  	
908  	  int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
909  	  ldout(cct, 10) << __func__ << " aborted = " << aborted << dendl;
910  	  if (aborted) {
911  	    ldout(cct, 0) << __func__ << " got " << front.length() << " + "
912  	                  << middle.length() << " + " << data.length()
913  	                  << " byte message.. ABORTED" << dendl;
914  	    return _fault();
915  	  }
916  	
917  	  ldout(cct, 20) << __func__ << " got " << front.length() << " + "
918  	                 << middle.length() << " + " << data.length() << " byte message"
919  	                 << dendl;
920  	  Message *message = decode_message(cct, messenger->crcflags, current_header,
921  	                                    footer, front, middle, data, connection);
922  	  if (!message) {
923  	    ldout(cct, 1) << __func__ << " decode message failed " << dendl;
924  	    return _fault();
925  	  }
926  	
927  	  //
928  	  //  Check the signature if one should be present.  A zero return indicates
929  	  //  success. PLR
930  	  //
931  	
932  	  if (session_security.get() == NULL) {
933  	    ldout(cct, 10) << __func__ << " no session security set" << dendl;
934  	  } else {
935  	    if (session_security->check_message_signature(message)) {
936  	      ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
937  	      message->put();
938  	      return _fault();
939  	    }
940  	  }
941  	  message->set_byte_throttler(connection->policy.throttler_bytes);
942  	  message->set_message_throttler(connection->policy.throttler_messages);
943  	
944  	  // store reservation size in message, so we don't get confused
945  	  // by messages entering the dispatch queue through other paths.
946  	  message->set_dispatch_throttle_size(cur_msg_size);
947  	
948  	  message->set_recv_stamp(recv_stamp);
949  	  message->set_throttle_stamp(throttle_stamp);
950  	  message->set_recv_complete_stamp(ceph_clock_now());
951  	
952  	  // check received seq#.  if it is old, drop the message.
953  	  // note that incoming messages may skip ahead.  this is convenient for the
954  	  // client side queueing because messages can't be renumbered, but the (kernel)
955  	  // client will occasionally pull a message out of the sent queue to send
956  	  // elsewhere.  in that case it doesn't matter if we "got" it or not.
957  	  uint64_t cur_seq = in_seq;
958  	  if (message->get_seq() <= cur_seq) {
959  	    ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
960  	                  << " <= " << cur_seq << " " << message << " " << *message
961  	                  << ", discarding" << dendl;
962  	    message->put();
963  	    if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
964  	        cct->_conf->ms_die_on_old_message) {
965  	      ceph_assert(0 == "old msgs despite reconnect_seq feature");
966  	    }
967  	    return nullptr;
968  	  }
969  	  if (message->get_seq() > cur_seq + 1) {
970  	    ldout(cct, 0) << __func__ << " missed message?  skipped from seq "
971  	                  << cur_seq << " to " << message->get_seq() << dendl;
972  	    if (cct->_conf->ms_die_on_skipped_message) {
973  	      ceph_assert(0 == "skipped incoming seq");
974  	    }
975  	  }
976  	
977  	#if defined(WITH_EVENTTRACE)
978  	  if (message->get_type() == CEPH_MSG_OSD_OP ||
979  	      message->get_type() == CEPH_MSG_OSD_OPREPLY) {
980  	    utime_t ltt_processed_stamp = ceph_clock_now();
981  	    double usecs_elapsed =
982  	      ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
983  	    ostringstream buf;
984  	    if (message->get_type() == CEPH_MSG_OSD_OP)
985  	      OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
986  	                           false);
987  	    else
988  	      OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
989  	                           false);
990  	  }
991  	#endif
992  	
993  	  // note last received message.
994  	  in_seq = message->get_seq();
995  	  ldout(cct, 5) << " rx " << message->get_source() << " seq "
996  	                << message->get_seq() << " " << message << " " << *message
997  	                << dendl;
998  	
999  	  bool need_dispatch_writer = false;
1000 	  if (!connection->policy.lossy) {
1001 	    ack_left++;
1002 	    need_dispatch_writer = true;
1003 	  }
1004 	
1005 	  state = OPENED;
1006 	
1007 	  ceph::mono_time fast_dispatch_time;
1008 	
1009 	  if (connection->is_blackhole()) {
1010 	    ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1011 	    message->put();
1012 	    goto out;
1013 	  }
1014 	
1015 	  connection->logger->inc(l_msgr_recv_messages);
1016 	  connection->logger->inc(
1017 	      l_msgr_recv_bytes,
1018 	      cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1019 	
1020 	  messenger->ms_fast_preprocess(message);
1021 	  fast_dispatch_time = ceph::mono_clock::now();
1022 	  connection->logger->tinc(l_msgr_running_recv_time,
1023 				   fast_dispatch_time - connection->recv_start_time);
1024 	  if (connection->delay_state) {
1025 	    double delay_period = 0;
1026 	    if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1027 	      delay_period =
1028 	          cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1029 	      ldout(cct, 1) << "queue_received will delay after "
1030 	                    << (ceph_clock_now() + delay_period) << " on " << message
1031 	                    << " " << *message << dendl;
1032 	    }
1033 	    connection->delay_state->queue(delay_period, message);
1034 	  } else if (messenger->ms_can_fast_dispatch(message)) {
1035 	    connection->lock.unlock();
1036 	    connection->dispatch_queue->fast_dispatch(message);
1037 	    connection->recv_start_time = ceph::mono_clock::now();
1038 	    connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1039 	                             connection->recv_start_time - fast_dispatch_time);
1040 	    connection->lock.lock();
1041 	  } else {
1042 	    connection->dispatch_queue->enqueue(message, message->get_priority(),
1043 	                                        connection->conn_id);
1044 	  }
1045 	
1046 	 out:
1047 	  // clean up local buffer references
1048 	  data_buf.clear();
1049 	  front.clear();
1050 	  middle.clear();
1051 	  data.clear();
1052 	
1053 	  if (need_dispatch_writer && connection->is_connected()) {
1054 	    connection->center->dispatch_event_external(connection->write_handler);
1055 	  }
1056 	
1057 	  return CONTINUE(wait_message);
1058 	}
1059 	
1060 	void ProtocolV1::session_reset() {
1061 	  ldout(cct, 10) << __func__ << " started" << dendl;
1062 	
1063 	  std::lock_guard<std::mutex> l(connection->write_lock);
1064 	  if (connection->delay_state) {
1065 	    connection->delay_state->discard();
1066 	  }
1067 	
1068 	  connection->dispatch_queue->discard_queue(connection->conn_id);
1069 	  discard_out_queue();
1070 	  // note: we need to clear outgoing_bl here, but session_reset may be
1071 	  // called by other thread, so let caller clear this itself!
1072 	  // outgoing_bl.clear();
1073 	
1074 	  connection->dispatch_queue->queue_remote_reset(connection);
1075 	
1076 	  randomize_out_seq();
1077 	
1078 	  in_seq = 0;
1079 	  connect_seq = 0;
1080 	  // it's safe to directly set 0, double locked
1081 	  ack_left = 0;
1082 	  once_ready = false;
1083 	  can_write = WriteStatus::NOWRITE;
1084 	}
1085 	
1086 	void ProtocolV1::randomize_out_seq() {
1087 	  if (connection->get_features() & CEPH_FEATURE_MSG_AUTH) {
1088 	    // Set out_seq to a random value, so CRC won't be predictable.
1089 	    auto rand_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1090 	    ldout(cct, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
1091 	    out_seq = rand_seq;
1092 	  } else {
1093 	    // previously, seq #'s always started at 0.
1094 	    out_seq = 0;
1095 	  }
1096 	}
1097 	
1098 	ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
1099 	  FUNCTRACE(cct);
1100 	  ceph_assert(connection->center->in_thread());
1101 	  m->set_seq(++out_seq);
1102 	
1103 	  if (messenger->crcflags & MSG_CRC_HEADER) {
1104 	    m->calc_header_crc();
1105 	  }
1106 	
1107 	  ceph_msg_header &header = m->get_header();
1108 	  ceph_msg_footer &footer = m->get_footer();
1109 	
1110 	  // TODO: let sign_message could be reentry?
1111 	  // Now that we have all the crcs calculated, handle the
1112 	  // digital signature for the message, if the AsyncConnection has session
1113 	  // security set up.  Some session security options do not
1114 	  // actually calculate and check the signature, but they should
1115 	  // handle the calls to sign_message and check_signature.  PLR
1116 	  if (session_security.get() == NULL) {
1117 	    ldout(cct, 20) << __func__ << " no session security" << dendl;
1118 	  } else {
1119 	    if (session_security->sign_message(m)) {
1120 	      ldout(cct, 20) << __func__ << " failed to sign m=" << m
1121 	                     << "): sig = " << footer.sig << dendl;
1122 	    } else {
1123 	      ldout(cct, 20) << __func__ << " signed m=" << m
1124 	                     << "): sig = " << footer.sig << dendl;
1125 	    }
1126 	  }
1127 	
1128 	  connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG);
1129 	  connection->outgoing_bl.append((char *)&header, sizeof(header));
1130 	
1131 	  ldout(cct, 20) << __func__ << " sending message type=" << header.type
1132 	                 << " src " << entity_name_t(header.src)
1133 	                 << " front=" << header.front_len << " data=" << header.data_len
1134 	                 << " off " << header.data_off << dendl;
1135 	
1136 	  if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
1137 	    for (const auto &pb : bl.buffers()) {
1138 	      connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
1139 	    }
1140 	  } else {
1141 	    connection->outgoing_bl.claim_append(bl);
1142 	  }
1143 	
1144 	  // send footer; if receiver doesn't support signatures, use the old footer
1145 	  // format
1146 	  ceph_msg_footer_old old_footer;
1147 	  if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
1148 	    connection->outgoing_bl.append((char *)&footer, sizeof(footer));
1149 	  } else {
1150 	    if (messenger->crcflags & MSG_CRC_HEADER) {
1151 	      old_footer.front_crc = footer.front_crc;
1152 	      old_footer.middle_crc = footer.middle_crc;
1153 	    } else {
1154 	      old_footer.front_crc = old_footer.middle_crc = 0;
1155 	    }
1156 	    old_footer.data_crc =
1157 	        messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
1158 	    old_footer.flags = footer.flags;
1159 	    connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer));
1160 	  }
1161 	
1162 	  m->trace.event("async writing message");
1163 	  ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
1164 	                 << dendl;
1165 	  ssize_t total_send_size = connection->outgoing_bl.length();
1166 	  ssize_t rc = connection->_try_send(more);
1167 	  if (rc < 0) {
1168 	    ldout(cct, 1) << __func__ << " error sending " << m << ", "
1169 	                  << cpp_strerror(rc) << dendl;
1170 	  } else {
1171 	    connection->logger->inc(
1172 	        l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
1173 	    ldout(cct, 10) << __func__ << " sending " << m
1174 	                   << (rc ? " continuely." : " done.") << dendl;
1175 	  }
1176 	
1177 	#if defined(WITH_EVENTTRACE)
1178 	  if (m->get_type() == CEPH_MSG_OSD_OP)
1179 	    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
1180 	  else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1181 	    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
1182 	#endif
1183 	  m->put();
1184 	
1185 	  return rc;
1186 	}
1187 	
1188 	void ProtocolV1::requeue_sent() {
1189 	  write_in_progress = false;
1190 	  if (sent.empty()) {
1191 	    return;
1192 	  }
1193 	
1194 	  list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1195 	  out_seq -= sent.size();
1196 	  while (!sent.empty()) {
1197 	    Message *m = sent.back();
1198 	    sent.pop_back();
1199 	    ldout(cct, 10) << __func__ << " " << *m << " for resend "
1200 	                   << " (" << m->get_seq() << ")" << dendl;
1201 	    m->clear_payload();
1202 	    rq.push_front(make_pair(bufferlist(), m));
1203 	  }
1204 	}
1205 	
1206 	uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
1207 	  ldout(cct, 10) << __func__ << " " << seq << dendl;
1208 	  std::lock_guard<std::mutex> l(connection->write_lock);
1209 	  if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1210 	    return seq;
1211 	  }
1212 	  list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1213 	  uint64_t count = out_seq;
1214 	  while (!rq.empty()) {
1215 	    pair<bufferlist, Message *> p = rq.front();
1216 	    if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1217 	    ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1218 	                   << p.second->get_seq() << " <= " << seq << ", discarding"
1219 	                   << dendl;
1220 	    p.second->put();
1221 	    rq.pop_front();
1222 	    count++;
1223 	  }
1224 	  if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1225 	  return count;
1226 	}
1227 	
1228 	/*
1229 	 * Tears down the message queues, and removes them from the
1230 	 * DispatchQueue Must hold write_lock prior to calling.
1231 	 */
1232 	void ProtocolV1::discard_out_queue() {
1233 	  ldout(cct, 10) << __func__ << " started" << dendl;
1234 	
1235 	  for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
1236 	    ldout(cct, 20) << __func__ << " discard " << *p << dendl;
1237 	    (*p)->put();
1238 	  }
1239 	  sent.clear();
1240 	  for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
1241 	           out_q.begin();
1242 	       p != out_q.end(); ++p) {
1243 	    for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
1244 	         r != p->second.end(); ++r) {
1245 	      ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1246 	      r->second->put();
1247 	    }
1248 	  }
1249 	  out_q.clear();
1250 	  write_in_progress = false;
1251 	}
1252 	
1253 	void ProtocolV1::reset_recv_state()
1254 	{
1255 	  // clean up state internal variables and states
1256 	  auth_meta.reset(new AuthConnectionMeta);
1257 	  authorizer_more.clear();
1258 	  session_security.reset();
1259 	
1260 	  // clean read and write callbacks
1261 	  connection->pendingReadLen.reset();
1262 	  connection->writeCallback.reset();
1263 	
1264 	  if (state > THROTTLE_MESSAGE && state <= READ_FOOTER_AND_DISPATCH &&
1265 	      connection->policy.throttler_messages) {
1266 	    ldout(cct, 10) << __func__ << " releasing " << 1
1267 	                   << " message to policy throttler "
1268 	                   << connection->policy.throttler_messages->get_current()
1269 	                   << "/" << connection->policy.throttler_messages->get_max()
1270 	                   << dendl;
1271 	    connection->policy.throttler_messages->put();
1272 	  }
1273 	  if (state > THROTTLE_BYTES && state <= READ_FOOTER_AND_DISPATCH) {
1274 	    if (connection->policy.throttler_bytes) {
1275 	      ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
1276 	                     << " bytes to policy throttler "
1277 	                     << connection->policy.throttler_bytes->get_current() << "/"
1278 	                     << connection->policy.throttler_bytes->get_max() << dendl;
1279 	      connection->policy.throttler_bytes->put(cur_msg_size);
1280 	    }
1281 	  }
1282 	  if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_FOOTER_AND_DISPATCH) {
1283 	    ldout(cct, 10)
1284 	        << __func__ << " releasing " << cur_msg_size
1285 	        << " bytes to dispatch_queue throttler "
1286 	        << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1287 	        << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
1288 	    connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
1289 	  }
1290 	}
1291 	
1292 	Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) {
1293 	  Message *m = 0;
1294 	  if (!out_q.empty()) {
1295 	    map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
1296 	        out_q.rbegin();
1297 	    ceph_assert(!it->second.empty());
1298 	    list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
1299 	    m = p->second;
1300 	    if (p->first.length() && bl) {
1301 	      assert(bl->length() == 0);
1302 	      bl->swap(p->first);
1303 	    }
1304 	    it->second.erase(p);
1305 	    if (it->second.empty()) out_q.erase(it->first);
1306 	  }
1307 	  return m;
1308 	}
1309 	
1310 	/**
1311 	 * Client Protocol V1
1312 	 **/
1313 	
1314 	CtPtr ProtocolV1::send_client_banner() {
1315 	  ldout(cct, 20) << __func__ << dendl;
1316 	  state = CONNECTING;
1317 	
1318 	  bufferlist bl;
1319 	  bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1320 	  return WRITE(bl, handle_client_banner_write);
1321 	}
1322 	
1323 	CtPtr ProtocolV1::handle_client_banner_write(int r) {
1324 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1325 	
1326 	  if (r < 0) {
1327 	    ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
1328 	    return _fault();
1329 	  }
1330 	  ldout(cct, 10) << __func__ << " connect write banner done: "
1331 	                 << connection->get_peer_addr() << dendl;
1332 	
1333 	  return wait_server_banner();
1334 	}
1335 	
1336 	CtPtr ProtocolV1::wait_server_banner() {
1337 	  state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
1338 	
1339 	  ldout(cct, 20) << __func__ << dendl;
1340 	
1341 	  bufferlist myaddrbl;
1342 	  unsigned banner_len = strlen(CEPH_BANNER);
1343 	  unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
1344 	  return READ(need_len, handle_server_banner_and_identify);
1345 	}
1346 	
1347 	CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
1348 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1349 	
1350 	  if (r < 0) {
1351 	    ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
1352 	                  << dendl;
1353 	    return _fault();
1354 	  }
1355 	
1356 	  unsigned banner_len = strlen(CEPH_BANNER);
1357 	  if (memcmp(buffer, CEPH_BANNER, banner_len)) {
1358 	    ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
1359 	                  << connection->get_peer_addr() << dendl;
1360 	    return _fault();
1361 	  }
1362 	
1363 	  bufferlist bl;
1364 	  entity_addr_t paddr, peer_addr_for_me;
1365 	
1366 	  bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
1367 	  auto p = bl.cbegin();
1368 	  try {
1369 	    decode(paddr, p);
1370 	    decode(peer_addr_for_me, p);
1371 	  } catch (const buffer::error &e) {
1372 	    lderr(cct) << __func__ << " decode peer addr failed " << dendl;
1373 	    return _fault();
1374 	  }
1375 	  ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
1376 	                 << " on socket " << connection->cs.fd() << dendl;
1377 	
1378 	  entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
1379 	  if (peer_addr != paddr) {
1380 	    if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
1381 	        peer_addr.get_nonce() == paddr.get_nonce()) {
1382 	      ldout(cct, 0) << __func__ << " connect claims to be " << paddr << " not "
1383 	                    << peer_addr << " - presumably this is the same node!"
1384 	                    << dendl;
1385 	    } else {
1386 	      ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
1387 	                     << peer_addr << dendl;
1388 	      return _fault();
1389 	    }
1390 	  }
1391 	
1392 	  ldout(cct, 20) << __func__ << " connect peer addr for me is "
1393 	                 << peer_addr_for_me << dendl;
1394 	  if (messenger->get_myaddrs().empty() ||
1395 	      messenger->get_myaddrs().front().is_blank_ip()) {
1396 	    sockaddr_storage ss;
1397 	    socklen_t len = sizeof(ss);
1398 	    getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
1399 	    entity_addr_t a;
1400 	    if (cct->_conf->ms_learn_addr_from_peer) {
1401 	      ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1402 			    << " says I am " << peer_addr_for_me << " (socket says "
1403 			    << (sockaddr*)&ss << ")" << dendl;
1404 	      a = peer_addr_for_me;
1405 	    } else {
1406 	      ldout(cct, 1) << __func__ << " socket to  " << connection->target_addr
1407 			    << " says I am " << (sockaddr*)&ss
1408 			    << " (peer says " << peer_addr_for_me << ")" << dendl;
1409 	      a.set_sockaddr((sockaddr *)&ss);
1410 	    }
1411 	    a.set_type(entity_addr_t::TYPE_LEGACY); // anything but NONE; learned_addr ignores this
1412 	    a.set_port(0);
1413 	    connection->lock.unlock();
1414 	    messenger->learned_addr(a);
1415 	    if (cct->_conf->ms_inject_internal_delays &&
1416 		cct->_conf->ms_inject_socket_failures) {
1417 	      if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1418 		ldout(cct, 10) << __func__ << " sleep for "
1419 			       << cct->_conf->ms_inject_internal_delays << dendl;
1420 		utime_t t;
1421 		t.set_from_double(cct->_conf->ms_inject_internal_delays);
1422 		t.sleep();
1423 	      }
1424 	    }
1425 	    connection->lock.lock();
1426 	    if (state != CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1427 	      ldout(cct, 1) << __func__
1428 	                  << " state changed while learned_addr, mark_down or "
1429 			    << " replacing must be happened just now" << dendl;
1430 	      return nullptr;
1431 	    }
1432 	  }
1433 	
1434 	  bufferlist myaddrbl;
1435 	  encode(messenger->get_myaddr_legacy(), myaddrbl, 0);  // legacy
1436 	  return WRITE(myaddrbl, handle_my_addr_write);
1437 	}
1438 	
1439 	CtPtr ProtocolV1::handle_my_addr_write(int r) {
1440 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1441 	
1442 	  if (r < 0) {
1443 	    ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
1444 	                  << cpp_strerror(r) << dendl;
1445 	    return _fault();
1446 	  }
1447 	  ldout(cct, 10) << __func__ << " connect sent my addr "
1448 	                 << messenger->get_myaddr_legacy() << dendl;
1449 	
1450 	  return CONTINUE(send_connect_message);
1451 	}
1452 	
1453 	CtPtr ProtocolV1::send_connect_message()
1454 	{
1455 	  state = CONNECTING_SEND_CONNECT_MSG;
1456 	
1457 	  ldout(cct, 20) << __func__ << dendl;
1458 	  ceph_assert(messenger->auth_client);
1459 	
1460 	  bufferlist auth_bl;
1461 	  vector<uint32_t> preferred_modes;
1462 	
1463 	  if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1464 	      messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1465 	    if (authorizer_more.length()) {
1466 	      ldout(cct,10) << __func__ << " using augmented (challenge) auth payload"
1467 			    << dendl;
1468 	      auth_bl = authorizer_more;
1469 	    } else {
1470 	      auto am = auth_meta;
1471 	      authorizer_more.clear();
1472 	      connection->lock.unlock();
1473 	      int r = messenger->auth_client->get_auth_request(
1474 		connection, am.get(),
1475 		&am->auth_method, &preferred_modes, &auth_bl);
1476 	      connection->lock.lock();
1477 	      if (r < 0) {
1478 		return _fault();
1479 	      }
1480 	      if (state != CONNECTING_SEND_CONNECT_MSG) {
1481 		ldout(cct, 1) << __func__ << " state changed!" << dendl;
1482 		return _fault();
1483 	      }
1484 	    }
1485 	  }
1486 	
1487 	  ceph_msg_connect connect;
1488 	  connect.features = connection->policy.features_supported;
1489 	  connect.host_type = messenger->get_myname().type();
1490 	  connect.global_seq = global_seq;
1491 	  connect.connect_seq = connect_seq;
1492 	  connect.protocol_version =
1493 	      messenger->get_proto_version(connection->peer_type, true);
1494 	  if (auth_bl.length()) {
1495 	    ldout(cct, 10) << __func__
1496 	                   << " connect_msg.authorizer_len=" << auth_bl.length()
1497 	                   << " protocol=" << auth_meta->auth_method << dendl;
1498 	    connect.authorizer_protocol = auth_meta->auth_method;
1499 	    connect.authorizer_len = auth_bl.length();
1500 	  } else {
1501 	    connect.authorizer_protocol = 0;
1502 	    connect.authorizer_len = 0;
1503 	  }
1504 	
1505 	  connect.flags = 0;
1506 	  if (connection->policy.lossy) {
1507 	    connect.flags |=
1508 	        CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
1509 	  }
1510 	
1511 	  bufferlist bl;
1512 	  bl.append((char *)&connect, sizeof(connect));
1513 	  if (auth_bl.length()) {
1514 	    bl.append(auth_bl.c_str(), auth_bl.length());
1515 	  }
1516 	
1517 	  ldout(cct, 10) << __func__ << " connect sending gseq=" << global_seq
1518 	                 << " cseq=" << connect_seq
1519 	                 << " proto=" << connect.protocol_version << dendl;
1520 	
1521 	  return WRITE(bl, handle_connect_message_write);
1522 	}
1523 	
1524 	CtPtr ProtocolV1::handle_connect_message_write(int r) {
1525 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1526 	
1527 	  if (r < 0) {
1528 	    ldout(cct, 2) << __func__ << " connect couldn't send reply "
1529 	                  << cpp_strerror(r) << dendl;
1530 	    return _fault();
1531 	  }
1532 	
1533 	  ldout(cct, 20) << __func__
1534 	                 << " connect wrote (self +) cseq, waiting for reply" << dendl;
1535 	
1536 	  return wait_connect_reply();
1537 	}
1538 	
1539 	CtPtr ProtocolV1::wait_connect_reply() {
1540 	  ldout(cct, 20) << __func__ << dendl;
1541 	
1542 	  memset(&connect_reply, 0, sizeof(connect_reply));
1543 	  return READ(sizeof(connect_reply), handle_connect_reply_1);
1544 	}
1545 	
1546 	CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
1547 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1548 	
1549 	  if (r < 0) {
1550 	    ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
1551 	    return _fault();
1552 	  }
1553 	
1554 	  connect_reply = *((ceph_msg_connect_reply *)buffer);
1555 	
1556 	  ldout(cct, 20) << __func__ << " connect got reply tag "
1557 	                 << (int)connect_reply.tag << " connect_seq "
1558 	                 << connect_reply.connect_seq << " global_seq "
1559 	                 << connect_reply.global_seq << " proto "
1560 	                 << connect_reply.protocol_version << " flags "
1561 	                 << (int)connect_reply.flags << " features "
1562 	                 << connect_reply.features << dendl;
1563 	
1564 	  if (connect_reply.authorizer_len) {
1565 	    return wait_connect_reply_auth();
1566 	  }
1567 	
1568 	  return handle_connect_reply_2();
1569 	}
1570 	
1571 	CtPtr ProtocolV1::wait_connect_reply_auth() {
1572 	  ldout(cct, 20) << __func__ << dendl;
1573 	
1574 	  ldout(cct, 10) << __func__
1575 	                 << " reply.authorizer_len=" << connect_reply.authorizer_len
1576 	                 << dendl;
1577 	
1578 	  ceph_assert(connect_reply.authorizer_len < 4096);
1579 	
1580 	  return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
1581 	}
1582 	
1583 	CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
1584 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1585 	
1586 	  if (r < 0) {
1587 	    ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
1588 	                  << dendl;
1589 	    return _fault();
1590 	  }
1591 	
1592 	  bufferlist authorizer_reply;
1593 	  authorizer_reply.append(buffer, connect_reply.authorizer_len);
1594 	
1595 	  if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1596 	      messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1597 	    auto am = auth_meta;
1598 	    bool more = (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER);
1599 	    bufferlist auth_retry_bl;
1600 	    int r;
1601 	    connection->lock.unlock();
1602 	    if (more) {
1603 	      r = messenger->auth_client->handle_auth_reply_more(
1604 		connection, am.get(), authorizer_reply, &auth_retry_bl);
1605 	    } else {
1606 	      // these aren't used for v1
1607 	      CryptoKey skey;
1608 	      string con_secret;
1609 	      r = messenger->auth_client->handle_auth_done(
1610 		connection, am.get(),
1611 		0 /* global id */, 0 /* con mode */,
1612 		authorizer_reply,
1613 		&skey, &con_secret);
1614 	    }
1615 	    connection->lock.lock();
1616 	    if (state != CONNECTING_SEND_CONNECT_MSG) {
1617 	      ldout(cct, 1) << __func__ << " state changed" << dendl;
1618 	      return _fault();
1619 	    }
1620 	    if (r < 0) {
1621 	      return _fault();
1622 	    }
1623 	    if (more && r == 0) {
1624 	      authorizer_more = auth_retry_bl;
1625 	      return CONTINUE(send_connect_message);
1626 	    }
1627 	  }
1628 	
1629 	  return handle_connect_reply_2();
1630 	}
1631 	
1632 	CtPtr ProtocolV1::handle_connect_reply_2() {
1633 	  ldout(cct, 20) << __func__ << dendl;
1634 	
1635 	  if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
1636 	    ldout(cct, 0) << __func__ << " connect protocol feature mismatch, my "
1637 	                  << std::hex << connection->policy.features_supported
1638 	                  << " < peer " << connect_reply.features << " missing "
1639 	                  << (connect_reply.features &
1640 	                      ~connection->policy.features_supported)
1641 	                  << std::dec << dendl;
1642 	    return _fault();
1643 	  }
1644 	
1645 	  if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1646 	    ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
1647 	                  << messenger->get_proto_version(connection->peer_type, true)
1648 	                  << " != " << connect_reply.protocol_version << dendl;
1649 	    return _fault();
1650 	  }
1651 	
1652 	  if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1653 	    ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1654 	    authorizer_more.clear();
1655 	    return _fault();
1656 	  }
1657 	
1658 	  if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1659 	    ldout(cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1660 	    session_reset();
1661 	    connect_seq = 0;
1662 	
1663 	    // see session_reset
1664 	    connection->outgoing_bl.clear();
1665 	
1666 	    return CONTINUE(send_connect_message);
1667 	  }
1668 	
1669 	  if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1670 	    global_seq = messenger->get_global_seq(connect_reply.global_seq);
1671 	    ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1672 	                  << connect_reply.global_seq << " chose new " << global_seq
1673 	                  << dendl;
1674 	    return CONTINUE(send_connect_message);
1675 	  }
1676 	
1677 	  if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1678 	    ceph_assert(connect_reply.connect_seq > connect_seq);
1679 	    ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
1680 	                  << " -> " << connect_reply.connect_seq << dendl;
1681 	    connect_seq = connect_reply.connect_seq;
1682 	    return CONTINUE(send_connect_message);
1683 	  }
1684 	
1685 	  if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
1686 	    ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1687 	    state = WAIT;
1688 	    return _fault();
1689 	  }
1690 	
1691 	  uint64_t feat_missing;
1692 	  feat_missing =
1693 	      connection->policy.features_required & ~(uint64_t)connect_reply.features;
1694 	  if (feat_missing) {
1695 	    ldout(cct, 1) << __func__ << " missing required features " << std::hex
1696 	                  << feat_missing << std::dec << dendl;
1697 	    return _fault();
1698 	  }
1699 	
1700 	  if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
1701 	    ldout(cct, 10)
1702 	        << __func__
1703 	        << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1704 	        << dendl;
1705 	
1706 	    return wait_ack_seq();
1707 	  }
1708 	
1709 	  if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
1710 	    ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1711 	  }
1712 	
1713 	  return client_ready();
1714 	}
1715 	
1716 	CtPtr ProtocolV1::wait_ack_seq() {
1717 	  ldout(cct, 20) << __func__ << dendl;
1718 	
1719 	  return READ(sizeof(uint64_t), handle_ack_seq);
1720 	}
1721 	
1722 	CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
1723 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1724 	
1725 	  if (r < 0) {
1726 	    ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1727 	    return _fault();
1728 	  }
1729 	
1730 	  uint64_t newly_acked_seq = 0;
1731 	
1732 	  newly_acked_seq = *((uint64_t *)buffer);
1733 	  ldout(cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1734 	                << " vs out_seq " << out_seq << dendl;
1735 	  out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
1736 	
1737 	  bufferlist bl;
1738 	  uint64_t s = in_seq;
1739 	  bl.append((char *)&s, sizeof(s));
1740 	
1741 	  return WRITE(bl, handle_in_seq_write);
1742 	}
1743 	
1744 	CtPtr ProtocolV1::handle_in_seq_write(int r) {
1745 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1746 	
1747 	  if (r < 0) {
1748 	    ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
1749 	    return _fault();
1750 	  }
1751 	
1752 	  ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
1753 	
1754 	  return client_ready();
1755 	}
1756 	
1757 	CtPtr ProtocolV1::client_ready() {
1758 	  ldout(cct, 20) << __func__ << dendl;
1759 	
1760 	  // hooray!
1761 	  peer_global_seq = connect_reply.global_seq;
1762 	  connection->policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1763 	
1764 	  once_ready = true;
1765 	  connect_seq += 1;
1766 	  ceph_assert(connect_seq == connect_reply.connect_seq);
1767 	  backoff = utime_t();
1768 	  connection->set_features((uint64_t)connect_reply.features &
1769 	                           (uint64_t)connection->policy.features_supported);
1770 	  ldout(cct, 10) << __func__ << " connect success " << connect_seq
1771 	                 << ", lossy = " << connection->policy.lossy << ", features "
1772 	                 << connection->get_features() << dendl;
1773 	
1774 	  // If we have an authorizer, get a new AuthSessionHandler to deal with
1775 	  // ongoing security of the connection.  PLR
1776 	  if (auth_meta->authorizer) {
1777 	    ldout(cct, 10) << __func__ << " setting up session_security with auth "
1778 			   << auth_meta->authorizer.get() << dendl;
1779 	    session_security.reset(get_auth_session_handler(
1780 	        cct, auth_meta->authorizer->protocol,
1781 		auth_meta->session_key,
1782 	        connection->get_features()));
1783 	  } else {
1784 	    // We have no authorizer, so we shouldn't be applying security to messages
1785 	    // in this AsyncConnection.  PLR
1786 	    ldout(cct, 10) << __func__ << " no authorizer, clearing session_security"
1787 			   << dendl;
1788 	    session_security.reset();
1789 	  }
1790 	
1791 	  if (connection->delay_state) {
1792 	    ceph_assert(connection->delay_state->ready());
1793 	  }
1794 	  connection->dispatch_queue->queue_connect(connection);
1795 	  messenger->ms_deliver_handle_fast_connect(connection);
1796 	
1797 	  return ready();
1798 	}
1799 	
1800 	/**
1801 	 * Server Protocol V1
1802 	 **/
1803 	
1804 	CtPtr ProtocolV1::send_server_banner() {
1805 	  ldout(cct, 20) << __func__ << dendl;
1806 	  state = ACCEPTING;
1807 	
1808 	  bufferlist bl;
1809 	
1810 	  bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1811 	
1812 	  // as a server, we should have a legacy addr if we accepted this connection.
1813 	  auto legacy = messenger->get_myaddrs().legacy_addr();
1814 	  encode(legacy, bl, 0);  // legacy
1815 	  connection->port = legacy.get_port();
1816 	  encode(connection->target_addr, bl, 0);  // legacy
1817 	
1818 	  ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
1819 			<< " legacy " << legacy
1820 			<< " socket_addr " << connection->socket_addr
1821 			<< " target_addr " << connection->target_addr
1822 			<< dendl;
1823 	
1824 	  return WRITE(bl, handle_server_banner_write);
1825 	}
1826 	
1827 	CtPtr ProtocolV1::handle_server_banner_write(int r) {
1828 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1829 	
1830 	  if (r < 0) {
1831 	    ldout(cct, 1) << " write server banner failed" << dendl;
1832 	    return _fault();
1833 	  }
1834 	  ldout(cct, 10) << __func__ << " write banner and addr done: "
1835 	                 << connection->get_peer_addr() << dendl;
1836 	
1837 	  return wait_client_banner();
1838 	}
1839 	
1840 	CtPtr ProtocolV1::wait_client_banner() {
1841 	  ldout(cct, 20) << __func__ << dendl;
1842 	
1843 	  return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
1844 	              handle_client_banner);
1845 	}
1846 	
1847 	CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
1848 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1849 	
1850 	  if (r < 0) {
1851 	    ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1852 	    return _fault();
1853 	  }
1854 	
1855 	  if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1856 	    ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
1857 	                  << "' (should be '" << CEPH_BANNER << "')" << dendl;
1858 	    return _fault();
1859 	  }
1860 	
1861 	  bufferlist addr_bl;
1862 	  entity_addr_t peer_addr;
1863 	
1864 	  addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1865 	  try {
1866 	    auto ti = addr_bl.cbegin();
1867 	    decode(peer_addr, ti);
1868 	  } catch (const buffer::error &e) {
1869 	    lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
1870 	    return _fault();
1871 	  }
1872 	
1873 	  ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1874 	  if (peer_addr.is_blank_ip()) {
1875 	    // peer apparently doesn't know what ip they have; figure it out for them.
1876 	    int port = peer_addr.get_port();
1877 	    peer_addr.set_sockaddr(connection->target_addr.get_sockaddr());
1878 	    peer_addr.set_port(port);
1879 	
1880 	    ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1881 	                  << " (socket is " << connection->target_addr << ")" << dendl;
1882 	  }
1883 	  connection->set_peer_addr(peer_addr);  // so that connection_state gets set up
1884 	  connection->target_addr = peer_addr;
1885 	
1886 	  return CONTINUE(wait_connect_message);
1887 	}
1888 	
1889 	CtPtr ProtocolV1::wait_connect_message() {
1890 	  ldout(cct, 20) << __func__ << dendl;
1891 	
1892 	  memset(&connect_msg, 0, sizeof(connect_msg));
1893 	  return READ(sizeof(connect_msg), handle_connect_message_1);
1894 	}
1895 	
1896 	CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
1897 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1898 	
1899 	  if (r < 0) {
1900 	    ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
1901 	    return _fault();
1902 	  }
1903 	
1904 	  connect_msg = *((ceph_msg_connect *)buffer);
1905 	
1906 	  state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1907 	
1908 	  if (connect_msg.authorizer_len) {
1909 	    return wait_connect_message_auth();
1910 	  }
1911 	
1912 	  return handle_connect_message_2();
1913 	}
1914 	
1915 	CtPtr ProtocolV1::wait_connect_message_auth() {
1916 	  ldout(cct, 20) << __func__ << dendl;
1917 	  authorizer_buf.clear();
1918 	  authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1919 	  return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
1920 	               handle_connect_message_auth);
1921 	}
1922 	
1923 	CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
1924 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1925 	
1926 	  if (r < 0) {
1927 	    ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1928 	    return _fault();
1929 	  }
1930 	
1931 	  return handle_connect_message_2();
1932 	}
1933 	
1934 	CtPtr ProtocolV1::handle_connect_message_2() {
(1) Event cond_true: Condition "should_gather", taking true branch.
1935 	  ldout(cct, 20) << __func__ << dendl;
1936 	
(2) Event cond_true: Condition "should_gather", taking true branch.
1937 	  ldout(cct, 20) << __func__ << " accept got peer connect_seq "
1938 	                 << connect_msg.connect_seq << " global_seq "
1939 	                 << connect_msg.global_seq << dendl;
1940 	
1941 	  connection->set_peer_type(connect_msg.host_type);
1942 	  connection->policy = messenger->get_policy(connect_msg.host_type);
1943 	
(3) Event cond_true: Condition "should_gather", taking true branch.
1944 	  ldout(cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1945 	                 << ", policy.lossy=" << connection->policy.lossy
1946 	                 << " policy.server=" << connection->policy.server
1947 	                 << " policy.standby=" << connection->policy.standby
1948 	                 << " policy.resetcheck=" << connection->policy.resetcheck
1949 			 << " features 0x" << std::hex << (uint64_t)connect_msg.features
1950 			 << std::dec
1951 	                 << dendl;
1952 	
1953 	  ceph_msg_connect_reply reply;
1954 	  bufferlist authorizer_reply;
1955 	
1956 	  memset(&reply, 0, sizeof(reply));
1957 	  reply.protocol_version =
1958 	      messenger->get_proto_version(connection->peer_type, false);
1959 	
1960 	  // mismatch?
(4) Event cond_true: Condition "should_gather", taking true branch.
1961 	  ldout(cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1962 	                 << ", their proto " << connect_msg.protocol_version << dendl;
1963 	
(5) Event cond_false: Condition "this->connect_msg.protocol_version.operator __u32() != reply.protocol_version.operator __u32()", taking false branch.
1964 	  if (connect_msg.protocol_version != reply.protocol_version) {
1965 	    return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
1966 	                                      authorizer_reply);
(6) Event if_end: End of if statement.
1967 	  }
1968 	
1969 	  // require signatures for cephx?
(7) Event cond_true: Condition "this->connect_msg.authorizer_protocol.operator __u32() == 2", taking true branch.
1970 	  if (connect_msg.authorizer_protocol == CEPH_AUTH_CEPHX) {
(8) Event cond_true: Condition "this->connection->peer_type == 4", taking true branch.
1971 	    if (connection->peer_type == CEPH_ENTITY_TYPE_OSD ||
1972 	        connection->peer_type == CEPH_ENTITY_TYPE_MDS) {
(9) Event cond_true: Condition "this->cct->_conf->cephx_require_signatures", taking true branch.
1973 	      if (cct->_conf->cephx_require_signatures ||
1974 	          cct->_conf->cephx_cluster_require_signatures) {
(10) Event cond_true: Condition "should_gather", taking true branch.
1975 	        ldout(cct, 10)
1976 	            << __func__
1977 	            << " using cephx, requiring MSG_AUTH feature bit for cluster"
1978 	            << dendl;
1979 	        connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1980 	      }
(11) Event if_fallthrough: Falling through to end of if statement.
1981 	    } else {
1982 	      if (cct->_conf->cephx_require_signatures ||
1983 	          cct->_conf->cephx_service_require_signatures) {
1984 	        ldout(cct, 10)
1985 	            << __func__
1986 	            << " using cephx, requiring MSG_AUTH feature bit for service"
1987 	            << dendl;
1988 	        connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1989 	      }
(12) Event if_end: End of if statement.
1990 	    }
1991 	  }
1992 	
1993 	  uint64_t feat_missing =
1994 	      connection->policy.features_required & ~(uint64_t)connect_msg.features;
(13) Event cond_false: Condition "feat_missing", taking false branch.
1995 	  if (feat_missing) {
1996 	    ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
1997 	                  << feat_missing << std::dec << dendl;
1998 	    return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
1999 	                                      authorizer_reply);
(14) Event if_end: End of if statement.
2000 	  }
2001 	
2002 	  bufferlist auth_bl_copy = authorizer_buf;
2003 	  auto am = auth_meta;
2004 	  am->auth_method = connect_msg.authorizer_protocol;
2005 	  connection->lock.unlock();
(15) Event cond_true: Condition "should_gather", taking true branch.
2006 	  ldout(cct,10) << __func__ << " authorizor_protocol "
2007 			<< connect_msg.authorizer_protocol
2008 			<< " len " << auth_bl_copy.length()
2009 			<< dendl;
2010 	  bool more = (bool)auth_meta->authorizer_challenge;
2011 	  int r = messenger->auth_server->handle_auth_request(
2012 	    connection,
2013 	    am.get(),
2014 	    more,
2015 	    am->auth_method,
2016 	    auth_bl_copy,
2017 	    &authorizer_reply);
(16) Event cond_false: Condition "r < 0", taking false branch.
2018 	  if (r < 0) {
2019 	    connection->lock.lock();
2020 	    if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2021 	      ldout(cct, 1) << __func__ << " state changed" << dendl;
2022 	      return _fault();
2023 	    }
2024 	    ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
2025 			  << authorizer_reply.length() << dendl;
2026 	    session_security.reset();
2027 	    return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
2028 					      authorizer_reply);
(17) Event if_end: End of if statement.
2029 	  }
(18) Event cond_false: Condition "r == 0", taking false branch.
2030 	  if (r == 0) {
2031 	    connection->lock.lock();
2032 	    if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2033 	      ldout(cct, 1) << __func__ << " state changed" << dendl;
2034 	      return _fault();
2035 	    }
2036 	    ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
2037 	    ceph_assert(authorizer_reply.length());
2038 	    return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
2039 					      reply, authorizer_reply);
(19) Event if_end: End of if statement.
2040 	  }
2041 	
2042 	  // We've verified the authorizer for this AsyncConnection, so set up the
2043 	  // session security structure.  PLR
(20) Event cond_true: Condition "should_gather", taking true branch.
2044 	  ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
2045 	
(21) Event cond_true: Condition "this->connection->policy.server", taking true branch.
(22) Event cond_false: Condition "this->connection->policy.lossy", taking false branch.
2046 	  if (connection->policy.server &&
2047 	      connection->policy.lossy) {
2048 	    // incoming lossy client, no need to register this connection
2049 	    // new session
2050 	    ldout(cct, 10) << __func__ << " accept new session" << dendl;
2051 	    return open(reply, authorizer_reply);
(23) Event if_end: End of if statement.
2052 	  }
2053 	
2054 	  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2055 	
2056 	  connection->inject_delay();
2057 	
(24) Event lock: "lock" locks "this->connection->lock". [Note: The source code implementation of the function has been overridden by a builtin model.]
Also see events: [missing_unlock]
2058 	  connection->lock.lock();
(25) Event cond_false: Condition "this->state != ProtocolV1::ACCEPTING_WAIT_CONNECT_MSG_AUTH", taking false branch.
2059 	  if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2060 	    ldout(cct, 1) << __func__ << " state changed" << dendl;
2061 	    return _fault();
(26) Event if_end: End of if statement.
2062 	  }
2063 	
(27) Event cond_true: Condition "existing == this->connection", taking true branch.
2064 	  if (existing == connection) {
2065 	    existing = nullptr;
2066 	  }
(28) Event cond_true: Condition "existing.operator bool()", taking true branch.
(29) Event cond_true: Condition "existing->protocol->proto_type != 1", taking true branch.
2067 	  if (existing && existing->protocol->proto_type != 1) {
(30) Event cond_true: Condition "should_gather", taking true branch.
2068 	    ldout(cct,1) << __func__ << " existing " << existing << " proto "
2069 			 << existing->protocol.get() << " version is "
2070 			 << existing->protocol->proto_type << ", marking down" << dendl;
2071 	    existing->mark_down();
2072 	    existing = nullptr;
2073 	  }
2074 	
(31) Event cond_true: Condition "existing.operator bool()", taking true branch.
2075 	  if (existing) {
2076 	    // There is no possible that existing connection will acquire this
2077 	    // connection's lock
2078 	    existing->lock.lock();  // skip lockdep check (we are locking a second
2079 	                            // AsyncConnection here)
2080 	
(32) Event cond_true: Condition "should_gather", taking true branch.
2081 	    ldout(cct,10) << __func__ << " existing=" << existing << " exproto="
2082 			  << existing->protocol.get() << dendl;
2083 	    ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
(33) Event cond_true: Condition "exproto", taking true branch.
2084 	    ceph_assert(exproto);
(34) Event cond_true: Condition "exproto->proto_type == 1", taking true branch.
2085 	    ceph_assert(exproto->proto_type == 1);
2086 	
(35) Event cond_false: Condition "exproto->state == ProtocolV1::CLOSED", taking false branch.
2087 	    if (exproto->state == CLOSED) {
2088 	      ldout(cct, 1) << __func__ << " existing " << existing
2089 			    << " already closed." << dendl;
2090 	      existing->lock.unlock();
2091 	      existing = nullptr;
2092 	
2093 	      return open(reply, authorizer_reply);
(36) Event if_end: End of if statement.
2094 	    }
2095 	
(37) Event cond_false: Condition "exproto->replacing", taking false branch.
2096 	    if (exproto->replacing) {
2097 	      ldout(cct, 1) << __func__
2098 	                    << " existing racing replace happened while replacing."
2099 	                    << " existing_state="
2100 	                    << connection->get_state_name(existing->state) << dendl;
2101 	      reply.global_seq = exproto->peer_global_seq;
2102 	      existing->lock.unlock();
2103 	      return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2104 	                                        authorizer_reply);
(38) Event if_end: End of if statement.
2105 	    }
2106 	
(39) Event cond_false: Condition "this->connect_msg.global_seq.operator __u32() < exproto->peer_global_seq", taking false branch.
2107 	    if (connect_msg.global_seq < exproto->peer_global_seq) {
2108 	      ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2109 	                     << exproto->peer_global_seq << " > "
2110 	                     << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
2111 	      reply.global_seq = exproto->peer_global_seq;  // so we can send it below..
2112 	      existing->lock.unlock();
2113 	      return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2114 	                                        authorizer_reply);
(40) Event else_branch: Reached else branch.
2115 	    } else {
(41) Event cond_true: Condition "should_gather", taking true branch.
2116 	      ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2117 	                     << exproto->peer_global_seq
2118 	                     << " <= " << connect_msg.global_seq << ", looks ok"
2119 	                     << dendl;
2120 	    }
2121 	
(42) Event cond_false: Condition "existing->policy.lossy", taking false branch.
2122 	    if (existing->policy.lossy) {
2123 	      ldout(cct, 0)
2124 	          << __func__
2125 	          << " accept replacing existing (lossy) channel (new one lossy="
2126 	          << connection->policy.lossy << ")" << dendl;
2127 	      exproto->session_reset();
2128 	      return replace(existing, reply, authorizer_reply);
(43) Event if_end: End of if statement.
2129 	    }
2130 	
(44) Event cond_true: Condition "should_gather", taking true branch.
2131 	    ldout(cct, 1) << __func__ << " accept connect_seq "
2132 	                  << connect_msg.connect_seq
2133 	                  << " vs existing csq=" << exproto->connect_seq
2134 	                  << " existing_state="
2135 	                  << connection->get_state_name(existing->state) << dendl;
2136 	
(45) Event cond_false: Condition "this->connect_msg.connect_seq.operator __u32() == 0", taking false branch.
2137 	    if (connect_msg.connect_seq == 0 && exproto->connect_seq > 0) {
2138 	      ldout(cct, 0)
2139 	          << __func__
2140 	          << " accept peer reset, then tried to connect to us, replacing"
2141 	          << dendl;
2142 	      // this is a hard reset from peer
2143 	      is_reset_from_peer = true;
2144 	      if (connection->policy.resetcheck) {
2145 	        exproto->session_reset();  // this resets out_queue, msg_ and
2146 	                                   // connect_seq #'s
2147 	      }
2148 	      return replace(existing, reply, authorizer_reply);
(46) Event if_end: End of if statement.
2149 	    }
2150 	
(47) Event cond_true: Condition "this->connect_msg.connect_seq.operator __u32() < exproto->connect_seq", taking true branch.
2151 	    if (connect_msg.connect_seq < exproto->connect_seq) {
2152 	      // old attempt, or we sent READY but they didn't get it.
(48) Event cond_true: Condition "should_gather", taking true branch.
2153 	      ldout(cct, 10) << __func__ << " accept existing " << existing << ".cseq "
2154 	                     << exproto->connect_seq << " > " << connect_msg.connect_seq
2155 	                     << ", RETRY_SESSION" << dendl;
2156 	      reply.connect_seq = exproto->connect_seq + 1;
2157 	      existing->lock.unlock();
(49) Event missing_unlock: Returning without unlocking "this->connection->lock".
Also see events: [lock]
2158 	      return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2159 	                                        authorizer_reply);
2160 	    }
2161 	
2162 	    if (connect_msg.connect_seq == exproto->connect_seq) {
2163 	      // if the existing connection successfully opened, and/or
2164 	      // subsequently went to standby, then the peer should bump
2165 	      // their connect_seq and retry: this is not a connection race
2166 	      // we need to resolve here.
2167 	      if (exproto->state == OPENED || exproto->state == STANDBY) {
2168 	        ldout(cct, 10) << __func__ << " accept connection race, existing "
2169 	                       << existing << ".cseq " << exproto->connect_seq
2170 	                       << " == " << connect_msg.connect_seq
2171 	                       << ", OPEN|STANDBY, RETRY_SESSION " << dendl;
2172 	        // if connect_seq both zero, dont stuck into dead lock. it's ok to
2173 	        // replace
2174 	        if (connection->policy.resetcheck && exproto->connect_seq == 0) {
2175 	          return replace(existing, reply, authorizer_reply);
2176 	        }
2177 	
2178 	        reply.connect_seq = exproto->connect_seq + 1;
2179 	        existing->lock.unlock();
2180 	        return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2181 	                                          authorizer_reply);
2182 	      }
2183 	
2184 	      // connection race?
2185 	      if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr_legacy() ||
2186 	          existing->policy.server) {
2187 	        // incoming wins
2188 	        ldout(cct, 10) << __func__ << " accept connection race, existing "
2189 	                       << existing << ".cseq " << exproto->connect_seq
2190 	                       << " == " << connect_msg.connect_seq
2191 	                       << ", or we are server, replacing my attempt" << dendl;
2192 	        return replace(existing, reply, authorizer_reply);
2193 	      } else {
2194 	        // our existing outgoing wins
2195 	        ldout(messenger->cct, 10)
2196 	            << __func__ << " accept connection race, existing " << existing
2197 	            << ".cseq " << exproto->connect_seq
2198 	            << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
2199 	        ceph_assert(connection->peer_addrs->legacy_addr() >
2200 	                    messenger->get_myaddr_legacy());
2201 	        existing->lock.unlock();
2202 		// make sure we follow through with opening the existing
2203 		// connection (if it isn't yet open) since we know the peer
2204 		// has something to send to us.
2205 		existing->send_keepalive();
2206 	        return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
2207 	                                          authorizer_reply);
2208 	      }
2209 	    }
2210 	
2211 	    ceph_assert(connect_msg.connect_seq > exproto->connect_seq);
2212 	    ceph_assert(connect_msg.global_seq >= exproto->peer_global_seq);
2213 	    if (connection->policy.resetcheck &&  // RESETSESSION only used by servers;
2214 	                                          // peers do not reset each other
2215 	        exproto->connect_seq == 0) {
2216 	      ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2217 	                    << connect_msg.connect_seq << ", " << existing
2218 	                    << ".cseq = " << exproto->connect_seq
2219 	                    << "), sending RESETSESSION " << dendl;
2220 	      existing->lock.unlock();
2221 	      return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2222 	                                        authorizer_reply);
2223 	    }
2224 	
2225 	    // reconnect
2226 	    ldout(cct, 10) << __func__ << " accept peer sent cseq "
2227 	                   << connect_msg.connect_seq << " > " << exproto->connect_seq
2228 	                   << dendl;
2229 	    return replace(existing, reply, authorizer_reply);
2230 	  }  // existing
2231 	  else if (!replacing && connect_msg.connect_seq > 0) {
2232 	    // we reset, and they are opening a new session
2233 	    ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2234 	                  << connect_msg.connect_seq << "), sending RESETSESSION"
2235 	                  << dendl;
2236 	    return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2237 	                                      authorizer_reply);
2238 	  } else {
2239 	    // new session
2240 	    ldout(cct, 10) << __func__ << " accept new session" << dendl;
2241 	    existing = nullptr;
2242 	    return open(reply, authorizer_reply);
2243 	  }
2244 	}
2245 	
2246 	CtPtr ProtocolV1::send_connect_message_reply(char tag,
2247 	                                             ceph_msg_connect_reply &reply,
2248 	                                             bufferlist &authorizer_reply) {
2249 	  ldout(cct, 20) << __func__ << dendl;
2250 	  bufferlist reply_bl;
2251 	  reply.tag = tag;
2252 	  reply.features =
2253 	      ((uint64_t)connect_msg.features & connection->policy.features_supported) |
2254 	      connection->policy.features_required;
2255 	  reply.authorizer_len = authorizer_reply.length();
2256 	  reply_bl.append((char *)&reply, sizeof(reply));
2257 	
2258 	  ldout(cct, 10) << __func__ << " reply features 0x" << std::hex
2259 			 << reply.features << " = (policy sup 0x"
2260 			 << connection->policy.features_supported
2261 			 << " & connect 0x" << (uint64_t)connect_msg.features
2262 			 << ") | policy req 0x"
2263 			 << connection->policy.features_required
2264 			 << dendl;
2265 	
2266 	  if (reply.authorizer_len) {
2267 	    reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2268 	    authorizer_reply.clear();
2269 	  }
2270 	
2271 	  return WRITE(reply_bl, handle_connect_message_reply_write);
2272 	}
2273 	
2274 	CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
2275 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
2276 	
2277 	  if (r < 0) {
2278 	    ldout(cct, 1) << " write connect message reply failed" << dendl;
2279 	    connection->inject_delay();
2280 	    return _fault();
2281 	  }
2282 	
2283 	  return CONTINUE(wait_connect_message);
2284 	}
2285 	
2286 	CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
2287 	                          ceph_msg_connect_reply &reply,
2288 	                          bufferlist &authorizer_reply) {
2289 	  ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
2290 	
2291 	  connection->inject_delay();
2292 	  if (existing->policy.lossy) {
2293 	    // disconnect from the Connection
2294 	    ldout(cct, 1) << __func__ << " replacing on lossy channel, failing existing"
2295 	                  << dendl;
2296 	    existing->protocol->stop();
2297 	    existing->dispatch_queue->queue_reset(existing.get());
2298 	  } else {
2299 	    ceph_assert(can_write == WriteStatus::NOWRITE);
2300 	    existing->write_lock.lock();
2301 	
2302 	    ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2303 	
2304 	    // reset the in_seq if this is a hard reset from peer,
2305 	    // otherwise we respect our original connection's value
2306 	    if (is_reset_from_peer) {
2307 	      exproto->is_reset_from_peer = true;
2308 	    }
2309 	
2310 	    connection->center->delete_file_event(connection->cs.fd(),
2311 	                                          EVENT_READABLE | EVENT_WRITABLE);
2312 	
2313 	    if (existing->delay_state) {
2314 	      existing->delay_state->flush();
2315 	      ceph_assert(!connection->delay_state);
2316 	    }
2317 	    exproto->reset_recv_state();
2318 	
2319 	    exproto->connect_msg.features = connect_msg.features;
2320 	
2321 	    auto temp_cs = std::move(connection->cs);
2322 	    EventCenter *new_center = connection->center;
2323 	    Worker *new_worker = connection->worker;
2324 	    // avoid _stop shutdown replacing socket
2325 	    // queue a reset on the new connection, which we're dumping for the old
2326 	    stop();
2327 	
2328 	    connection->dispatch_queue->queue_reset(connection);
2329 	    ldout(messenger->cct, 1)
2330 	        << __func__ << " stop myself to swap existing" << dendl;
2331 	    exproto->can_write = WriteStatus::REPLACING;
2332 	    exproto->replacing = true;
2333 	    exproto->write_in_progress = false;
2334 	    existing->state_offset = 0;
2335 	    // avoid previous thread modify event
2336 	    exproto->state = NONE;
2337 	    existing->state = AsyncConnection::STATE_NONE;
2338 	    // Discard existing prefetch buffer in `recv_buf`
2339 	    existing->recv_start = existing->recv_end = 0;
2340 	    // there shouldn't exist any buffer
2341 	    ceph_assert(connection->recv_start == connection->recv_end);
2342 	
2343 	    auto deactivate_existing = std::bind(
2344 	        [existing, new_worker, new_center, exproto, reply,
2345 	         authorizer_reply](ConnectedSocket &cs) mutable {
2346 	          // we need to delete time event in original thread
2347 	          {
2348 	            std::lock_guard<std::mutex> l(existing->lock);
2349 	            existing->write_lock.lock();
2350 	            exproto->requeue_sent();
2351 	            existing->outgoing_bl.clear();
2352 	            existing->open_write = false;
2353 	            existing->write_lock.unlock();
2354 	            if (exproto->state == NONE) {
2355 	              existing->shutdown_socket();
2356 	              existing->cs = std::move(cs);
2357 	              existing->worker->references--;
2358 	              new_worker->references++;
2359 	              existing->logger = new_worker->get_perf_counter();
2360 	              existing->worker = new_worker;
2361 	              existing->center = new_center;
2362 	              if (existing->delay_state)
2363 	                existing->delay_state->set_center(new_center);
2364 	            } else if (exproto->state == CLOSED) {
2365 	              auto back_to_close =
2366 	                  std::bind([](ConnectedSocket &cs) mutable { cs.close(); },
2367 	                            std::move(cs));
2368 	              new_center->submit_to(new_center->get_id(),
2369 	                                    std::move(back_to_close), true);
2370 	              return;
2371 	            } else {
2372 	              ceph_abort();
2373 	            }
2374 	          }
2375 	
2376 	          // Before changing existing->center, it may already exists some
2377 	          // events in existing->center's queue. Then if we mark down
2378 	          // `existing`, it will execute in another thread and clean up
2379 	          // connection. Previous event will result in segment fault
2380 	          auto transfer_existing = [existing, exproto, reply,
2381 	                                    authorizer_reply]() mutable {
2382 	            std::lock_guard<std::mutex> l(existing->lock);
2383 	            if (exproto->state == CLOSED) return;
2384 	            ceph_assert(exproto->state == NONE);
2385 	
2386 	            // we have called shutdown_socket above
2387 	            ceph_assert(existing->last_tick_id == 0);
2388 	            // restart timer since we are going to re-build connection
2389 	            existing->last_connect_started = ceph::coarse_mono_clock::now();
2390 	            existing->last_tick_id = existing->center->create_time_event(
2391 	              existing->connect_timeout_us, existing->tick_handler);
2392 	            existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2393 	            exproto->state = ACCEPTING;
2394 	
2395 	            existing->center->create_file_event(
2396 	                existing->cs.fd(), EVENT_READABLE, existing->read_handler);
2397 	            reply.global_seq = exproto->peer_global_seq;
2398 	            exproto->run_continuation(exproto->send_connect_message_reply(
2399 	                CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
2400 	          };
2401 	          if (existing->center->in_thread())
2402 	            transfer_existing();
2403 	          else
2404 	            existing->center->submit_to(existing->center->get_id(),
2405 	                                        std::move(transfer_existing), true);
2406 	        },
2407 	        std::move(temp_cs));
2408 	
2409 	    existing->center->submit_to(existing->center->get_id(),
2410 	                                std::move(deactivate_existing), true);
2411 	    existing->write_lock.unlock();
2412 	    existing->lock.unlock();
2413 	    return nullptr;
2414 	  }
2415 	  existing->lock.unlock();
2416 	
2417 	  return open(reply, authorizer_reply);
2418 	}
2419 	
2420 	CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
2421 	                       bufferlist &authorizer_reply) {
2422 	  ldout(cct, 20) << __func__ << dendl;
2423 	
2424 	  connect_seq = connect_msg.connect_seq + 1;
2425 	  peer_global_seq = connect_msg.global_seq;
2426 	  ldout(cct, 10) << __func__ << " accept success, connect_seq = " << connect_seq
2427 	                 << " in_seq=" << in_seq << ", sending READY" << dendl;
2428 	
2429 	  // if it is a hard reset from peer, we don't need a round-trip to negotiate
2430 	  // in/out sequence
2431 	  if ((connect_msg.features & CEPH_FEATURE_RECONNECT_SEQ) &&
2432 	      !is_reset_from_peer) {
2433 	    reply.tag = CEPH_MSGR_TAG_SEQ;
2434 	    wait_for_seq = true;
2435 	  } else {
2436 	    reply.tag = CEPH_MSGR_TAG_READY;
2437 	    wait_for_seq = false;
2438 	    out_seq = discard_requeued_up_to(out_seq, 0);
2439 	    is_reset_from_peer = false;
2440 	    in_seq = 0;
2441 	  }
2442 	
2443 	  // send READY reply
2444 	  reply.features = connection->policy.features_supported;
2445 	  reply.global_seq = messenger->get_global_seq();
2446 	  reply.connect_seq = connect_seq;
2447 	  reply.flags = 0;
2448 	  reply.authorizer_len = authorizer_reply.length();
2449 	  if (connection->policy.lossy) {
2450 	    reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
2451 	  }
2452 	
2453 	  connection->set_features((uint64_t)reply.features &
2454 	                           (uint64_t)connect_msg.features);
2455 	  ldout(cct, 10) << __func__ << " accept features "
2456 	                 << connection->get_features()
2457 			 << " authorizer_protocol "
2458 			 << connect_msg.authorizer_protocol << dendl;
2459 	
2460 	  session_security.reset(
2461 	    get_auth_session_handler(cct, auth_meta->auth_method,
2462 				     auth_meta->session_key,
2463 				     connection->get_features()));
2464 	
2465 	  bufferlist reply_bl;
2466 	  reply_bl.append((char *)&reply, sizeof(reply));
2467 	
2468 	  if (reply.authorizer_len) {
2469 	    reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2470 	  }
2471 	
2472 	  if (reply.tag == CEPH_MSGR_TAG_SEQ) {
2473 	    uint64_t s = in_seq;
2474 	    reply_bl.append((char *)&s, sizeof(s));
2475 	  }
2476 	
2477 	  connection->lock.unlock();
2478 	  // Because "replacing" will prevent other connections preempt this addr,
2479 	  // it's safe that here we don't acquire Connection's lock
2480 	  ssize_t r = messenger->accept_conn(connection);
2481 	
2482 	  connection->inject_delay();
2483 	
2484 	  connection->lock.lock();
2485 	  replacing = false;
2486 	  if (r < 0) {
2487 	    ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2488 	                  << connection->peer_addrs->legacy_addr()
2489 	                  << " just fail later one(this)" << dendl;
2490 	    ldout(cct, 10) << "accept fault after register" << dendl;
2491 	    connection->inject_delay();
2492 	    return _fault();
2493 	  }
2494 	  if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2495 	    ldout(cct, 1) << __func__
2496 	                  << " state changed while accept_conn, it must be mark_down"
2497 	                  << dendl;
2498 	    ceph_assert(state == CLOSED || state == NONE);
2499 	    ldout(cct, 10) << "accept fault after register" << dendl;
2500 	    messenger->unregister_conn(connection);
2501 	    connection->inject_delay();
2502 	    return _fault();
2503 	  }
2504 	
2505 	  return WRITE(reply_bl, handle_ready_connect_message_reply_write);
2506 	}
2507 	
2508 	CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
2509 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
2510 	
2511 	  if (r < 0) {
2512 	    ldout(cct, 1) << __func__ << " write ready connect message reply failed"
2513 	                  << dendl;
2514 	    return _fault();
2515 	  }
2516 	
2517 	  // notify
2518 	  connection->dispatch_queue->queue_accept(connection);
2519 	  messenger->ms_deliver_handle_fast_accept(connection);
2520 	  once_ready = true;
2521 	
2522 	  state = ACCEPTING_HANDLED_CONNECT_MSG;
2523 	
2524 	  if (wait_for_seq) {
2525 	    return wait_seq();
2526 	  }
2527 	
2528 	  return server_ready();
2529 	}
2530 	
2531 	CtPtr ProtocolV1::wait_seq() {
2532 	  ldout(cct, 20) << __func__ << dendl;
2533 	
2534 	  return READ(sizeof(uint64_t), handle_seq);
2535 	}
2536 	
2537 	CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
2538 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
2539 	
2540 	  if (r < 0) {
2541 	    ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
2542 	    return _fault();
2543 	  }
2544 	
2545 	  uint64_t newly_acked_seq = *(uint64_t *)buffer;
2546 	  ldout(cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq
2547 	                << dendl;
2548 	  out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
2549 	
2550 	  return server_ready();
2551 	}
2552 	
2553 	CtPtr ProtocolV1::server_ready() {
2554 	  ldout(cct, 20) << __func__ << " session_security is "
2555 			 << session_security
2556 			 << dendl;
2557 	
2558 	  ldout(cct, 20) << __func__ << " accept done" << dendl;
2559 	  memset(&connect_msg, 0, sizeof(connect_msg));
2560 	
2561 	  if (connection->delay_state) {
2562 	    ceph_assert(connection->delay_state->ready());
2563 	  }
2564 	
2565 	  return ready();
2566 	}
2567