1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include <type_traits>
5    	
6    	#include "ProtocolV2.h"
7    	#include "AsyncMessenger.h"
8    	
9    	#include "common/EventTrace.h"
10   	#include "common/ceph_crypto.h"
11   	#include "common/errno.h"
12   	#include "include/random.h"
13   	#include "auth/AuthClient.h"
14   	#include "auth/AuthServer.h"
15   	
16   	#define dout_subsys ceph_subsys_ms
17   	#undef dout_prefix
18   	#define dout_prefix _conn_prefix(_dout)
19   	ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20   	  return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21   	                << *connection->peer_addrs << " conn(" << connection << " "
22   	                << this
23   			<< " " << ceph_con_mode_name(auth_meta->con_mode)
24   			<< " :" << connection->port
25   	                << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26   	                << " cs=" << connect_seq << " l=" << connection->policy.lossy
27   	                << " rx=" << session_stream_handlers.rx.get()
28   	                << " tx=" << session_stream_handlers.tx.get()
29   	                << ").";
30   	}
31   	
32   	using namespace ceph::msgr::v2;
33   	
34   	using CtPtr = Ct<ProtocolV2> *;
35   	using CtRef = Ct<ProtocolV2> &;
36   	
37   	void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38   	  if (pcontinuation) {
39   	    run_continuation(*pcontinuation);
40   	  }
41   	}
42   	
43   	void ProtocolV2::run_continuation(CtRef continuation) {
44   	  try {
45   	    CONTINUATION_RUN(continuation)
46   	  } catch (const buffer::error &e) {
47   	    lderr(cct) << __func__ << " failed decoding of frame header: " << e
48   	               << dendl;
49   	    _fault();
50   	  } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51   	    lderr(cct) << __func__ << " " << e.what() << dendl;
52   	    _fault();
53   	  } catch (const DecryptionError &) {
54   	    lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55   	  }
56   	}
57   	
58   	#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59   	
60   	#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61   	
62   	#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63   	
64   	#ifdef UNIT_TESTS_BUILT
65   	
66   	#define INTERCEPT(S) { \
67   	if(connection->interceptor) { \
68   	  auto a = connection->interceptor->intercept(connection, (S)); \
69   	  if (a == Interceptor::ACTION::FAIL) { \
70   	    return _fault(); \
71   	  } else if (a == Interceptor::ACTION::STOP) { \
72   	    stop(); \
73   	    connection->dispatch_queue->queue_reset(connection); \
74   	    return nullptr; \
75   	  }}}
76   	  
77   	#else
78   	#define INTERCEPT(S)
79   	#endif
80   	
81   	ProtocolV2::ProtocolV2(AsyncConnection *connection)
82   	    : Protocol(2, connection),
83   	      state(NONE),
84   	      peer_required_features(0),
85   	      client_cookie(0),
86   	      server_cookie(0),
87   	      global_seq(0),
88   	      connect_seq(0),
89   	      peer_global_seq(0),
90   	      message_seq(0),
91   	      reconnecting(false),
92   	      replacing(false),
93   	      can_write(false),
94   	      bannerExchangeCallback(nullptr),
95   	      next_tag(static_cast<Tag>(0)),
96   	      keepalive(false) {
97   	}
98   	
99   	ProtocolV2::~ProtocolV2() {
100  	}
101  	
102  	void ProtocolV2::connect() {
103  	  ldout(cct, 1) << __func__ << dendl;
104  	  state = START_CONNECT;
105  	  pre_auth.enabled = true;
106  	}
107  	
108  	void ProtocolV2::accept() {
109  	  ldout(cct, 1) << __func__ << dendl;
110  	  state = START_ACCEPT;
111  	}
112  	
113  	bool ProtocolV2::is_connected() { return can_write; }
114  	
115  	/*
116  	 * Tears down the message queues, and removes them from the
117  	 * DispatchQueue Must hold write_lock prior to calling.
118  	 */
119  	void ProtocolV2::discard_out_queue() {
120  	  ldout(cct, 10) << __func__ << " started" << dendl;
121  	
122  	  for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123  	    ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124  	    (*p)->put();
125  	  }
126  	  sent.clear();
127  	  for (auto& [ prio, entries ] : out_queue) {
128  	    static_cast<void>(prio);
129  	    for (auto& entry : entries) {
130  	      ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131  	      entry.m->put();
132  	    }
133  	  }
134  	  out_queue.clear();
135  	  write_in_progress = false;
136  	}
137  	
138  	void ProtocolV2::reset_session() {
139  	  ldout(cct, 1) << __func__ << dendl;
140  	
141  	  std::lock_guard<std::mutex> l(connection->write_lock);
142  	  if (connection->delay_state) {
143  	    connection->delay_state->discard();
144  	  }
145  	
146  	  connection->dispatch_queue->discard_queue(connection->conn_id);
147  	  discard_out_queue();
148  	  connection->outgoing_bl.clear();
149  	
150  	  connection->dispatch_queue->queue_remote_reset(connection);
151  	
152  	  out_seq = 0;
153  	  in_seq = 0;
154  	  client_cookie = 0;
155  	  server_cookie = 0;
156  	  connect_seq = 0;
157  	  peer_global_seq = 0;
158  	  message_seq = 0;
159  	  ack_left = 0;
160  	  can_write = false;
161  	}
162  	
163  	void ProtocolV2::stop() {
164  	  ldout(cct, 1) << __func__ << dendl;
165  	  if (state == CLOSED) {
166  	    return;
167  	  }
168  	
169  	  if (connection->delay_state) connection->delay_state->flush();
170  	
171  	  std::lock_guard<std::mutex> l(connection->write_lock);
172  	
173  	  reset_recv_state();
174  	  discard_out_queue();
175  	
176  	  connection->_stop();
177  	
178  	  can_write = false;
179  	  state = CLOSED;
180  	}
181  	
182  	void ProtocolV2::fault() { _fault(); }
183  	
184  	void ProtocolV2::requeue_sent() {
185  	  write_in_progress = false;
186  	  if (sent.empty()) {
187  	    return;
188  	  }
189  	
190  	  auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
191  	  out_seq -= sent.size();
192  	  while (!sent.empty()) {
193  	    Message *m = sent.back();
194  	    sent.pop_back();
195  	    ldout(cct, 5) << __func__ << " requeueing message m=" << m
196  	                  << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
197  	                  << *m << dendl;
198  	    m->clear_payload();
199  	    rq.emplace_front(out_queue_entry_t{false, m});
200  	  }
201  	}
202  	
203  	uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
204  	  ldout(cct, 10) << __func__ << " " << seq << dendl;
205  	  std::lock_guard<std::mutex> l(connection->write_lock);
206  	  if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
207  	    return seq;
208  	  }
209  	  auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
210  	  uint64_t count = out_seq;
211  	  while (!rq.empty()) {
212  	    Message* const m = rq.front().m;
213  	    if (m->get_seq() == 0 || m->get_seq() > seq) break;
214  	    ldout(cct, 5) << __func__ << " discarding message m=" << m
215  	                  << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
216  	                  << *m << dendl;
217  	    m->put();
218  	    rq.pop_front();
219  	    count++;
220  	  }
221  	  if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
222  	  return count;
223  	}
224  	
225  	void ProtocolV2::reset_recv_state() {
226  	  auth_meta.reset(new AuthConnectionMeta);
227  	  session_stream_handlers.tx.reset(nullptr);
228  	  session_stream_handlers.rx.reset(nullptr);
229  	  pre_auth.txbuf.clear();
230  	  pre_auth.rxbuf.clear();
231  	
232  	  // clean read and write callbacks
233  	  connection->pendingReadLen.reset();
234  	  connection->writeCallback.reset();
235  	
236  	  next_tag = static_cast<Tag>(0);
237  	
238  	  reset_throttle();
239  	}
240  	
241  	size_t ProtocolV2::get_current_msg_size() const {
242  	  ceph_assert(!rx_segments_desc.empty());
243  	  size_t sum = 0;
244  	  // we don't include SegmentIndex::Msg::HEADER.
245  	  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
246  	    sum += rx_segments_desc[idx].length;
247  	  }
248  	  return sum;
249  	}
250  	
251  	void ProtocolV2::reset_throttle() {
252  	  if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
253  	      connection->policy.throttler_messages) {
254  	    ldout(cct, 10) << __func__ << " releasing " << 1
255  	                   << " message to policy throttler "
256  	                   << connection->policy.throttler_messages->get_current()
257  	                   << "/" << connection->policy.throttler_messages->get_max()
258  	                   << dendl;
259  	    connection->policy.throttler_messages->put();
260  	  }
261  	  if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
262  	    if (connection->policy.throttler_bytes) {
263  	      const size_t cur_msg_size = get_current_msg_size();
264  	      ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
265  	                     << " bytes to policy throttler "
266  	                     << connection->policy.throttler_bytes->get_current() << "/"
267  	                     << connection->policy.throttler_bytes->get_max() << dendl;
268  	      connection->policy.throttler_bytes->put(cur_msg_size);
269  	    }
270  	  }
271  	  if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
272  	    const size_t cur_msg_size = get_current_msg_size();
273  	    ldout(cct, 10)
274  	        << __func__ << " releasing " << cur_msg_size
275  	        << " bytes to dispatch_queue throttler "
276  	        << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
277  	        << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
278  	    connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
279  	  }
280  	}
281  	
282  	CtPtr ProtocolV2::_fault() {
283  	  ldout(cct, 10) << __func__ << dendl;
284  	
285  	  if (state == CLOSED || state == NONE) {
286  	    ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
287  	    return nullptr;
288  	  }
289  	
290  	  if (connection->policy.lossy &&
291  	      !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
292  	    ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
293  	    stop();
294  	    connection->dispatch_queue->queue_reset(connection);
295  	    return nullptr;
296  	  }
297  	
298  	  connection->write_lock.lock();
299  	
300  	  can_write = false;
301  	  // requeue sent items
302  	  requeue_sent();
303  	
304  	  if (out_queue.empty() && state >= START_ACCEPT &&
305  	      state <= SESSION_ACCEPTING && !replacing) {
306  	    ldout(cct, 2) << __func__ << " with nothing to send and in the half "
307  	                   << " accept state just closed" << dendl;
308  	    connection->write_lock.unlock();
309  	    stop();
310  	    connection->dispatch_queue->queue_reset(connection);
311  	    return nullptr;
312  	  }
313  	
314  	  replacing = false;
315  	  connection->fault();
316  	  reset_recv_state();
317  	
318  	  reconnecting = false;
319  	
320  	  if (connection->policy.standby && out_queue.empty() && !keepalive &&
321  	      state != WAIT) {
322  	    ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
323  	                  << dendl;
324  	    state = STANDBY;
325  	    connection->write_lock.unlock();
326  	    return nullptr;
327  	  }
328  	  if (connection->policy.server) {
329  	    ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
330  	    state = STANDBY;
331  	    connection->write_lock.unlock();
332  	    return nullptr;
333  	  }
334  	
335  	  connection->write_lock.unlock();
336  	
337  	  if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
338  	      state != WAIT &&
339  	      state != SESSION_ACCEPTING /* due to connection race */) {
340  	    // policy maybe empty when state is in accept
341  	    if (connection->policy.server) {
342  	      ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
343  	      state = STANDBY;
344  	    } else {
345  	      ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
346  	      connect_seq++;
347  	      global_seq = messenger->get_global_seq();
348  	      state = START_CONNECT;
349  	      pre_auth.enabled = true;
350  	      connection->state = AsyncConnection::STATE_CONNECTING;
351  	    }
352  	    backoff = utime_t();
353  	    connection->center->dispatch_event_external(connection->read_handler);
354  	  } else {
355  	    if (state == WAIT) {
356  	      backoff.set_from_double(cct->_conf->ms_max_backoff);
357  	    } else if (backoff == utime_t()) {
358  	      backoff.set_from_double(cct->_conf->ms_initial_backoff);
359  	    } else {
360  	      backoff += backoff;
361  	      if (backoff > cct->_conf->ms_max_backoff)
362  	        backoff.set_from_double(cct->_conf->ms_max_backoff);
363  	    }
364  	
365  	    if (server_cookie) {
366  	      connect_seq++;
367  	    }
368  	
369  	    global_seq = messenger->get_global_seq();
370  	    state = START_CONNECT;
371  	    pre_auth.enabled = true;
372  	    connection->state = AsyncConnection::STATE_CONNECTING;
373  	    ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
374  	    // woke up again;
375  	    connection->register_time_events.insert(
376  	        connection->center->create_time_event(backoff.to_nsec() / 1000,
377  	                                              connection->wakeup_handler));
378  	  }
379  	  return nullptr;
380  	}
381  	
382  	void ProtocolV2::prepare_send_message(uint64_t features,
383  					      Message *m) {
384  	  ldout(cct, 20) << __func__ << " m=" << *m << dendl;
385  	
386  	  // associate message with Connection (for benefit of encode_payload)
387  	  ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
388  			 << features << " " << m  << " " << *m << dendl;
389  	
390  	  // encode and copy out of *m
391  	  m->encode(features, 0);
392  	}
393  	
394  	void ProtocolV2::send_message(Message *m) {
395  	  uint64_t f = connection->get_features();
396  	
397  	  // TODO: Currently not all messages supports reencode like MOSDMap, so here
398  	  // only let fast dispatch support messages prepare message
399  	  const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
400  	  if (can_fast_prepare) {
401  	    prepare_send_message(f, m);
402  	  }
403  	
404  	  std::lock_guard<std::mutex> l(connection->write_lock);
405  	  bool is_prepared = can_fast_prepare;
406  	  // "features" changes will change the payload encoding
407  	  if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
408  	    // ensure the correctness of message encoding
409  	    m->clear_payload();
410  	    is_prepared = false;
411  	    ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
412  	                   << " != " << connection->get_features() << dendl;
413  	  }
414  	  if (state == CLOSED) {
415  	    ldout(cct, 10) << __func__ << " connection closed."
416  	                   << " Drop message " << m << dendl;
417  	    m->put();
418  	  } else {
419  	    ldout(cct, 5) << __func__ << " enqueueing message m=" << m
420  	                  << " type=" << m->get_type() << " " << *m << dendl;
421  	    m->queue_start = ceph::mono_clock::now();
422  	    m->trace.event("async enqueueing message");
423  	    out_queue[m->get_priority()].emplace_back(
424  	      out_queue_entry_t{is_prepared, m});
425  	    ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
426  	                   << dendl;
427  	    if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
428  	      write_in_progress = true;
429  	      connection->center->dispatch_event_external(connection->write_handler);
430  	    }
431  	  }
432  	}
433  	
434  	void ProtocolV2::send_keepalive() {
435  	  ldout(cct, 10) << __func__ << dendl;
436  	  std::lock_guard<std::mutex> l(connection->write_lock);
437  	  if (state != CLOSED) {
438  	    keepalive = true;
439  	    connection->center->dispatch_event_external(connection->write_handler);
440  	  }
441  	}
442  	
443  	void ProtocolV2::read_event() {
444  	  ldout(cct, 20) << __func__ << dendl;
445  	
446  	  switch (state) {
447  	    case START_CONNECT:
448  	      run_continuation(CONTINUATION(start_client_banner_exchange));
449  	      break;
450  	    case START_ACCEPT:
451  	      run_continuation(CONTINUATION(start_server_banner_exchange));
452  	      break;
453  	    case READY:
454  	      run_continuation(CONTINUATION(read_frame));
455  	      break;
456  	    case THROTTLE_MESSAGE:
457  	      run_continuation(CONTINUATION(throttle_message));
458  	      break;
459  	    case THROTTLE_BYTES:
460  	      run_continuation(CONTINUATION(throttle_bytes));
461  	      break;
462  	    case THROTTLE_DISPATCH_QUEUE:
463  	      run_continuation(CONTINUATION(throttle_dispatch_queue));
464  	      break;
465  	    default:
466  	      break;
467  	  }
468  	}
469  	
470  	ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
471  	  out_queue_entry_t out_entry;
472  	
473  	  if (!out_queue.empty()) {
474  	    auto it = out_queue.rbegin();
475  	    auto& entries = it->second;
476  	    ceph_assert(!entries.empty());
477  	    out_entry = entries.front();
478  	    entries.pop_front();
479  	    if (entries.empty()) {
480  	      out_queue.erase(it->first);
481  	    }
482  	  }
483  	  return out_entry;
484  	}
485  	
486  	ssize_t ProtocolV2::write_message(Message *m, bool more) {
487  	  FUNCTRACE(cct);
488  	  ceph_assert(connection->center->in_thread());
489  	  m->set_seq(++out_seq);
490  	
491  	  connection->lock.lock();
492  	  uint64_t ack_seq = in_seq;
493  	  ack_left = 0;
494  	  connection->lock.unlock();
495  	
496  	  ceph_msg_header &header = m->get_header();
497  	  ceph_msg_footer &footer = m->get_footer();
498  	
499  	  ceph_msg_header2 header2{header.seq,        header.tid,
500  	                           header.type,       header.priority,
501  	                           header.version,
502  	                           init_le32(0),      header.data_off,
503  	                           init_le64(ack_seq),
504  	                           footer.flags,      header.compat_version,
505  	                           header.reserved};
506  	
507  	  auto message = MessageFrame::Encode(
508  				     header2,
509  				     m->get_payload(),
510  				     m->get_middle(),
511  				     m->get_data());
512  	  connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
513  	
514  	  ldout(cct, 5) << __func__ << " sending message m=" << m
515  	                << " seq=" << m->get_seq() << " " << *m << dendl;
516  	
517  	  m->trace.event("async writing message");
518  	  ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
519  	                 << " src=" << entity_name_t(messenger->get_myname())
520  	                 << " off=" << header2.data_off
521  	                 << dendl;
522  	  ssize_t total_send_size = connection->outgoing_bl.length();
523  	  ssize_t rc = connection->_try_send(more);
524  	  if (rc < 0) {
525  	    ldout(cct, 1) << __func__ << " error sending " << m << ", "
526  	                  << cpp_strerror(rc) << dendl;
527  	  } else {
528  	    connection->logger->inc(
529  	        l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
530  	    ldout(cct, 10) << __func__ << " sending " << m
531  	                   << (rc ? " continuely." : " done.") << dendl;
532  	  }
533  	
534  	#if defined(WITH_EVENTTRACE)
535  	  if (m->get_type() == CEPH_MSG_OSD_OP)
536  	    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
537  	  else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
538  	    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
539  	#endif
540  	  m->put();
541  	
542  	  return rc;
543  	}
544  	
545  	void ProtocolV2::append_keepalive() {
546  	  ldout(cct, 10) << __func__ << dendl;
547  	  auto keepalive_frame = KeepAliveFrame::Encode();
548  	  connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
549  	}
550  	
551  	void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
552  	  auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
553  	  connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
554  	}
555  	
556  	void ProtocolV2::handle_message_ack(uint64_t seq) {
557  	  if (connection->policy.lossy) {  // lossy connections don't keep sent messages
558  	    return;
559  	  }
560  	
561  	  ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
562  	
563  	  // trim sent list
564  	  static const int max_pending = 128;
565  	  int i = 0;
566  	  Message *pending[max_pending];
567  	  auto now = ceph::mono_clock::now();
568  	  connection->write_lock.lock();
569  	  while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
570  	    Message *m = sent.front();
571  	    sent.pop_front();
572  	    pending[i++] = m;
573  	    ldout(cct, 10) << __func__ << " got ack seq " << seq
574  	                   << " >= " << m->get_seq() << " on " << m << " " << *m
575  	                   << dendl;
576  	  }
577  	  connection->write_lock.unlock();
578  	  connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
579  	  for (int k = 0; k < i; k++) {
580  	    pending[k]->put();
581  	  }
582  	}
583  	
584  	void ProtocolV2::write_event() {
585  	  ldout(cct, 10) << __func__ << dendl;
586  	  ssize_t r = 0;
587  	
588  	  connection->write_lock.lock();
589  	  if (can_write) {
590  	    if (keepalive) {
591  	      append_keepalive();
592  	      keepalive = false;
593  	    }
594  	
595  	    auto start = ceph::mono_clock::now();
596  	    bool more;
597  	    do {
598  	      const auto out_entry = _get_next_outgoing();
599  	      if (!out_entry.m) {
600  	        break;
601  	      }
602  	
603  	      if (!connection->policy.lossy) {
604  	        // put on sent list
605  	        sent.push_back(out_entry.m);
606  	        out_entry.m->get();
607  	      }
608  	      more = !out_queue.empty();
609  	      connection->write_lock.unlock();
610  	
611  	      // send_message or requeue messages may not encode message
612  	      if (!out_entry.is_prepared) {
613  	        prepare_send_message(connection->get_features(), out_entry.m);
614  	      }
615  	
616  	      if (out_entry.m->queue_start != ceph::mono_time()) {
617  	        connection->logger->tinc(l_msgr_send_messages_queue_lat,
618  					 ceph::mono_clock::now() -
619  					 out_entry.m->queue_start);
620  	      }
621  	
622  	      r = write_message(out_entry.m, more);
623  	
624  	      connection->write_lock.lock();
625  	      if (r == 0) {
626  	        ;
627  	      } else if (r < 0) {
628  	        ldout(cct, 1) << __func__ << " send msg failed" << dendl;
629  	        break;
630  	      } else if (r > 0) {
631  		// Outbound message in-progress, thread will be re-awoken
632  		// when the outbound socket is writeable again
633  	        break;
634  	      }
635  	    } while (can_write);
636  	    write_in_progress = false;
637  	
638  	    // if r > 0 mean data still lefted, so no need _try_send.
639  	    if (r == 0) {
640  	      uint64_t left = ack_left;
641  	      if (left) {
642  	        auto ack = AckFrame::Encode(in_seq);
643  	        connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
644  	        ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
645  	                       << " messages" << dendl;
646  	        ack_left -= left;
647  	        left = ack_left;
648  	        r = connection->_try_send(left);
649  	      } else if (is_queued()) {
650  	        r = connection->_try_send();
651  	      }
652  	    }
653  	    connection->write_lock.unlock();
654  	
655  	    connection->logger->tinc(l_msgr_running_send_time,
656  	                             ceph::mono_clock::now() - start);
657  	    if (r < 0) {
658  	      ldout(cct, 1) << __func__ << " send msg failed" << dendl;
659  	      connection->lock.lock();
660  	      fault();
661  	      connection->lock.unlock();
662  	      return;
663  	    }
664  	  } else {
665  	    write_in_progress = false;
666  	    connection->write_lock.unlock();
667  	    connection->lock.lock();
668  	    connection->write_lock.lock();
669  	    if (state == STANDBY && !connection->policy.server && is_queued()) {
670  	      ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
671  	      if (server_cookie) {  // only increment connect_seq if there is a session
672  	        connect_seq++;
673  	      }
674  	      connection->_connect();
675  	    } else if (connection->cs && state != NONE && state != CLOSED &&
676  	               state != START_CONNECT) {
677  	      r = connection->_try_send();
678  	      if (r < 0) {
679  	        ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
680  	        connection->write_lock.unlock();
681  	        fault();
682  	        connection->lock.unlock();
683  	        return;
684  	      }
685  	    }
686  	    connection->write_lock.unlock();
687  	    connection->lock.unlock();
688  	  }
689  	}
690  	
691  	bool ProtocolV2::is_queued() {
692  	  return !out_queue.empty() || connection->is_queued();
693  	}
694  	
695  	uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
696  	  if (session_stream_handlers.rx) {
697  	    return segment_onwire_size(logical_size);
698  	  } else {
699  	    return logical_size;
700  	  }
701  	}
702  	
703  	uint32_t ProtocolV2::get_epilogue_size() const {
704  	  // In secure mode size of epilogue is flexible and depends on particular
705  	  // cipher implementation. See the comment for epilogue_secure_block_t or
706  	  // epilogue_plain_block_t.
707  	  if (session_stream_handlers.rx) {
708  	    return FRAME_SECURE_EPILOGUE_SIZE + \
709  	        session_stream_handlers.rx->get_extra_size_at_final();
710  	  } else {
711  	    return FRAME_PLAIN_EPILOGUE_SIZE;
712  	  }
713  	}
714  	
715  	CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
716  	                       rx_buffer_t &&buffer) {
717  	  const auto len = buffer->length();
718  	  const auto buf = buffer->c_str();
719  	  next.node = std::move(buffer);
720  	  ssize_t r = connection->read(len, buf,
721  	    [&next, this](char *buffer, int r) {
722  	      if (unlikely(pre_auth.enabled) && r >= 0) {
723  	        pre_auth.rxbuf.append(*next.node);
724  		ceph_assert(!cct->_conf->ms_die_on_bug ||
725  			    pre_auth.rxbuf.length() < 1000000);
726  	      }
727  	      next.r = r;
728  	      run_continuation(next);
729  	    });
730  	  if (r <= 0) {
731  	    // error or done synchronously
732  	    if (unlikely(pre_auth.enabled) && r >= 0) {
733  	      pre_auth.rxbuf.append(*next.node);
734  	      ceph_assert(!cct->_conf->ms_die_on_bug ||
735  			  pre_auth.rxbuf.length() < 1000000);
736  	    }
737  	    next.r = r;
738  	    return &next;
739  	  }
740  	
741  	  return nullptr;
742  	}
743  	
744  	template <class F>
745  	CtPtr ProtocolV2::write(const std::string &desc,
746  	                        CONTINUATION_TYPE<ProtocolV2> &next,
747  	                        F &frame) {
748  	  ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
749  	  return write(desc, next, bl);
750  	}
751  	
752  	CtPtr ProtocolV2::write(const std::string &desc,
753  	                        CONTINUATION_TYPE<ProtocolV2> &next,
754  	                        bufferlist &buffer) {
755  	  if (unlikely(pre_auth.enabled)) {
756  	    pre_auth.txbuf.append(buffer);
757  	    ceph_assert(!cct->_conf->ms_die_on_bug ||
758  			pre_auth.txbuf.length() < 1000000);
759  	  }
760  	
761  	  ssize_t r =
762  	      connection->write(buffer, [&next, desc, this](int r) {
763  	        if (r < 0) {
764  	          ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
765  	                        << " (" << cpp_strerror(r) << ")" << dendl;
766  	          connection->inject_delay();
767  	          _fault();
768  	        }
769  	        run_continuation(next);
770  	      });
771  	
772  	  if (r < 0) {
773  	    ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
774  	                  << " (" << cpp_strerror(r) << ")" << dendl;
775  	    return _fault();
776  	  } else if (r == 0) {
777  	    next.setParams();
778  	    return &next;
779  	  }
780  	
781  	  return nullptr;
782  	}
783  	
784  	CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
785  	  ldout(cct, 20) << __func__ << dendl;
786  	  bannerExchangeCallback = &callback;
787  	
788  	  bufferlist banner_payload;
789  	  encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
790  	  encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
791  	
792  	  bufferlist bl;
793  	  bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
794  	  encode((uint16_t)banner_payload.length(), bl, 0);
795  	  bl.claim_append(banner_payload);
796  	
797  	  INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
798  	
799  	  return WRITE(bl, "banner", _wait_for_peer_banner);
800  	}
801  	
802  	CtPtr ProtocolV2::_wait_for_peer_banner() {
803  	  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
804  	  return READ(banner_len, _handle_peer_banner);
805  	}
806  	
807  	CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
808  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
809  	
810  	  if (r < 0) {
811  	    ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
812  	                  << cpp_strerror(r) << ")" << dendl;
813  	    return _fault();
814  	  }
815  	
816  	  unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
817  	
818  	  if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
819  	    if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
820  	      lderr(cct) << __func__ << " peer " << *connection->peer_addrs
821  	                 << " is using msgr V1 protocol" << dendl;
822  	      return _fault();
823  	    }
824  	    ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
825  	    return _fault();
826  	  }
827  	
828  	  uint16_t payload_len;
829  	  bufferlist bl;
830  	  buffer->set_offset(banner_prefix_len);
831  	  buffer->set_length(sizeof(ceph_le16));
832  	  bl.push_back(std::move(buffer));
833  	  auto ti = bl.cbegin();
834  	  try {
835  	    decode(payload_len, ti);
836  	  } catch (const buffer::error &e) {
837  	    lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
838  	    return _fault();
839  	  }
840  	
841  	  INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
842  	
843  	  return READ(payload_len, _handle_peer_banner_payload);
844  	}
845  	
846  	CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
847  	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
848  	
849  	  if (r < 0) {
850  	    ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
851  	                  << " (" << cpp_strerror(r) << ")" << dendl;
852  	    return _fault();
853  	  }
854  	
855  	  uint64_t peer_supported_features;
856  	  uint64_t peer_required_features;
857  	
858  	  bufferlist bl;
859  	  bl.push_back(std::move(buffer));
860  	  auto ti = bl.cbegin();
861  	  try {
862  	    decode(peer_supported_features, ti);
863  	    decode(peer_required_features, ti);
864  	  } catch (const buffer::error &e) {
865  	    lderr(cct) << __func__ << " decode banner payload failed " << dendl;
866  	    return _fault();
867  	  }
868  	
869  	  ldout(cct, 1) << __func__ << " supported=" << std::hex
870  	                << peer_supported_features << " required=" << std::hex
871  	                << peer_required_features << std::dec << dendl;
872  	
873  	  // Check feature bit compatibility
874  	
875  	  uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
876  	  uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
877  	
878  	  if ((required_features & peer_supported_features) != required_features) {
879  	    ldout(cct, 1) << __func__ << " peer does not support all required features"
880  	                  << " required=" << std::hex << required_features
881  	                  << " supported=" << std::hex << peer_supported_features
882  	                  << std::dec << dendl;
883  	    stop();
884  	    connection->dispatch_queue->queue_reset(connection);
885  	    return nullptr;
886  	  }
887  	  if ((supported_features & peer_required_features) != peer_required_features) {
888  	    ldout(cct, 1) << __func__ << " we do not support all peer required features"
889  	                  << " required=" << std::hex << peer_required_features
890  	                  << " supported=" << supported_features << std::dec << dendl;
891  	    stop();
892  	    connection->dispatch_queue->queue_reset(connection);
893  	    return nullptr;
894  	  }
895  	
896  	  this->peer_required_features = peer_required_features;
897  	  if (this->peer_required_features == 0) {
898  	    this->connection_features = msgr2_required;
899  	  }
900  	
901  	  // at this point we can change how the client protocol behaves based on
902  	  // this->peer_required_features
903  	
904  	  if (state == BANNER_CONNECTING) {
905  	    state = HELLO_CONNECTING;
906  	  }
907  	  else {
908  	    ceph_assert(state == BANNER_ACCEPTING);
909  	    state = HELLO_ACCEPTING;
910  	  }
911  	
912  	  auto hello = HelloFrame::Encode(messenger->get_mytype(),
913  	                                  connection->target_addr);
914  	
915  	  INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
916  	
917  	  return WRITE(hello, "hello frame", read_frame);
918  	}
919  	
920  	CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
921  	{
922  	  ldout(cct, 20) << __func__
923  			 << " payload.length()=" << payload.length() << dendl;
924  	
925  	  if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
926  	    lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
927  	    return _fault();
928  	  }
929  	
930  	  auto hello = HelloFrame::Decode(payload);
931  	
932  	  ldout(cct, 5) << __func__ << " received hello:"
933  	                << " peer_type=" << (int)hello.entity_type()
934  	                << " peer_addr_for_me=" << hello.peer_addr() << dendl;
935  	
936  	  sockaddr_storage ss;
937  	  socklen_t len = sizeof(ss);
938  	  getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
939  	  ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
940  			<< " when talking to " << connection->target_addr << dendl;
941  	
942  	  if (connection->get_peer_type() == -1) {
943  	    connection->set_peer_type(hello.entity_type());
944  	
945  	    ceph_assert(state == HELLO_ACCEPTING);
946  	    connection->policy = messenger->get_policy(hello.entity_type());
947  	    ldout(cct, 10) << __func__ << " accept of host_type "
948  	                   << (int)hello.entity_type()
949  	                   << ", policy.lossy=" << connection->policy.lossy
950  	                   << " policy.server=" << connection->policy.server
951  	                   << " policy.standby=" << connection->policy.standby
952  	                   << " policy.resetcheck=" << connection->policy.resetcheck
953  	                   << dendl;
954  	  } else {
955  	    ceph_assert(state == HELLO_CONNECTING);
956  	    if (connection->get_peer_type() != hello.entity_type()) {
957  	      ldout(cct, 1) << __func__ << " connection peer type does not match what"
958  	                    << " peer advertises " << connection->get_peer_type()
959  	                    << " != " << (int)hello.entity_type() << dendl;
960  	      stop();
961  	      connection->dispatch_queue->queue_reset(connection);
962  	      return nullptr;
963  	    }
964  	  }
965  	
966  	  if (messenger->get_myaddrs().empty() ||
967  	      messenger->get_myaddrs().front().is_blank_ip()) {
968  	    entity_addr_t a;
969  	    if (cct->_conf->ms_learn_addr_from_peer) {
970  	      ldout(cct, 1) << __func__ << " peer " << connection->target_addr
971  			    << " says I am " << hello.peer_addr() << " (socket says "
972  			    << (sockaddr*)&ss << ")" << dendl;
973  	      a = hello.peer_addr();
974  	    } else {
975  	      ldout(cct, 1) << __func__ << " socket to  " << connection->target_addr
976  			    << " says I am " << (sockaddr*)&ss
977  			    << " (peer says " << hello.peer_addr() << ")" << dendl;
978  	      a.set_sockaddr((sockaddr *)&ss);
979  	    }
980  	    a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
981  	    a.set_port(0);
982  	    connection->lock.unlock();
983  	    messenger->learned_addr(a);
984  	    if (cct->_conf->ms_inject_internal_delays &&
985  	        cct->_conf->ms_inject_socket_failures) {
986  	      if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
987  	        ldout(cct, 10) << __func__ << " sleep for "
988  	                       << cct->_conf->ms_inject_internal_delays << dendl;
989  	        utime_t t;
990  	        t.set_from_double(cct->_conf->ms_inject_internal_delays);
991  	        t.sleep();
992  	      }
993  	    }
994  	    connection->lock.lock();
995  	    if (state != HELLO_CONNECTING) {
996  	      ldout(cct, 1) << __func__
997  	                    << " state changed while learned_addr, mark_down or "
998  	                    << " replacing must be happened just now" << dendl;
999  	      return nullptr;
1000 	    }
1001 	  }
1002 	
1003 	
1004 	
1005 	  CtPtr callback;
1006 	  callback = bannerExchangeCallback;
1007 	  bannerExchangeCallback = nullptr;
1008 	  ceph_assert(callback);
1009 	  return callback;
1010 	}
1011 	
1012 	CtPtr ProtocolV2::read_frame() {
1013 	  if (state == CLOSED) {
1014 	    return nullptr;
1015 	  }
1016 	
1017 	  ldout(cct, 20) << __func__ << dendl;
1018 	  return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
1019 	}
1020 	
(1) Event root_function: In function "ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&, int)" an exception of type "std::length_error" is thrown and never caught.
Also see events: [fun_call_w_exception]
1021 	CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1022 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1023 	
1024 	  if (r < 0) {
1025 	    ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
1026 	                  << " (" << cpp_strerror(r) << ")" << dendl;
1027 	    return _fault();
1028 	  }
1029 	
1030 	  ceph::bufferlist preamble;
1031 	  preamble.push_back(std::move(buffer));
1032 	
1033 	  ldout(cct, 30) << __func__ << " preamble\n";
1034 	  preamble.hexdump(*_dout);
1035 	  *_dout << dendl;
1036 	
1037 	  if (session_stream_handlers.rx) {
1038 	    ceph_assert(session_stream_handlers.rx);
1039 	
1040 	    session_stream_handlers.rx->reset_rx_handler();
1041 	    preamble = session_stream_handlers.rx->authenticated_decrypt_update(
1042 	      std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
1043 	
1044 	    ldout(cct, 10) << __func__ << " got encrypted preamble."
1045 	                   << " after decrypt premable.length()=" << preamble.length()
1046 	                   << dendl;
1047 	
1048 	    ldout(cct, 30) << __func__ << " preamble after decrypt\n";
1049 	    preamble.hexdump(*_dout);
1050 	    *_dout << dendl;
1051 	  }
1052 	
1053 	  {
1054 	    // I expect ceph_le32 will make the endian conversion for me. Passing
1055 	    // everything through ::Decode is unnecessary.
1056 	    const auto& main_preamble = \
1057 	      reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1058 	
1059 	    // verify preamble's CRC before any further processing
1060 	    const auto rx_crc = ceph_crc32c(0,
1061 	      reinterpret_cast<const unsigned char*>(&main_preamble),
1062 	      sizeof(main_preamble) - sizeof(main_preamble.crc));
1063 	    if (rx_crc != main_preamble.crc) {
1064 	      ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1065 			     << " rx_crc=" << rx_crc
1066 			     << " tx_crc=" << main_preamble.crc << dendl;
1067 	      return _fault();
1068 	    }
1069 	
1070 	    // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1071 	    if (main_preamble.num_segments < 1 ||
1072 	        main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1073 	      ldout(cct, 10) << __func__ << " unsupported num_segments="
1074 			     << " tx_crc=" << main_preamble.num_segments << dendl;
1075 	      return _fault();
1076 	    }
1077 	
1078 	    next_tag = static_cast<Tag>(main_preamble.tag);
1079 	
1080 	    rx_segments_desc.clear();
1081 	    rx_segments_data.clear();
1082 	
1083 	    if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
(2) Event fun_call_w_exception: Called function throws an exception of type "std::length_error". [details]
Also see events: [root_function]
1084 	      ldout(cct, 30) << __func__
1085 			     << " num_segments=" << main_preamble.num_segments
1086 			     << " is too much" << dendl;
1087 	      return _fault();
1088 	    }
1089 	    for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1090 	      ldout(cct, 10) << __func__ << " got new segment:"
1091 			     << " len=" << main_preamble.segments[idx].length
1092 			     << " align=" << main_preamble.segments[idx].alignment
1093 			     << dendl;
1094 	      rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1095 	    }
1096 	  }
1097 	
1098 	  // does it need throttle?
1099 	  if (next_tag == Tag::MESSAGE) {
1100 	    if (state != READY) {
1101 	      lderr(cct) << __func__ << " not in ready state!" << dendl;
1102 	      return _fault();
1103 	    }
1104 	    state = THROTTLE_MESSAGE;
1105 	    return CONTINUE(throttle_message);
1106 	  } else {
1107 	    return read_frame_segment();
1108 	  }
1109 	}
1110 	
1111 	CtPtr ProtocolV2::handle_read_frame_dispatch() {
1112 	  ldout(cct, 10) << __func__
1113 	                 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1114 	
1115 	  switch (next_tag) {
1116 	    case Tag::HELLO:
1117 	    case Tag::AUTH_REQUEST:
1118 	    case Tag::AUTH_BAD_METHOD:
1119 	    case Tag::AUTH_REPLY_MORE:
1120 	    case Tag::AUTH_REQUEST_MORE:
1121 	    case Tag::AUTH_DONE:
1122 	    case Tag::AUTH_SIGNATURE:
1123 	    case Tag::CLIENT_IDENT:
1124 	    case Tag::SERVER_IDENT:
1125 	    case Tag::IDENT_MISSING_FEATURES:
1126 	    case Tag::SESSION_RECONNECT:
1127 	    case Tag::SESSION_RESET:
1128 	    case Tag::SESSION_RETRY:
1129 	    case Tag::SESSION_RETRY_GLOBAL:
1130 	    case Tag::SESSION_RECONNECT_OK:
1131 	    case Tag::KEEPALIVE2:
1132 	    case Tag::KEEPALIVE2_ACK:
1133 	    case Tag::ACK:
1134 	    case Tag::WAIT:
1135 	      return handle_frame_payload();
1136 	    case Tag::MESSAGE:
1137 	      return handle_message();
1138 	    default: {
1139 	      lderr(cct) << __func__
1140 	                 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1141 	                 << dendl;
1142 	      return _fault();
1143 	    }
1144 	  }
1145 	
1146 	  return nullptr;
1147 	}
1148 	
1149 	CtPtr ProtocolV2::read_frame_segment() {
1150 	  ldout(cct, 20) << __func__ << dendl;
1151 	  ceph_assert(!rx_segments_desc.empty());
1152 	
1153 	  // description of current segment to read
1154 	  const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1155 	  rx_buffer_t rx_buffer;
1156 	  try {
1157 	    rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1158 	      get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1159 	  } catch (std::bad_alloc&) {
1160 	    // Catching because of potential issues with satisfying alignment.
1161 	    ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1162 			   << " len=" << get_onwire_size(cur_rx_desc.length)
1163 			   << " align=" << cur_rx_desc.alignment
1164 			   << dendl;
1165 	    return _fault();
1166 	  }
1167 	
1168 	  return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1169 	}
1170 	
1171 	CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1172 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1173 	
1174 	  if (r < 0) {
1175 	    ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1176 	                  << cpp_strerror(r) << ")" << dendl;
1177 	    return _fault();
1178 	  }
1179 	
1180 	  rx_segments_data.emplace_back();
1181 	  rx_segments_data.back().push_back(std::move(rx_buffer));
1182 	
1183 	  // decrypt incoming data
1184 	  // FIXME: if (auth_meta->is_mode_secure()) {
1185 	  if (session_stream_handlers.rx) {
1186 	    ceph_assert(session_stream_handlers.rx);
1187 	
1188 	    auto& new_seg = rx_segments_data.back();
1189 	    if (new_seg.length()) {
1190 	      auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1191 	          std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1192 	      const auto idx = rx_segments_data.size() - 1;
1193 	      new_seg.clear();
1194 	      padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1195 	
1196 	      ldout(cct, 20) << __func__
1197 	                     << " unpadded new_seg.length()=" << new_seg.length()
1198 	                     << dendl;
1199 	    }
1200 	  }
1201 	
1202 	  if (rx_segments_desc.size() == rx_segments_data.size()) {
1203 	    // OK, all segments planned to read are read. Can go with epilogue.
1204 	    return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1205 	  } else {
1206 	    // TODO: for makeshift only. This will be more generic and throttled
1207 	    return read_frame_segment();
1208 	  }
1209 	}
1210 	
1211 	CtPtr ProtocolV2::handle_frame_payload() {
1212 	  ceph_assert(!rx_segments_data.empty());
1213 	  auto& payload = rx_segments_data.back();
1214 	
1215 	  ldout(cct, 30) << __func__ << "\n";
1216 	  payload.hexdump(*_dout);
1217 	  *_dout << dendl;
1218 	
1219 	  switch (next_tag) {
1220 	    case Tag::HELLO:
1221 	      return handle_hello(payload);
1222 	    case Tag::AUTH_REQUEST:
1223 	      return handle_auth_request(payload);
1224 	    case Tag::AUTH_BAD_METHOD:
1225 	      return handle_auth_bad_method(payload);
1226 	    case Tag::AUTH_REPLY_MORE:
1227 	      return handle_auth_reply_more(payload);
1228 	    case Tag::AUTH_REQUEST_MORE:
1229 	      return handle_auth_request_more(payload);
1230 	    case Tag::AUTH_DONE:
1231 	      return handle_auth_done(payload);
1232 	    case Tag::AUTH_SIGNATURE:
1233 	      return handle_auth_signature(payload);
1234 	    case Tag::CLIENT_IDENT:
1235 	      return handle_client_ident(payload);
1236 	    case Tag::SERVER_IDENT:
1237 	      return handle_server_ident(payload);
1238 	    case Tag::IDENT_MISSING_FEATURES:
1239 	      return handle_ident_missing_features(payload);
1240 	    case Tag::SESSION_RECONNECT:
1241 	      return handle_reconnect(payload);
1242 	    case Tag::SESSION_RESET:
1243 	      return handle_session_reset(payload);
1244 	    case Tag::SESSION_RETRY:
1245 	      return handle_session_retry(payload);
1246 	    case Tag::SESSION_RETRY_GLOBAL:
1247 	      return handle_session_retry_global(payload);
1248 	    case Tag::SESSION_RECONNECT_OK:
1249 	      return handle_reconnect_ok(payload);
1250 	    case Tag::KEEPALIVE2:
1251 	      return handle_keepalive2(payload);
1252 	    case Tag::KEEPALIVE2_ACK:
1253 	      return handle_keepalive2_ack(payload);
1254 	    case Tag::ACK:
1255 	      return handle_message_ack(payload);
1256 	    case Tag::WAIT:
1257 	      return handle_wait(payload);
1258 	    default:
1259 	      ceph_abort();
1260 	  }
1261 	  return nullptr;
1262 	}
1263 	
1264 	CtPtr ProtocolV2::ready() {
1265 	  ldout(cct, 25) << __func__ << dendl;
1266 	
1267 	  reconnecting = false;
1268 	  replacing = false;
1269 	
1270 	  // make sure no pending tick timer
1271 	  if (connection->last_tick_id) {
1272 	    connection->center->delete_time_event(connection->last_tick_id);
1273 	  }
1274 	  connection->last_tick_id = connection->center->create_time_event(
1275 	      connection->inactive_timeout_us, connection->tick_handler);
1276 	
1277 	  {
1278 	    std::lock_guard<std::mutex> l(connection->write_lock);
1279 	    can_write = true;
1280 	    if (!out_queue.empty()) {
1281 	      connection->center->dispatch_event_external(connection->write_handler);
1282 	    }
1283 	  }
1284 	
1285 	  connection->maybe_start_delay_thread();
1286 	
1287 	  state = READY;
1288 	  ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1289 	                << std::hex << client_cookie << " server_cookie="
1290 	                << server_cookie << std::dec << " in_seq=" << in_seq
1291 	                << " out_seq=" << out_seq << dendl;
1292 	
1293 	  INTERCEPT(15);
1294 	
1295 	  return CONTINUE(read_frame);
1296 	}
1297 	
1298 	CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1299 	{
1300 	  ldout(cct, 20) << __func__ << " r=" << r << dendl;
1301 	
1302 	  if (r < 0) {
1303 	    ldout(cct, 1) << __func__ << " read data error " << dendl;
1304 	    return _fault();
1305 	  }
1306 	
1307 	  __u8 late_flags;
1308 	
1309 	  // FIXME: if (auth_meta->is_mode_secure()) {
1310 	  if (session_stream_handlers.rx) {
1311 	    ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1312 	                  << get_epilogue_size() << dendl;
1313 	
1314 	    // decrypt epilogue and authenticate entire frame.
1315 	    ceph::bufferlist epilogue_bl;
1316 	    {
1317 	      epilogue_bl.push_back(std::move(buffer));
1318 	      try {
1319 	        epilogue_bl =
1320 	            session_stream_handlers.rx->authenticated_decrypt_update_final(
1321 		        std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1322 	      } catch (ceph::crypto::onwire::MsgAuthError &e) {
1323 	        ldout(cct, 5) << __func__ << " message authentication failed: "
1324 	                      << e.what() << dendl;
1325 	        return _fault();
1326 	      }
1327 	    }
1328 	    auto& epilogue =
1329 	        reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1330 	    late_flags = epilogue.late_flags;
1331 	  } else {
1332 	    auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1333 	
1334 	    for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1335 	      const __u32 expected_crc = epilogue.crc_values[idx];
1336 	      const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1337 	      if (expected_crc != calculated_crc) {
1338 		ldout(cct, 5) << __func__ << " message integrity check failed: "
1339 			      << " expected_crc=" << expected_crc
1340 			      << " calculated_crc=" << calculated_crc
1341 			      << dendl;
1342 		return _fault();
1343 	      } else {
1344 		ldout(cct, 20) << __func__ << " message integrity check success: "
1345 			       << " expected_crc=" << expected_crc
1346 			       << " calculated_crc=" << calculated_crc
1347 			       << dendl;
1348 	      }
1349 	    }
1350 	    late_flags = epilogue.late_flags;
1351 	  }
1352 	
1353 	  // we do have a mechanism that allows transmitter to start sending message
1354 	  // and abort after putting entire data field on wire. This will be used by
1355 	  // the kernel client to avoid unnecessary buffering.
1356 	  if (late_flags & FRAME_FLAGS_LATEABRT) {
1357 	    reset_throttle();
1358 	    state = READY;
1359 	    return CONTINUE(read_frame);
1360 	  } else {
1361 	    return handle_read_frame_dispatch();
1362 	  }
1363 	}
1364 	
1365 	CtPtr ProtocolV2::handle_message() {
1366 	  ldout(cct, 20) << __func__ << dendl;
1367 	  ceph_assert(state == THROTTLE_DONE);
1368 	
1369 	#if defined(WITH_EVENTTRACE)
1370 	  utime_t ltt_recv_stamp = ceph_clock_now();
1371 	#endif
1372 	  recv_stamp = ceph_clock_now();
1373 	
1374 	  // we need to get the size before std::moving segments data
1375 	  const size_t cur_msg_size = get_current_msg_size();
1376 	  auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1377 	
1378 	  // XXX: paranoid copy just to avoid oops
1379 	  ceph_msg_header2 current_header = msg_frame.header();
1380 	
1381 	  ldout(cct, 5) << __func__
1382 			<< " got " << msg_frame.front_len()
1383 			<< " + " << msg_frame.middle_len()
1384 			<< " + " << msg_frame.data_len()
1385 			<< " byte message."
1386 			<< " envelope type=" << current_header.type
1387 			<< " src " << peer_name
1388 			<< " off " << current_header.data_off
1389 	                << dendl;
1390 	
1391 	  INTERCEPT(16);
1392 	  ceph_msg_header header{current_header.seq,
1393 	                         current_header.tid,
1394 	                         current_header.type,
1395 	                         current_header.priority,
1396 	                         current_header.version,
1397 	                         init_le32(msg_frame.front_len()),
1398 	                         init_le32(msg_frame.middle_len()),
1399 	                         init_le32(msg_frame.data_len()),
1400 	                         current_header.data_off,
1401 	                         peer_name,
1402 	                         current_header.compat_version,
1403 	                         current_header.reserved,
1404 	                         init_le32(0)};
1405 	  ceph_msg_footer footer{init_le32(0), init_le32(0),
1406 		                 init_le32(0), init_le64(0), current_header.flags};
1407 	
1408 	  Message *message = decode_message(cct, 0, header, footer,
1409 	      msg_frame.front(),
1410 	      msg_frame.middle(),
1411 	      msg_frame.data(),
1412 	      connection);
1413 	  if (!message) {
1414 	    ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1415 	    return _fault();
1416 	  } else {
1417 	    state = READ_MESSAGE_COMPLETE;
1418 	  }
1419 	
1420 	  INTERCEPT(17);
1421 	
1422 	  message->set_byte_throttler(connection->policy.throttler_bytes);
1423 	  message->set_message_throttler(connection->policy.throttler_messages);
1424 	
1425 	  // store reservation size in message, so we don't get confused
1426 	  // by messages entering the dispatch queue through other paths.
1427 	  message->set_dispatch_throttle_size(cur_msg_size);
1428 	
1429 	  message->set_recv_stamp(recv_stamp);
1430 	  message->set_throttle_stamp(throttle_stamp);
1431 	  message->set_recv_complete_stamp(ceph_clock_now());
1432 	
1433 	  // check received seq#.  if it is old, drop the message.
1434 	  // note that incoming messages may skip ahead.  this is convenient for the
1435 	  // client side queueing because messages can't be renumbered, but the (kernel)
1436 	  // client will occasionally pull a message out of the sent queue to send
1437 	  // elsewhere.  in that case it doesn't matter if we "got" it or not.
1438 	  uint64_t cur_seq = in_seq;
1439 	  if (message->get_seq() <= cur_seq) {
1440 	    ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1441 	                  << " <= " << cur_seq << " " << message << " " << *message
1442 	                  << ", discarding" << dendl;
1443 	    message->put();
1444 	    if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1445 	        cct->_conf->ms_die_on_old_message) {
1446 	      ceph_assert(0 == "old msgs despite reconnect_seq feature");
1447 	    }
1448 	    return nullptr;
1449 	  }
1450 	  if (message->get_seq() > cur_seq + 1) {
1451 	    ldout(cct, 0) << __func__ << " missed message?  skipped from seq "
1452 	                  << cur_seq << " to " << message->get_seq() << dendl;
1453 	    if (cct->_conf->ms_die_on_skipped_message) {
1454 	      ceph_assert(0 == "skipped incoming seq");
1455 	    }
1456 	  }
1457 	
1458 	#if defined(WITH_EVENTTRACE)
1459 	  if (message->get_type() == CEPH_MSG_OSD_OP ||
1460 	      message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1461 	    utime_t ltt_processed_stamp = ceph_clock_now();
1462 	    double usecs_elapsed =
1463 	        (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1464 	    ostringstream buf;
1465 	    if (message->get_type() == CEPH_MSG_OSD_OP)
1466 	      OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1467 	                           false);
1468 	    else
1469 	      OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1470 	                           false);
1471 	  }
1472 	#endif
1473 	
1474 	  // note last received message.
1475 	  in_seq = message->get_seq();
1476 	  ldout(cct, 5) << __func__ << " received message m=" << message
1477 	                << " seq=" << message->get_seq()
1478 	                << " from=" << message->get_source() << " type=" << header.type
1479 	                << " " << *message << dendl;
1480 	
1481 	  bool need_dispatch_writer = false;
1482 	  if (!connection->policy.lossy) {
1483 	    ack_left++;
1484 	    need_dispatch_writer = true;
1485 	  }
1486 	
1487 	  state = READY;
1488 	
1489 	  ceph::mono_time fast_dispatch_time;
1490 	
1491 	  if (connection->is_blackhole()) {
1492 	    ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1493 	    message->put();
1494 	    goto out;
1495 	  }
1496 	
1497 	  connection->logger->inc(l_msgr_recv_messages);
1498 	  connection->logger->inc(
1499 	      l_msgr_recv_bytes,
1500 	      cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1501 	
1502 	  messenger->ms_fast_preprocess(message);
1503 	  fast_dispatch_time = ceph::mono_clock::now();
1504 	  connection->logger->tinc(l_msgr_running_recv_time,
1505 				   fast_dispatch_time - connection->recv_start_time);
1506 	  if (connection->delay_state) {
1507 	    double delay_period = 0;
1508 	    if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1509 	      delay_period =
1510 	          cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1511 	      ldout(cct, 1) << "queue_received will delay after "
1512 	                    << (ceph_clock_now() + delay_period) << " on " << message
1513 	                    << " " << *message << dendl;
1514 	    }
1515 	    connection->delay_state->queue(delay_period, message);
1516 	  } else if (messenger->ms_can_fast_dispatch(message)) {
1517 	    connection->lock.unlock();
1518 	    connection->dispatch_queue->fast_dispatch(message);
1519 	    connection->recv_start_time = ceph::mono_clock::now();
1520 	    connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1521 	                             connection->recv_start_time - fast_dispatch_time);
1522 	    connection->lock.lock();
1523 	    // we might have been reused by another connection
1524 	    // let's check if that is the case
1525 	    if (state != READY) {
1526 	      // yes, that was the case, let's do nothing
1527 	      return nullptr;
1528 	    }
1529 	  } else {
1530 	    connection->dispatch_queue->enqueue(message, message->get_priority(),
1531 	                                        connection->conn_id);
1532 	  }
1533 	
1534 	  handle_message_ack(current_header.ack_seq);
1535 	
1536 	 out:
1537 	  if (need_dispatch_writer && connection->is_connected()) {
1538 	    connection->center->dispatch_event_external(connection->write_handler);
1539 	  }
1540 	
1541 	  return CONTINUE(read_frame);
1542 	}
1543 	
1544 	
1545 	CtPtr ProtocolV2::throttle_message() {
1546 	  ldout(cct, 20) << __func__ << dendl;
1547 	
1548 	  if (connection->policy.throttler_messages) {
1549 	    ldout(cct, 10) << __func__ << " wants " << 1
1550 	                   << " message from policy throttler "
1551 	                   << connection->policy.throttler_messages->get_current()
1552 	                   << "/" << connection->policy.throttler_messages->get_max()
1553 	                   << dendl;
1554 	    if (!connection->policy.throttler_messages->get_or_fail()) {
1555 	      ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1556 	                     << connection->policy.throttler_messages->get_current()
1557 	                     << "/" << connection->policy.throttler_messages->get_max()
1558 	                     << " failed, just wait." << dendl;
1559 	      // following thread pool deal with th full message queue isn't a
1560 	      // short time, so we can wait a ms.
1561 	      if (connection->register_time_events.empty()) {
1562 	        connection->register_time_events.insert(
1563 	            connection->center->create_time_event(1000,
1564 	                                                  connection->wakeup_handler));
1565 	      }
1566 	      return nullptr;
1567 	    }
1568 	  }
1569 	
1570 	  state = THROTTLE_BYTES;
1571 	  return CONTINUE(throttle_bytes);
1572 	}
1573 	
1574 	CtPtr ProtocolV2::throttle_bytes() {
1575 	  ldout(cct, 20) << __func__ << dendl;
1576 	
1577 	  const size_t cur_msg_size = get_current_msg_size();
1578 	  if (cur_msg_size) {
1579 	    if (connection->policy.throttler_bytes) {
1580 	      ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1581 	                     << " bytes from policy throttler "
1582 	                     << connection->policy.throttler_bytes->get_current() << "/"
1583 	                     << connection->policy.throttler_bytes->get_max() << dendl;
1584 	      if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1585 	        ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1586 	                       << " bytes from policy throttler "
1587 	                       << connection->policy.throttler_bytes->get_current()
1588 	                       << "/" << connection->policy.throttler_bytes->get_max()
1589 	                       << " failed, just wait." << dendl;
1590 	        // following thread pool deal with th full message queue isn't a
1591 	        // short time, so we can wait a ms.
1592 	        if (connection->register_time_events.empty()) {
1593 	          connection->register_time_events.insert(
1594 	              connection->center->create_time_event(
1595 	                  1000, connection->wakeup_handler));
1596 	        }
1597 	        return nullptr;
1598 	      }
1599 	    }
1600 	  }
1601 	
1602 	  state = THROTTLE_DISPATCH_QUEUE;
1603 	  return CONTINUE(throttle_dispatch_queue);
1604 	}
1605 	
1606 	CtPtr ProtocolV2::throttle_dispatch_queue() {
1607 	  ldout(cct, 20) << __func__ << dendl;
1608 	
1609 	  const size_t cur_msg_size = get_current_msg_size();
1610 	  if (cur_msg_size) {
1611 	    if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1612 	            cur_msg_size)) {
1613 	      ldout(cct, 10)
1614 	          << __func__ << " wants " << cur_msg_size
1615 	          << " bytes from dispatch throttle "
1616 	          << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1617 	          << connection->dispatch_queue->dispatch_throttler.get_max()
1618 	          << " failed, just wait." << dendl;
1619 	      // following thread pool deal with th full message queue isn't a
1620 	      // short time, so we can wait a ms.
1621 	      if (connection->register_time_events.empty()) {
1622 	        connection->register_time_events.insert(
1623 	            connection->center->create_time_event(1000,
1624 	                                                  connection->wakeup_handler));
1625 	      }
1626 	      return nullptr;
1627 	    }
1628 	  }
1629 	
1630 	  throttle_stamp = ceph_clock_now();
1631 	  state = THROTTLE_DONE;
1632 	
1633 	  return read_frame_segment();
1634 	}
1635 	
1636 	CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1637 	{
1638 	  ldout(cct, 20) << __func__
1639 			 << " payload.length()=" << payload.length() << dendl;
1640 	
1641 	  if (state != READY) {
1642 	    lderr(cct) << __func__ << " not in ready state!" << dendl;
1643 	    return _fault();
1644 	  }
1645 	
1646 	  auto keepalive_frame = KeepAliveFrame::Decode(payload);
1647 	
1648 	  ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1649 	
1650 	  connection->write_lock.lock();
1651 	  append_keepalive_ack(keepalive_frame.timestamp());
1652 	  connection->write_lock.unlock();
1653 	
1654 	  ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1655 	                 << keepalive_frame.timestamp() << dendl;
1656 	  connection->set_last_keepalive(ceph_clock_now());
1657 	
1658 	  if (is_connected()) {
1659 	    connection->center->dispatch_event_external(connection->write_handler);
1660 	  }
1661 	
1662 	  return CONTINUE(read_frame);
1663 	}
1664 	
1665 	CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1666 	{
1667 	  ldout(cct, 20) << __func__
1668 			 << " payload.length()=" << payload.length() << dendl;
1669 	
1670 	  if (state != READY) {
1671 	    lderr(cct) << __func__ << " not in ready state!" << dendl;
1672 	    return _fault();
1673 	  }
1674 	
1675 	  auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1676 	  connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1677 	  ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1678 	
1679 	  return CONTINUE(read_frame);
1680 	}
1681 	
1682 	CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1683 	{
1684 	  ldout(cct, 20) << __func__
1685 			 << " payload.length()=" << payload.length() << dendl;
1686 	
1687 	  if (state != READY) {
1688 	    lderr(cct) << __func__ << " not in ready state!" << dendl;
1689 	    return _fault();
1690 	  }
1691 	
1692 	  auto ack = AckFrame::Decode(payload);
1693 	  handle_message_ack(ack.seq());
1694 	  return CONTINUE(read_frame);
1695 	}
1696 	
1697 	/* Client Protocol Methods */
1698 	
1699 	CtPtr ProtocolV2::start_client_banner_exchange() {
1700 	  ldout(cct, 20) << __func__ << dendl;
1701 	
1702 	  INTERCEPT(1);
1703 	
1704 	  state = BANNER_CONNECTING;
1705 	
1706 	  global_seq = messenger->get_global_seq();
1707 	
1708 	  return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1709 	}
1710 	
1711 	CtPtr ProtocolV2::post_client_banner_exchange() {
1712 	  ldout(cct, 20) << __func__ << dendl;
1713 	
1714 	  state = AUTH_CONNECTING;
1715 	
1716 	  return send_auth_request();
1717 	}
1718 	
1719 	CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
1720 	  ceph_assert(messenger->auth_client);
1721 	  ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1722 			 << " auth_client " << messenger->auth_client << dendl;
1723 	
1724 	  bufferlist bl;
1725 	  vector<uint32_t> preferred_modes;
1726 	  auto am = auth_meta;
1727 	  connection->lock.unlock();
1728 	  int r = messenger->auth_client->get_auth_request(
1729 	    connection, am.get(),
1730 	    &am->auth_method, &preferred_modes, &bl);
1731 	  connection->lock.lock();
1732 	  if (state != AUTH_CONNECTING) {
1733 	    ldout(cct, 1) << __func__ << " state changed!" << dendl;
1734 	    return _fault();
1735 	  }
1736 	  if (r < 0) {
1737 	    ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1738 			  << dendl;
1739 	    stop();
1740 	    connection->dispatch_queue->queue_reset(connection);
1741 	    return nullptr;
1742 	  }
1743 	
1744 	  INTERCEPT(9);
1745 	
1746 	  auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1747 	                                        bl);
1748 	  return WRITE(frame, "auth request", read_frame);
1749 	}
1750 	
1751 	CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1752 	  ldout(cct, 20) << __func__
1753 			 << " payload.length()=" << payload.length() << dendl;
1754 	
1755 	  if (state != AUTH_CONNECTING) {
1756 	    lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1757 	    return _fault();
1758 	  }
1759 	
1760 	  auto bad_method = AuthBadMethodFrame::Decode(payload);
1761 	  ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1762 			<< " result " << cpp_strerror(bad_method.result())
1763 	                << ", allowed methods=" << bad_method.allowed_methods()
1764 			<< ", allowed modes=" << bad_method.allowed_modes()
1765 	                << dendl;
1766 	  ceph_assert(messenger->auth_client);
1767 	  auto am = auth_meta;
1768 	  connection->lock.unlock();
1769 	  int r = messenger->auth_client->handle_auth_bad_method(
1770 	    connection,
1771 	    am.get(),
1772 	    bad_method.method(), bad_method.result(),
1773 	    bad_method.allowed_methods(),
1774 	    bad_method.allowed_modes());
1775 	  connection->lock.lock();
1776 	  if (state != AUTH_CONNECTING || r < 0) {
1777 	    return _fault();
1778 	  }
1779 	  return send_auth_request(bad_method.allowed_methods());
1780 	}
1781 	
1782 	CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1783 	{
1784 	  ldout(cct, 20) << __func__
1785 			 << " payload.length()=" << payload.length() << dendl;
1786 	
1787 	  if (state != AUTH_CONNECTING) {
1788 	    lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1789 	    return _fault();
1790 	  }
1791 	
1792 	  auto auth_more = AuthReplyMoreFrame::Decode(payload);
1793 	  ldout(cct, 5) << __func__
1794 	                << " auth reply more len=" << auth_more.auth_payload().length()
1795 	                << dendl;
1796 	  ceph_assert(messenger->auth_client);
1797 	  ceph::bufferlist reply;
1798 	  auto am = auth_meta;
1799 	  connection->lock.unlock();
1800 	  int r = messenger->auth_client->handle_auth_reply_more(
1801 	    connection, am.get(), auth_more.auth_payload(), &reply);
1802 	  connection->lock.lock();
1803 	  if (state != AUTH_CONNECTING) {
1804 	    ldout(cct, 1) << __func__ << " state changed!" << dendl;
1805 	    return _fault();
1806 	  }
1807 	  if (r < 0) {
1808 	    lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1809 		       << r << dendl;
1810 	    return _fault();
1811 	  }
1812 	  auto more_reply = AuthRequestMoreFrame::Encode(reply);
1813 	  return WRITE(more_reply, "auth request more", read_frame);
1814 	}
1815 	
1816 	CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1817 	{
1818 	  ldout(cct, 20) << __func__
1819 			 << " payload.length()=" << payload.length() << dendl;
1820 	
1821 	  if (state != AUTH_CONNECTING) {
1822 	    lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1823 	    return _fault();
1824 	  }
1825 	
1826 	  auto auth_done = AuthDoneFrame::Decode(payload);
1827 	
1828 	  ceph_assert(messenger->auth_client);
1829 	  auto am = auth_meta;
1830 	  connection->lock.unlock();
1831 	  int r = messenger->auth_client->handle_auth_done(
1832 	    connection,
1833 	    am.get(),
1834 	    auth_done.global_id(),
1835 	    auth_done.con_mode(),
1836 	    auth_done.auth_payload(),
1837 	    &am->session_key,
1838 	    &am->connection_secret);
1839 	  connection->lock.lock();
1840 	  if (state != AUTH_CONNECTING) {
1841 	    ldout(cct, 1) << __func__ << " state changed!" << dendl;
1842 	    return _fault();
1843 	  }
1844 	  if (r < 0) {
1845 	    return _fault();
1846 	  }
1847 	  auth_meta->con_mode = auth_done.con_mode();
1848 	  session_stream_handlers = \
1849 	    ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1850 	
1851 	  state = AUTH_CONNECTING_SIGN;
1852 	
1853 	  const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1854 	    auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1855 	  auto sig_frame = AuthSignatureFrame::Encode(sig);
1856 	  pre_auth.enabled = false;
1857 	  pre_auth.rxbuf.clear();
1858 	  return WRITE(sig_frame, "auth signature", read_frame);
1859 	}
1860 	
1861 	CtPtr ProtocolV2::finish_client_auth() {
1862 	  if (!server_cookie) {
1863 	    ceph_assert(connect_seq == 0);
1864 	    state = SESSION_CONNECTING;
1865 	    return send_client_ident();
1866 	  } else {  // reconnecting to previous session
1867 	    state = SESSION_RECONNECTING;
1868 	    ceph_assert(connect_seq > 0);
1869 	    return send_reconnect();
1870 	  }
1871 	}
1872 	
1873 	CtPtr ProtocolV2::send_client_ident() {
1874 	  ldout(cct, 20) << __func__ << dendl;
1875 	
1876 	  if (!connection->policy.lossy && !client_cookie) {
1877 	    client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1878 	  }
1879 	
1880 	  uint64_t flags = 0;
1881 	  if (connection->policy.lossy) {
1882 	    flags |= CEPH_MSG_CONNECT_LOSSY;
1883 	  }
1884 	
1885 	  auto client_ident = ClientIdentFrame::Encode(
1886 	      messenger->get_myaddrs(),
1887 	      connection->target_addr,
1888 	      messenger->get_myname().num(),
1889 	      global_seq,
1890 	      connection->policy.features_supported,
1891 	      connection->policy.features_required | msgr2_required,
1892 	      flags,
1893 	      client_cookie);
1894 	
1895 	  ldout(cct, 5) << __func__ << " sending identification: "
1896 	                << "addrs=" << messenger->get_myaddrs()
1897 	                << " target=" << connection->target_addr
1898 	                << " gid=" << messenger->get_myname().num()
1899 	                << " global_seq=" << global_seq
1900 	                << " features_supported=" << std::hex
1901 	                << connection->policy.features_supported
1902 	                << " features_required="
1903 			            << (connection->policy.features_required | msgr2_required)
1904 	                << " flags=" << flags
1905 	                << " cookie=" << client_cookie << std::dec << dendl;
1906 	
1907 	  INTERCEPT(11);
1908 	
1909 	  return WRITE(client_ident, "client ident", read_frame);
1910 	}
1911 	
1912 	CtPtr ProtocolV2::send_reconnect() {
1913 	  ldout(cct, 20) << __func__ << dendl;
1914 	
1915 	  auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1916 	                                          client_cookie,
1917 	                                          server_cookie,
1918 	                                          global_seq,
1919 	                                          connect_seq,
1920 	                                          in_seq);
1921 	
1922 	  ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1923 	                << std::hex << client_cookie << " server_cookie="
1924 	                << server_cookie << std::dec
1925 	                << " gs=" << global_seq << " cs=" << connect_seq
1926 	                << " ms=" << in_seq << dendl;
1927 	
1928 	  INTERCEPT(13);
1929 	
1930 	  return WRITE(reconnect, "reconnect", read_frame);
1931 	}
1932 	
1933 	CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1934 	{
1935 	  ldout(cct, 20) << __func__
1936 			 << " payload.length()=" << payload.length() << dendl;
1937 	
1938 	  if (state != SESSION_CONNECTING) {
1939 	    lderr(cct) << __func__ << " not in session connect state!" << dendl;
1940 	    return _fault();
1941 	  }
1942 	
1943 	  auto ident_missing =
1944 	      IdentMissingFeaturesFrame::Decode(payload);
1945 	  lderr(cct) << __func__
1946 	             << " client does not support all server features: " << std::hex
1947 	             << ident_missing.features() << std::dec << dendl;
1948 	
1949 	  return _fault();
1950 	}
1951 	
1952 	CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1953 	{
1954 	  ldout(cct, 20) << __func__
1955 			 << " payload.length()=" << payload.length() << dendl;
1956 	
1957 	  if (state != SESSION_RECONNECTING) {
1958 	    lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1959 	    return _fault();
1960 	  }
1961 	
1962 	  auto reset = ResetFrame::Decode(payload);
1963 	
1964 	  ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1965 	                << dendl;
1966 	  if (reset.full()) {
1967 	    reset_session();
1968 	  } else {
1969 	    server_cookie = 0;
1970 	    connect_seq = 0;
1971 	    in_seq = 0;
1972 	  }
1973 	
1974 	  state = SESSION_CONNECTING;
1975 	  return send_client_ident();
1976 	}
1977 	
1978 	CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1979 	{
1980 	  ldout(cct, 20) << __func__
1981 			 << " payload.length()=" << payload.length() << dendl;
1982 	
1983 	  if (state != SESSION_RECONNECTING) {
1984 	    lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1985 	    return _fault();
1986 	  }
1987 	
1988 	  auto retry = RetryFrame::Decode(payload);
1989 	  connect_seq = retry.connect_seq() + 1;
1990 	
1991 	  ldout(cct, 1) << __func__
1992 	                << " received session retry connect_seq=" << retry.connect_seq()
1993 	                << ", inc to cs=" << connect_seq << dendl;
1994 	
1995 	  return send_reconnect();
1996 	}
1997 	
1998 	CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
1999 	{
2000 	  ldout(cct, 20) << __func__
2001 			 << " payload.length()=" << payload.length() << dendl;
2002 	
2003 	  if (state != SESSION_RECONNECTING) {
2004 	    lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2005 	    return _fault();
2006 	  }
2007 	
2008 	  auto retry = RetryGlobalFrame::Decode(payload);
2009 	  global_seq = messenger->get_global_seq(retry.global_seq());
2010 	
2011 	  ldout(cct, 1) << __func__ << " received session retry global global_seq="
2012 	                << retry.global_seq() << ", choose new gs=" << global_seq
2013 	                << dendl;
2014 	
2015 	  return send_reconnect();
2016 	}
2017 	
2018 	CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2019 	  ldout(cct, 20) << __func__
2020 			 << " received WAIT (connection race)"
2021 			 << " payload.length()=" << payload.length()
2022 			 << dendl;
2023 	
2024 	  if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2025 	    lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2026 	    return _fault();
2027 	  }
2028 	
2029 	  state = WAIT;
2030 	  WaitFrame::Decode(payload);
2031 	  return _fault();
2032 	}
2033 	
2034 	CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2035 	{
2036 	  ldout(cct, 20) << __func__
2037 			 << " payload.length()=" << payload.length() << dendl;
2038 	
2039 	  if (state != SESSION_RECONNECTING) {
2040 	    lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2041 	    return _fault();
2042 	  }
2043 	
2044 	  auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2045 	  ldout(cct, 5) << __func__
2046 	                << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2047 	                << dendl;
2048 	
2049 	  out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2050 	
2051 	  backoff = utime_t();
2052 	  ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2053 	                 << ", lossy = " << connection->policy.lossy << ", features "
2054 	                 << connection->get_features() << dendl;
2055 	
2056 	  if (connection->delay_state) {
2057 	    ceph_assert(connection->delay_state->ready());
2058 	  }
2059 	
2060 	  connection->dispatch_queue->queue_connect(connection);
2061 	  messenger->ms_deliver_handle_fast_connect(connection);
2062 	
2063 	  return ready();
2064 	}
2065 	
2066 	CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2067 	{
2068 	  ldout(cct, 20) << __func__
2069 			 << " payload.length()=" << payload.length() << dendl;
2070 	
2071 	  if (state != SESSION_CONNECTING) {
2072 	    lderr(cct) << __func__ << " not in session connect state!" << dendl;
2073 	    return _fault();
2074 	  }
2075 	
2076 	  auto server_ident = ServerIdentFrame::Decode(payload);
2077 	  ldout(cct, 5) << __func__ << " received server identification:"
2078 	                << " addrs=" << server_ident.addrs()
2079 	                << " gid=" << server_ident.gid()
2080 	                << " global_seq=" << server_ident.global_seq()
2081 	                << " features_supported=" << std::hex
2082 	                << server_ident.supported_features()
2083 	                << " features_required=" << server_ident.required_features()
2084 	                << " flags=" << server_ident.flags() << " cookie=" << std::dec
2085 	                << server_ident.cookie() << dendl;
2086 	
2087 	  // is this who we intended to talk to?
2088 	  // be a bit forgiving here, since we may be connecting based on addresses parsed out
2089 	  // of mon_host or something.
2090 	  if (!server_ident.addrs().contains(connection->target_addr)) {
2091 	    ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2092 			 << ", does not include " << connection->target_addr << dendl;
2093 	    return _fault();
2094 	  }
2095 	
2096 	  server_cookie = server_ident.cookie();
2097 	
2098 	  connection->set_peer_addrs(server_ident.addrs());
2099 	  peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2100 	  connection->set_features(server_ident.supported_features() &
2101 	                           connection->policy.features_supported);
2102 	  peer_global_seq = server_ident.global_seq();
2103 	
2104 	  connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2105 	
2106 	  backoff = utime_t();
2107 	  ldout(cct, 10) << __func__ << " connect success " << connect_seq
2108 	                 << ", lossy = " << connection->policy.lossy << ", features "
2109 	                 << connection->get_features() << dendl;
2110 	
2111 	  if (connection->delay_state) {
2112 	    ceph_assert(connection->delay_state->ready());
2113 	  }
2114 	
2115 	  connection->dispatch_queue->queue_connect(connection);
2116 	  messenger->ms_deliver_handle_fast_connect(connection);
2117 	
2118 	  return ready();
2119 	}
2120 	
2121 	/* Server Protocol Methods */
2122 	
2123 	CtPtr ProtocolV2::start_server_banner_exchange() {
2124 	  ldout(cct, 20) << __func__ << dendl;
2125 	
2126 	  INTERCEPT(2);
2127 	
2128 	  state = BANNER_ACCEPTING;
2129 	
2130 	  return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2131 	}
2132 	
2133 	CtPtr ProtocolV2::post_server_banner_exchange() {
2134 	  ldout(cct, 20) << __func__ << dendl;
2135 	
2136 	  state = AUTH_ACCEPTING;
2137 	
2138 	  return CONTINUE(read_frame);
2139 	}
2140 	
2141 	CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2142 	  ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2143 	                 << dendl;
2144 	
2145 	  if (state != AUTH_ACCEPTING) {
2146 	    lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2147 	    return _fault();
2148 	  }
2149 	
2150 	  auto request = AuthRequestFrame::Decode(payload);
2151 	  ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2152 			 << ", preferred_modes=" << request.preferred_modes()
2153 	                 << ", payload_len=" << request.auth_payload().length() << ")"
2154 	                 << dendl;
2155 	  auth_meta->auth_method = request.method();
2156 	  auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2157 	    connection->get_peer_type(), auth_meta->auth_method,
2158 	    request.preferred_modes());
2159 	  if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2160 	    return _auth_bad_method(-EOPNOTSUPP);
2161 	  }
2162 	  return _handle_auth_request(request.auth_payload(), false);
2163 	}
2164 	
2165 	CtPtr ProtocolV2::_auth_bad_method(int r)
2166 	{
2167 	  ceph_assert(r < 0);
2168 	  std::vector<uint32_t> allowed_methods;
2169 	  std::vector<uint32_t> allowed_modes;
2170 	  messenger->auth_server->get_supported_auth_methods(
2171 	    connection->get_peer_type(), &allowed_methods, &allowed_modes);
2172 	  ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2173 			<< " r " << cpp_strerror(r)
2174 			<< ", allowed_methods " << allowed_methods
2175 			<< ", allowed_modes " << allowed_modes
2176 			<< dendl;
2177 	  auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2178 	                                               allowed_methods, allowed_modes);
2179 	  return WRITE(bad_method, "bad auth method", read_frame);
2180 	}
2181 	
2182 	CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2183 	{
2184 	  if (!messenger->auth_server) {
2185 	    return _fault();
2186 	  }
2187 	  bufferlist reply;
2188 	  auto am = auth_meta;
2189 	  connection->lock.unlock();
2190 	  int r = messenger->auth_server->handle_auth_request(
2191 	    connection, am.get(),
2192 	    more, am->auth_method, auth_payload,
2193 	    &reply);
2194 	  connection->lock.lock();
2195 	  if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2196 	    ldout(cct, 1) << __func__
2197 	                  << " state changed while accept, it must be mark_down"
2198 	                  << dendl;
2199 	    ceph_assert(state == CLOSED);
2200 	    return _fault();
2201 	  }
2202 	  if (r == 1) {
2203 	    INTERCEPT(10);
2204 	    state = AUTH_ACCEPTING_SIGN;
2205 	
2206 	    auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2207 	                                           auth_meta->con_mode,
2208 	                                           reply);
2209 	    return WRITE(auth_done, "auth done", finish_auth);
2210 	  } else if (r == 0) {
2211 	    state = AUTH_ACCEPTING_MORE;
2212 	
2213 	    auto more = AuthReplyMoreFrame::Encode(reply);
2214 	    return WRITE(more, "auth reply more", read_frame);
2215 	  } else if (r == -EBUSY) {
2216 	    // kick the client and maybe they'll come back later
2217 	    return _fault();
2218 	  } else {
2219 	    return _auth_bad_method(r);
2220 	  }
2221 	}
2222 	
2223 	CtPtr ProtocolV2::finish_auth()
2224 	{
2225 	  ceph_assert(auth_meta);
2226 	  // TODO: having a possibility to check whether we're server or client could
2227 	  // allow reusing finish_auth().
2228 	  session_stream_handlers = \
2229 	    ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2230 	
2231 	  const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2232 	    auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2233 	  auto sig_frame = AuthSignatureFrame::Encode(sig);
2234 	  pre_auth.enabled = false;
2235 	  pre_auth.rxbuf.clear();
2236 	  return WRITE(sig_frame, "auth signature", read_frame);
2237 	}
2238 	
2239 	CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2240 	{
2241 	  ldout(cct, 20) << __func__
2242 			 << " payload.length()=" << payload.length() << dendl;
2243 	
2244 	  if (state != AUTH_ACCEPTING_MORE) {
2245 	    lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2246 	    return _fault();
2247 	  }
2248 	
2249 	  auto auth_more = AuthRequestMoreFrame::Decode(payload);
2250 	  return _handle_auth_request(auth_more.auth_payload(), true);
2251 	}
2252 	
2253 	CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2254 	{
2255 	  ldout(cct, 20) << __func__
2256 			 << " payload.length()=" << payload.length() << dendl;
2257 	
2258 	  if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2259 	    lderr(cct) << __func__
2260 	               << " pre-auth verification signature seen in wrong state!"
2261 	               << dendl;
2262 	    return _fault();
2263 	  }
2264 	
2265 	  auto sig_frame = AuthSignatureFrame::Decode(payload);
2266 	
2267 	  const auto actual_tx_sig = auth_meta->session_key.empty() ?
2268 	    sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2269 	  if (sig_frame.signature() != actual_tx_sig) {
2270 	    ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2271 	                  << " actual_tx_sig=" << actual_tx_sig
2272 	                  << " sig_frame.signature()=" << sig_frame.signature()
2273 	                  << dendl;
2274 	    return _fault();
2275 	  } else {
2276 	    ldout(cct, 20) << __func__ << " pre-auth signature success"
2277 	                   << " sig_frame.signature()=" << sig_frame.signature()
2278 	                   << dendl;
2279 	    pre_auth.txbuf.clear();
2280 	  }
2281 	
2282 	  if (state == AUTH_ACCEPTING_SIGN) {
2283 	    // server had sent AuthDone and client responded with correct pre-auth
2284 	    // signature. we can start accepting new sessions/reconnects.
2285 	    state = SESSION_ACCEPTING;
2286 	    return CONTINUE(read_frame);
2287 	  } else if (state == AUTH_CONNECTING_SIGN) {
2288 	    // this happened at client side
2289 	    return finish_client_auth();
2290 	  } else {
2291 	    ceph_abort("state corruption");
2292 	  }
2293 	}
2294 	
2295 	CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2296 	{
2297 	  ldout(cct, 20) << __func__
2298 			 << " payload.length()=" << payload.length() << dendl;
2299 	
2300 	  if (state != SESSION_ACCEPTING) {
2301 	    lderr(cct) << __func__ << " not in session accept state!" << dendl;
2302 	    return _fault();
2303 	  }
2304 	
2305 	  auto client_ident = ClientIdentFrame::Decode(payload);
2306 	
2307 	  ldout(cct, 5) << __func__ << " received client identification:"
2308 	                << " addrs=" << client_ident.addrs()
2309 			            << " target=" << client_ident.target_addr()
2310 	                << " gid=" << client_ident.gid()
2311 	                << " global_seq=" << client_ident.global_seq()
2312 	                << " features_supported=" << std::hex
2313 	                << client_ident.supported_features()
2314 	                << " features_required=" << client_ident.required_features()
2315 	                << " flags=" << client_ident.flags()
2316 	                << " cookie=" << client_ident.cookie() << std::dec << dendl;
2317 	
2318 	  if (client_ident.addrs().empty() ||
2319 	      client_ident.addrs().front() == entity_addr_t()) {
2320 	    ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2321 	    return _fault();  // a v2 peer should never do this
2322 	  }
2323 	  if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2324 	    ldout(cct,5) << __func__ << " peer is trying to reach "
2325 			 << client_ident.target_addr()
2326 			 << " which is not us (" << messenger->get_myaddrs() << ")"
2327 			 << dendl;
2328 	    return _fault();
2329 	  }
2330 	
2331 	  connection->set_peer_addrs(client_ident.addrs());
2332 	  connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2333 	
2334 	  peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2335 	  connection->set_peer_id(client_ident.gid());
2336 	
2337 	  client_cookie = client_ident.cookie();
2338 	
2339 	  uint64_t feat_missing =
2340 	    (connection->policy.features_required | msgr2_required) &
2341 	    ~(uint64_t)client_ident.supported_features();
2342 	  if (feat_missing) {
2343 	    ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2344 	                  << feat_missing << std::dec << dendl;
2345 	    auto ident_missing_features =
2346 	        IdentMissingFeaturesFrame::Encode(feat_missing);
2347 	
2348 	    return WRITE(ident_missing_features, "ident missing features", read_frame);
2349 	  }
2350 	
2351 	  connection_features =
2352 	      client_ident.supported_features() & connection->policy.features_supported;
2353 	
2354 	  peer_global_seq = client_ident.global_seq();
2355 	
2356 	  if (connection->policy.server &&
2357 	      connection->policy.lossy) {
2358 	    // incoming lossy client, no need to register this connection
2359 	  } else {
2360 	    // Looks good so far, let's check if there is already an existing connection
2361 	    // to this peer.
2362 	    connection->lock.unlock();
2363 	    AsyncConnectionRef existing = messenger->lookup_conn(
2364 	      *connection->peer_addrs);
2365 	
2366 	    if (existing &&
2367 		existing->protocol->proto_type != 2) {
2368 	      ldout(cct,1) << __func__ << " existing " << existing << " proto "
2369 			   << existing->protocol.get() << " version is "
2370 			   << existing->protocol->proto_type << ", marking down"
2371 			   << dendl;
2372 	      existing->mark_down();
2373 	      existing = nullptr;
2374 	    }
2375 	
2376 	    connection->inject_delay();
2377 	
2378 	    connection->lock.lock();
2379 	    if (state != SESSION_ACCEPTING) {
2380 	      ldout(cct, 1) << __func__
2381 			    << " state changed while accept, it must be mark_down"
2382 			    << dendl;
2383 	      ceph_assert(state == CLOSED);
2384 	      return _fault();
2385 	    }
2386 	
2387 	    if (existing) {
2388 	      return handle_existing_connection(existing);
2389 	    }
2390 	  }
2391 	
2392 	  // if everything is OK reply with server identification
2393 	  return send_server_ident();
2394 	}
2395 	
2396 	CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2397 	{
2398 	  ldout(cct, 20) << __func__
2399 			 << " payload.length()=" << payload.length() << dendl;
2400 	
2401 	  if (state != SESSION_ACCEPTING) {
2402 	    lderr(cct) << __func__ << " not in session accept state!" << dendl;
2403 	    return _fault();
2404 	  }
2405 	
2406 	  auto reconnect = ReconnectFrame::Decode(payload);
2407 	
2408 	  ldout(cct, 5) << __func__
2409 	                << " received reconnect:" 
2410 	                << " client_cookie=" << std::hex << reconnect.client_cookie()
2411 	                << " server_cookie=" << reconnect.server_cookie() << std::dec
2412 	                << " gs=" << reconnect.global_seq()
2413 	                << " cs=" << reconnect.connect_seq()
2414 	                << " ms=" << reconnect.msg_seq()
2415 			            << dendl;
2416 	
2417 	  // Should we check if one of the ident.addrs match connection->target_addr
2418 	  // as we do in ProtocolV1?
2419 	  connection->set_peer_addrs(reconnect.addrs());
2420 	  connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2421 	  peer_global_seq = reconnect.global_seq();
2422 	
2423 	  connection->lock.unlock();
2424 	  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2425 	
2426 	  if (existing &&
2427 	      existing->protocol->proto_type != 2) {
2428 	    ldout(cct,1) << __func__ << " existing " << existing << " proto "
2429 			 << existing->protocol.get() << " version is "
2430 			 << existing->protocol->proto_type << ", marking down" << dendl;
2431 	    existing->mark_down();
2432 	    existing = nullptr;
2433 	  }
2434 	
2435 	  connection->inject_delay();
2436 	
2437 	  connection->lock.lock();
2438 	  if (state != SESSION_ACCEPTING) {
2439 	    ldout(cct, 1) << __func__
2440 	                  << " state changed while accept, it must be mark_down"
2441 	                  << dendl;
2442 	    ceph_assert(state == CLOSED);
2443 	    return _fault();
2444 	  }
2445 	
2446 	  if (!existing) {
2447 	    // there is no existing connection therefore cannot reconnect to previous
2448 	    // session
2449 	    ldout(cct, 0) << __func__
2450 	                  << " no existing connection exists, reseting client" << dendl;
2451 	    auto reset = ResetFrame::Encode(true);
2452 	    return WRITE(reset, "session reset", read_frame);
2453 	  }
2454 	
2455 	  std::lock_guard<std::mutex> l(existing->lock);
2456 	
2457 	  ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2458 	  if (!exproto) {
2459 	    ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2460 	    ceph_assert(false);
2461 	  }
2462 	
2463 	  if (exproto->state == CLOSED) {
2464 	    ldout(cct, 5) << __func__ << " existing " << existing
2465 	                  << " already closed. Reseting client" << dendl;
2466 	    auto reset = ResetFrame::Encode(true);
2467 	    return WRITE(reset, "session reset", read_frame);
2468 	  }
2469 	
2470 	  if (exproto->replacing) {
2471 	    ldout(cct, 1) << __func__
2472 	                  << " existing racing replace happened while replacing."
2473 	                  << " existing=" << existing << dendl;
2474 	    auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2475 	    return WRITE(retry, "session retry", read_frame);
2476 	  }
2477 	
2478 	  if (exproto->client_cookie != reconnect.client_cookie()) {
2479 	    ldout(cct, 1) << __func__ << " existing=" << existing
2480 	                  << " client cookie mismatch, I must have reseted:"
2481 	                  << " cc=" << std::hex << exproto->client_cookie
2482 	                  << " rcc=" << reconnect.client_cookie()
2483 	                  << ", reseting client." << std::dec
2484 	                  << dendl;
2485 	    auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2486 	    return WRITE(reset, "session reset", read_frame);
2487 	  } else if (exproto->server_cookie == 0) {
2488 	    // this happens when:
2489 	    //   - a connects to b
2490 	    //   - a sends client_ident
2491 	    //   - b gets client_ident, sends server_ident and sets cookie X
2492 	    //   - connection fault
2493 	    //   - b reconnects to a with cookie X, connect_seq=1
2494 	    //   - a has cookie==0
2495 	    ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2496 	                  << " server_ident. Asking peer to resume session"
2497 	                  << " establishment" << dendl;
2498 	    auto reset = ResetFrame::Encode(false);
2499 	    return WRITE(reset, "session reset", read_frame);
2500 	  }
2501 	
2502 	  if (exproto->peer_global_seq > reconnect.global_seq()) {
2503 	    ldout(cct, 5) << __func__
2504 	                  << " stale global_seq: sgs=" << exproto->peer_global_seq
2505 	                  << " cgs=" << reconnect.global_seq()
2506 	                  << ", ask client to retry global" << dendl;
2507 	    auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2508 	
2509 	    INTERCEPT(18);
2510 	
2511 	    return WRITE(retry, "session retry", read_frame);
2512 	  }
2513 	
2514 	  if (exproto->connect_seq > reconnect.connect_seq()) {
2515 	    ldout(cct, 5) << __func__
2516 	                  << " stale connect_seq scs=" << exproto->connect_seq
2517 	                  << " ccs=" << reconnect.connect_seq()
2518 	                  << " , ask client to retry" << dendl;
2519 	    auto retry = RetryFrame::Encode(exproto->connect_seq);
2520 	    return WRITE(retry, "session retry", read_frame);
2521 	  }
2522 	
2523 	  if (exproto->connect_seq == reconnect.connect_seq()) {
2524 	    // reconnect race: both peers are sending reconnect messages
2525 	    if (existing->peer_addrs->msgr2_addr() >
2526 	            messenger->get_myaddrs().msgr2_addr() &&
2527 	        !existing->policy.server) {
2528 	      // the existing connection wins
2529 	      ldout(cct, 1)
2530 	          << __func__
2531 	          << " reconnect race detected, this connection loses to existing="
2532 	          << existing << dendl;
2533 	
2534 	      auto wait = WaitFrame::Encode();
2535 	      return WRITE(wait, "wait", read_frame);
2536 	    } else {
2537 	      // this connection wins
2538 	      ldout(cct, 1) << __func__
2539 	                    << " reconnect race detected, replacing existing="
2540 	                    << existing << " socket by this connection's socket"
2541 	                    << dendl;
2542 	    }
2543 	  }
2544 	
2545 	  ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2546 	
2547 	  reconnecting = true;
2548 	
2549 	  // everything looks good
2550 	  exproto->connect_seq = reconnect.connect_seq();
2551 	  exproto->message_seq = reconnect.msg_seq();
2552 	
2553 	  return reuse_connection(existing, exproto);
2554 	}
2555 	
2556 	CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
2557 	  ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2558 	
2559 	  std::lock_guard<std::mutex> l(existing->lock);
2560 	
2561 	  ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2562 	  if (!exproto) {
2563 	    ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2564 	    ceph_assert(false);
2565 	  }
2566 	
2567 	  if (exproto->state == CLOSED) {
2568 	    ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2569 	                  << dendl;
2570 	    return send_server_ident();
2571 	  }
2572 	
2573 	  if (exproto->replacing) {
2574 	    ldout(cct, 1) << __func__
2575 	                  << " existing racing replace happened while replacing."
2576 	                  << " existing=" << existing << dendl;
2577 	    auto wait = WaitFrame::Encode();
2578 	    return WRITE(wait, "wait", read_frame);
2579 	  }
2580 	
2581 	  if (exproto->peer_global_seq > peer_global_seq) {
2582 	    ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2583 	                  << peer_global_seq
2584 	                  << " existing->peer_global_seq=" << exproto->peer_global_seq
2585 	                  << ", stopping this connection." << dendl;
2586 	    stop();
2587 	    connection->dispatch_queue->queue_reset(connection);
2588 	    return nullptr;
2589 	  }
2590 	
2591 	  if (existing->policy.lossy) {
2592 	    // existing connection can be thrown out in favor of this one
2593 	    ldout(cct, 1)
2594 	        << __func__ << " existing=" << existing
2595 	        << " is a lossy channel. Stopping existing in favor of this connection"
2596 	        << dendl;
2597 	    existing->protocol->stop();
2598 	    existing->dispatch_queue->queue_reset(existing.get());
2599 	    return send_server_ident();
2600 	  }
2601 	
2602 	  if (exproto->server_cookie && exproto->client_cookie &&
2603 	      exproto->client_cookie != client_cookie) {
2604 	    // Found previous session
2605 	    // peer has reseted and we're going to reuse the existing connection
2606 	    // by replacing the communication socket
2607 	    ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2608 	                  << ", peer must have reseted." << dendl;
2609 	    if (connection->policy.resetcheck) {
2610 	      exproto->reset_session();
2611 	    }
2612 	    return reuse_connection(existing, exproto);
2613 	  }
2614 	
2615 	  if (exproto->client_cookie == client_cookie) {
2616 	    // session establishment interrupted between client_ident and server_ident,
2617 	    // continuing...
2618 	    ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2619 	                  << ", continuing session establishment." << dendl;
2620 	    return reuse_connection(existing, exproto);
2621 	  }
2622 	
2623 	  if (exproto->state == READY || exproto->state == STANDBY) {
2624 	    ldout(cct, 1) << __func__ << " existing=" << existing
2625 	                  << " is READY/STANDBY, lets reuse it" << dendl;
2626 	    return reuse_connection(existing, exproto);
2627 	  }
2628 	
2629 	  // Looks like a connection race: server and client are both connecting to
2630 	  // each other at the same time.
2631 	  if (connection->peer_addrs->msgr2_addr() <
2632 	          messenger->get_myaddrs().msgr2_addr() ||
2633 	      existing->policy.server) {
2634 	    // this connection wins
2635 	    ldout(cct, 1) << __func__
2636 	                  << " connection race detected, replacing existing="
2637 	                  << existing << " socket by this connection's socket" << dendl;
2638 	    return reuse_connection(existing, exproto);
2639 	  } else {
2640 	    // the existing connection wins
2641 	    ldout(cct, 1)
2642 	        << __func__
2643 	        << " connection race detected, this connection loses to existing="
2644 	        << existing << dendl;
2645 	    ceph_assert(connection->peer_addrs->msgr2_addr() >
2646 	                messenger->get_myaddrs().msgr2_addr());
2647 	
2648 	    // make sure we follow through with opening the existing
2649 	    // connection (if it isn't yet open) since we know the peer
2650 	    // has something to send to us.
2651 	    existing->send_keepalive();
2652 	    auto wait = WaitFrame::Encode();
2653 	    return WRITE(wait, "wait", read_frame);
2654 	  }
2655 	}
2656 	
2657 	CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
2658 	                                   ProtocolV2 *exproto) {
2659 	  ldout(cct, 20) << __func__ << " existing=" << existing
2660 	                 << " reconnect=" << reconnecting << dendl;
2661 	
2662 	  connection->inject_delay();
2663 	
2664 	  std::lock_guard<std::mutex> l(existing->write_lock);
2665 	
2666 	  connection->center->delete_file_event(connection->cs.fd(),
2667 	                                        EVENT_READABLE | EVENT_WRITABLE);
2668 	
2669 	  if (existing->delay_state) {
2670 	    existing->delay_state->flush();
2671 	    ceph_assert(!connection->delay_state);
2672 	  }
2673 	  exproto->reset_recv_state();
2674 	  exproto->pre_auth.enabled = false;
2675 	
2676 	  if (!reconnecting) {
2677 	    exproto->client_cookie = client_cookie;
2678 	    exproto->peer_name = peer_name;
2679 	    exproto->connection_features = connection_features;
2680 	    existing->set_features(connection_features);
2681 	  }
2682 	  exproto->peer_global_seq = peer_global_seq;
2683 	
2684 	  auto temp_cs = std::move(connection->cs);
2685 	  EventCenter *new_center = connection->center;
2686 	  Worker *new_worker = connection->worker;
2687 	
2688 	  ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2689 	                           << dendl;
2690 	
2691 	  std::swap(exproto->session_stream_handlers, session_stream_handlers);
2692 	  exproto->auth_meta = auth_meta;
2693 	
2694 	  // avoid _stop shutdown replacing socket
2695 	  // queue a reset on the new connection, which we're dumping for the old
2696 	  stop();
2697 	
2698 	  connection->dispatch_queue->queue_reset(connection);
2699 	
2700 	  exproto->can_write = false;
2701 	  exproto->write_in_progress = false;
2702 	  exproto->reconnecting = reconnecting;
2703 	  exproto->replacing = true;
2704 	  existing->state_offset = 0;
2705 	  // avoid previous thread modify event
2706 	  exproto->state = NONE;
2707 	  existing->state = AsyncConnection::STATE_NONE;
2708 	  // Discard existing prefetch buffer in `recv_buf`
2709 	  existing->recv_start = existing->recv_end = 0;
2710 	  // there shouldn't exist any buffer
2711 	  ceph_assert(connection->recv_start == connection->recv_end);
2712 	
2713 	  auto deactivate_existing = std::bind(
2714 	      [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
2715 	        // we need to delete time event in original thread
2716 	        {
2717 	          std::lock_guard<std::mutex> l(existing->lock);
2718 	          existing->write_lock.lock();
2719 	          exproto->requeue_sent();
2720 	          existing->outgoing_bl.clear();
2721 	          existing->open_write = false;
2722 	          existing->write_lock.unlock();
2723 	          if (exproto->state == NONE) {
2724 	            existing->shutdown_socket();
2725 	            existing->cs = std::move(cs);
2726 	            existing->worker->references--;
2727 	            new_worker->references++;
2728 	            existing->logger = new_worker->get_perf_counter();
2729 	            existing->worker = new_worker;
2730 	            existing->center = new_center;
2731 	            if (existing->delay_state)
2732 	              existing->delay_state->set_center(new_center);
2733 	          } else if (exproto->state == CLOSED) {
2734 	            auto back_to_close = std::bind(
2735 	                [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2736 	            new_center->submit_to(new_center->get_id(),
2737 	                                  std::move(back_to_close), true);
2738 	            return;
2739 	          } else {
2740 	            ceph_abort();
2741 	          }
2742 	        }
2743 	
2744 	        // Before changing existing->center, it may already exists some
2745 	        // events in existing->center's queue. Then if we mark down
2746 	        // `existing`, it will execute in another thread and clean up
2747 	        // connection. Previous event will result in segment fault
2748 	        auto transfer_existing = [existing, exproto]() mutable {
2749 	          std::lock_guard<std::mutex> l(existing->lock);
2750 	          if (exproto->state == CLOSED) return;
2751 	          ceph_assert(exproto->state == NONE);
2752 	
2753 	          exproto->state = SESSION_ACCEPTING;
2754 	          // we have called shutdown_socket above
2755 	          ceph_assert(existing->last_tick_id == 0);
2756 	          // restart timer since we are going to re-build connection
2757 	          existing->last_connect_started = ceph::coarse_mono_clock::now();
2758 	          existing->last_tick_id = existing->center->create_time_event(
2759 	            existing->connect_timeout_us, existing->tick_handler);
2760 	          existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2761 	          existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2762 	                                              existing->read_handler);
2763 	          if (!exproto->reconnecting) {
2764 	            exproto->run_continuation(exproto->send_server_ident());
2765 	          } else {
2766 	            exproto->run_continuation(exproto->send_reconnect_ok());
2767 	          }
2768 	        };
2769 	        if (existing->center->in_thread())
2770 	          transfer_existing();
2771 	        else
2772 	          existing->center->submit_to(existing->center->get_id(),
2773 	                                      std::move(transfer_existing), true);
2774 	      },
2775 	      std::move(temp_cs));
2776 	
2777 	  existing->center->submit_to(existing->center->get_id(),
2778 	                              std::move(deactivate_existing), true);
2779 	  return nullptr;
2780 	}
2781 	
2782 	CtPtr ProtocolV2::send_server_ident() {
2783 	  ldout(cct, 20) << __func__ << dendl;
2784 	
2785 	  // this is required for the case when this connection is being replaced
2786 	  out_seq = discard_requeued_up_to(out_seq, 0);
2787 	  in_seq = 0;
2788 	
2789 	  if (!connection->policy.lossy) {
2790 	    server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2791 	  }
2792 	
2793 	  uint64_t flags = 0;
2794 	  if (connection->policy.lossy) {
2795 	    flags = flags | CEPH_MSG_CONNECT_LOSSY;
2796 	  }
2797 	
2798 	  uint64_t gs = messenger->get_global_seq();
2799 	  auto server_ident = ServerIdentFrame::Encode(
2800 	          messenger->get_myaddrs(),
2801 	          messenger->get_myname().num(),
2802 	          gs,
2803 	          connection->policy.features_supported,
2804 	          connection->policy.features_required | msgr2_required,
2805 	          flags,
2806 	          server_cookie);
2807 	
2808 	  ldout(cct, 5) << __func__ << " sending identification:"
2809 	                << " addrs=" << messenger->get_myaddrs()
2810 	                << " gid=" << messenger->get_myname().num()
2811 	                << " global_seq=" << gs << " features_supported=" << std::hex
2812 	                << connection->policy.features_supported
2813 	                << " features_required="
2814 			            << (connection->policy.features_required | msgr2_required)
2815 	                << " flags=" << flags << " cookie=" << std::dec << server_cookie
2816 	                << dendl;
2817 	
2818 	  connection->lock.unlock();
2819 	  // Because "replacing" will prevent other connections preempt this addr,
2820 	  // it's safe that here we don't acquire Connection's lock
2821 	  ssize_t r = messenger->accept_conn(connection);
2822 	
2823 	  connection->inject_delay();
2824 	
2825 	  connection->lock.lock();
2826 	
2827 	  if (r < 0) {
2828 	    ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2829 	                  << connection->peer_addrs->msgr2_addr()
2830 	                  << " just fail later one(this)" << dendl;
2831 	    connection->inject_delay();
2832 	    return _fault();
2833 	  }
2834 	  if (state != SESSION_ACCEPTING) {
2835 	    ldout(cct, 1) << __func__
2836 	                  << " state changed while accept_conn, it must be mark_down"
2837 	                  << dendl;
2838 	    ceph_assert(state == CLOSED || state == NONE);
2839 	    messenger->unregister_conn(connection);
2840 	    connection->inject_delay();
2841 	    return _fault();
2842 	  }
2843 	
2844 	  connection->set_features(connection_features);
2845 	
2846 	  // notify
2847 	  connection->dispatch_queue->queue_accept(connection);
2848 	  messenger->ms_deliver_handle_fast_accept(connection);
2849 	
2850 	  INTERCEPT(12);
2851 	
2852 	  return WRITE(server_ident, "server ident", server_ready);
2853 	}
2854 	
2855 	CtPtr ProtocolV2::server_ready() {
2856 	  ldout(cct, 20) << __func__ << dendl;
2857 	
2858 	  if (connection->delay_state) {
2859 	    ceph_assert(connection->delay_state->ready());
2860 	  }
2861 	
2862 	  return ready();
2863 	}
2864 	
2865 	CtPtr ProtocolV2::send_reconnect_ok() {
2866 	  ldout(cct, 20) << __func__ << dendl;
2867 	
2868 	  out_seq = discard_requeued_up_to(out_seq, message_seq);
2869 	
2870 	  uint64_t ms = in_seq;
2871 	  auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2872 	
2873 	  ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2874 	
2875 	  connection->lock.unlock();
2876 	  // Because "replacing" will prevent other connections preempt this addr,
2877 	  // it's safe that here we don't acquire Connection's lock
2878 	  ssize_t r = messenger->accept_conn(connection);
2879 	
2880 	  connection->inject_delay();
2881 	
2882 	  connection->lock.lock();
2883 	
2884 	  if (r < 0) {
2885 	    ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2886 	                  << connection->peer_addrs->msgr2_addr()
2887 	                  << " just fail later one(this)" << dendl;
2888 	    connection->inject_delay();
2889 	    return _fault();
2890 	  }
2891 	  if (state != SESSION_ACCEPTING) {
2892 	    ldout(cct, 1) << __func__
2893 	                  << " state changed while accept_conn, it must be mark_down"
2894 	                  << dendl;
2895 	    ceph_assert(state == CLOSED || state == NONE);
2896 	    messenger->unregister_conn(connection);
2897 	    connection->inject_delay();
2898 	    return _fault();
2899 	  }
2900 	
2901 	  // notify
2902 	  connection->dispatch_queue->queue_accept(connection);
2903 	  messenger->ms_deliver_handle_fast_accept(connection);
2904 	
2905 	  INTERCEPT(14);
2906 	
2907 	  return WRITE(reconnect_ok, "reconnect ok", server_ready);
2908 	}
2909