1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <type_traits>
5
6 #include "ProtocolV2.h"
7 #include "AsyncMessenger.h"
8
9 #include "common/EventTrace.h"
10 #include "common/ceph_crypto.h"
11 #include "common/errno.h"
12 #include "include/random.h"
13 #include "auth/AuthClient.h"
14 #include "auth/AuthServer.h"
15
16 #define dout_subsys ceph_subsys_ms
17 #undef dout_prefix
18 #define dout_prefix _conn_prefix(_dout)
19 ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
20 return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
21 << *connection->peer_addrs << " conn(" << connection << " "
22 << this
23 << " " << ceph_con_mode_name(auth_meta->con_mode)
24 << " :" << connection->port
25 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
26 << " cs=" << connect_seq << " l=" << connection->policy.lossy
27 << " rx=" << session_stream_handlers.rx.get()
28 << " tx=" << session_stream_handlers.tx.get()
29 << ").";
30 }
31
32 using namespace ceph::msgr::v2;
33
34 using CtPtr = Ct<ProtocolV2> *;
35 using CtRef = Ct<ProtocolV2> &;
36
37 void ProtocolV2::run_continuation(CtPtr pcontinuation) {
38 if (pcontinuation) {
39 run_continuation(*pcontinuation);
40 }
41 }
42
43 void ProtocolV2::run_continuation(CtRef continuation) {
44 try {
45 CONTINUATION_RUN(continuation)
46 } catch (const buffer::error &e) {
47 lderr(cct) << __func__ << " failed decoding of frame header: " << e
48 << dendl;
49 _fault();
50 } catch (const ceph::crypto::onwire::MsgAuthError &e) {
51 lderr(cct) << __func__ << " " << e.what() << dendl;
52 _fault();
53 } catch (const DecryptionError &) {
54 lderr(cct) << __func__ << " failed to decrypt frame payload" << dendl;
55 }
56 }
57
58 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
59
60 #define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
61
62 #define READ_RXBUF(B, C) read(CONTINUATION(C), B)
63
64 #ifdef UNIT_TESTS_BUILT
65
66 #define INTERCEPT(S) { \
67 if(connection->interceptor) { \
68 auto a = connection->interceptor->intercept(connection, (S)); \
69 if (a == Interceptor::ACTION::FAIL) { \
70 return _fault(); \
71 } else if (a == Interceptor::ACTION::STOP) { \
72 stop(); \
73 connection->dispatch_queue->queue_reset(connection); \
74 return nullptr; \
75 }}}
76
77 #else
78 #define INTERCEPT(S)
79 #endif
80
81 ProtocolV2::ProtocolV2(AsyncConnection *connection)
82 : Protocol(2, connection),
83 state(NONE),
84 peer_required_features(0),
85 client_cookie(0),
86 server_cookie(0),
87 global_seq(0),
88 connect_seq(0),
89 peer_global_seq(0),
90 message_seq(0),
91 reconnecting(false),
92 replacing(false),
93 can_write(false),
94 bannerExchangeCallback(nullptr),
95 next_tag(static_cast<Tag>(0)),
96 keepalive(false) {
97 }
98
99 ProtocolV2::~ProtocolV2() {
100 }
101
102 void ProtocolV2::connect() {
103 ldout(cct, 1) << __func__ << dendl;
104 state = START_CONNECT;
105 pre_auth.enabled = true;
106 }
107
108 void ProtocolV2::accept() {
109 ldout(cct, 1) << __func__ << dendl;
110 state = START_ACCEPT;
111 }
112
113 bool ProtocolV2::is_connected() { return can_write; }
114
115 /*
116 * Tears down the message queues, and removes them from the
117 * DispatchQueue Must hold write_lock prior to calling.
118 */
119 void ProtocolV2::discard_out_queue() {
120 ldout(cct, 10) << __func__ << " started" << dendl;
121
122 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
123 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
124 (*p)->put();
125 }
126 sent.clear();
127 for (auto& [ prio, entries ] : out_queue) {
128 static_cast<void>(prio);
129 for (auto& entry : entries) {
130 ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
131 entry.m->put();
132 }
133 }
134 out_queue.clear();
135 write_in_progress = false;
136 }
137
138 void ProtocolV2::reset_session() {
139 ldout(cct, 1) << __func__ << dendl;
140
141 std::lock_guard<std::mutex> l(connection->write_lock);
142 if (connection->delay_state) {
143 connection->delay_state->discard();
144 }
145
146 connection->dispatch_queue->discard_queue(connection->conn_id);
147 discard_out_queue();
148 connection->outgoing_bl.clear();
149
150 connection->dispatch_queue->queue_remote_reset(connection);
151
152 out_seq = 0;
153 in_seq = 0;
154 client_cookie = 0;
155 server_cookie = 0;
156 connect_seq = 0;
157 peer_global_seq = 0;
158 message_seq = 0;
159 ack_left = 0;
160 can_write = false;
161 }
162
163 void ProtocolV2::stop() {
164 ldout(cct, 1) << __func__ << dendl;
165 if (state == CLOSED) {
166 return;
167 }
168
169 if (connection->delay_state) connection->delay_state->flush();
170
171 std::lock_guard<std::mutex> l(connection->write_lock);
172
173 reset_recv_state();
174 discard_out_queue();
175
176 connection->_stop();
177
178 can_write = false;
179 state = CLOSED;
180 }
181
182 void ProtocolV2::fault() { _fault(); }
183
184 void ProtocolV2::requeue_sent() {
185 write_in_progress = false;
186 if (sent.empty()) {
187 return;
188 }
189
190 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
191 out_seq -= sent.size();
192 while (!sent.empty()) {
193 Message *m = sent.back();
194 sent.pop_back();
195 ldout(cct, 5) << __func__ << " requeueing message m=" << m
196 << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
197 << *m << dendl;
198 m->clear_payload();
199 rq.emplace_front(out_queue_entry_t{false, m});
200 }
201 }
202
203 uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
204 ldout(cct, 10) << __func__ << " " << seq << dendl;
205 std::lock_guard<std::mutex> l(connection->write_lock);
206 if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
207 return seq;
208 }
209 auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
210 uint64_t count = out_seq;
211 while (!rq.empty()) {
212 Message* const m = rq.front().m;
213 if (m->get_seq() == 0 || m->get_seq() > seq) break;
214 ldout(cct, 5) << __func__ << " discarding message m=" << m
215 << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
216 << *m << dendl;
217 m->put();
218 rq.pop_front();
219 count++;
220 }
221 if (rq.empty()) out_queue.erase(CEPH_MSG_PRIO_HIGHEST);
222 return count;
223 }
224
225 void ProtocolV2::reset_recv_state() {
226 auth_meta.reset(new AuthConnectionMeta);
227 session_stream_handlers.tx.reset(nullptr);
228 session_stream_handlers.rx.reset(nullptr);
229 pre_auth.txbuf.clear();
230 pre_auth.rxbuf.clear();
231
232 // clean read and write callbacks
233 connection->pendingReadLen.reset();
234 connection->writeCallback.reset();
235
236 next_tag = static_cast<Tag>(0);
237
238 reset_throttle();
239 }
240
241 size_t ProtocolV2::get_current_msg_size() const {
242 ceph_assert(!rx_segments_desc.empty());
243 size_t sum = 0;
244 // we don't include SegmentIndex::Msg::HEADER.
245 for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
246 sum += rx_segments_desc[idx].length;
247 }
248 return sum;
249 }
250
251 void ProtocolV2::reset_throttle() {
252 if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
253 connection->policy.throttler_messages) {
254 ldout(cct, 10) << __func__ << " releasing " << 1
255 << " message to policy throttler "
256 << connection->policy.throttler_messages->get_current()
257 << "/" << connection->policy.throttler_messages->get_max()
258 << dendl;
259 connection->policy.throttler_messages->put();
260 }
261 if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
262 if (connection->policy.throttler_bytes) {
263 const size_t cur_msg_size = get_current_msg_size();
264 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
265 << " bytes to policy throttler "
266 << connection->policy.throttler_bytes->get_current() << "/"
267 << connection->policy.throttler_bytes->get_max() << dendl;
268 connection->policy.throttler_bytes->put(cur_msg_size);
269 }
270 }
271 if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
272 const size_t cur_msg_size = get_current_msg_size();
273 ldout(cct, 10)
274 << __func__ << " releasing " << cur_msg_size
275 << " bytes to dispatch_queue throttler "
276 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
277 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
278 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
279 }
280 }
281
282 CtPtr ProtocolV2::_fault() {
283 ldout(cct, 10) << __func__ << dendl;
284
285 if (state == CLOSED || state == NONE) {
286 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
287 return nullptr;
288 }
289
290 if (connection->policy.lossy &&
291 !(state >= START_CONNECT && state <= SESSION_RECONNECTING)) {
292 ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
293 stop();
294 connection->dispatch_queue->queue_reset(connection);
295 return nullptr;
296 }
297
298 connection->write_lock.lock();
299
300 can_write = false;
301 // requeue sent items
302 requeue_sent();
303
304 if (out_queue.empty() && state >= START_ACCEPT &&
305 state <= SESSION_ACCEPTING && !replacing) {
306 ldout(cct, 2) << __func__ << " with nothing to send and in the half "
307 << " accept state just closed" << dendl;
308 connection->write_lock.unlock();
309 stop();
310 connection->dispatch_queue->queue_reset(connection);
311 return nullptr;
312 }
313
314 replacing = false;
315 connection->fault();
316 reset_recv_state();
317
318 reconnecting = false;
319
320 if (connection->policy.standby && out_queue.empty() && !keepalive &&
321 state != WAIT) {
322 ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
323 << dendl;
324 state = STANDBY;
325 connection->write_lock.unlock();
326 return nullptr;
327 }
328 if (connection->policy.server) {
329 ldout(cct, 1) << __func__ << " server, going to standby, even though i have stuff queued" << dendl;
330 state = STANDBY;
331 connection->write_lock.unlock();
332 return nullptr;
333 }
334
335 connection->write_lock.unlock();
336
337 if (!(state >= START_CONNECT && state <= SESSION_RECONNECTING) &&
338 state != WAIT &&
339 state != SESSION_ACCEPTING /* due to connection race */) {
340 // policy maybe empty when state is in accept
341 if (connection->policy.server) {
342 ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
343 state = STANDBY;
344 } else {
345 ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
346 connect_seq++;
347 global_seq = messenger->get_global_seq();
348 state = START_CONNECT;
349 pre_auth.enabled = true;
350 connection->state = AsyncConnection::STATE_CONNECTING;
351 }
352 backoff = utime_t();
353 connection->center->dispatch_event_external(connection->read_handler);
354 } else {
355 if (state == WAIT) {
356 backoff.set_from_double(cct->_conf->ms_max_backoff);
357 } else if (backoff == utime_t()) {
358 backoff.set_from_double(cct->_conf->ms_initial_backoff);
359 } else {
360 backoff += backoff;
361 if (backoff > cct->_conf->ms_max_backoff)
362 backoff.set_from_double(cct->_conf->ms_max_backoff);
363 }
364
365 if (server_cookie) {
366 connect_seq++;
367 }
368
369 global_seq = messenger->get_global_seq();
370 state = START_CONNECT;
371 pre_auth.enabled = true;
372 connection->state = AsyncConnection::STATE_CONNECTING;
373 ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
374 // woke up again;
375 connection->register_time_events.insert(
376 connection->center->create_time_event(backoff.to_nsec() / 1000,
377 connection->wakeup_handler));
378 }
379 return nullptr;
380 }
381
382 void ProtocolV2::prepare_send_message(uint64_t features,
383 Message *m) {
384 ldout(cct, 20) << __func__ << " m=" << *m << dendl;
385
386 // associate message with Connection (for benefit of encode_payload)
387 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
388 << features << " " << m << " " << *m << dendl;
389
390 // encode and copy out of *m
391 m->encode(features, 0);
392 }
393
394 void ProtocolV2::send_message(Message *m) {
395 uint64_t f = connection->get_features();
396
397 // TODO: Currently not all messages supports reencode like MOSDMap, so here
398 // only let fast dispatch support messages prepare message
399 const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
400 if (can_fast_prepare) {
401 prepare_send_message(f, m);
402 }
403
404 std::lock_guard<std::mutex> l(connection->write_lock);
405 bool is_prepared = can_fast_prepare;
406 // "features" changes will change the payload encoding
407 if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
408 // ensure the correctness of message encoding
409 m->clear_payload();
410 is_prepared = false;
411 ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
412 << " != " << connection->get_features() << dendl;
413 }
414 if (state == CLOSED) {
415 ldout(cct, 10) << __func__ << " connection closed."
416 << " Drop message " << m << dendl;
417 m->put();
418 } else {
419 ldout(cct, 5) << __func__ << " enqueueing message m=" << m
420 << " type=" << m->get_type() << " " << *m << dendl;
421 m->queue_start = ceph::mono_clock::now();
422 m->trace.event("async enqueueing message");
423 out_queue[m->get_priority()].emplace_back(
424 out_queue_entry_t{is_prepared, m});
425 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
426 << dendl;
427 if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
428 write_in_progress = true;
429 connection->center->dispatch_event_external(connection->write_handler);
430 }
431 }
432 }
433
434 void ProtocolV2::send_keepalive() {
435 ldout(cct, 10) << __func__ << dendl;
436 std::lock_guard<std::mutex> l(connection->write_lock);
437 if (state != CLOSED) {
438 keepalive = true;
439 connection->center->dispatch_event_external(connection->write_handler);
440 }
441 }
442
443 void ProtocolV2::read_event() {
444 ldout(cct, 20) << __func__ << dendl;
445
446 switch (state) {
447 case START_CONNECT:
448 run_continuation(CONTINUATION(start_client_banner_exchange));
449 break;
450 case START_ACCEPT:
451 run_continuation(CONTINUATION(start_server_banner_exchange));
452 break;
453 case READY:
454 run_continuation(CONTINUATION(read_frame));
455 break;
456 case THROTTLE_MESSAGE:
457 run_continuation(CONTINUATION(throttle_message));
458 break;
459 case THROTTLE_BYTES:
460 run_continuation(CONTINUATION(throttle_bytes));
461 break;
462 case THROTTLE_DISPATCH_QUEUE:
463 run_continuation(CONTINUATION(throttle_dispatch_queue));
464 break;
465 default:
466 break;
467 }
468 }
469
470 ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
471 out_queue_entry_t out_entry;
472
473 if (!out_queue.empty()) {
474 auto it = out_queue.rbegin();
475 auto& entries = it->second;
476 ceph_assert(!entries.empty());
477 out_entry = entries.front();
478 entries.pop_front();
479 if (entries.empty()) {
480 out_queue.erase(it->first);
481 }
482 }
483 return out_entry;
484 }
485
486 ssize_t ProtocolV2::write_message(Message *m, bool more) {
487 FUNCTRACE(cct);
488 ceph_assert(connection->center->in_thread());
489 m->set_seq(++out_seq);
490
491 connection->lock.lock();
492 uint64_t ack_seq = in_seq;
493 ack_left = 0;
494 connection->lock.unlock();
495
496 ceph_msg_header &header = m->get_header();
497 ceph_msg_footer &footer = m->get_footer();
498
499 ceph_msg_header2 header2{header.seq, header.tid,
500 header.type, header.priority,
501 header.version,
502 init_le32(0), header.data_off,
503 init_le64(ack_seq),
504 footer.flags, header.compat_version,
505 header.reserved};
506
507 auto message = MessageFrame::Encode(
508 header2,
509 m->get_payload(),
510 m->get_middle(),
511 m->get_data());
512 connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
513
514 ldout(cct, 5) << __func__ << " sending message m=" << m
515 << " seq=" << m->get_seq() << " " << *m << dendl;
516
517 m->trace.event("async writing message");
518 ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
519 << " src=" << entity_name_t(messenger->get_myname())
520 << " off=" << header2.data_off
521 << dendl;
522 ssize_t total_send_size = connection->outgoing_bl.length();
523 ssize_t rc = connection->_try_send(more);
524 if (rc < 0) {
525 ldout(cct, 1) << __func__ << " error sending " << m << ", "
526 << cpp_strerror(rc) << dendl;
527 } else {
528 connection->logger->inc(
529 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
530 ldout(cct, 10) << __func__ << " sending " << m
531 << (rc ? " continuely." : " done.") << dendl;
532 }
533
534 #if defined(WITH_EVENTTRACE)
535 if (m->get_type() == CEPH_MSG_OSD_OP)
536 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
537 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
538 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
539 #endif
540 m->put();
541
542 return rc;
543 }
544
545 void ProtocolV2::append_keepalive() {
546 ldout(cct, 10) << __func__ << dendl;
547 auto keepalive_frame = KeepAliveFrame::Encode();
548 connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
549 }
550
551 void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
552 auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
553 connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
554 }
555
556 void ProtocolV2::handle_message_ack(uint64_t seq) {
557 if (connection->policy.lossy) { // lossy connections don't keep sent messages
558 return;
559 }
560
561 ldout(cct, 15) << __func__ << " seq=" << seq << dendl;
562
563 // trim sent list
564 static const int max_pending = 128;
565 int i = 0;
566 Message *pending[max_pending];
567 auto now = ceph::mono_clock::now();
568 connection->write_lock.lock();
569 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
570 Message *m = sent.front();
571 sent.pop_front();
572 pending[i++] = m;
573 ldout(cct, 10) << __func__ << " got ack seq " << seq
574 << " >= " << m->get_seq() << " on " << m << " " << *m
575 << dendl;
576 }
577 connection->write_lock.unlock();
578 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
579 for (int k = 0; k < i; k++) {
580 pending[k]->put();
581 }
582 }
583
584 void ProtocolV2::write_event() {
585 ldout(cct, 10) << __func__ << dendl;
586 ssize_t r = 0;
587
588 connection->write_lock.lock();
589 if (can_write) {
590 if (keepalive) {
591 append_keepalive();
592 keepalive = false;
593 }
594
595 auto start = ceph::mono_clock::now();
596 bool more;
597 do {
598 const auto out_entry = _get_next_outgoing();
599 if (!out_entry.m) {
600 break;
601 }
602
603 if (!connection->policy.lossy) {
604 // put on sent list
605 sent.push_back(out_entry.m);
606 out_entry.m->get();
607 }
608 more = !out_queue.empty();
609 connection->write_lock.unlock();
610
611 // send_message or requeue messages may not encode message
612 if (!out_entry.is_prepared) {
613 prepare_send_message(connection->get_features(), out_entry.m);
614 }
615
616 if (out_entry.m->queue_start != ceph::mono_time()) {
617 connection->logger->tinc(l_msgr_send_messages_queue_lat,
618 ceph::mono_clock::now() -
619 out_entry.m->queue_start);
620 }
621
622 r = write_message(out_entry.m, more);
623
624 connection->write_lock.lock();
625 if (r == 0) {
626 ;
627 } else if (r < 0) {
628 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
629 break;
630 } else if (r > 0) {
631 // Outbound message in-progress, thread will be re-awoken
632 // when the outbound socket is writeable again
633 break;
634 }
635 } while (can_write);
636 write_in_progress = false;
637
638 // if r > 0 mean data still lefted, so no need _try_send.
639 if (r == 0) {
640 uint64_t left = ack_left;
641 if (left) {
642 auto ack = AckFrame::Encode(in_seq);
643 connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
644 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
645 << " messages" << dendl;
646 ack_left -= left;
647 left = ack_left;
648 r = connection->_try_send(left);
649 } else if (is_queued()) {
650 r = connection->_try_send();
651 }
652 }
653 connection->write_lock.unlock();
654
655 connection->logger->tinc(l_msgr_running_send_time,
656 ceph::mono_clock::now() - start);
657 if (r < 0) {
658 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
659 connection->lock.lock();
660 fault();
661 connection->lock.unlock();
662 return;
663 }
664 } else {
665 write_in_progress = false;
666 connection->write_lock.unlock();
667 connection->lock.lock();
668 connection->write_lock.lock();
669 if (state == STANDBY && !connection->policy.server && is_queued()) {
670 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
671 if (server_cookie) { // only increment connect_seq if there is a session
672 connect_seq++;
673 }
674 connection->_connect();
675 } else if (connection->cs && state != NONE && state != CLOSED &&
676 state != START_CONNECT) {
677 r = connection->_try_send();
678 if (r < 0) {
679 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
680 connection->write_lock.unlock();
681 fault();
682 connection->lock.unlock();
683 return;
684 }
685 }
686 connection->write_lock.unlock();
687 connection->lock.unlock();
688 }
689 }
690
691 bool ProtocolV2::is_queued() {
692 return !out_queue.empty() || connection->is_queued();
693 }
694
695 uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
696 if (session_stream_handlers.rx) {
697 return segment_onwire_size(logical_size);
698 } else {
699 return logical_size;
700 }
701 }
702
703 uint32_t ProtocolV2::get_epilogue_size() const {
704 // In secure mode size of epilogue is flexible and depends on particular
705 // cipher implementation. See the comment for epilogue_secure_block_t or
706 // epilogue_plain_block_t.
707 if (session_stream_handlers.rx) {
708 return FRAME_SECURE_EPILOGUE_SIZE + \
709 session_stream_handlers.rx->get_extra_size_at_final();
710 } else {
711 return FRAME_PLAIN_EPILOGUE_SIZE;
712 }
713 }
714
715 CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
716 rx_buffer_t &&buffer) {
717 const auto len = buffer->length();
718 const auto buf = buffer->c_str();
719 next.node = std::move(buffer);
720 ssize_t r = connection->read(len, buf,
721 [&next, this](char *buffer, int r) {
722 if (unlikely(pre_auth.enabled) && r >= 0) {
723 pre_auth.rxbuf.append(*next.node);
724 ceph_assert(!cct->_conf->ms_die_on_bug ||
725 pre_auth.rxbuf.length() < 1000000);
726 }
727 next.r = r;
728 run_continuation(next);
729 });
730 if (r <= 0) {
731 // error or done synchronously
732 if (unlikely(pre_auth.enabled) && r >= 0) {
733 pre_auth.rxbuf.append(*next.node);
734 ceph_assert(!cct->_conf->ms_die_on_bug ||
735 pre_auth.rxbuf.length() < 1000000);
736 }
737 next.r = r;
738 return &next;
739 }
740
741 return nullptr;
742 }
743
744 template <class F>
745 CtPtr ProtocolV2::write(const std::string &desc,
746 CONTINUATION_TYPE<ProtocolV2> &next,
747 F &frame) {
748 ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
749 return write(desc, next, bl);
750 }
751
752 CtPtr ProtocolV2::write(const std::string &desc,
753 CONTINUATION_TYPE<ProtocolV2> &next,
754 bufferlist &buffer) {
755 if (unlikely(pre_auth.enabled)) {
756 pre_auth.txbuf.append(buffer);
757 ceph_assert(!cct->_conf->ms_die_on_bug ||
758 pre_auth.txbuf.length() < 1000000);
759 }
760
761 ssize_t r =
762 connection->write(buffer, [&next, desc, this](int r) {
763 if (r < 0) {
764 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
765 << " (" << cpp_strerror(r) << ")" << dendl;
766 connection->inject_delay();
767 _fault();
768 }
769 run_continuation(next);
770 });
771
772 if (r < 0) {
773 ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r
774 << " (" << cpp_strerror(r) << ")" << dendl;
775 return _fault();
776 } else if (r == 0) {
777 next.setParams();
778 return &next;
779 }
780
781 return nullptr;
782 }
783
784 CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
785 ldout(cct, 20) << __func__ << dendl;
786 bannerExchangeCallback = &callback;
787
788 bufferlist banner_payload;
789 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
790 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
791
792 bufferlist bl;
793 bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
794 encode((uint16_t)banner_payload.length(), bl, 0);
795 bl.claim_append(banner_payload);
796
797 INTERCEPT(state == BANNER_CONNECTING ? 3 : 4);
798
799 return WRITE(bl, "banner", _wait_for_peer_banner);
800 }
801
802 CtPtr ProtocolV2::_wait_for_peer_banner() {
803 unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
804 return READ(banner_len, _handle_peer_banner);
805 }
806
807 CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
808 ldout(cct, 20) << __func__ << " r=" << r << dendl;
809
810 if (r < 0) {
811 ldout(cct, 1) << __func__ << " read peer banner failed r=" << r << " ("
812 << cpp_strerror(r) << ")" << dendl;
813 return _fault();
814 }
815
816 unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
817
818 if (memcmp(buffer->c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
819 if (memcmp(buffer->c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
820 lderr(cct) << __func__ << " peer " << *connection->peer_addrs
821 << " is using msgr V1 protocol" << dendl;
822 return _fault();
823 }
824 ldout(cct, 1) << __func__ << " accept peer sent bad banner" << dendl;
825 return _fault();
826 }
827
828 uint16_t payload_len;
829 bufferlist bl;
830 buffer->set_offset(banner_prefix_len);
831 buffer->set_length(sizeof(ceph_le16));
832 bl.push_back(std::move(buffer));
833 auto ti = bl.cbegin();
834 try {
835 decode(payload_len, ti);
836 } catch (const buffer::error &e) {
837 lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
838 return _fault();
839 }
840
841 INTERCEPT(state == BANNER_CONNECTING ? 5 : 6);
842
843 return READ(payload_len, _handle_peer_banner_payload);
844 }
845
846 CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
847 ldout(cct, 20) << __func__ << " r=" << r << dendl;
848
849 if (r < 0) {
850 ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
851 << " (" << cpp_strerror(r) << ")" << dendl;
852 return _fault();
853 }
854
855 uint64_t peer_supported_features;
856 uint64_t peer_required_features;
857
858 bufferlist bl;
859 bl.push_back(std::move(buffer));
860 auto ti = bl.cbegin();
861 try {
862 decode(peer_supported_features, ti);
863 decode(peer_required_features, ti);
864 } catch (const buffer::error &e) {
865 lderr(cct) << __func__ << " decode banner payload failed " << dendl;
866 return _fault();
867 }
868
869 ldout(cct, 1) << __func__ << " supported=" << std::hex
870 << peer_supported_features << " required=" << std::hex
871 << peer_required_features << std::dec << dendl;
872
873 // Check feature bit compatibility
874
875 uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
876 uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
877
878 if ((required_features & peer_supported_features) != required_features) {
879 ldout(cct, 1) << __func__ << " peer does not support all required features"
880 << " required=" << std::hex << required_features
881 << " supported=" << std::hex << peer_supported_features
882 << std::dec << dendl;
883 stop();
884 connection->dispatch_queue->queue_reset(connection);
885 return nullptr;
886 }
887 if ((supported_features & peer_required_features) != peer_required_features) {
888 ldout(cct, 1) << __func__ << " we do not support all peer required features"
889 << " required=" << std::hex << peer_required_features
890 << " supported=" << supported_features << std::dec << dendl;
891 stop();
892 connection->dispatch_queue->queue_reset(connection);
893 return nullptr;
894 }
895
896 this->peer_required_features = peer_required_features;
897 if (this->peer_required_features == 0) {
898 this->connection_features = msgr2_required;
899 }
900
901 // at this point we can change how the client protocol behaves based on
902 // this->peer_required_features
903
904 if (state == BANNER_CONNECTING) {
905 state = HELLO_CONNECTING;
906 }
907 else {
908 ceph_assert(state == BANNER_ACCEPTING);
909 state = HELLO_ACCEPTING;
910 }
911
912 auto hello = HelloFrame::Encode(messenger->get_mytype(),
913 connection->target_addr);
914
915 INTERCEPT(state == HELLO_CONNECTING ? 7 : 8);
916
917 return WRITE(hello, "hello frame", read_frame);
918 }
919
920 CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
921 {
922 ldout(cct, 20) << __func__
923 << " payload.length()=" << payload.length() << dendl;
924
925 if (state != HELLO_CONNECTING && state != HELLO_ACCEPTING) {
926 lderr(cct) << __func__ << " not in hello exchange state!" << dendl;
927 return _fault();
928 }
929
930 auto hello = HelloFrame::Decode(payload);
931
932 ldout(cct, 5) << __func__ << " received hello:"
933 << " peer_type=" << (int)hello.entity_type()
934 << " peer_addr_for_me=" << hello.peer_addr() << dendl;
935
936 sockaddr_storage ss;
937 socklen_t len = sizeof(ss);
938 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
939 ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
940 << " when talking to " << connection->target_addr << dendl;
941
942 if (connection->get_peer_type() == -1) {
943 connection->set_peer_type(hello.entity_type());
944
945 ceph_assert(state == HELLO_ACCEPTING);
946 connection->policy = messenger->get_policy(hello.entity_type());
947 ldout(cct, 10) << __func__ << " accept of host_type "
948 << (int)hello.entity_type()
949 << ", policy.lossy=" << connection->policy.lossy
950 << " policy.server=" << connection->policy.server
951 << " policy.standby=" << connection->policy.standby
952 << " policy.resetcheck=" << connection->policy.resetcheck
953 << dendl;
954 } else {
955 ceph_assert(state == HELLO_CONNECTING);
956 if (connection->get_peer_type() != hello.entity_type()) {
957 ldout(cct, 1) << __func__ << " connection peer type does not match what"
958 << " peer advertises " << connection->get_peer_type()
959 << " != " << (int)hello.entity_type() << dendl;
960 stop();
961 connection->dispatch_queue->queue_reset(connection);
962 return nullptr;
963 }
964 }
965
966 if (messenger->get_myaddrs().empty() ||
967 messenger->get_myaddrs().front().is_blank_ip()) {
968 entity_addr_t a;
969 if (cct->_conf->ms_learn_addr_from_peer) {
970 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
971 << " says I am " << hello.peer_addr() << " (socket says "
972 << (sockaddr*)&ss << ")" << dendl;
973 a = hello.peer_addr();
974 } else {
975 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
976 << " says I am " << (sockaddr*)&ss
977 << " (peer says " << hello.peer_addr() << ")" << dendl;
978 a.set_sockaddr((sockaddr *)&ss);
979 }
980 a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
981 a.set_port(0);
982 connection->lock.unlock();
983 messenger->learned_addr(a);
984 if (cct->_conf->ms_inject_internal_delays &&
985 cct->_conf->ms_inject_socket_failures) {
986 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
987 ldout(cct, 10) << __func__ << " sleep for "
988 << cct->_conf->ms_inject_internal_delays << dendl;
989 utime_t t;
990 t.set_from_double(cct->_conf->ms_inject_internal_delays);
991 t.sleep();
992 }
993 }
994 connection->lock.lock();
995 if (state != HELLO_CONNECTING) {
996 ldout(cct, 1) << __func__
997 << " state changed while learned_addr, mark_down or "
998 << " replacing must be happened just now" << dendl;
999 return nullptr;
1000 }
1001 }
1002
1003
1004
1005 CtPtr callback;
1006 callback = bannerExchangeCallback;
1007 bannerExchangeCallback = nullptr;
1008 ceph_assert(callback);
1009 return callback;
1010 }
1011
1012 CtPtr ProtocolV2::read_frame() {
1013 if (state == CLOSED) {
1014 return nullptr;
1015 }
1016
1017 ldout(cct, 20) << __func__ << dendl;
1018 return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
1019 }
1020
1021 CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
1022 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1023
1024 if (r < 0) {
1025 ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
1026 << " (" << cpp_strerror(r) << ")" << dendl;
1027 return _fault();
1028 }
1029
1030 ceph::bufferlist preamble;
1031 preamble.push_back(std::move(buffer));
1032
1033 ldout(cct, 30) << __func__ << " preamble\n";
1034 preamble.hexdump(*_dout);
1035 *_dout << dendl;
1036
1037 if (session_stream_handlers.rx) {
1038 ceph_assert(session_stream_handlers.rx);
1039
1040 session_stream_handlers.rx->reset_rx_handler();
1041 preamble = session_stream_handlers.rx->authenticated_decrypt_update(
1042 std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
1043
1044 ldout(cct, 10) << __func__ << " got encrypted preamble."
1045 << " after decrypt premable.length()=" << preamble.length()
1046 << dendl;
1047
1048 ldout(cct, 30) << __func__ << " preamble after decrypt\n";
1049 preamble.hexdump(*_dout);
1050 *_dout << dendl;
1051 }
1052
1053 {
1054 // I expect ceph_le32 will make the endian conversion for me. Passing
1055 // everything through ::Decode is unnecessary.
1056 const auto& main_preamble = \
1057 reinterpret_cast<preamble_block_t&>(*preamble.c_str());
1058
1059 // verify preamble's CRC before any further processing
1060 const auto rx_crc = ceph_crc32c(0,
1061 reinterpret_cast<const unsigned char*>(&main_preamble),
1062 sizeof(main_preamble) - sizeof(main_preamble.crc));
1063 if (rx_crc != main_preamble.crc) {
1064 ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
1065 << " rx_crc=" << rx_crc
1066 << " tx_crc=" << main_preamble.crc << dendl;
1067 return _fault();
1068 }
1069
1070 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
1071 if (main_preamble.num_segments < 1 ||
1072 main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1073 ldout(cct, 10) << __func__ << " unsupported num_segments="
1074 << " tx_crc=" << main_preamble.num_segments << dendl;
1075 return _fault();
1076 }
1077
1078 next_tag = static_cast<Tag>(main_preamble.tag);
1079
1080 rx_segments_desc.clear();
1081 rx_segments_data.clear();
1082
1083 if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
1084 ldout(cct, 30) << __func__
1085 << " num_segments=" << main_preamble.num_segments
1086 << " is too much" << dendl;
1087 return _fault();
1088 }
1089 for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
1090 ldout(cct, 10) << __func__ << " got new segment:"
1091 << " len=" << main_preamble.segments[idx].length
1092 << " align=" << main_preamble.segments[idx].alignment
1093 << dendl;
1094 rx_segments_desc.emplace_back(main_preamble.segments[idx]);
1095 }
1096 }
1097
1098 // does it need throttle?
1099 if (next_tag == Tag::MESSAGE) {
1100 if (state != READY) {
1101 lderr(cct) << __func__ << " not in ready state!" << dendl;
1102 return _fault();
1103 }
1104 state = THROTTLE_MESSAGE;
1105 return CONTINUE(throttle_message);
1106 } else {
1107 return read_frame_segment();
1108 }
1109 }
1110
1111 CtPtr ProtocolV2::handle_read_frame_dispatch() {
1112 ldout(cct, 10) << __func__
1113 << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
1114
1115 switch (next_tag) {
1116 case Tag::HELLO:
1117 case Tag::AUTH_REQUEST:
1118 case Tag::AUTH_BAD_METHOD:
1119 case Tag::AUTH_REPLY_MORE:
1120 case Tag::AUTH_REQUEST_MORE:
1121 case Tag::AUTH_DONE:
1122 case Tag::AUTH_SIGNATURE:
1123 case Tag::CLIENT_IDENT:
1124 case Tag::SERVER_IDENT:
1125 case Tag::IDENT_MISSING_FEATURES:
1126 case Tag::SESSION_RECONNECT:
1127 case Tag::SESSION_RESET:
1128 case Tag::SESSION_RETRY:
1129 case Tag::SESSION_RETRY_GLOBAL:
1130 case Tag::SESSION_RECONNECT_OK:
1131 case Tag::KEEPALIVE2:
1132 case Tag::KEEPALIVE2_ACK:
1133 case Tag::ACK:
1134 case Tag::WAIT:
1135 return handle_frame_payload();
1136 case Tag::MESSAGE:
1137 return handle_message();
1138 default: {
1139 lderr(cct) << __func__
1140 << " received unknown tag=" << static_cast<uint32_t>(next_tag)
1141 << dendl;
1142 return _fault();
1143 }
1144 }
1145
1146 return nullptr;
1147 }
1148
1149 CtPtr ProtocolV2::read_frame_segment() {
1150 ldout(cct, 20) << __func__ << dendl;
1151 ceph_assert(!rx_segments_desc.empty());
1152
1153 // description of current segment to read
1154 const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
1155 rx_buffer_t rx_buffer;
1156 try {
1157 rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
1158 get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
1159 } catch (std::bad_alloc&) {
1160 // Catching because of potential issues with satisfying alignment.
1161 ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
1162 << " len=" << get_onwire_size(cur_rx_desc.length)
1163 << " align=" << cur_rx_desc.alignment
1164 << dendl;
1165 return _fault();
1166 }
1167
1168 return READ_RXBUF(std::move(rx_buffer), handle_read_frame_segment);
1169 }
1170
1171 CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
1172 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1173
1174 if (r < 0) {
1175 ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
1176 << cpp_strerror(r) << ")" << dendl;
1177 return _fault();
1178 }
1179
1180 rx_segments_data.emplace_back();
1181 rx_segments_data.back().push_back(std::move(rx_buffer));
1182
1183 // decrypt incoming data
1184 // FIXME: if (auth_meta->is_mode_secure()) {
1185 if (session_stream_handlers.rx) {
1186 ceph_assert(session_stream_handlers.rx);
1187
1188 auto& new_seg = rx_segments_data.back();
1189 if (new_seg.length()) {
1190 auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
1191 std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
1192 const auto idx = rx_segments_data.size() - 1;
1193 new_seg.clear();
1194 padded.splice(0, rx_segments_desc[idx].length, &new_seg);
1195
1196 ldout(cct, 20) << __func__
1197 << " unpadded new_seg.length()=" << new_seg.length()
1198 << dendl;
1199 }
1200 }
1201
1202 if (rx_segments_desc.size() == rx_segments_data.size()) {
1203 // OK, all segments planned to read are read. Can go with epilogue.
1204 return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
1205 } else {
1206 // TODO: for makeshift only. This will be more generic and throttled
1207 return read_frame_segment();
1208 }
1209 }
1210
1211 CtPtr ProtocolV2::handle_frame_payload() {
1212 ceph_assert(!rx_segments_data.empty());
1213 auto& payload = rx_segments_data.back();
1214
1215 ldout(cct, 30) << __func__ << "\n";
1216 payload.hexdump(*_dout);
1217 *_dout << dendl;
1218
1219 switch (next_tag) {
1220 case Tag::HELLO:
1221 return handle_hello(payload);
1222 case Tag::AUTH_REQUEST:
1223 return handle_auth_request(payload);
1224 case Tag::AUTH_BAD_METHOD:
1225 return handle_auth_bad_method(payload);
1226 case Tag::AUTH_REPLY_MORE:
1227 return handle_auth_reply_more(payload);
1228 case Tag::AUTH_REQUEST_MORE:
1229 return handle_auth_request_more(payload);
1230 case Tag::AUTH_DONE:
1231 return handle_auth_done(payload);
1232 case Tag::AUTH_SIGNATURE:
1233 return handle_auth_signature(payload);
1234 case Tag::CLIENT_IDENT:
1235 return handle_client_ident(payload);
1236 case Tag::SERVER_IDENT:
1237 return handle_server_ident(payload);
1238 case Tag::IDENT_MISSING_FEATURES:
1239 return handle_ident_missing_features(payload);
1240 case Tag::SESSION_RECONNECT:
1241 return handle_reconnect(payload);
1242 case Tag::SESSION_RESET:
1243 return handle_session_reset(payload);
1244 case Tag::SESSION_RETRY:
1245 return handle_session_retry(payload);
1246 case Tag::SESSION_RETRY_GLOBAL:
1247 return handle_session_retry_global(payload);
1248 case Tag::SESSION_RECONNECT_OK:
1249 return handle_reconnect_ok(payload);
1250 case Tag::KEEPALIVE2:
1251 return handle_keepalive2(payload);
1252 case Tag::KEEPALIVE2_ACK:
1253 return handle_keepalive2_ack(payload);
1254 case Tag::ACK:
1255 return handle_message_ack(payload);
1256 case Tag::WAIT:
1257 return handle_wait(payload);
1258 default:
1259 ceph_abort();
1260 }
1261 return nullptr;
1262 }
1263
1264 CtPtr ProtocolV2::ready() {
1265 ldout(cct, 25) << __func__ << dendl;
1266
1267 reconnecting = false;
1268 replacing = false;
1269
1270 // make sure no pending tick timer
1271 if (connection->last_tick_id) {
1272 connection->center->delete_time_event(connection->last_tick_id);
1273 }
1274 connection->last_tick_id = connection->center->create_time_event(
1275 connection->inactive_timeout_us, connection->tick_handler);
1276
1277 {
1278 std::lock_guard<std::mutex> l(connection->write_lock);
1279 can_write = true;
1280 if (!out_queue.empty()) {
1281 connection->center->dispatch_event_external(connection->write_handler);
1282 }
1283 }
1284
1285 connection->maybe_start_delay_thread();
1286
1287 state = READY;
1288 ldout(cct, 1) << __func__ << " entity=" << peer_name << " client_cookie="
1289 << std::hex << client_cookie << " server_cookie="
1290 << server_cookie << std::dec << " in_seq=" << in_seq
1291 << " out_seq=" << out_seq << dendl;
1292
1293 INTERCEPT(15);
1294
1295 return CONTINUE(read_frame);
1296 }
1297
1298 CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
1299 {
1300 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1301
1302 if (r < 0) {
1303 ldout(cct, 1) << __func__ << " read data error " << dendl;
1304 return _fault();
1305 }
1306
1307 __u8 late_flags;
1308
1309 // FIXME: if (auth_meta->is_mode_secure()) {
1310 if (session_stream_handlers.rx) {
1311 ldout(cct, 1) << __func__ << " read frame epilogue bytes="
1312 << get_epilogue_size() << dendl;
1313
1314 // decrypt epilogue and authenticate entire frame.
1315 ceph::bufferlist epilogue_bl;
1316 {
1317 epilogue_bl.push_back(std::move(buffer));
1318 try {
1319 epilogue_bl =
1320 session_stream_handlers.rx->authenticated_decrypt_update_final(
1321 std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
1322 } catch (ceph::crypto::onwire::MsgAuthError &e) {
1323 ldout(cct, 5) << __func__ << " message authentication failed: "
1324 << e.what() << dendl;
1325 return _fault();
1326 }
1327 }
1328 auto& epilogue =
1329 reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
1330 late_flags = epilogue.late_flags;
1331 } else {
1332 auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
1333
1334 for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
1335 const __u32 expected_crc = epilogue.crc_values[idx];
1336 const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
1337 if (expected_crc != calculated_crc) {
1338 ldout(cct, 5) << __func__ << " message integrity check failed: "
1339 << " expected_crc=" << expected_crc
1340 << " calculated_crc=" << calculated_crc
1341 << dendl;
1342 return _fault();
1343 } else {
1344 ldout(cct, 20) << __func__ << " message integrity check success: "
1345 << " expected_crc=" << expected_crc
1346 << " calculated_crc=" << calculated_crc
1347 << dendl;
1348 }
1349 }
1350 late_flags = epilogue.late_flags;
1351 }
1352
1353 // we do have a mechanism that allows transmitter to start sending message
1354 // and abort after putting entire data field on wire. This will be used by
1355 // the kernel client to avoid unnecessary buffering.
1356 if (late_flags & FRAME_FLAGS_LATEABRT) {
1357 reset_throttle();
1358 state = READY;
1359 return CONTINUE(read_frame);
1360 } else {
1361 return handle_read_frame_dispatch();
1362 }
1363 }
1364
1365 CtPtr ProtocolV2::handle_message() {
1366 ldout(cct, 20) << __func__ << dendl;
1367 ceph_assert(state == THROTTLE_DONE);
1368
1369 #if defined(WITH_EVENTTRACE)
1370 utime_t ltt_recv_stamp = ceph_clock_now();
1371 #endif
1372 recv_stamp = ceph_clock_now();
1373
1374 // we need to get the size before std::moving segments data
1375 const size_t cur_msg_size = get_current_msg_size();
1376 auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
1377
1378 // XXX: paranoid copy just to avoid oops
1379 ceph_msg_header2 current_header = msg_frame.header();
1380
1381 ldout(cct, 5) << __func__
1382 << " got " << msg_frame.front_len()
1383 << " + " << msg_frame.middle_len()
1384 << " + " << msg_frame.data_len()
1385 << " byte message."
1386 << " envelope type=" << current_header.type
1387 << " src " << peer_name
1388 << " off " << current_header.data_off
1389 << dendl;
1390
1391 INTERCEPT(16);
1392 ceph_msg_header header{current_header.seq,
1393 current_header.tid,
1394 current_header.type,
1395 current_header.priority,
1396 current_header.version,
1397 init_le32(msg_frame.front_len()),
1398 init_le32(msg_frame.middle_len()),
1399 init_le32(msg_frame.data_len()),
1400 current_header.data_off,
1401 peer_name,
1402 current_header.compat_version,
1403 current_header.reserved,
1404 init_le32(0)};
1405 ceph_msg_footer footer{init_le32(0), init_le32(0),
1406 init_le32(0), init_le64(0), current_header.flags};
1407
1408 Message *message = decode_message(cct, 0, header, footer,
1409 msg_frame.front(),
1410 msg_frame.middle(),
1411 msg_frame.data(),
1412 connection);
1413 if (!message) {
1414 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
1415 return _fault();
1416 } else {
1417 state = READ_MESSAGE_COMPLETE;
1418 }
1419
1420 INTERCEPT(17);
1421
1422 message->set_byte_throttler(connection->policy.throttler_bytes);
1423 message->set_message_throttler(connection->policy.throttler_messages);
1424
1425 // store reservation size in message, so we don't get confused
1426 // by messages entering the dispatch queue through other paths.
1427 message->set_dispatch_throttle_size(cur_msg_size);
1428
1429 message->set_recv_stamp(recv_stamp);
1430 message->set_throttle_stamp(throttle_stamp);
1431 message->set_recv_complete_stamp(ceph_clock_now());
1432
1433 // check received seq#. if it is old, drop the message.
1434 // note that incoming messages may skip ahead. this is convenient for the
1435 // client side queueing because messages can't be renumbered, but the (kernel)
1436 // client will occasionally pull a message out of the sent queue to send
1437 // elsewhere. in that case it doesn't matter if we "got" it or not.
1438 uint64_t cur_seq = in_seq;
1439 if (message->get_seq() <= cur_seq) {
1440 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
1441 << " <= " << cur_seq << " " << message << " " << *message
1442 << ", discarding" << dendl;
1443 message->put();
1444 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1445 cct->_conf->ms_die_on_old_message) {
1446 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1447 }
1448 return nullptr;
1449 }
1450 if (message->get_seq() > cur_seq + 1) {
1451 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
1452 << cur_seq << " to " << message->get_seq() << dendl;
1453 if (cct->_conf->ms_die_on_skipped_message) {
1454 ceph_assert(0 == "skipped incoming seq");
1455 }
1456 }
1457
1458 #if defined(WITH_EVENTTRACE)
1459 if (message->get_type() == CEPH_MSG_OSD_OP ||
1460 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
1461 utime_t ltt_processed_stamp = ceph_clock_now();
1462 double usecs_elapsed =
1463 (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
1464 ostringstream buf;
1465 if (message->get_type() == CEPH_MSG_OSD_OP)
1466 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
1467 false);
1468 else
1469 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
1470 false);
1471 }
1472 #endif
1473
1474 // note last received message.
1475 in_seq = message->get_seq();
1476 ldout(cct, 5) << __func__ << " received message m=" << message
1477 << " seq=" << message->get_seq()
1478 << " from=" << message->get_source() << " type=" << header.type
1479 << " " << *message << dendl;
1480
1481 bool need_dispatch_writer = false;
1482 if (!connection->policy.lossy) {
1483 ack_left++;
1484 need_dispatch_writer = true;
1485 }
1486
1487 state = READY;
1488
1489 ceph::mono_time fast_dispatch_time;
1490
1491 if (connection->is_blackhole()) {
1492 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1493 message->put();
1494 goto out;
1495 }
1496
1497 connection->logger->inc(l_msgr_recv_messages);
1498 connection->logger->inc(
1499 l_msgr_recv_bytes,
1500 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1501
1502 messenger->ms_fast_preprocess(message);
1503 fast_dispatch_time = ceph::mono_clock::now();
1504 connection->logger->tinc(l_msgr_running_recv_time,
1505 fast_dispatch_time - connection->recv_start_time);
1506 if (connection->delay_state) {
1507 double delay_period = 0;
1508 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1509 delay_period =
1510 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1511 ldout(cct, 1) << "queue_received will delay after "
1512 << (ceph_clock_now() + delay_period) << " on " << message
1513 << " " << *message << dendl;
1514 }
1515 connection->delay_state->queue(delay_period, message);
1516 } else if (messenger->ms_can_fast_dispatch(message)) {
1517 connection->lock.unlock();
1518 connection->dispatch_queue->fast_dispatch(message);
1519 connection->recv_start_time = ceph::mono_clock::now();
1520 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1521 connection->recv_start_time - fast_dispatch_time);
1522 connection->lock.lock();
1523 // we might have been reused by another connection
1524 // let's check if that is the case
1525 if (state != READY) {
1526 // yes, that was the case, let's do nothing
1527 return nullptr;
1528 }
1529 } else {
1530 connection->dispatch_queue->enqueue(message, message->get_priority(),
1531 connection->conn_id);
1532 }
1533
1534 handle_message_ack(current_header.ack_seq);
1535
1536 out:
1537 if (need_dispatch_writer && connection->is_connected()) {
1538 connection->center->dispatch_event_external(connection->write_handler);
1539 }
1540
1541 return CONTINUE(read_frame);
1542 }
1543
1544
1545 CtPtr ProtocolV2::throttle_message() {
1546 ldout(cct, 20) << __func__ << dendl;
1547
1548 if (connection->policy.throttler_messages) {
1549 ldout(cct, 10) << __func__ << " wants " << 1
1550 << " message from policy throttler "
1551 << connection->policy.throttler_messages->get_current()
1552 << "/" << connection->policy.throttler_messages->get_max()
1553 << dendl;
1554 if (!connection->policy.throttler_messages->get_or_fail()) {
1555 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
1556 << connection->policy.throttler_messages->get_current()
1557 << "/" << connection->policy.throttler_messages->get_max()
1558 << " failed, just wait." << dendl;
1559 // following thread pool deal with th full message queue isn't a
1560 // short time, so we can wait a ms.
1561 if (connection->register_time_events.empty()) {
1562 connection->register_time_events.insert(
1563 connection->center->create_time_event(1000,
1564 connection->wakeup_handler));
1565 }
1566 return nullptr;
1567 }
1568 }
1569
1570 state = THROTTLE_BYTES;
1571 return CONTINUE(throttle_bytes);
1572 }
1573
1574 CtPtr ProtocolV2::throttle_bytes() {
1575 ldout(cct, 20) << __func__ << dendl;
1576
1577 const size_t cur_msg_size = get_current_msg_size();
1578 if (cur_msg_size) {
1579 if (connection->policy.throttler_bytes) {
1580 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1581 << " bytes from policy throttler "
1582 << connection->policy.throttler_bytes->get_current() << "/"
1583 << connection->policy.throttler_bytes->get_max() << dendl;
1584 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
1585 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
1586 << " bytes from policy throttler "
1587 << connection->policy.throttler_bytes->get_current()
1588 << "/" << connection->policy.throttler_bytes->get_max()
1589 << " failed, just wait." << dendl;
1590 // following thread pool deal with th full message queue isn't a
1591 // short time, so we can wait a ms.
1592 if (connection->register_time_events.empty()) {
1593 connection->register_time_events.insert(
1594 connection->center->create_time_event(
1595 1000, connection->wakeup_handler));
1596 }
1597 return nullptr;
1598 }
1599 }
1600 }
1601
1602 state = THROTTLE_DISPATCH_QUEUE;
1603 return CONTINUE(throttle_dispatch_queue);
1604 }
1605
1606 CtPtr ProtocolV2::throttle_dispatch_queue() {
1607 ldout(cct, 20) << __func__ << dendl;
1608
1609 const size_t cur_msg_size = get_current_msg_size();
1610 if (cur_msg_size) {
1611 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
1612 cur_msg_size)) {
1613 ldout(cct, 10)
1614 << __func__ << " wants " << cur_msg_size
1615 << " bytes from dispatch throttle "
1616 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1617 << connection->dispatch_queue->dispatch_throttler.get_max()
1618 << " failed, just wait." << dendl;
1619 // following thread pool deal with th full message queue isn't a
1620 // short time, so we can wait a ms.
1621 if (connection->register_time_events.empty()) {
1622 connection->register_time_events.insert(
1623 connection->center->create_time_event(1000,
1624 connection->wakeup_handler));
1625 }
1626 return nullptr;
1627 }
1628 }
1629
1630 throttle_stamp = ceph_clock_now();
1631 state = THROTTLE_DONE;
1632
1633 return read_frame_segment();
1634 }
1635
1636 CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
1637 {
1638 ldout(cct, 20) << __func__
1639 << " payload.length()=" << payload.length() << dendl;
1640
1641 if (state != READY) {
1642 lderr(cct) << __func__ << " not in ready state!" << dendl;
1643 return _fault();
1644 }
1645
1646 auto keepalive_frame = KeepAliveFrame::Decode(payload);
1647
1648 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
1649
1650 connection->write_lock.lock();
1651 append_keepalive_ack(keepalive_frame.timestamp());
1652 connection->write_lock.unlock();
1653
1654 ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
1655 << keepalive_frame.timestamp() << dendl;
1656 connection->set_last_keepalive(ceph_clock_now());
1657
1658 if (is_connected()) {
1659 connection->center->dispatch_event_external(connection->write_handler);
1660 }
1661
1662 return CONTINUE(read_frame);
1663 }
1664
1665 CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload)
1666 {
1667 ldout(cct, 20) << __func__
1668 << " payload.length()=" << payload.length() << dendl;
1669
1670 if (state != READY) {
1671 lderr(cct) << __func__ << " not in ready state!" << dendl;
1672 return _fault();
1673 }
1674
1675 auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload);
1676 connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp());
1677 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
1678
1679 return CONTINUE(read_frame);
1680 }
1681
1682 CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload)
1683 {
1684 ldout(cct, 20) << __func__
1685 << " payload.length()=" << payload.length() << dendl;
1686
1687 if (state != READY) {
1688 lderr(cct) << __func__ << " not in ready state!" << dendl;
1689 return _fault();
1690 }
1691
1692 auto ack = AckFrame::Decode(payload);
1693 handle_message_ack(ack.seq());
1694 return CONTINUE(read_frame);
1695 }
1696
1697 /* Client Protocol Methods */
1698
1699 CtPtr ProtocolV2::start_client_banner_exchange() {
1700 ldout(cct, 20) << __func__ << dendl;
1701
1702 INTERCEPT(1);
1703
1704 state = BANNER_CONNECTING;
1705
1706 global_seq = messenger->get_global_seq();
1707
1708 return _banner_exchange(CONTINUATION(post_client_banner_exchange));
1709 }
1710
1711 CtPtr ProtocolV2::post_client_banner_exchange() {
1712 ldout(cct, 20) << __func__ << dendl;
1713
1714 state = AUTH_CONNECTING;
1715
1716 return send_auth_request();
1717 }
1718
1719 CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
1720 ceph_assert(messenger->auth_client);
1721 ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
1722 << " auth_client " << messenger->auth_client << dendl;
1723
1724 bufferlist bl;
1725 vector<uint32_t> preferred_modes;
1726 auto am = auth_meta;
1727 connection->lock.unlock();
1728 int r = messenger->auth_client->get_auth_request(
1729 connection, am.get(),
1730 &am->auth_method, &preferred_modes, &bl);
1731 connection->lock.lock();
1732 if (state != AUTH_CONNECTING) {
1733 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1734 return _fault();
1735 }
1736 if (r < 0) {
1737 ldout(cct, 0) << __func__ << " get_initial_auth_request returned " << r
1738 << dendl;
1739 stop();
1740 connection->dispatch_queue->queue_reset(connection);
1741 return nullptr;
1742 }
1743
1744 INTERCEPT(9);
1745
1746 auto frame = AuthRequestFrame::Encode(auth_meta->auth_method, preferred_modes,
1747 bl);
1748 return WRITE(frame, "auth request", read_frame);
1749 }
1750
1751 CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
1752 ldout(cct, 20) << __func__
1753 << " payload.length()=" << payload.length() << dendl;
1754
1755 if (state != AUTH_CONNECTING) {
1756 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1757 return _fault();
1758 }
1759
1760 auto bad_method = AuthBadMethodFrame::Decode(payload);
1761 ldout(cct, 1) << __func__ << " method=" << bad_method.method()
1762 << " result " << cpp_strerror(bad_method.result())
1763 << ", allowed methods=" << bad_method.allowed_methods()
1764 << ", allowed modes=" << bad_method.allowed_modes()
1765 << dendl;
1766 ceph_assert(messenger->auth_client);
1767 auto am = auth_meta;
1768 connection->lock.unlock();
1769 int r = messenger->auth_client->handle_auth_bad_method(
1770 connection,
1771 am.get(),
1772 bad_method.method(), bad_method.result(),
1773 bad_method.allowed_methods(),
1774 bad_method.allowed_modes());
1775 connection->lock.lock();
1776 if (state != AUTH_CONNECTING || r < 0) {
1777 return _fault();
1778 }
1779 return send_auth_request(bad_method.allowed_methods());
1780 }
1781
1782 CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
1783 {
1784 ldout(cct, 20) << __func__
1785 << " payload.length()=" << payload.length() << dendl;
1786
1787 if (state != AUTH_CONNECTING) {
1788 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1789 return _fault();
1790 }
1791
1792 auto auth_more = AuthReplyMoreFrame::Decode(payload);
1793 ldout(cct, 5) << __func__
1794 << " auth reply more len=" << auth_more.auth_payload().length()
1795 << dendl;
1796 ceph_assert(messenger->auth_client);
1797 ceph::bufferlist reply;
1798 auto am = auth_meta;
1799 connection->lock.unlock();
1800 int r = messenger->auth_client->handle_auth_reply_more(
1801 connection, am.get(), auth_more.auth_payload(), &reply);
1802 connection->lock.lock();
1803 if (state != AUTH_CONNECTING) {
1804 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1805 return _fault();
1806 }
1807 if (r < 0) {
1808 lderr(cct) << __func__ << " auth_client handle_auth_reply_more returned "
1809 << r << dendl;
1810 return _fault();
1811 }
1812 auto more_reply = AuthRequestMoreFrame::Encode(reply);
1813 return WRITE(more_reply, "auth request more", read_frame);
1814 }
1815
1816 CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
1817 {
1818 ldout(cct, 20) << __func__
1819 << " payload.length()=" << payload.length() << dendl;
1820
1821 if (state != AUTH_CONNECTING) {
1822 lderr(cct) << __func__ << " not in auth connect state!" << dendl;
1823 return _fault();
1824 }
1825
1826 auto auth_done = AuthDoneFrame::Decode(payload);
1827
1828 ceph_assert(messenger->auth_client);
1829 auto am = auth_meta;
1830 connection->lock.unlock();
1831 int r = messenger->auth_client->handle_auth_done(
1832 connection,
1833 am.get(),
1834 auth_done.global_id(),
1835 auth_done.con_mode(),
1836 auth_done.auth_payload(),
1837 &am->session_key,
1838 &am->connection_secret);
1839 connection->lock.lock();
1840 if (state != AUTH_CONNECTING) {
1841 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1842 return _fault();
1843 }
1844 if (r < 0) {
1845 return _fault();
1846 }
1847 auth_meta->con_mode = auth_done.con_mode();
1848 session_stream_handlers = \
1849 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
1850
1851 state = AUTH_CONNECTING_SIGN;
1852
1853 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
1854 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
1855 auto sig_frame = AuthSignatureFrame::Encode(sig);
1856 pre_auth.enabled = false;
1857 pre_auth.rxbuf.clear();
1858 return WRITE(sig_frame, "auth signature", read_frame);
1859 }
1860
1861 CtPtr ProtocolV2::finish_client_auth() {
1862 if (!server_cookie) {
1863 ceph_assert(connect_seq == 0);
1864 state = SESSION_CONNECTING;
1865 return send_client_ident();
1866 } else { // reconnecting to previous session
1867 state = SESSION_RECONNECTING;
1868 ceph_assert(connect_seq > 0);
1869 return send_reconnect();
1870 }
1871 }
1872
1873 CtPtr ProtocolV2::send_client_ident() {
1874 ldout(cct, 20) << __func__ << dendl;
1875
1876 if (!connection->policy.lossy && !client_cookie) {
1877 client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
1878 }
1879
1880 uint64_t flags = 0;
1881 if (connection->policy.lossy) {
1882 flags |= CEPH_MSG_CONNECT_LOSSY;
1883 }
1884
1885 auto client_ident = ClientIdentFrame::Encode(
1886 messenger->get_myaddrs(),
1887 connection->target_addr,
1888 messenger->get_myname().num(),
1889 global_seq,
1890 connection->policy.features_supported,
1891 connection->policy.features_required | msgr2_required,
1892 flags,
1893 client_cookie);
1894
1895 ldout(cct, 5) << __func__ << " sending identification: "
1896 << "addrs=" << messenger->get_myaddrs()
1897 << " target=" << connection->target_addr
1898 << " gid=" << messenger->get_myname().num()
1899 << " global_seq=" << global_seq
1900 << " features_supported=" << std::hex
1901 << connection->policy.features_supported
1902 << " features_required="
1903 << (connection->policy.features_required | msgr2_required)
1904 << " flags=" << flags
1905 << " cookie=" << client_cookie << std::dec << dendl;
1906
1907 INTERCEPT(11);
1908
1909 return WRITE(client_ident, "client ident", read_frame);
1910 }
1911
1912 CtPtr ProtocolV2::send_reconnect() {
1913 ldout(cct, 20) << __func__ << dendl;
1914
1915 auto reconnect = ReconnectFrame::Encode(messenger->get_myaddrs(),
1916 client_cookie,
1917 server_cookie,
1918 global_seq,
1919 connect_seq,
1920 in_seq);
1921
1922 ldout(cct, 5) << __func__ << " reconnect to session: client_cookie="
1923 << std::hex << client_cookie << " server_cookie="
1924 << server_cookie << std::dec
1925 << " gs=" << global_seq << " cs=" << connect_seq
1926 << " ms=" << in_seq << dendl;
1927
1928 INTERCEPT(13);
1929
1930 return WRITE(reconnect, "reconnect", read_frame);
1931 }
1932
1933 CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
1934 {
1935 ldout(cct, 20) << __func__
1936 << " payload.length()=" << payload.length() << dendl;
1937
1938 if (state != SESSION_CONNECTING) {
1939 lderr(cct) << __func__ << " not in session connect state!" << dendl;
1940 return _fault();
1941 }
1942
1943 auto ident_missing =
1944 IdentMissingFeaturesFrame::Decode(payload);
1945 lderr(cct) << __func__
1946 << " client does not support all server features: " << std::hex
1947 << ident_missing.features() << std::dec << dendl;
1948
1949 return _fault();
1950 }
1951
1952 CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload)
1953 {
1954 ldout(cct, 20) << __func__
1955 << " payload.length()=" << payload.length() << dendl;
1956
1957 if (state != SESSION_RECONNECTING) {
1958 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1959 return _fault();
1960 }
1961
1962 auto reset = ResetFrame::Decode(payload);
1963
1964 ldout(cct, 1) << __func__ << " received session reset full=" << reset.full()
1965 << dendl;
1966 if (reset.full()) {
1967 reset_session();
1968 } else {
1969 server_cookie = 0;
1970 connect_seq = 0;
1971 in_seq = 0;
1972 }
1973
1974 state = SESSION_CONNECTING;
1975 return send_client_ident();
1976 }
1977
1978 CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload)
1979 {
1980 ldout(cct, 20) << __func__
1981 << " payload.length()=" << payload.length() << dendl;
1982
1983 if (state != SESSION_RECONNECTING) {
1984 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
1985 return _fault();
1986 }
1987
1988 auto retry = RetryFrame::Decode(payload);
1989 connect_seq = retry.connect_seq() + 1;
1990
1991 ldout(cct, 1) << __func__
1992 << " received session retry connect_seq=" << retry.connect_seq()
1993 << ", inc to cs=" << connect_seq << dendl;
1994
1995 return send_reconnect();
1996 }
1997
1998 CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload)
1999 {
2000 ldout(cct, 20) << __func__
2001 << " payload.length()=" << payload.length() << dendl;
2002
2003 if (state != SESSION_RECONNECTING) {
2004 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2005 return _fault();
2006 }
2007
2008 auto retry = RetryGlobalFrame::Decode(payload);
2009 global_seq = messenger->get_global_seq(retry.global_seq());
2010
2011 ldout(cct, 1) << __func__ << " received session retry global global_seq="
2012 << retry.global_seq() << ", choose new gs=" << global_seq
2013 << dendl;
2014
2015 return send_reconnect();
2016 }
2017
2018 CtPtr ProtocolV2::handle_wait(ceph::bufferlist &payload) {
2019 ldout(cct, 20) << __func__
2020 << " received WAIT (connection race)"
2021 << " payload.length()=" << payload.length()
2022 << dendl;
2023
2024 if (state != SESSION_CONNECTING && state != SESSION_RECONNECTING) {
2025 lderr(cct) << __func__ << " not in session (re)connect state!" << dendl;
2026 return _fault();
2027 }
2028
2029 state = WAIT;
2030 WaitFrame::Decode(payload);
2031 return _fault();
2032 }
2033
2034 CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload)
2035 {
2036 ldout(cct, 20) << __func__
2037 << " payload.length()=" << payload.length() << dendl;
2038
2039 if (state != SESSION_RECONNECTING) {
2040 lderr(cct) << __func__ << " not in session reconnect state!" << dendl;
2041 return _fault();
2042 }
2043
2044 auto reconnect_ok = ReconnectOkFrame::Decode(payload);
2045 ldout(cct, 5) << __func__
2046 << " reconnect accepted: sms=" << reconnect_ok.msg_seq()
2047 << dendl;
2048
2049 out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq());
2050
2051 backoff = utime_t();
2052 ldout(cct, 10) << __func__ << " reconnect success " << connect_seq
2053 << ", lossy = " << connection->policy.lossy << ", features "
2054 << connection->get_features() << dendl;
2055
2056 if (connection->delay_state) {
2057 ceph_assert(connection->delay_state->ready());
2058 }
2059
2060 connection->dispatch_queue->queue_connect(connection);
2061 messenger->ms_deliver_handle_fast_connect(connection);
2062
2063 return ready();
2064 }
2065
2066 CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
2067 {
2068 ldout(cct, 20) << __func__
2069 << " payload.length()=" << payload.length() << dendl;
2070
2071 if (state != SESSION_CONNECTING) {
2072 lderr(cct) << __func__ << " not in session connect state!" << dendl;
2073 return _fault();
2074 }
2075
2076 auto server_ident = ServerIdentFrame::Decode(payload);
2077 ldout(cct, 5) << __func__ << " received server identification:"
2078 << " addrs=" << server_ident.addrs()
2079 << " gid=" << server_ident.gid()
2080 << " global_seq=" << server_ident.global_seq()
2081 << " features_supported=" << std::hex
2082 << server_ident.supported_features()
2083 << " features_required=" << server_ident.required_features()
2084 << " flags=" << server_ident.flags() << " cookie=" << std::dec
2085 << server_ident.cookie() << dendl;
2086
2087 // is this who we intended to talk to?
2088 // be a bit forgiving here, since we may be connecting based on addresses parsed out
2089 // of mon_host or something.
2090 if (!server_ident.addrs().contains(connection->target_addr)) {
2091 ldout(cct,1) << __func__ << " peer identifies as " << server_ident.addrs()
2092 << ", does not include " << connection->target_addr << dendl;
2093 return _fault();
2094 }
2095
2096 server_cookie = server_ident.cookie();
2097
2098 connection->set_peer_addrs(server_ident.addrs());
2099 peer_name = entity_name_t(connection->get_peer_type(), server_ident.gid());
2100 connection->set_features(server_ident.supported_features() &
2101 connection->policy.features_supported);
2102 peer_global_seq = server_ident.global_seq();
2103
2104 connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
2105
2106 backoff = utime_t();
2107 ldout(cct, 10) << __func__ << " connect success " << connect_seq
2108 << ", lossy = " << connection->policy.lossy << ", features "
2109 << connection->get_features() << dendl;
2110
2111 if (connection->delay_state) {
2112 ceph_assert(connection->delay_state->ready());
2113 }
2114
2115 connection->dispatch_queue->queue_connect(connection);
2116 messenger->ms_deliver_handle_fast_connect(connection);
2117
2118 return ready();
2119 }
2120
2121 /* Server Protocol Methods */
2122
2123 CtPtr ProtocolV2::start_server_banner_exchange() {
2124 ldout(cct, 20) << __func__ << dendl;
2125
2126 INTERCEPT(2);
2127
2128 state = BANNER_ACCEPTING;
2129
2130 return _banner_exchange(CONTINUATION(post_server_banner_exchange));
2131 }
2132
2133 CtPtr ProtocolV2::post_server_banner_exchange() {
2134 ldout(cct, 20) << __func__ << dendl;
2135
2136 state = AUTH_ACCEPTING;
2137
2138 return CONTINUE(read_frame);
2139 }
2140
2141 CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) {
2142 ldout(cct, 20) << __func__ << " payload.length()=" << payload.length()
2143 << dendl;
2144
2145 if (state != AUTH_ACCEPTING) {
2146 lderr(cct) << __func__ << " not in auth accept state!" << dendl;
2147 return _fault();
2148 }
2149
2150 auto request = AuthRequestFrame::Decode(payload);
2151 ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method()
2152 << ", preferred_modes=" << request.preferred_modes()
2153 << ", payload_len=" << request.auth_payload().length() << ")"
2154 << dendl;
2155 auth_meta->auth_method = request.method();
2156 auth_meta->con_mode = messenger->auth_server->pick_con_mode(
2157 connection->get_peer_type(), auth_meta->auth_method,
2158 request.preferred_modes());
2159 if (auth_meta->con_mode == CEPH_CON_MODE_UNKNOWN) {
2160 return _auth_bad_method(-EOPNOTSUPP);
2161 }
2162 return _handle_auth_request(request.auth_payload(), false);
2163 }
2164
2165 CtPtr ProtocolV2::_auth_bad_method(int r)
2166 {
2167 ceph_assert(r < 0);
2168 std::vector<uint32_t> allowed_methods;
2169 std::vector<uint32_t> allowed_modes;
2170 messenger->auth_server->get_supported_auth_methods(
2171 connection->get_peer_type(), &allowed_methods, &allowed_modes);
2172 ldout(cct, 1) << __func__ << " auth_method " << auth_meta->auth_method
2173 << " r " << cpp_strerror(r)
2174 << ", allowed_methods " << allowed_methods
2175 << ", allowed_modes " << allowed_modes
2176 << dendl;
2177 auto bad_method = AuthBadMethodFrame::Encode(auth_meta->auth_method, r,
2178 allowed_methods, allowed_modes);
2179 return WRITE(bad_method, "bad auth method", read_frame);
2180 }
2181
2182 CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
2183 {
2184 if (!messenger->auth_server) {
2185 return _fault();
2186 }
2187 bufferlist reply;
2188 auto am = auth_meta;
2189 connection->lock.unlock();
2190 int r = messenger->auth_server->handle_auth_request(
2191 connection, am.get(),
2192 more, am->auth_method, auth_payload,
2193 &reply);
2194 connection->lock.lock();
2195 if (state != AUTH_ACCEPTING && state != AUTH_ACCEPTING_MORE) {
2196 ldout(cct, 1) << __func__
2197 << " state changed while accept, it must be mark_down"
2198 << dendl;
2199 ceph_assert(state == CLOSED);
2200 return _fault();
2201 }
2202 if (r == 1) {
2203 INTERCEPT(10);
2204 state = AUTH_ACCEPTING_SIGN;
2205
2206 auto auth_done = AuthDoneFrame::Encode(connection->peer_global_id,
2207 auth_meta->con_mode,
2208 reply);
2209 return WRITE(auth_done, "auth done", finish_auth);
2210 } else if (r == 0) {
2211 state = AUTH_ACCEPTING_MORE;
2212
2213 auto more = AuthReplyMoreFrame::Encode(reply);
2214 return WRITE(more, "auth reply more", read_frame);
2215 } else if (r == -EBUSY) {
2216 // kick the client and maybe they'll come back later
2217 return _fault();
2218 } else {
2219 return _auth_bad_method(r);
2220 }
2221 }
2222
2223 CtPtr ProtocolV2::finish_auth()
2224 {
2225 ceph_assert(auth_meta);
2226 // TODO: having a possibility to check whether we're server or client could
2227 // allow reusing finish_auth().
2228 session_stream_handlers = \
2229 ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
2230
2231 const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
2232 auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
2233 auto sig_frame = AuthSignatureFrame::Encode(sig);
2234 pre_auth.enabled = false;
2235 pre_auth.rxbuf.clear();
2236 return WRITE(sig_frame, "auth signature", read_frame);
2237 }
2238
2239 CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload)
2240 {
2241 ldout(cct, 20) << __func__
2242 << " payload.length()=" << payload.length() << dendl;
2243
2244 if (state != AUTH_ACCEPTING_MORE) {
2245 lderr(cct) << __func__ << " not in auth accept more state!" << dendl;
2246 return _fault();
2247 }
2248
2249 auto auth_more = AuthRequestMoreFrame::Decode(payload);
2250 return _handle_auth_request(auth_more.auth_payload(), true);
2251 }
2252
2253 CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
2254 {
2255 ldout(cct, 20) << __func__
2256 << " payload.length()=" << payload.length() << dendl;
2257
2258 if (state != AUTH_ACCEPTING_SIGN && state != AUTH_CONNECTING_SIGN) {
2259 lderr(cct) << __func__
2260 << " pre-auth verification signature seen in wrong state!"
2261 << dendl;
2262 return _fault();
2263 }
2264
2265 auto sig_frame = AuthSignatureFrame::Decode(payload);
2266
2267 const auto actual_tx_sig = auth_meta->session_key.empty() ?
2268 sha256_digest_t() : auth_meta->session_key.hmac_sha256(cct, pre_auth.txbuf);
2269 if (sig_frame.signature() != actual_tx_sig) {
2270 ldout(cct, 2) << __func__ << " pre-auth signature mismatch"
2271 << " actual_tx_sig=" << actual_tx_sig
2272 << " sig_frame.signature()=" << sig_frame.signature()
2273 << dendl;
2274 return _fault();
2275 } else {
2276 ldout(cct, 20) << __func__ << " pre-auth signature success"
2277 << " sig_frame.signature()=" << sig_frame.signature()
2278 << dendl;
2279 pre_auth.txbuf.clear();
2280 }
2281
2282 if (state == AUTH_ACCEPTING_SIGN) {
2283 // server had sent AuthDone and client responded with correct pre-auth
2284 // signature. we can start accepting new sessions/reconnects.
2285 state = SESSION_ACCEPTING;
2286 return CONTINUE(read_frame);
2287 } else if (state == AUTH_CONNECTING_SIGN) {
2288 // this happened at client side
2289 return finish_client_auth();
2290 } else {
2291 ceph_abort("state corruption");
2292 }
2293 }
2294
2295 CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
2296 {
2297 ldout(cct, 20) << __func__
2298 << " payload.length()=" << payload.length() << dendl;
2299
2300 if (state != SESSION_ACCEPTING) {
2301 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2302 return _fault();
2303 }
2304
2305 auto client_ident = ClientIdentFrame::Decode(payload);
2306
2307 ldout(cct, 5) << __func__ << " received client identification:"
2308 << " addrs=" << client_ident.addrs()
2309 << " target=" << client_ident.target_addr()
2310 << " gid=" << client_ident.gid()
2311 << " global_seq=" << client_ident.global_seq()
2312 << " features_supported=" << std::hex
2313 << client_ident.supported_features()
2314 << " features_required=" << client_ident.required_features()
2315 << " flags=" << client_ident.flags()
2316 << " cookie=" << client_ident.cookie() << std::dec << dendl;
2317
2318 if (client_ident.addrs().empty() ||
2319 client_ident.addrs().front() == entity_addr_t()) {
2320 ldout(cct,5) << __func__ << " oops, client_ident.addrs() is empty" << dendl;
2321 return _fault(); // a v2 peer should never do this
2322 }
2323 if (!messenger->get_myaddrs().contains(client_ident.target_addr())) {
2324 ldout(cct,5) << __func__ << " peer is trying to reach "
2325 << client_ident.target_addr()
2326 << " which is not us (" << messenger->get_myaddrs() << ")"
2327 << dendl;
2328 return _fault();
2329 }
2330
2331 connection->set_peer_addrs(client_ident.addrs());
2332 connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
2333
2334 peer_name = entity_name_t(connection->get_peer_type(), client_ident.gid());
2335 connection->set_peer_id(client_ident.gid());
2336
2337 client_cookie = client_ident.cookie();
2338
2339 uint64_t feat_missing =
2340 (connection->policy.features_required | msgr2_required) &
2341 ~(uint64_t)client_ident.supported_features();
2342 if (feat_missing) {
2343 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
2344 << feat_missing << std::dec << dendl;
2345 auto ident_missing_features =
2346 IdentMissingFeaturesFrame::Encode(feat_missing);
2347
2348 return WRITE(ident_missing_features, "ident missing features", read_frame);
2349 }
2350
2351 connection_features =
2352 client_ident.supported_features() & connection->policy.features_supported;
2353
2354 peer_global_seq = client_ident.global_seq();
2355
2356 if (connection->policy.server &&
2357 connection->policy.lossy) {
2358 // incoming lossy client, no need to register this connection
2359 } else {
2360 // Looks good so far, let's check if there is already an existing connection
2361 // to this peer.
2362 connection->lock.unlock();
2363 AsyncConnectionRef existing = messenger->lookup_conn(
2364 *connection->peer_addrs);
2365
2366 if (existing &&
2367 existing->protocol->proto_type != 2) {
2368 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2369 << existing->protocol.get() << " version is "
2370 << existing->protocol->proto_type << ", marking down"
2371 << dendl;
2372 existing->mark_down();
2373 existing = nullptr;
2374 }
2375
2376 connection->inject_delay();
2377
2378 connection->lock.lock();
2379 if (state != SESSION_ACCEPTING) {
2380 ldout(cct, 1) << __func__
2381 << " state changed while accept, it must be mark_down"
2382 << dendl;
2383 ceph_assert(state == CLOSED);
2384 return _fault();
2385 }
2386
2387 if (existing) {
2388 return handle_existing_connection(existing);
2389 }
2390 }
2391
2392 // if everything is OK reply with server identification
2393 return send_server_ident();
2394 }
2395
2396 CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
2397 {
2398 ldout(cct, 20) << __func__
2399 << " payload.length()=" << payload.length() << dendl;
2400
2401 if (state != SESSION_ACCEPTING) {
2402 lderr(cct) << __func__ << " not in session accept state!" << dendl;
2403 return _fault();
2404 }
2405
2406 auto reconnect = ReconnectFrame::Decode(payload);
2407
2408 ldout(cct, 5) << __func__
2409 << " received reconnect:"
2410 << " client_cookie=" << std::hex << reconnect.client_cookie()
2411 << " server_cookie=" << reconnect.server_cookie() << std::dec
2412 << " gs=" << reconnect.global_seq()
2413 << " cs=" << reconnect.connect_seq()
2414 << " ms=" << reconnect.msg_seq()
2415 << dendl;
2416
2417 // Should we check if one of the ident.addrs match connection->target_addr
2418 // as we do in ProtocolV1?
2419 connection->set_peer_addrs(reconnect.addrs());
2420 connection->target_addr = connection->_infer_target_addr(reconnect.addrs());
2421 peer_global_seq = reconnect.global_seq();
2422
2423 connection->lock.unlock();
2424 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2425
2426 if (existing &&
2427 existing->protocol->proto_type != 2) {
2428 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2429 << existing->protocol.get() << " version is "
2430 << existing->protocol->proto_type << ", marking down" << dendl;
2431 existing->mark_down();
2432 existing = nullptr;
2433 }
2434
2435 connection->inject_delay();
2436
2437 connection->lock.lock();
2438 if (state != SESSION_ACCEPTING) {
2439 ldout(cct, 1) << __func__
2440 << " state changed while accept, it must be mark_down"
2441 << dendl;
2442 ceph_assert(state == CLOSED);
2443 return _fault();
2444 }
2445
2446 if (!existing) {
2447 // there is no existing connection therefore cannot reconnect to previous
2448 // session
2449 ldout(cct, 0) << __func__
2450 << " no existing connection exists, reseting client" << dendl;
2451 auto reset = ResetFrame::Encode(true);
2452 return WRITE(reset, "session reset", read_frame);
2453 }
2454
2455 std::lock_guard<std::mutex> l(existing->lock);
2456
2457 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2458 if (!exproto) {
2459 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2460 ceph_assert(false);
2461 }
2462
2463 if (exproto->state == CLOSED) {
2464 ldout(cct, 5) << __func__ << " existing " << existing
2465 << " already closed. Reseting client" << dendl;
2466 auto reset = ResetFrame::Encode(true);
2467 return WRITE(reset, "session reset", read_frame);
2468 }
2469
2470 if (exproto->replacing) {
2471 ldout(cct, 1) << __func__
2472 << " existing racing replace happened while replacing."
2473 << " existing=" << existing << dendl;
2474 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2475 return WRITE(retry, "session retry", read_frame);
2476 }
2477
2478 if (exproto->client_cookie != reconnect.client_cookie()) {
2479 ldout(cct, 1) << __func__ << " existing=" << existing
2480 << " client cookie mismatch, I must have reseted:"
2481 << " cc=" << std::hex << exproto->client_cookie
2482 << " rcc=" << reconnect.client_cookie()
2483 << ", reseting client." << std::dec
2484 << dendl;
2485 auto reset = ResetFrame::Encode(connection->policy.resetcheck);
2486 return WRITE(reset, "session reset", read_frame);
2487 } else if (exproto->server_cookie == 0) {
2488 // this happens when:
2489 // - a connects to b
2490 // - a sends client_ident
2491 // - b gets client_ident, sends server_ident and sets cookie X
2492 // - connection fault
2493 // - b reconnects to a with cookie X, connect_seq=1
2494 // - a has cookie==0
2495 ldout(cct, 1) << __func__ << " I was a client and didn't received the"
2496 << " server_ident. Asking peer to resume session"
2497 << " establishment" << dendl;
2498 auto reset = ResetFrame::Encode(false);
2499 return WRITE(reset, "session reset", read_frame);
2500 }
2501
2502 if (exproto->peer_global_seq > reconnect.global_seq()) {
2503 ldout(cct, 5) << __func__
2504 << " stale global_seq: sgs=" << exproto->peer_global_seq
2505 << " cgs=" << reconnect.global_seq()
2506 << ", ask client to retry global" << dendl;
2507 auto retry = RetryGlobalFrame::Encode(exproto->peer_global_seq);
2508
2509 INTERCEPT(18);
2510
2511 return WRITE(retry, "session retry", read_frame);
2512 }
2513
2514 if (exproto->connect_seq > reconnect.connect_seq()) {
2515 ldout(cct, 5) << __func__
2516 << " stale connect_seq scs=" << exproto->connect_seq
2517 << " ccs=" << reconnect.connect_seq()
2518 << " , ask client to retry" << dendl;
2519 auto retry = RetryFrame::Encode(exproto->connect_seq);
2520 return WRITE(retry, "session retry", read_frame);
2521 }
2522
2523 if (exproto->connect_seq == reconnect.connect_seq()) {
2524 // reconnect race: both peers are sending reconnect messages
2525 if (existing->peer_addrs->msgr2_addr() >
2526 messenger->get_myaddrs().msgr2_addr() &&
2527 !existing->policy.server) {
2528 // the existing connection wins
2529 ldout(cct, 1)
2530 << __func__
2531 << " reconnect race detected, this connection loses to existing="
2532 << existing << dendl;
2533
2534 auto wait = WaitFrame::Encode();
2535 return WRITE(wait, "wait", read_frame);
2536 } else {
2537 // this connection wins
2538 ldout(cct, 1) << __func__
2539 << " reconnect race detected, replacing existing="
2540 << existing << " socket by this connection's socket"
2541 << dendl;
2542 }
2543 }
2544
2545 ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
2546
2547 reconnecting = true;
2548
2549 // everything looks good
2550 exproto->connect_seq = reconnect.connect_seq();
2551 exproto->message_seq = reconnect.msg_seq();
2552
2553 return reuse_connection(existing, exproto);
2554 }
2555
2556 CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
2557 ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
2558
2559 std::lock_guard<std::mutex> l(existing->lock);
2560
2561 ProtocolV2 *exproto = dynamic_cast<ProtocolV2 *>(existing->protocol.get());
2562 if (!exproto) {
2563 ldout(cct, 1) << __func__ << " existing=" << existing << dendl;
2564 ceph_assert(false);
2565 }
2566
2567 if (exproto->state == CLOSED) {
2568 ldout(cct, 1) << __func__ << " existing " << existing << " already closed."
2569 << dendl;
2570 return send_server_ident();
2571 }
2572
2573 if (exproto->replacing) {
2574 ldout(cct, 1) << __func__
2575 << " existing racing replace happened while replacing."
2576 << " existing=" << existing << dendl;
2577 auto wait = WaitFrame::Encode();
2578 return WRITE(wait, "wait", read_frame);
2579 }
2580
2581 if (exproto->peer_global_seq > peer_global_seq) {
2582 ldout(cct, 1) << __func__ << " this is a stale connection, peer_global_seq="
2583 << peer_global_seq
2584 << " existing->peer_global_seq=" << exproto->peer_global_seq
2585 << ", stopping this connection." << dendl;
2586 stop();
2587 connection->dispatch_queue->queue_reset(connection);
2588 return nullptr;
2589 }
2590
2591 if (existing->policy.lossy) {
2592 // existing connection can be thrown out in favor of this one
2593 ldout(cct, 1)
2594 << __func__ << " existing=" << existing
2595 << " is a lossy channel. Stopping existing in favor of this connection"
2596 << dendl;
2597 existing->protocol->stop();
2598 existing->dispatch_queue->queue_reset(existing.get());
2599 return send_server_ident();
2600 }
2601
2602 if (exproto->server_cookie && exproto->client_cookie &&
2603 exproto->client_cookie != client_cookie) {
2604 // Found previous session
2605 // peer has reseted and we're going to reuse the existing connection
2606 // by replacing the communication socket
2607 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2608 << ", peer must have reseted." << dendl;
2609 if (connection->policy.resetcheck) {
2610 exproto->reset_session();
2611 }
2612 return reuse_connection(existing, exproto);
2613 }
2614
2615 if (exproto->client_cookie == client_cookie) {
2616 // session establishment interrupted between client_ident and server_ident,
2617 // continuing...
2618 ldout(cct, 1) << __func__ << " found previous session existing=" << existing
2619 << ", continuing session establishment." << dendl;
2620 return reuse_connection(existing, exproto);
2621 }
2622
2623 if (exproto->state == READY || exproto->state == STANDBY) {
2624 ldout(cct, 1) << __func__ << " existing=" << existing
2625 << " is READY/STANDBY, lets reuse it" << dendl;
2626 return reuse_connection(existing, exproto);
2627 }
2628
2629 // Looks like a connection race: server and client are both connecting to
2630 // each other at the same time.
2631 if (connection->peer_addrs->msgr2_addr() <
2632 messenger->get_myaddrs().msgr2_addr() ||
2633 existing->policy.server) {
2634 // this connection wins
2635 ldout(cct, 1) << __func__
2636 << " connection race detected, replacing existing="
2637 << existing << " socket by this connection's socket" << dendl;
2638 return reuse_connection(existing, exproto);
2639 } else {
2640 // the existing connection wins
2641 ldout(cct, 1)
2642 << __func__
2643 << " connection race detected, this connection loses to existing="
2644 << existing << dendl;
2645 ceph_assert(connection->peer_addrs->msgr2_addr() >
2646 messenger->get_myaddrs().msgr2_addr());
2647
2648 // make sure we follow through with opening the existing
2649 // connection (if it isn't yet open) since we know the peer
2650 // has something to send to us.
2651 existing->send_keepalive();
2652 auto wait = WaitFrame::Encode();
2653 return WRITE(wait, "wait", read_frame);
2654 }
2655 }
2656
2657 CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
2658 ProtocolV2 *exproto) {
2659 ldout(cct, 20) << __func__ << " existing=" << existing
2660 << " reconnect=" << reconnecting << dendl;
2661
2662 connection->inject_delay();
2663
2664 std::lock_guard<std::mutex> l(existing->write_lock);
2665
2666 connection->center->delete_file_event(connection->cs.fd(),
2667 EVENT_READABLE | EVENT_WRITABLE);
2668
2669 if (existing->delay_state) {
2670 existing->delay_state->flush();
2671 ceph_assert(!connection->delay_state);
2672 }
2673 exproto->reset_recv_state();
2674 exproto->pre_auth.enabled = false;
2675
2676 if (!reconnecting) {
2677 exproto->client_cookie = client_cookie;
2678 exproto->peer_name = peer_name;
2679 exproto->connection_features = connection_features;
2680 existing->set_features(connection_features);
2681 }
2682 exproto->peer_global_seq = peer_global_seq;
2683
2684 auto temp_cs = std::move(connection->cs);
2685 EventCenter *new_center = connection->center;
2686 Worker *new_worker = connection->worker;
2687
2688 ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
2689 << dendl;
2690
2691 std::swap(exproto->session_stream_handlers, session_stream_handlers);
2692 exproto->auth_meta = auth_meta;
2693
2694 // avoid _stop shutdown replacing socket
2695 // queue a reset on the new connection, which we're dumping for the old
2696 stop();
2697
2698 connection->dispatch_queue->queue_reset(connection);
2699
2700 exproto->can_write = false;
2701 exproto->write_in_progress = false;
2702 exproto->reconnecting = reconnecting;
2703 exproto->replacing = true;
2704 existing->state_offset = 0;
2705 // avoid previous thread modify event
2706 exproto->state = NONE;
2707 existing->state = AsyncConnection::STATE_NONE;
2708 // Discard existing prefetch buffer in `recv_buf`
2709 existing->recv_start = existing->recv_end = 0;
2710 // there shouldn't exist any buffer
2711 ceph_assert(connection->recv_start == connection->recv_end);
2712
2713 auto deactivate_existing = std::bind(
2714 [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
2715 // we need to delete time event in original thread
2716 {
2717 std::lock_guard<std::mutex> l(existing->lock);
2718 existing->write_lock.lock();
2719 exproto->requeue_sent();
2720 existing->outgoing_bl.clear();
2721 existing->open_write = false;
2722 existing->write_lock.unlock();
2723 if (exproto->state == NONE) {
2724 existing->shutdown_socket();
2725 existing->cs = std::move(cs);
2726 existing->worker->references--;
2727 new_worker->references++;
2728 existing->logger = new_worker->get_perf_counter();
2729 existing->worker = new_worker;
2730 existing->center = new_center;
2731 if (existing->delay_state)
2732 existing->delay_state->set_center(new_center);
2733 } else if (exproto->state == CLOSED) {
2734 auto back_to_close = std::bind(
2735 [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
2736 new_center->submit_to(new_center->get_id(),
2737 std::move(back_to_close), true);
2738 return;
2739 } else {
2740 ceph_abort();
2741 }
2742 }
2743
2744 // Before changing existing->center, it may already exists some
2745 // events in existing->center's queue. Then if we mark down
2746 // `existing`, it will execute in another thread and clean up
2747 // connection. Previous event will result in segment fault
2748 auto transfer_existing = [existing, exproto]() mutable {
2749 std::lock_guard<std::mutex> l(existing->lock);
2750 if (exproto->state == CLOSED) return;
2751 ceph_assert(exproto->state == NONE);
2752
2753 exproto->state = SESSION_ACCEPTING;
2754 // we have called shutdown_socket above
2755 ceph_assert(existing->last_tick_id == 0);
2756 // restart timer since we are going to re-build connection
2757 existing->last_connect_started = ceph::coarse_mono_clock::now();
2758 existing->last_tick_id = existing->center->create_time_event(
2759 existing->connect_timeout_us, existing->tick_handler);
2760 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2761 existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
2762 existing->read_handler);
2763 if (!exproto->reconnecting) {
2764 exproto->run_continuation(exproto->send_server_ident());
2765 } else {
2766 exproto->run_continuation(exproto->send_reconnect_ok());
2767 }
2768 };
2769 if (existing->center->in_thread())
2770 transfer_existing();
2771 else
2772 existing->center->submit_to(existing->center->get_id(),
2773 std::move(transfer_existing), true);
2774 },
2775 std::move(temp_cs));
2776
2777 existing->center->submit_to(existing->center->get_id(),
2778 std::move(deactivate_existing), true);
2779 return nullptr;
2780 }
2781
2782 CtPtr ProtocolV2::send_server_ident() {
(1) Event cond_true: |
Condition "should_gather", taking true branch. |
2783 ldout(cct, 20) << __func__ << dendl;
2784
2785 // this is required for the case when this connection is being replaced
2786 out_seq = discard_requeued_up_to(out_seq, 0);
2787 in_seq = 0;
2788
(2) Event cond_true: |
Condition "!this->connection->policy.lossy", taking true branch. |
2789 if (!connection->policy.lossy) {
2790 server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
2791 }
2792
2793 uint64_t flags = 0;
(3) Event cond_false: |
Condition "this->connection->policy.lossy", taking false branch. |
2794 if (connection->policy.lossy) {
2795 flags = flags | CEPH_MSG_CONNECT_LOSSY;
(4) Event if_end: |
End of if statement. |
2796 }
2797
2798 uint64_t gs = messenger->get_global_seq();
2799 auto server_ident = ServerIdentFrame::Encode(
2800 messenger->get_myaddrs(),
2801 messenger->get_myname().num(),
2802 gs,
2803 connection->policy.features_supported,
2804 connection->policy.features_required | msgr2_required,
2805 flags,
2806 server_cookie);
2807
(5) Event cond_true: |
Condition "should_gather", taking true branch. |
2808 ldout(cct, 5) << __func__ << " sending identification:"
2809 << " addrs=" << messenger->get_myaddrs()
2810 << " gid=" << messenger->get_myname().num()
2811 << " global_seq=" << gs << " features_supported=" << std::hex
2812 << connection->policy.features_supported
2813 << " features_required="
2814 << (connection->policy.features_required | msgr2_required)
2815 << " flags=" << flags << " cookie=" << std::dec << server_cookie
2816 << dendl;
2817
2818 connection->lock.unlock();
2819 // Because "replacing" will prevent other connections preempt this addr,
2820 // it's safe that here we don't acquire Connection's lock
2821 ssize_t r = messenger->accept_conn(connection);
2822
2823 connection->inject_delay();
2824
(6) Event lock_acquire: |
Calling function "lock" acquires lock "this->connection->lock". [Note: The source code implementation of the function has been overridden by a builtin model.] |
Also see events: |
[sleep] |
2825 connection->lock.lock();
2826
(7) Event cond_false: |
Condition "r < 0", taking false branch. |
2827 if (r < 0) {
2828 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2829 << connection->peer_addrs->msgr2_addr()
2830 << " just fail later one(this)" << dendl;
2831 connection->inject_delay();
2832 return _fault();
(8) Event if_end: |
End of if statement. |
2833 }
(9) Event cond_true: |
Condition "this->state != ProtocolV2::SESSION_ACCEPTING", taking true branch. |
2834 if (state != SESSION_ACCEPTING) {
(10) Event cond_true: |
Condition "should_gather", taking true branch. |
2835 ldout(cct, 1) << __func__
2836 << " state changed while accept_conn, it must be mark_down"
2837 << dendl;
(11) Event cond_true: |
Condition "this->state == ProtocolV2::CLOSED", taking true branch. |
2838 ceph_assert(state == CLOSED || state == NONE);
2839 messenger->unregister_conn(connection);
(12) Event sleep: |
Call to "inject_delay" might sleep while holding lock "this->connection->lock". [details] |
Also see events: |
[lock_acquire] |
2840 connection->inject_delay();
2841 return _fault();
2842 }
2843
2844 connection->set_features(connection_features);
2845
2846 // notify
2847 connection->dispatch_queue->queue_accept(connection);
2848 messenger->ms_deliver_handle_fast_accept(connection);
2849
2850 INTERCEPT(12);
2851
2852 return WRITE(server_ident, "server ident", server_ready);
2853 }
2854
2855 CtPtr ProtocolV2::server_ready() {
2856 ldout(cct, 20) << __func__ << dendl;
2857
2858 if (connection->delay_state) {
2859 ceph_assert(connection->delay_state->ready());
2860 }
2861
2862 return ready();
2863 }
2864
2865 CtPtr ProtocolV2::send_reconnect_ok() {
2866 ldout(cct, 20) << __func__ << dendl;
2867
2868 out_seq = discard_requeued_up_to(out_seq, message_seq);
2869
2870 uint64_t ms = in_seq;
2871 auto reconnect_ok = ReconnectOkFrame::Encode(ms);
2872
2873 ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl;
2874
2875 connection->lock.unlock();
2876 // Because "replacing" will prevent other connections preempt this addr,
2877 // it's safe that here we don't acquire Connection's lock
2878 ssize_t r = messenger->accept_conn(connection);
2879
2880 connection->inject_delay();
2881
2882 connection->lock.lock();
2883
2884 if (r < 0) {
2885 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2886 << connection->peer_addrs->msgr2_addr()
2887 << " just fail later one(this)" << dendl;
2888 connection->inject_delay();
2889 return _fault();
2890 }
2891 if (state != SESSION_ACCEPTING) {
2892 ldout(cct, 1) << __func__
2893 << " state changed while accept_conn, it must be mark_down"
2894 << dendl;
2895 ceph_assert(state == CLOSED || state == NONE);
2896 messenger->unregister_conn(connection);
2897 connection->inject_delay();
2898 return _fault();
2899 }
2900
2901 // notify
2902 connection->dispatch_queue->queue_accept(connection);
2903 messenger->ms_deliver_handle_fast_accept(connection);
2904
2905 INTERCEPT(14);
2906
2907 return WRITE(reconnect_ok, "reconnect ok", server_ready);
2908 }
2909