1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ProtocolV1.h"
5
6 #include "common/errno.h"
7
8 #include "AsyncConnection.h"
9 #include "AsyncMessenger.h"
10 #include "common/EventTrace.h"
11 #include "include/random.h"
12 #include "auth/AuthClient.h"
13 #include "auth/AuthServer.h"
14
15 #define dout_subsys ceph_subsys_ms
16 #undef dout_prefix
17 #define dout_prefix _conn_prefix(_dout)
18 ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
19 return *_dout << "--1- " << messenger->get_myaddrs() << " >> "
20 << *connection->peer_addrs
21 << " conn("
22 << connection << " " << this
23 << " :" << connection->port << " s=" << get_state_name(state)
24 << " pgs=" << peer_global_seq << " cs=" << connect_seq
25 << " l=" << connection->policy.lossy << ").";
26 }
27
28 #define WRITE(B, C) write(CONTINUATION(C), B)
29
30 #define READ(L, C) read(CONTINUATION(C), L)
31
32 #define READB(L, B, C) read(CONTINUATION(C), L, B)
33
34 // Constant to limit starting sequence number to 2^31. Nothing special about
35 // it, just a big number. PLR
36 #define SEQ_MASK 0x7fffffff
37
38 const int ASYNC_COALESCE_THRESHOLD = 256;
39
40 using namespace std;
41
42 static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
43 // create a buffer to read into that matches the data alignment
44 unsigned alloc_len = 0;
45 unsigned left = len;
46 unsigned head = 0;
47 if (off & ~CEPH_PAGE_MASK) {
48 // head
49 alloc_len += CEPH_PAGE_SIZE;
50 head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
51 left -= head;
52 }
53 alloc_len += left;
54 bufferptr ptr(buffer::create_small_page_aligned(alloc_len));
55 if (head) ptr.set_offset(CEPH_PAGE_SIZE - head);
56 data.push_back(std::move(ptr));
57 }
58
59 /**
60 * Protocol V1
61 **/
62
63 ProtocolV1::ProtocolV1(AsyncConnection *connection)
64 : Protocol(1, connection),
65 temp_buffer(nullptr),
66 can_write(WriteStatus::NOWRITE),
67 keepalive(false),
68 connect_seq(0),
69 peer_global_seq(0),
70 msg_left(0),
71 cur_msg_size(0),
72 replacing(false),
73 is_reset_from_peer(false),
74 once_ready(false),
75 state(NONE),
76 global_seq(0),
77 wait_for_seq(false) {
78 temp_buffer = new char[4096];
79 }
80
81 ProtocolV1::~ProtocolV1() {
82 ceph_assert(out_q.empty());
83 ceph_assert(sent.empty());
84
85 delete[] temp_buffer;
86 }
87
88 void ProtocolV1::connect() {
89 this->state = START_CONNECT;
90
91 // reset connect state variables
92 authorizer_buf.clear();
93 memset(&connect_msg, 0, sizeof(connect_msg));
94 memset(&connect_reply, 0, sizeof(connect_reply));
95
96 global_seq = messenger->get_global_seq();
97 }
98
99 void ProtocolV1::accept() { this->state = START_ACCEPT; }
100
101 bool ProtocolV1::is_connected() {
102 return can_write.load() == WriteStatus::CANWRITE;
103 }
104
105 void ProtocolV1::stop() {
106 ldout(cct, 20) << __func__ << dendl;
107 if (state == CLOSED) {
108 return;
109 }
110
111 if (connection->delay_state) connection->delay_state->flush();
112
113 ldout(cct, 2) << __func__ << dendl;
114 std::lock_guard<std::mutex> l(connection->write_lock);
115
116 reset_recv_state();
117 discard_out_queue();
118
119 connection->_stop();
120
121 can_write = WriteStatus::CLOSED;
122 state = CLOSED;
123 }
124
125 void ProtocolV1::fault() {
126 ldout(cct, 20) << __func__ << dendl;
127
128 if (state == CLOSED || state == NONE) {
129 ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
130 return;
131 }
132
133 if (connection->policy.lossy && state != START_CONNECT &&
134 state != CONNECTING) {
135 ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
136 stop();
137 connection->dispatch_queue->queue_reset(connection);
138 return;
139 }
140
141 connection->write_lock.lock();
142 can_write = WriteStatus::NOWRITE;
143 is_reset_from_peer = false;
144
145 // requeue sent items
146 requeue_sent();
147
148 if (!once_ready && out_q.empty() && state >= START_ACCEPT &&
149 state <= ACCEPTING_WAIT_CONNECT_MSG_AUTH && !replacing) {
150 ldout(cct, 10) << __func__ << " with nothing to send and in the half "
151 << " accept state just closed" << dendl;
152 connection->write_lock.unlock();
153 stop();
154 connection->dispatch_queue->queue_reset(connection);
155 return;
156 }
157 replacing = false;
158
159 connection->fault();
160
161 reset_recv_state();
162
163 if (connection->policy.standby && out_q.empty() && !keepalive &&
164 state != WAIT) {
165 ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
166 << dendl;
167 state = STANDBY;
168 connection->write_lock.unlock();
169 return;
170 }
171
172 connection->write_lock.unlock();
173
174 if ((state >= START_CONNECT && state <= CONNECTING_SEND_CONNECT_MSG) ||
175 state == WAIT) {
176 // backoff!
177 if (state == WAIT) {
178 backoff.set_from_double(cct->_conf->ms_max_backoff);
179 } else if (backoff == utime_t()) {
180 backoff.set_from_double(cct->_conf->ms_initial_backoff);
181 } else {
182 backoff += backoff;
183 if (backoff > cct->_conf->ms_max_backoff)
184 backoff.set_from_double(cct->_conf->ms_max_backoff);
185 }
186
187 global_seq = messenger->get_global_seq();
188 state = START_CONNECT;
189 connection->state = AsyncConnection::STATE_CONNECTING;
190 ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
191 // woke up again;
192 connection->register_time_events.insert(
193 connection->center->create_time_event(backoff.to_nsec() / 1000,
194 connection->wakeup_handler));
195 } else {
196 // policy maybe empty when state is in accept
197 if (connection->policy.server) {
198 ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
199 state = STANDBY;
200 } else {
201 ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
202 connect_seq++;
203 global_seq = messenger->get_global_seq();
204 state = START_CONNECT;
205 connection->state = AsyncConnection::STATE_CONNECTING;
206 }
207 backoff = utime_t();
208 connection->center->dispatch_event_external(connection->read_handler);
209 }
210 }
211
212 void ProtocolV1::send_message(Message *m) {
213 bufferlist bl;
214 uint64_t f = connection->get_features();
215
216 // TODO: Currently not all messages supports reencode like MOSDMap, so here
217 // only let fast dispatch support messages prepare message
218 bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
219 if (can_fast_prepare) {
220 prepare_send_message(f, m, bl);
221 }
222
223 std::lock_guard<std::mutex> l(connection->write_lock);
224 // "features" changes will change the payload encoding
225 if (can_fast_prepare &&
226 (can_write == WriteStatus::NOWRITE || connection->get_features() != f)) {
227 // ensure the correctness of message encoding
228 bl.clear();
229 m->clear_payload();
230 ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
231 << " != " << connection->get_features() << dendl;
232 }
233 if (can_write == WriteStatus::CLOSED) {
234 ldout(cct, 10) << __func__ << " connection closed."
235 << " Drop message " << m << dendl;
236 m->put();
237 } else {
238 m->queue_start = ceph::mono_clock::now();
239 m->trace.event("async enqueueing message");
240 out_q[m->get_priority()].emplace_back(std::move(bl), m);
241 ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
242 << dendl;
243 if (can_write != WriteStatus::REPLACING && !write_in_progress) {
244 write_in_progress = true;
245 connection->center->dispatch_event_external(connection->write_handler);
246 }
247 }
248 }
249
250 void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
251 bufferlist &bl) {
252 ldout(cct, 20) << __func__ << " m " << *m << dendl;
253
254 // associate message with Connection (for benefit of encode_payload)
255 ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
256 << features << " " << m << " " << *m << dendl;
257
258 // encode and copy out of *m
259 // in write_message we update header.seq and need recalc crc
260 // so skip calc header in encode function.
261 m->encode(features, messenger->crcflags, true);
262
263 bl.append(m->get_payload());
264 bl.append(m->get_middle());
265 bl.append(m->get_data());
266 }
267
268 void ProtocolV1::send_keepalive() {
269 ldout(cct, 10) << __func__ << dendl;
270 std::lock_guard<std::mutex> l(connection->write_lock);
271 if (can_write != WriteStatus::CLOSED) {
272 keepalive = true;
273 connection->center->dispatch_event_external(connection->write_handler);
274 }
275 }
276
277 void ProtocolV1::read_event() {
278 ldout(cct, 20) << __func__ << dendl;
279 switch (state) {
280 case START_CONNECT:
281 CONTINUATION_RUN(CONTINUATION(send_client_banner));
282 break;
283 case START_ACCEPT:
284 CONTINUATION_RUN(CONTINUATION(send_server_banner));
285 break;
286 case OPENED:
287 CONTINUATION_RUN(CONTINUATION(wait_message));
288 break;
289 case THROTTLE_MESSAGE:
290 CONTINUATION_RUN(CONTINUATION(throttle_message));
291 break;
292 case THROTTLE_BYTES:
293 CONTINUATION_RUN(CONTINUATION(throttle_bytes));
294 break;
295 case THROTTLE_DISPATCH_QUEUE:
296 CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue));
297 break;
298 default:
299 break;
300 }
301 }
302
303 void ProtocolV1::write_event() {
304 ldout(cct, 10) << __func__ << dendl;
305 ssize_t r = 0;
306
307 connection->write_lock.lock();
308 if (can_write == WriteStatus::CANWRITE) {
309 if (keepalive) {
310 append_keepalive_or_ack();
311 keepalive = false;
312 }
313
314 auto start = ceph::mono_clock::now();
315 bool more;
316 do {
317 bufferlist data;
318 Message *m = _get_next_outgoing(&data);
319 if (!m) {
320 break;
321 }
322
323 if (!connection->policy.lossy) {
324 // put on sent list
325 sent.push_back(m);
326 m->get();
327 }
328 more = !out_q.empty();
329 connection->write_lock.unlock();
330
331 // send_message or requeue messages may not encode message
332 if (!data.length()) {
333 prepare_send_message(connection->get_features(), m, data);
334 }
335
336 if (m->queue_start != ceph::mono_time()) {
337 connection->logger->tinc(l_msgr_send_messages_queue_lat,
338 ceph::mono_clock::now() - m->queue_start);
339 }
340
341 r = write_message(m, data, more);
342
343 connection->write_lock.lock();
344 if (r == 0) {
345 ;
346 } else if (r < 0) {
347 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
348 break;
349 } else if (r > 0) {
350 // Outbound message in-progress, thread will be re-awoken
351 // when the outbound socket is writeable again
352 break;
353 }
354 } while (can_write == WriteStatus::CANWRITE);
355 write_in_progress = false;
356 connection->write_lock.unlock();
357
358 // if r > 0 mean data still lefted, so no need _try_send.
359 if (r == 0) {
360 uint64_t left = ack_left;
361 if (left) {
362 ceph_le64 s;
363 s = in_seq;
364 connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK);
365 connection->outgoing_bl.append((char *)&s, sizeof(s));
366 ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
367 << " messages" << dendl;
368 ack_left -= left;
369 left = ack_left;
370 r = connection->_try_send(left);
371 } else if (is_queued()) {
372 r = connection->_try_send();
373 }
374 }
375
376 connection->logger->tinc(l_msgr_running_send_time,
377 ceph::mono_clock::now() - start);
378 if (r < 0) {
379 ldout(cct, 1) << __func__ << " send msg failed" << dendl;
380 connection->lock.lock();
381 fault();
382 connection->lock.unlock();
383 return;
384 }
385 } else {
386 write_in_progress = false;
387 connection->write_lock.unlock();
388 connection->lock.lock();
389 connection->write_lock.lock();
390 if (state == STANDBY && !connection->policy.server && is_queued()) {
391 ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
392 connection->_connect();
393 } else if (connection->cs && state != NONE && state != CLOSED &&
394 state != START_CONNECT) {
395 r = connection->_try_send();
396 if (r < 0) {
397 ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
398 connection->write_lock.unlock();
399 fault();
400 connection->lock.unlock();
401 return;
402 }
403 }
404 connection->write_lock.unlock();
405 connection->lock.unlock();
406 }
407 }
408
409 bool ProtocolV1::is_queued() {
410 return !out_q.empty() || connection->is_queued();
411 }
412
413 void ProtocolV1::run_continuation(CtPtr pcontinuation) {
414 if (pcontinuation) {
415 CONTINUATION_RUN(*pcontinuation);
416 }
417 }
418
419 CtPtr ProtocolV1::read(CONTINUATION_RX_TYPE<ProtocolV1> &next,
420 int len, char *buffer) {
421 if (!buffer) {
422 buffer = temp_buffer;
423 }
424 ssize_t r = connection->read(len, buffer,
425 [&next, this](char *buffer, int r) {
426 next.setParams(buffer, r);
427 CONTINUATION_RUN(next);
428 });
429 if (r <= 0) {
430 next.setParams(buffer, r);
431 return &next;
432 }
433
434 return nullptr;
435 }
436
437 CtPtr ProtocolV1::write(CONTINUATION_TX_TYPE<ProtocolV1> &next,
438 bufferlist &buffer) {
439 ssize_t r = connection->write(buffer, [&next, this](int r) {
440 next.setParams(r);
441 CONTINUATION_RUN(next);
442 });
443 if (r <= 0) {
444 next.setParams(r);
445 return &next;
446 }
447
448 return nullptr;
449 }
450
451 CtPtr ProtocolV1::ready() {
452 ldout(cct, 25) << __func__ << dendl;
453
454 // make sure no pending tick timer
455 if (connection->last_tick_id) {
456 connection->center->delete_time_event(connection->last_tick_id);
457 }
458 connection->last_tick_id = connection->center->create_time_event(
459 connection->inactive_timeout_us, connection->tick_handler);
460
461 connection->write_lock.lock();
462 can_write = WriteStatus::CANWRITE;
463 if (is_queued()) {
464 connection->center->dispatch_event_external(connection->write_handler);
465 }
466 connection->write_lock.unlock();
467 connection->maybe_start_delay_thread();
468
469 state = OPENED;
470 return wait_message();
471 }
472
473 CtPtr ProtocolV1::wait_message() {
474 if (state != OPENED) { // must have changed due to a replace
475 return nullptr;
476 }
477
478 ldout(cct, 20) << __func__ << dendl;
479
480 return READ(sizeof(char), handle_message);
481 }
482
483 CtPtr ProtocolV1::handle_message(char *buffer, int r) {
484 ldout(cct, 20) << __func__ << " r=" << r << dendl;
485
486 if (r < 0) {
487 ldout(cct, 1) << __func__ << " read tag failed" << dendl;
488 return _fault();
489 }
490
491 char tag = buffer[0];
492 ldout(cct, 20) << __func__ << " process tag " << (int)tag << dendl;
493
494 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
495 ldout(cct, 20) << __func__ << " got KEEPALIVE" << dendl;
496 connection->set_last_keepalive(ceph_clock_now());
497 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
498 return READ(sizeof(ceph_timespec), handle_keepalive2);
499 } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
500 return READ(sizeof(ceph_timespec), handle_keepalive2_ack);
501 } else if (tag == CEPH_MSGR_TAG_ACK) {
502 return READ(sizeof(ceph_le64), handle_tag_ack);
503 } else if (tag == CEPH_MSGR_TAG_MSG) {
504 recv_stamp = ceph_clock_now();
505 ldout(cct, 20) << __func__ << " begin MSG" << dendl;
506 return READ(sizeof(ceph_msg_header), handle_message_header);
507 } else if (tag == CEPH_MSGR_TAG_CLOSE) {
508 ldout(cct, 20) << __func__ << " got CLOSE" << dendl;
509 stop();
510 } else {
511 ldout(cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
512 return _fault();
513 }
514 return nullptr;
515 }
516
517 CtPtr ProtocolV1::handle_keepalive2(char *buffer, int r) {
518 ldout(cct, 20) << __func__ << " r=" << r << dendl;
519
520 if (r < 0) {
521 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
522 return _fault();
523 }
524
525 ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
526
527 ceph_timespec *t;
528 t = (ceph_timespec *)buffer;
529 utime_t kp_t = utime_t(*t);
530 connection->write_lock.lock();
531 append_keepalive_or_ack(true, &kp_t);
532 connection->write_lock.unlock();
533
534 ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
535 connection->set_last_keepalive(ceph_clock_now());
536
537 if (is_connected()) {
538 connection->center->dispatch_event_external(connection->write_handler);
539 }
540
541 return CONTINUE(wait_message);
542 }
543
544 void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
545 ldout(cct, 10) << __func__ << dendl;
546 if (ack) {
547 ceph_assert(tp);
548 struct ceph_timespec ts;
549 tp->encode_timeval(&ts);
550 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
551 connection->outgoing_bl.append((char *)&ts, sizeof(ts));
552 } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
553 struct ceph_timespec ts;
554 utime_t t = ceph_clock_now();
555 t.encode_timeval(&ts);
556 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
557 connection->outgoing_bl.append((char *)&ts, sizeof(ts));
558 } else {
559 connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
560 }
561 }
562
563 CtPtr ProtocolV1::handle_keepalive2_ack(char *buffer, int r) {
564 ldout(cct, 20) << __func__ << " r=" << r << dendl;
565
566 if (r < 0) {
567 ldout(cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
568 return _fault();
569 }
570
571 ceph_timespec *t;
572 t = (ceph_timespec *)buffer;
573 connection->set_last_keepalive_ack(utime_t(*t));
574 ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
575
576 return CONTINUE(wait_message);
577 }
578
579 CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
580 ldout(cct, 20) << __func__ << " r=" << r << dendl;
581
582 if (r < 0) {
583 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
584 return _fault();
585 }
586
587 ceph_le64 seq;
588 seq = *(ceph_le64 *)buffer;
589 ldout(cct, 20) << __func__ << " got ACK" << dendl;
590
591 ldout(cct, 15) << __func__ << " got ack seq " << seq << dendl;
592 // trim sent list
593 static const int max_pending = 128;
594 int i = 0;
595 auto now = ceph::mono_clock::now();
596 Message *pending[max_pending];
597 connection->write_lock.lock();
598 while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
599 Message *m = sent.front();
600 sent.pop_front();
601 pending[i++] = m;
602 ldout(cct, 10) << __func__ << " got ack seq " << seq
603 << " >= " << m->get_seq() << " on " << m << " " << *m
604 << dendl;
605 }
606 connection->write_lock.unlock();
607 connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
608 for (int k = 0; k < i; k++) {
609 pending[k]->put();
610 }
611
612 return CONTINUE(wait_message);
613 }
614
615 CtPtr ProtocolV1::handle_message_header(char *buffer, int r) {
616 ldout(cct, 20) << __func__ << " r=" << r << dendl;
617
618 if (r < 0) {
619 ldout(cct, 1) << __func__ << " read message header failed" << dendl;
620 return _fault();
621 }
622
623 ldout(cct, 20) << __func__ << " got MSG header" << dendl;
624
625 current_header = *((ceph_msg_header *)buffer);
626
627 ldout(cct, 20) << __func__ << " got envelope type=" << current_header.type << " src "
628 << entity_name_t(current_header.src) << " front=" << current_header.front_len
629 << " data=" << current_header.data_len << " off " << current_header.data_off
630 << dendl;
631
632 if (messenger->crcflags & MSG_CRC_HEADER) {
633 __u32 header_crc = 0;
634 header_crc = ceph_crc32c(0, (unsigned char *)¤t_header,
635 sizeof(current_header) - sizeof(current_header.crc));
636 // verify header crc
637 if (header_crc != current_header.crc) {
638 ldout(cct, 0) << __func__ << " got bad header crc " << header_crc
639 << " != " << current_header.crc << dendl;
640 return _fault();
641 }
642 }
643
644 // Reset state
645 data_buf.clear();
646 front.clear();
647 middle.clear();
648 data.clear();
649
650 state = THROTTLE_MESSAGE;
651 return CONTINUE(throttle_message);
652 }
653
654 CtPtr ProtocolV1::throttle_message() {
655 ldout(cct, 20) << __func__ << dendl;
656
657 if (connection->policy.throttler_messages) {
658 ldout(cct, 10) << __func__ << " wants " << 1
659 << " message from policy throttler "
660 << connection->policy.throttler_messages->get_current()
661 << "/" << connection->policy.throttler_messages->get_max()
662 << dendl;
663 if (!connection->policy.throttler_messages->get_or_fail()) {
664 ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
665 << connection->policy.throttler_messages->get_current()
666 << "/" << connection->policy.throttler_messages->get_max()
667 << " failed, just wait." << dendl;
668 // following thread pool deal with th full message queue isn't a
669 // short time, so we can wait a ms.
670 if (connection->register_time_events.empty()) {
671 connection->register_time_events.insert(
672 connection->center->create_time_event(1000,
673 connection->wakeup_handler));
674 }
675 return nullptr;
676 }
677 }
678
679 state = THROTTLE_BYTES;
680 return CONTINUE(throttle_bytes);
681 }
682
683 CtPtr ProtocolV1::throttle_bytes() {
684 ldout(cct, 20) << __func__ << dendl;
685
686 cur_msg_size = current_header.front_len + current_header.middle_len +
687 current_header.data_len;
688 if (cur_msg_size) {
689 if (connection->policy.throttler_bytes) {
690 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
691 << " bytes from policy throttler "
692 << connection->policy.throttler_bytes->get_current() << "/"
693 << connection->policy.throttler_bytes->get_max() << dendl;
694 if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
695 ldout(cct, 10) << __func__ << " wants " << cur_msg_size
696 << " bytes from policy throttler "
697 << connection->policy.throttler_bytes->get_current()
698 << "/" << connection->policy.throttler_bytes->get_max()
699 << " failed, just wait." << dendl;
700 // following thread pool deal with th full message queue isn't a
701 // short time, so we can wait a ms.
702 if (connection->register_time_events.empty()) {
703 connection->register_time_events.insert(
704 connection->center->create_time_event(
705 1000, connection->wakeup_handler));
706 }
707 return nullptr;
708 }
709 }
710 }
711
712 state = THROTTLE_DISPATCH_QUEUE;
713 return CONTINUE(throttle_dispatch_queue);
714 }
715
716 CtPtr ProtocolV1::throttle_dispatch_queue() {
717 ldout(cct, 20) << __func__ << dendl;
718
719 if (cur_msg_size) {
720 if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
721 cur_msg_size)) {
722 ldout(cct, 10)
723 << __func__ << " wants " << cur_msg_size
724 << " bytes from dispatch throttle "
725 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
726 << connection->dispatch_queue->dispatch_throttler.get_max()
727 << " failed, just wait." << dendl;
728 // following thread pool deal with th full message queue isn't a
729 // short time, so we can wait a ms.
730 if (connection->register_time_events.empty()) {
731 connection->register_time_events.insert(
732 connection->center->create_time_event(1000,
733 connection->wakeup_handler));
734 }
735 return nullptr;
736 }
737 }
738
739 throttle_stamp = ceph_clock_now();
740
741 state = READ_MESSAGE_FRONT;
742 return read_message_front();
743 }
744
745 CtPtr ProtocolV1::read_message_front() {
746 ldout(cct, 20) << __func__ << dendl;
747
748 unsigned front_len = current_header.front_len;
749 if (front_len) {
750 if (!front.length()) {
751 front.push_back(buffer::create(front_len));
752 }
753 return READB(front_len, front.c_str(), handle_message_front);
754 }
755 return read_message_middle();
756 }
757
758 CtPtr ProtocolV1::handle_message_front(char *buffer, int r) {
759 ldout(cct, 20) << __func__ << " r=" << r << dendl;
760
761 if (r < 0) {
762 ldout(cct, 1) << __func__ << " read message front failed" << dendl;
763 return _fault();
764 }
765
766 ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
767
768 return read_message_middle();
769 }
770
771 CtPtr ProtocolV1::read_message_middle() {
772 ldout(cct, 20) << __func__ << dendl;
773
774 if (current_header.middle_len) {
775 if (!middle.length()) {
776 middle.push_back(buffer::create(current_header.middle_len));
777 }
778 return READB(current_header.middle_len, middle.c_str(),
779 handle_message_middle);
780 }
781
782 return read_message_data_prepare();
783 }
784
785 CtPtr ProtocolV1::handle_message_middle(char *buffer, int r) {
786 ldout(cct, 20) << __func__ << " r" << r << dendl;
787
788 if (r < 0) {
789 ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
790 return _fault();
791 }
792
793 ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
794
795 return read_message_data_prepare();
796 }
797
798 CtPtr ProtocolV1::read_message_data_prepare() {
799 ldout(cct, 20) << __func__ << dendl;
800
801 unsigned data_len = current_header.data_len;
802 unsigned data_off = current_header.data_off;
803
804 if (data_len) {
805 // get a buffer
806 #if 0
807 // rx_buffers is broken by design... see
808 // http://tracker.ceph.com/issues/22480
809 map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
810 connection->rx_buffers.find(current_header.tid);
811 if (p != connection->rx_buffers.end()) {
812 ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second
813 << " at offset " << data_off << " len "
814 << p->second.first.length() << dendl;
815 data_buf = p->second.first;
816 // make sure it's big enough
817 if (data_buf.length() < data_len)
818 data_buf.push_back(buffer::create(data_len - data_buf.length()));
819 data_blp = data_buf.begin();
820 } else {
821 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
822 << data_off << dendl;
823 alloc_aligned_buffer(data_buf, data_len, data_off);
824 data_blp = data_buf.begin();
825 }
826 #else
827 ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
828 << data_off << dendl;
829 alloc_aligned_buffer(data_buf, data_len, data_off);
830 data_blp = data_buf.begin();
831 #endif
832 }
833
834 msg_left = data_len;
835
836 return CONTINUE(read_message_data);
837 }
838
839 CtPtr ProtocolV1::read_message_data() {
840 ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl;
841
842 if (msg_left > 0) {
843 bufferptr bp = data_blp.get_current_ptr();
844 unsigned read_len = std::min(bp.length(), msg_left);
845
846 return READB(read_len, bp.c_str(), handle_message_data);
847 }
848
849 return read_message_footer();
850 }
851
852 CtPtr ProtocolV1::handle_message_data(char *buffer, int r) {
853 ldout(cct, 20) << __func__ << " r=" << r << dendl;
854
855 if (r < 0) {
856 ldout(cct, 1) << __func__ << " read data error " << dendl;
857 return _fault();
858 }
859
860 bufferptr bp = data_blp.get_current_ptr();
861 unsigned read_len = std::min(bp.length(), msg_left);
862 ceph_assert(read_len <
863 static_cast<unsigned>(std::numeric_limits<int>::max()));
864 data_blp.advance(read_len);
865 data.append(bp, 0, read_len);
866 msg_left -= read_len;
867
868 return CONTINUE(read_message_data);
869 }
870
871 CtPtr ProtocolV1::read_message_footer() {
872 ldout(cct, 20) << __func__ << dendl;
873
874 state = READ_FOOTER_AND_DISPATCH;
875
876 unsigned len;
877 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
878 len = sizeof(ceph_msg_footer);
879 } else {
880 len = sizeof(ceph_msg_footer_old);
881 }
882
883 return READ(len, handle_message_footer);
884 }
885
886 CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
887 ldout(cct, 20) << __func__ << " r=" << r << dendl;
888
889 if (r < 0) {
890 ldout(cct, 1) << __func__ << " read footer data error " << dendl;
891 return _fault();
892 }
893
894 ceph_msg_footer footer;
895 ceph_msg_footer_old old_footer;
896
897 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
898 footer = *((ceph_msg_footer *)buffer);
899 } else {
900 old_footer = *((ceph_msg_footer_old *)buffer);
901 footer.front_crc = old_footer.front_crc;
902 footer.middle_crc = old_footer.middle_crc;
903 footer.data_crc = old_footer.data_crc;
904 footer.sig = 0;
905 footer.flags = old_footer.flags;
906 }
907
908 int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
909 ldout(cct, 10) << __func__ << " aborted = " << aborted << dendl;
910 if (aborted) {
911 ldout(cct, 0) << __func__ << " got " << front.length() << " + "
912 << middle.length() << " + " << data.length()
913 << " byte message.. ABORTED" << dendl;
914 return _fault();
915 }
916
917 ldout(cct, 20) << __func__ << " got " << front.length() << " + "
918 << middle.length() << " + " << data.length() << " byte message"
919 << dendl;
920 Message *message = decode_message(cct, messenger->crcflags, current_header,
921 footer, front, middle, data, connection);
922 if (!message) {
923 ldout(cct, 1) << __func__ << " decode message failed " << dendl;
924 return _fault();
925 }
926
927 //
928 // Check the signature if one should be present. A zero return indicates
929 // success. PLR
930 //
931
932 if (session_security.get() == NULL) {
933 ldout(cct, 10) << __func__ << " no session security set" << dendl;
934 } else {
935 if (session_security->check_message_signature(message)) {
936 ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
937 message->put();
938 return _fault();
939 }
940 }
941 message->set_byte_throttler(connection->policy.throttler_bytes);
942 message->set_message_throttler(connection->policy.throttler_messages);
943
944 // store reservation size in message, so we don't get confused
945 // by messages entering the dispatch queue through other paths.
946 message->set_dispatch_throttle_size(cur_msg_size);
947
948 message->set_recv_stamp(recv_stamp);
949 message->set_throttle_stamp(throttle_stamp);
950 message->set_recv_complete_stamp(ceph_clock_now());
951
952 // check received seq#. if it is old, drop the message.
953 // note that incoming messages may skip ahead. this is convenient for the
954 // client side queueing because messages can't be renumbered, but the (kernel)
955 // client will occasionally pull a message out of the sent queue to send
956 // elsewhere. in that case it doesn't matter if we "got" it or not.
957 uint64_t cur_seq = in_seq;
958 if (message->get_seq() <= cur_seq) {
959 ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
960 << " <= " << cur_seq << " " << message << " " << *message
961 << ", discarding" << dendl;
962 message->put();
963 if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
964 cct->_conf->ms_die_on_old_message) {
965 ceph_assert(0 == "old msgs despite reconnect_seq feature");
966 }
967 return nullptr;
968 }
969 if (message->get_seq() > cur_seq + 1) {
970 ldout(cct, 0) << __func__ << " missed message? skipped from seq "
971 << cur_seq << " to " << message->get_seq() << dendl;
972 if (cct->_conf->ms_die_on_skipped_message) {
973 ceph_assert(0 == "skipped incoming seq");
974 }
975 }
976
977 #if defined(WITH_EVENTTRACE)
978 if (message->get_type() == CEPH_MSG_OSD_OP ||
979 message->get_type() == CEPH_MSG_OSD_OPREPLY) {
980 utime_t ltt_processed_stamp = ceph_clock_now();
981 double usecs_elapsed =
982 ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
983 ostringstream buf;
984 if (message->get_type() == CEPH_MSG_OSD_OP)
985 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
986 false);
987 else
988 OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY",
989 false);
990 }
991 #endif
992
993 // note last received message.
994 in_seq = message->get_seq();
995 ldout(cct, 5) << " rx " << message->get_source() << " seq "
996 << message->get_seq() << " " << message << " " << *message
997 << dendl;
998
999 bool need_dispatch_writer = false;
1000 if (!connection->policy.lossy) {
1001 ack_left++;
1002 need_dispatch_writer = true;
1003 }
1004
1005 state = OPENED;
1006
1007 ceph::mono_time fast_dispatch_time;
1008
1009 if (connection->is_blackhole()) {
1010 ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
1011 message->put();
1012 goto out;
1013 }
1014
1015 connection->logger->inc(l_msgr_recv_messages);
1016 connection->logger->inc(
1017 l_msgr_recv_bytes,
1018 cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
1019
1020 messenger->ms_fast_preprocess(message);
1021 fast_dispatch_time = ceph::mono_clock::now();
1022 connection->logger->tinc(l_msgr_running_recv_time,
1023 fast_dispatch_time - connection->recv_start_time);
1024 if (connection->delay_state) {
1025 double delay_period = 0;
1026 if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
1027 delay_period =
1028 cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1029 ldout(cct, 1) << "queue_received will delay after "
1030 << (ceph_clock_now() + delay_period) << " on " << message
1031 << " " << *message << dendl;
1032 }
1033 connection->delay_state->queue(delay_period, message);
1034 } else if (messenger->ms_can_fast_dispatch(message)) {
1035 connection->lock.unlock();
1036 connection->dispatch_queue->fast_dispatch(message);
1037 connection->recv_start_time = ceph::mono_clock::now();
1038 connection->logger->tinc(l_msgr_running_fast_dispatch_time,
1039 connection->recv_start_time - fast_dispatch_time);
1040 connection->lock.lock();
1041 } else {
1042 connection->dispatch_queue->enqueue(message, message->get_priority(),
1043 connection->conn_id);
1044 }
1045
1046 out:
1047 // clean up local buffer references
1048 data_buf.clear();
1049 front.clear();
1050 middle.clear();
1051 data.clear();
1052
1053 if (need_dispatch_writer && connection->is_connected()) {
1054 connection->center->dispatch_event_external(connection->write_handler);
1055 }
1056
1057 return CONTINUE(wait_message);
1058 }
1059
1060 void ProtocolV1::session_reset() {
1061 ldout(cct, 10) << __func__ << " started" << dendl;
1062
1063 std::lock_guard<std::mutex> l(connection->write_lock);
1064 if (connection->delay_state) {
1065 connection->delay_state->discard();
1066 }
1067
1068 connection->dispatch_queue->discard_queue(connection->conn_id);
1069 discard_out_queue();
1070 // note: we need to clear outgoing_bl here, but session_reset may be
1071 // called by other thread, so let caller clear this itself!
1072 // outgoing_bl.clear();
1073
1074 connection->dispatch_queue->queue_remote_reset(connection);
1075
1076 randomize_out_seq();
1077
1078 in_seq = 0;
1079 connect_seq = 0;
1080 // it's safe to directly set 0, double locked
1081 ack_left = 0;
1082 once_ready = false;
1083 can_write = WriteStatus::NOWRITE;
1084 }
1085
1086 void ProtocolV1::randomize_out_seq() {
1087 if (connection->get_features() & CEPH_FEATURE_MSG_AUTH) {
1088 // Set out_seq to a random value, so CRC won't be predictable.
1089 auto rand_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1090 ldout(cct, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
1091 out_seq = rand_seq;
1092 } else {
1093 // previously, seq #'s always started at 0.
1094 out_seq = 0;
1095 }
1096 }
1097
1098 ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
1099 FUNCTRACE(cct);
1100 ceph_assert(connection->center->in_thread());
1101 m->set_seq(++out_seq);
1102
1103 if (messenger->crcflags & MSG_CRC_HEADER) {
1104 m->calc_header_crc();
1105 }
1106
1107 ceph_msg_header &header = m->get_header();
1108 ceph_msg_footer &footer = m->get_footer();
1109
1110 // TODO: let sign_message could be reentry?
1111 // Now that we have all the crcs calculated, handle the
1112 // digital signature for the message, if the AsyncConnection has session
1113 // security set up. Some session security options do not
1114 // actually calculate and check the signature, but they should
1115 // handle the calls to sign_message and check_signature. PLR
1116 if (session_security.get() == NULL) {
1117 ldout(cct, 20) << __func__ << " no session security" << dendl;
1118 } else {
1119 if (session_security->sign_message(m)) {
1120 ldout(cct, 20) << __func__ << " failed to sign m=" << m
1121 << "): sig = " << footer.sig << dendl;
1122 } else {
1123 ldout(cct, 20) << __func__ << " signed m=" << m
1124 << "): sig = " << footer.sig << dendl;
1125 }
1126 }
1127
1128 connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG);
1129 connection->outgoing_bl.append((char *)&header, sizeof(header));
1130
1131 ldout(cct, 20) << __func__ << " sending message type=" << header.type
1132 << " src " << entity_name_t(header.src)
1133 << " front=" << header.front_len << " data=" << header.data_len
1134 << " off " << header.data_off << dendl;
1135
1136 if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
1137 for (const auto &pb : bl.buffers()) {
1138 connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
1139 }
1140 } else {
1141 connection->outgoing_bl.claim_append(bl);
1142 }
1143
1144 // send footer; if receiver doesn't support signatures, use the old footer
1145 // format
1146 ceph_msg_footer_old old_footer;
1147 if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
1148 connection->outgoing_bl.append((char *)&footer, sizeof(footer));
1149 } else {
1150 if (messenger->crcflags & MSG_CRC_HEADER) {
1151 old_footer.front_crc = footer.front_crc;
1152 old_footer.middle_crc = footer.middle_crc;
1153 } else {
1154 old_footer.front_crc = old_footer.middle_crc = 0;
1155 }
1156 old_footer.data_crc =
1157 messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
1158 old_footer.flags = footer.flags;
1159 connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer));
1160 }
1161
1162 m->trace.event("async writing message");
1163 ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
1164 << dendl;
1165 ssize_t total_send_size = connection->outgoing_bl.length();
1166 ssize_t rc = connection->_try_send(more);
1167 if (rc < 0) {
1168 ldout(cct, 1) << __func__ << " error sending " << m << ", "
1169 << cpp_strerror(rc) << dendl;
1170 } else {
1171 connection->logger->inc(
1172 l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
1173 ldout(cct, 10) << __func__ << " sending " << m
1174 << (rc ? " continuely." : " done.") << dendl;
1175 }
1176
1177 #if defined(WITH_EVENTTRACE)
1178 if (m->get_type() == CEPH_MSG_OSD_OP)
1179 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
1180 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1181 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
1182 #endif
1183 m->put();
1184
1185 return rc;
1186 }
1187
1188 void ProtocolV1::requeue_sent() {
1189 write_in_progress = false;
1190 if (sent.empty()) {
1191 return;
1192 }
1193
1194 list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1195 out_seq -= sent.size();
1196 while (!sent.empty()) {
1197 Message *m = sent.back();
1198 sent.pop_back();
1199 ldout(cct, 10) << __func__ << " " << *m << " for resend "
1200 << " (" << m->get_seq() << ")" << dendl;
1201 m->clear_payload();
1202 rq.push_front(make_pair(bufferlist(), m));
1203 }
1204 }
1205
1206 uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
1207 ldout(cct, 10) << __func__ << " " << seq << dendl;
1208 std::lock_guard<std::mutex> l(connection->write_lock);
1209 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1210 return seq;
1211 }
1212 list<pair<bufferlist, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1213 uint64_t count = out_seq;
1214 while (!rq.empty()) {
1215 pair<bufferlist, Message *> p = rq.front();
1216 if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1217 ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1218 << p.second->get_seq() << " <= " << seq << ", discarding"
1219 << dendl;
1220 p.second->put();
1221 rq.pop_front();
1222 count++;
1223 }
1224 if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1225 return count;
1226 }
1227
1228 /*
1229 * Tears down the message queues, and removes them from the
1230 * DispatchQueue Must hold write_lock prior to calling.
1231 */
1232 void ProtocolV1::discard_out_queue() {
1233 ldout(cct, 10) << __func__ << " started" << dendl;
1234
1235 for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
1236 ldout(cct, 20) << __func__ << " discard " << *p << dendl;
1237 (*p)->put();
1238 }
1239 sent.clear();
1240 for (map<int, list<pair<bufferlist, Message *> > >::iterator p =
1241 out_q.begin();
1242 p != out_q.end(); ++p) {
1243 for (list<pair<bufferlist, Message *> >::iterator r = p->second.begin();
1244 r != p->second.end(); ++r) {
1245 ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1246 r->second->put();
1247 }
1248 }
1249 out_q.clear();
1250 write_in_progress = false;
1251 }
1252
1253 void ProtocolV1::reset_recv_state()
1254 {
1255 // clean up state internal variables and states
1256 auth_meta.reset(new AuthConnectionMeta);
1257 authorizer_more.clear();
1258 session_security.reset();
1259
1260 // clean read and write callbacks
1261 connection->pendingReadLen.reset();
1262 connection->writeCallback.reset();
1263
1264 if (state > THROTTLE_MESSAGE && state <= READ_FOOTER_AND_DISPATCH &&
1265 connection->policy.throttler_messages) {
1266 ldout(cct, 10) << __func__ << " releasing " << 1
1267 << " message to policy throttler "
1268 << connection->policy.throttler_messages->get_current()
1269 << "/" << connection->policy.throttler_messages->get_max()
1270 << dendl;
1271 connection->policy.throttler_messages->put();
1272 }
1273 if (state > THROTTLE_BYTES && state <= READ_FOOTER_AND_DISPATCH) {
1274 if (connection->policy.throttler_bytes) {
1275 ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
1276 << " bytes to policy throttler "
1277 << connection->policy.throttler_bytes->get_current() << "/"
1278 << connection->policy.throttler_bytes->get_max() << dendl;
1279 connection->policy.throttler_bytes->put(cur_msg_size);
1280 }
1281 }
1282 if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_FOOTER_AND_DISPATCH) {
1283 ldout(cct, 10)
1284 << __func__ << " releasing " << cur_msg_size
1285 << " bytes to dispatch_queue throttler "
1286 << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
1287 << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
1288 connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
1289 }
1290 }
1291
1292 Message *ProtocolV1::_get_next_outgoing(bufferlist *bl) {
1293 Message *m = 0;
1294 if (!out_q.empty()) {
1295 map<int, list<pair<bufferlist, Message *> > >::reverse_iterator it =
1296 out_q.rbegin();
1297 ceph_assert(!it->second.empty());
1298 list<pair<bufferlist, Message *> >::iterator p = it->second.begin();
1299 m = p->second;
1300 if (p->first.length() && bl) {
1301 assert(bl->length() == 0);
1302 bl->swap(p->first);
1303 }
1304 it->second.erase(p);
1305 if (it->second.empty()) out_q.erase(it->first);
1306 }
1307 return m;
1308 }
1309
1310 /**
1311 * Client Protocol V1
1312 **/
1313
1314 CtPtr ProtocolV1::send_client_banner() {
1315 ldout(cct, 20) << __func__ << dendl;
1316 state = CONNECTING;
1317
1318 bufferlist bl;
1319 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1320 return WRITE(bl, handle_client_banner_write);
1321 }
1322
1323 CtPtr ProtocolV1::handle_client_banner_write(int r) {
1324 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1325
1326 if (r < 0) {
1327 ldout(cct, 1) << __func__ << " write client banner failed" << dendl;
1328 return _fault();
1329 }
1330 ldout(cct, 10) << __func__ << " connect write banner done: "
1331 << connection->get_peer_addr() << dendl;
1332
1333 return wait_server_banner();
1334 }
1335
1336 CtPtr ProtocolV1::wait_server_banner() {
1337 state = CONNECTING_WAIT_BANNER_AND_IDENTIFY;
1338
1339 ldout(cct, 20) << __func__ << dendl;
1340
1341 bufferlist myaddrbl;
1342 unsigned banner_len = strlen(CEPH_BANNER);
1343 unsigned need_len = banner_len + sizeof(ceph_entity_addr) * 2;
1344 return READ(need_len, handle_server_banner_and_identify);
1345 }
1346
1347 CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
1348 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1349
1350 if (r < 0) {
1351 ldout(cct, 1) << __func__ << " read banner and identify addresses failed"
1352 << dendl;
1353 return _fault();
1354 }
1355
1356 unsigned banner_len = strlen(CEPH_BANNER);
1357 if (memcmp(buffer, CEPH_BANNER, banner_len)) {
1358 ldout(cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
1359 << connection->get_peer_addr() << dendl;
1360 return _fault();
1361 }
1362
1363 bufferlist bl;
1364 entity_addr_t paddr, peer_addr_for_me;
1365
1366 bl.append(buffer + banner_len, sizeof(ceph_entity_addr) * 2);
1367 auto p = bl.cbegin();
1368 try {
1369 decode(paddr, p);
1370 decode(peer_addr_for_me, p);
1371 } catch (const buffer::error &e) {
1372 lderr(cct) << __func__ << " decode peer addr failed " << dendl;
1373 return _fault();
1374 }
1375 ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
1376 << " on socket " << connection->cs.fd() << dendl;
1377
1378 entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
1379 if (peer_addr != paddr) {
1380 if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
1381 peer_addr.get_nonce() == paddr.get_nonce()) {
1382 ldout(cct, 0) << __func__ << " connect claims to be " << paddr << " not "
1383 << peer_addr << " - presumably this is the same node!"
1384 << dendl;
1385 } else {
1386 ldout(cct, 10) << __func__ << " connect claims to be " << paddr << " not "
1387 << peer_addr << dendl;
1388 return _fault();
1389 }
1390 }
1391
1392 ldout(cct, 20) << __func__ << " connect peer addr for me is "
1393 << peer_addr_for_me << dendl;
1394 if (messenger->get_myaddrs().empty() ||
1395 messenger->get_myaddrs().front().is_blank_ip()) {
1396 sockaddr_storage ss;
1397 socklen_t len = sizeof(ss);
1398 getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
1399 entity_addr_t a;
1400 if (cct->_conf->ms_learn_addr_from_peer) {
1401 ldout(cct, 1) << __func__ << " peer " << connection->target_addr
1402 << " says I am " << peer_addr_for_me << " (socket says "
1403 << (sockaddr*)&ss << ")" << dendl;
1404 a = peer_addr_for_me;
1405 } else {
1406 ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
1407 << " says I am " << (sockaddr*)&ss
1408 << " (peer says " << peer_addr_for_me << ")" << dendl;
1409 a.set_sockaddr((sockaddr *)&ss);
1410 }
1411 a.set_type(entity_addr_t::TYPE_LEGACY); // anything but NONE; learned_addr ignores this
1412 a.set_port(0);
1413 connection->lock.unlock();
1414 messenger->learned_addr(a);
1415 if (cct->_conf->ms_inject_internal_delays &&
1416 cct->_conf->ms_inject_socket_failures) {
1417 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
1418 ldout(cct, 10) << __func__ << " sleep for "
1419 << cct->_conf->ms_inject_internal_delays << dendl;
1420 utime_t t;
1421 t.set_from_double(cct->_conf->ms_inject_internal_delays);
1422 t.sleep();
1423 }
1424 }
1425 connection->lock.lock();
1426 if (state != CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1427 ldout(cct, 1) << __func__
1428 << " state changed while learned_addr, mark_down or "
1429 << " replacing must be happened just now" << dendl;
1430 return nullptr;
1431 }
1432 }
1433
1434 bufferlist myaddrbl;
1435 encode(messenger->get_myaddr_legacy(), myaddrbl, 0); // legacy
1436 return WRITE(myaddrbl, handle_my_addr_write);
1437 }
1438
1439 CtPtr ProtocolV1::handle_my_addr_write(int r) {
1440 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1441
1442 if (r < 0) {
1443 ldout(cct, 2) << __func__ << " connect couldn't write my addr, "
1444 << cpp_strerror(r) << dendl;
1445 return _fault();
1446 }
1447 ldout(cct, 10) << __func__ << " connect sent my addr "
1448 << messenger->get_myaddr_legacy() << dendl;
1449
1450 return CONTINUE(send_connect_message);
1451 }
1452
1453 CtPtr ProtocolV1::send_connect_message()
1454 {
1455 state = CONNECTING_SEND_CONNECT_MSG;
1456
1457 ldout(cct, 20) << __func__ << dendl;
1458 ceph_assert(messenger->auth_client);
1459
1460 bufferlist auth_bl;
1461 vector<uint32_t> preferred_modes;
1462
1463 if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1464 messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1465 if (authorizer_more.length()) {
1466 ldout(cct,10) << __func__ << " using augmented (challenge) auth payload"
1467 << dendl;
1468 auth_bl = authorizer_more;
1469 } else {
1470 auto am = auth_meta;
1471 authorizer_more.clear();
1472 connection->lock.unlock();
1473 int r = messenger->auth_client->get_auth_request(
1474 connection, am.get(),
1475 &am->auth_method, &preferred_modes, &auth_bl);
1476 connection->lock.lock();
1477 if (r < 0) {
1478 return _fault();
1479 }
1480 if (state != CONNECTING_SEND_CONNECT_MSG) {
1481 ldout(cct, 1) << __func__ << " state changed!" << dendl;
1482 return _fault();
1483 }
1484 }
1485 }
1486
1487 ceph_msg_connect connect;
1488 connect.features = connection->policy.features_supported;
1489 connect.host_type = messenger->get_myname().type();
1490 connect.global_seq = global_seq;
1491 connect.connect_seq = connect_seq;
1492 connect.protocol_version =
1493 messenger->get_proto_version(connection->peer_type, true);
1494 if (auth_bl.length()) {
1495 ldout(cct, 10) << __func__
1496 << " connect_msg.authorizer_len=" << auth_bl.length()
1497 << " protocol=" << auth_meta->auth_method << dendl;
1498 connect.authorizer_protocol = auth_meta->auth_method;
1499 connect.authorizer_len = auth_bl.length();
1500 } else {
1501 connect.authorizer_protocol = 0;
1502 connect.authorizer_len = 0;
1503 }
1504
1505 connect.flags = 0;
1506 if (connection->policy.lossy) {
1507 connect.flags |=
1508 CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1509 }
1510
1511 bufferlist bl;
1512 bl.append((char *)&connect, sizeof(connect));
1513 if (auth_bl.length()) {
1514 bl.append(auth_bl.c_str(), auth_bl.length());
1515 }
1516
1517 ldout(cct, 10) << __func__ << " connect sending gseq=" << global_seq
1518 << " cseq=" << connect_seq
1519 << " proto=" << connect.protocol_version << dendl;
1520
1521 return WRITE(bl, handle_connect_message_write);
1522 }
1523
1524 CtPtr ProtocolV1::handle_connect_message_write(int r) {
1525 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1526
1527 if (r < 0) {
1528 ldout(cct, 2) << __func__ << " connect couldn't send reply "
1529 << cpp_strerror(r) << dendl;
1530 return _fault();
1531 }
1532
1533 ldout(cct, 20) << __func__
1534 << " connect wrote (self +) cseq, waiting for reply" << dendl;
1535
1536 return wait_connect_reply();
1537 }
1538
1539 CtPtr ProtocolV1::wait_connect_reply() {
1540 ldout(cct, 20) << __func__ << dendl;
1541
1542 memset(&connect_reply, 0, sizeof(connect_reply));
1543 return READ(sizeof(connect_reply), handle_connect_reply_1);
1544 }
1545
1546 CtPtr ProtocolV1::handle_connect_reply_1(char *buffer, int r) {
1547 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1548
1549 if (r < 0) {
1550 ldout(cct, 1) << __func__ << " read connect reply failed" << dendl;
1551 return _fault();
1552 }
1553
1554 connect_reply = *((ceph_msg_connect_reply *)buffer);
1555
1556 ldout(cct, 20) << __func__ << " connect got reply tag "
1557 << (int)connect_reply.tag << " connect_seq "
1558 << connect_reply.connect_seq << " global_seq "
1559 << connect_reply.global_seq << " proto "
1560 << connect_reply.protocol_version << " flags "
1561 << (int)connect_reply.flags << " features "
1562 << connect_reply.features << dendl;
1563
1564 if (connect_reply.authorizer_len) {
1565 return wait_connect_reply_auth();
1566 }
1567
1568 return handle_connect_reply_2();
1569 }
1570
1571 CtPtr ProtocolV1::wait_connect_reply_auth() {
1572 ldout(cct, 20) << __func__ << dendl;
1573
1574 ldout(cct, 10) << __func__
1575 << " reply.authorizer_len=" << connect_reply.authorizer_len
1576 << dendl;
1577
1578 ceph_assert(connect_reply.authorizer_len < 4096);
1579
1580 return READ(connect_reply.authorizer_len, handle_connect_reply_auth);
1581 }
1582
1583 CtPtr ProtocolV1::handle_connect_reply_auth(char *buffer, int r) {
1584 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1585
1586 if (r < 0) {
1587 ldout(cct, 1) << __func__ << " read connect reply authorizer failed"
1588 << dendl;
1589 return _fault();
1590 }
1591
1592 bufferlist authorizer_reply;
1593 authorizer_reply.append(buffer, connect_reply.authorizer_len);
1594
1595 if (connection->peer_type != CEPH_ENTITY_TYPE_MON ||
1596 messenger->get_myname().type() == CEPH_ENTITY_TYPE_MON) {
1597 auto am = auth_meta;
1598 bool more = (connect_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER);
1599 bufferlist auth_retry_bl;
1600 int r;
1601 connection->lock.unlock();
1602 if (more) {
1603 r = messenger->auth_client->handle_auth_reply_more(
1604 connection, am.get(), authorizer_reply, &auth_retry_bl);
1605 } else {
1606 // these aren't used for v1
1607 CryptoKey skey;
1608 string con_secret;
1609 r = messenger->auth_client->handle_auth_done(
1610 connection, am.get(),
1611 0 /* global id */, 0 /* con mode */,
1612 authorizer_reply,
1613 &skey, &con_secret);
1614 }
1615 connection->lock.lock();
1616 if (state != CONNECTING_SEND_CONNECT_MSG) {
1617 ldout(cct, 1) << __func__ << " state changed" << dendl;
1618 return _fault();
1619 }
1620 if (r < 0) {
1621 return _fault();
1622 }
1623 if (more && r == 0) {
1624 authorizer_more = auth_retry_bl;
1625 return CONTINUE(send_connect_message);
1626 }
1627 }
1628
1629 return handle_connect_reply_2();
1630 }
1631
1632 CtPtr ProtocolV1::handle_connect_reply_2() {
1633 ldout(cct, 20) << __func__ << dendl;
1634
1635 if (connect_reply.tag == CEPH_MSGR_TAG_FEATURES) {
1636 ldout(cct, 0) << __func__ << " connect protocol feature mismatch, my "
1637 << std::hex << connection->policy.features_supported
1638 << " < peer " << connect_reply.features << " missing "
1639 << (connect_reply.features &
1640 ~connection->policy.features_supported)
1641 << std::dec << dendl;
1642 return _fault();
1643 }
1644
1645 if (connect_reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1646 ldout(cct, 0) << __func__ << " connect protocol version mismatch, my "
1647 << messenger->get_proto_version(connection->peer_type, true)
1648 << " != " << connect_reply.protocol_version << dendl;
1649 return _fault();
1650 }
1651
1652 if (connect_reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1653 ldout(cct, 0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1654 authorizer_more.clear();
1655 return _fault();
1656 }
1657
1658 if (connect_reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1659 ldout(cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1660 session_reset();
1661 connect_seq = 0;
1662
1663 // see session_reset
1664 connection->outgoing_bl.clear();
1665
1666 return CONTINUE(send_connect_message);
1667 }
1668
1669 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1670 global_seq = messenger->get_global_seq(connect_reply.global_seq);
1671 ldout(cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1672 << connect_reply.global_seq << " chose new " << global_seq
1673 << dendl;
1674 return CONTINUE(send_connect_message);
1675 }
1676
1677 if (connect_reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1678 ceph_assert(connect_reply.connect_seq > connect_seq);
1679 ldout(cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq
1680 << " -> " << connect_reply.connect_seq << dendl;
1681 connect_seq = connect_reply.connect_seq;
1682 return CONTINUE(send_connect_message);
1683 }
1684
1685 if (connect_reply.tag == CEPH_MSGR_TAG_WAIT) {
1686 ldout(cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1687 state = WAIT;
1688 return _fault();
1689 }
1690
1691 uint64_t feat_missing;
1692 feat_missing =
1693 connection->policy.features_required & ~(uint64_t)connect_reply.features;
1694 if (feat_missing) {
1695 ldout(cct, 1) << __func__ << " missing required features " << std::hex
1696 << feat_missing << std::dec << dendl;
1697 return _fault();
1698 }
1699
1700 if (connect_reply.tag == CEPH_MSGR_TAG_SEQ) {
1701 ldout(cct, 10)
1702 << __func__
1703 << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq"
1704 << dendl;
1705
1706 return wait_ack_seq();
1707 }
1708
1709 if (connect_reply.tag == CEPH_MSGR_TAG_READY) {
1710 ldout(cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1711 }
1712
1713 return client_ready();
1714 }
1715
1716 CtPtr ProtocolV1::wait_ack_seq() {
1717 ldout(cct, 20) << __func__ << dendl;
1718
1719 return READ(sizeof(uint64_t), handle_ack_seq);
1720 }
1721
1722 CtPtr ProtocolV1::handle_ack_seq(char *buffer, int r) {
1723 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1724
1725 if (r < 0) {
1726 ldout(cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1727 return _fault();
1728 }
1729
1730 uint64_t newly_acked_seq = 0;
1731
1732 newly_acked_seq = *((uint64_t *)buffer);
1733 ldout(cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1734 << " vs out_seq " << out_seq << dendl;
1735 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
1736
1737 bufferlist bl;
1738 uint64_t s = in_seq;
1739 bl.append((char *)&s, sizeof(s));
1740
1741 return WRITE(bl, handle_in_seq_write);
1742 }
1743
1744 CtPtr ProtocolV1::handle_in_seq_write(int r) {
1745 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1746
1747 if (r < 0) {
1748 ldout(cct, 10) << __func__ << " failed to send in_seq " << dendl;
1749 return _fault();
1750 }
1751
1752 ldout(cct, 10) << __func__ << " send in_seq done " << dendl;
1753
1754 return client_ready();
1755 }
1756
1757 CtPtr ProtocolV1::client_ready() {
1758 ldout(cct, 20) << __func__ << dendl;
1759
1760 // hooray!
1761 peer_global_seq = connect_reply.global_seq;
1762 connection->policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1763
1764 once_ready = true;
1765 connect_seq += 1;
1766 ceph_assert(connect_seq == connect_reply.connect_seq);
1767 backoff = utime_t();
1768 connection->set_features((uint64_t)connect_reply.features &
1769 (uint64_t)connection->policy.features_supported);
1770 ldout(cct, 10) << __func__ << " connect success " << connect_seq
1771 << ", lossy = " << connection->policy.lossy << ", features "
1772 << connection->get_features() << dendl;
1773
1774 // If we have an authorizer, get a new AuthSessionHandler to deal with
1775 // ongoing security of the connection. PLR
1776 if (auth_meta->authorizer) {
1777 ldout(cct, 10) << __func__ << " setting up session_security with auth "
1778 << auth_meta->authorizer.get() << dendl;
1779 session_security.reset(get_auth_session_handler(
1780 cct, auth_meta->authorizer->protocol,
1781 auth_meta->session_key,
1782 connection->get_features()));
1783 } else {
1784 // We have no authorizer, so we shouldn't be applying security to messages
1785 // in this AsyncConnection. PLR
1786 ldout(cct, 10) << __func__ << " no authorizer, clearing session_security"
1787 << dendl;
1788 session_security.reset();
1789 }
1790
1791 if (connection->delay_state) {
1792 ceph_assert(connection->delay_state->ready());
1793 }
1794 connection->dispatch_queue->queue_connect(connection);
1795 messenger->ms_deliver_handle_fast_connect(connection);
1796
1797 return ready();
1798 }
1799
1800 /**
1801 * Server Protocol V1
1802 **/
1803
1804 CtPtr ProtocolV1::send_server_banner() {
1805 ldout(cct, 20) << __func__ << dendl;
1806 state = ACCEPTING;
1807
1808 bufferlist bl;
1809
1810 bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1811
1812 // as a server, we should have a legacy addr if we accepted this connection.
1813 auto legacy = messenger->get_myaddrs().legacy_addr();
1814 encode(legacy, bl, 0); // legacy
1815 connection->port = legacy.get_port();
1816 encode(connection->target_addr, bl, 0); // legacy
1817
1818 ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
1819 << " legacy " << legacy
1820 << " socket_addr " << connection->socket_addr
1821 << " target_addr " << connection->target_addr
1822 << dendl;
1823
1824 return WRITE(bl, handle_server_banner_write);
1825 }
1826
1827 CtPtr ProtocolV1::handle_server_banner_write(int r) {
1828 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1829
1830 if (r < 0) {
1831 ldout(cct, 1) << " write server banner failed" << dendl;
1832 return _fault();
1833 }
1834 ldout(cct, 10) << __func__ << " write banner and addr done: "
1835 << connection->get_peer_addr() << dendl;
1836
1837 return wait_client_banner();
1838 }
1839
1840 CtPtr ProtocolV1::wait_client_banner() {
1841 ldout(cct, 20) << __func__ << dendl;
1842
1843 return READ(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr),
1844 handle_client_banner);
1845 }
1846
1847 CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
1848 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1849
1850 if (r < 0) {
1851 ldout(cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1852 return _fault();
1853 }
1854
1855 if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1856 ldout(cct, 1) << __func__ << " accept peer sent bad banner '" << buffer
1857 << "' (should be '" << CEPH_BANNER << "')" << dendl;
1858 return _fault();
1859 }
1860
1861 bufferlist addr_bl;
1862 entity_addr_t peer_addr;
1863
1864 addr_bl.append(buffer + strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1865 try {
1866 auto ti = addr_bl.cbegin();
1867 decode(peer_addr, ti);
1868 } catch (const buffer::error &e) {
1869 lderr(cct) << __func__ << " decode peer_addr failed " << dendl;
1870 return _fault();
1871 }
1872
1873 ldout(cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1874 if (peer_addr.is_blank_ip()) {
1875 // peer apparently doesn't know what ip they have; figure it out for them.
1876 int port = peer_addr.get_port();
1877 peer_addr.set_sockaddr(connection->target_addr.get_sockaddr());
1878 peer_addr.set_port(port);
1879
1880 ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1881 << " (socket is " << connection->target_addr << ")" << dendl;
1882 }
1883 connection->set_peer_addr(peer_addr); // so that connection_state gets set up
1884 connection->target_addr = peer_addr;
1885
1886 return CONTINUE(wait_connect_message);
1887 }
1888
1889 CtPtr ProtocolV1::wait_connect_message() {
1890 ldout(cct, 20) << __func__ << dendl;
1891
1892 memset(&connect_msg, 0, sizeof(connect_msg));
1893 return READ(sizeof(connect_msg), handle_connect_message_1);
1894 }
1895
1896 CtPtr ProtocolV1::handle_connect_message_1(char *buffer, int r) {
1897 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1898
1899 if (r < 0) {
1900 ldout(cct, 1) << __func__ << " read connect msg failed" << dendl;
1901 return _fault();
1902 }
1903
1904 connect_msg = *((ceph_msg_connect *)buffer);
1905
1906 state = ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1907
1908 if (connect_msg.authorizer_len) {
1909 return wait_connect_message_auth();
1910 }
1911
1912 return handle_connect_message_2();
1913 }
1914
1915 CtPtr ProtocolV1::wait_connect_message_auth() {
1916 ldout(cct, 20) << __func__ << dendl;
1917 authorizer_buf.clear();
1918 authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1919 return READB(connect_msg.authorizer_len, authorizer_buf.c_str(),
1920 handle_connect_message_auth);
1921 }
1922
1923 CtPtr ProtocolV1::handle_connect_message_auth(char *buffer, int r) {
1924 ldout(cct, 20) << __func__ << " r=" << r << dendl;
1925
1926 if (r < 0) {
1927 ldout(cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1928 return _fault();
1929 }
1930
1931 return handle_connect_message_2();
1932 }
1933
1934 CtPtr ProtocolV1::handle_connect_message_2() {
(1) Event cond_true: |
Condition "should_gather", taking true branch. |
1935 ldout(cct, 20) << __func__ << dendl;
1936
(2) Event cond_true: |
Condition "should_gather", taking true branch. |
1937 ldout(cct, 20) << __func__ << " accept got peer connect_seq "
1938 << connect_msg.connect_seq << " global_seq "
1939 << connect_msg.global_seq << dendl;
1940
1941 connection->set_peer_type(connect_msg.host_type);
1942 connection->policy = messenger->get_policy(connect_msg.host_type);
1943
(3) Event cond_true: |
Condition "should_gather", taking true branch. |
1944 ldout(cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1945 << ", policy.lossy=" << connection->policy.lossy
1946 << " policy.server=" << connection->policy.server
1947 << " policy.standby=" << connection->policy.standby
1948 << " policy.resetcheck=" << connection->policy.resetcheck
1949 << " features 0x" << std::hex << (uint64_t)connect_msg.features
1950 << std::dec
1951 << dendl;
1952
1953 ceph_msg_connect_reply reply;
1954 bufferlist authorizer_reply;
1955
1956 memset(&reply, 0, sizeof(reply));
1957 reply.protocol_version =
1958 messenger->get_proto_version(connection->peer_type, false);
1959
1960 // mismatch?
(4) Event cond_true: |
Condition "should_gather", taking true branch. |
1961 ldout(cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1962 << ", their proto " << connect_msg.protocol_version << dendl;
1963
(5) Event cond_false: |
Condition "this->connect_msg.protocol_version.operator __u32() != reply.protocol_version.operator __u32()", taking false branch. |
1964 if (connect_msg.protocol_version != reply.protocol_version) {
1965 return send_connect_message_reply(CEPH_MSGR_TAG_BADPROTOVER, reply,
1966 authorizer_reply);
(6) Event if_end: |
End of if statement. |
1967 }
1968
1969 // require signatures for cephx?
(7) Event cond_true: |
Condition "this->connect_msg.authorizer_protocol.operator __u32() == 2", taking true branch. |
1970 if (connect_msg.authorizer_protocol == CEPH_AUTH_CEPHX) {
(8) Event cond_true: |
Condition "this->connection->peer_type == 4", taking true branch. |
1971 if (connection->peer_type == CEPH_ENTITY_TYPE_OSD ||
1972 connection->peer_type == CEPH_ENTITY_TYPE_MDS) {
(9) Event cond_true: |
Condition "this->cct->_conf->cephx_require_signatures", taking true branch. |
1973 if (cct->_conf->cephx_require_signatures ||
1974 cct->_conf->cephx_cluster_require_signatures) {
(10) Event cond_true: |
Condition "should_gather", taking true branch. |
1975 ldout(cct, 10)
1976 << __func__
1977 << " using cephx, requiring MSG_AUTH feature bit for cluster"
1978 << dendl;
1979 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1980 }
(11) Event if_fallthrough: |
Falling through to end of if statement. |
1981 } else {
1982 if (cct->_conf->cephx_require_signatures ||
1983 cct->_conf->cephx_service_require_signatures) {
1984 ldout(cct, 10)
1985 << __func__
1986 << " using cephx, requiring MSG_AUTH feature bit for service"
1987 << dendl;
1988 connection->policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1989 }
(12) Event if_end: |
End of if statement. |
1990 }
1991 }
1992
1993 uint64_t feat_missing =
1994 connection->policy.features_required & ~(uint64_t)connect_msg.features;
(13) Event cond_false: |
Condition "feat_missing", taking false branch. |
1995 if (feat_missing) {
1996 ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
1997 << feat_missing << std::dec << dendl;
1998 return send_connect_message_reply(CEPH_MSGR_TAG_FEATURES, reply,
1999 authorizer_reply);
(14) Event if_end: |
End of if statement. |
2000 }
2001
2002 bufferlist auth_bl_copy = authorizer_buf;
2003 auto am = auth_meta;
2004 am->auth_method = connect_msg.authorizer_protocol;
2005 connection->lock.unlock();
(15) Event cond_true: |
Condition "should_gather", taking true branch. |
2006 ldout(cct,10) << __func__ << " authorizor_protocol "
2007 << connect_msg.authorizer_protocol
2008 << " len " << auth_bl_copy.length()
2009 << dendl;
2010 bool more = (bool)auth_meta->authorizer_challenge;
2011 int r = messenger->auth_server->handle_auth_request(
2012 connection,
2013 am.get(),
2014 more,
2015 am->auth_method,
2016 auth_bl_copy,
2017 &authorizer_reply);
(16) Event cond_false: |
Condition "r < 0", taking false branch. |
2018 if (r < 0) {
2019 connection->lock.lock();
2020 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2021 ldout(cct, 1) << __func__ << " state changed" << dendl;
2022 return _fault();
2023 }
2024 ldout(cct, 0) << __func__ << ": got bad authorizer, auth_reply_len="
2025 << authorizer_reply.length() << dendl;
2026 session_security.reset();
2027 return send_connect_message_reply(CEPH_MSGR_TAG_BADAUTHORIZER, reply,
2028 authorizer_reply);
(17) Event if_end: |
End of if statement. |
2029 }
(18) Event cond_false: |
Condition "r == 0", taking false branch. |
2030 if (r == 0) {
2031 connection->lock.lock();
2032 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2033 ldout(cct, 1) << __func__ << " state changed" << dendl;
2034 return _fault();
2035 }
2036 ldout(cct, 10) << __func__ << ": challenging authorizer" << dendl;
2037 ceph_assert(authorizer_reply.length());
2038 return send_connect_message_reply(CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER,
2039 reply, authorizer_reply);
(19) Event if_end: |
End of if statement. |
2040 }
2041
2042 // We've verified the authorizer for this AsyncConnection, so set up the
2043 // session security structure. PLR
(20) Event cond_true: |
Condition "should_gather", taking true branch. |
2044 ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
2045
(21) Event cond_true: |
Condition "this->connection->policy.server", taking true branch. |
(22) Event cond_false: |
Condition "this->connection->policy.lossy", taking false branch. |
2046 if (connection->policy.server &&
2047 connection->policy.lossy) {
2048 // incoming lossy client, no need to register this connection
2049 // new session
2050 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2051 return open(reply, authorizer_reply);
(23) Event if_end: |
End of if statement. |
2052 }
2053
2054 AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
2055
2056 connection->inject_delay();
2057
(24) Event lock: |
"lock" locks "this->connection->lock". [Note: The source code implementation of the function has been overridden by a builtin model.] |
Also see events: |
[missing_unlock] |
2058 connection->lock.lock();
(25) Event cond_false: |
Condition "this->state != ProtocolV1::ACCEPTING_WAIT_CONNECT_MSG_AUTH", taking false branch. |
2059 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2060 ldout(cct, 1) << __func__ << " state changed" << dendl;
2061 return _fault();
(26) Event if_end: |
End of if statement. |
2062 }
2063
(27) Event cond_true: |
Condition "existing == this->connection", taking true branch. |
2064 if (existing == connection) {
2065 existing = nullptr;
2066 }
(28) Event cond_true: |
Condition "existing.operator bool()", taking true branch. |
(29) Event cond_true: |
Condition "existing->protocol->proto_type != 1", taking true branch. |
2067 if (existing && existing->protocol->proto_type != 1) {
(30) Event cond_true: |
Condition "should_gather", taking true branch. |
2068 ldout(cct,1) << __func__ << " existing " << existing << " proto "
2069 << existing->protocol.get() << " version is "
2070 << existing->protocol->proto_type << ", marking down" << dendl;
2071 existing->mark_down();
2072 existing = nullptr;
2073 }
2074
(31) Event cond_true: |
Condition "existing.operator bool()", taking true branch. |
2075 if (existing) {
2076 // There is no possible that existing connection will acquire this
2077 // connection's lock
2078 existing->lock.lock(); // skip lockdep check (we are locking a second
2079 // AsyncConnection here)
2080
(32) Event cond_true: |
Condition "should_gather", taking true branch. |
2081 ldout(cct,10) << __func__ << " existing=" << existing << " exproto="
2082 << existing->protocol.get() << dendl;
2083 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
(33) Event cond_true: |
Condition "exproto", taking true branch. |
2084 ceph_assert(exproto);
(34) Event cond_true: |
Condition "exproto->proto_type == 1", taking true branch. |
2085 ceph_assert(exproto->proto_type == 1);
2086
(35) Event cond_true: |
Condition "exproto->state == ProtocolV1::CLOSED", taking true branch. |
2087 if (exproto->state == CLOSED) {
(36) Event cond_true: |
Condition "should_gather", taking true branch. |
2088 ldout(cct, 1) << __func__ << " existing " << existing
2089 << " already closed." << dendl;
2090 existing->lock.unlock();
2091 existing = nullptr;
2092
(37) Event missing_unlock: |
Returning without unlocking "this->connection->lock". |
Also see events: |
[lock] |
2093 return open(reply, authorizer_reply);
2094 }
2095
2096 if (exproto->replacing) {
2097 ldout(cct, 1) << __func__
2098 << " existing racing replace happened while replacing."
2099 << " existing_state="
2100 << connection->get_state_name(existing->state) << dendl;
2101 reply.global_seq = exproto->peer_global_seq;
2102 existing->lock.unlock();
2103 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2104 authorizer_reply);
2105 }
2106
2107 if (connect_msg.global_seq < exproto->peer_global_seq) {
2108 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2109 << exproto->peer_global_seq << " > "
2110 << connect_msg.global_seq << ", RETRY_GLOBAL" << dendl;
2111 reply.global_seq = exproto->peer_global_seq; // so we can send it below..
2112 existing->lock.unlock();
2113 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_GLOBAL, reply,
2114 authorizer_reply);
2115 } else {
2116 ldout(cct, 10) << __func__ << " accept existing " << existing << ".gseq "
2117 << exproto->peer_global_seq
2118 << " <= " << connect_msg.global_seq << ", looks ok"
2119 << dendl;
2120 }
2121
2122 if (existing->policy.lossy) {
2123 ldout(cct, 0)
2124 << __func__
2125 << " accept replacing existing (lossy) channel (new one lossy="
2126 << connection->policy.lossy << ")" << dendl;
2127 exproto->session_reset();
2128 return replace(existing, reply, authorizer_reply);
2129 }
2130
2131 ldout(cct, 1) << __func__ << " accept connect_seq "
2132 << connect_msg.connect_seq
2133 << " vs existing csq=" << exproto->connect_seq
2134 << " existing_state="
2135 << connection->get_state_name(existing->state) << dendl;
2136
2137 if (connect_msg.connect_seq == 0 && exproto->connect_seq > 0) {
2138 ldout(cct, 0)
2139 << __func__
2140 << " accept peer reset, then tried to connect to us, replacing"
2141 << dendl;
2142 // this is a hard reset from peer
2143 is_reset_from_peer = true;
2144 if (connection->policy.resetcheck) {
2145 exproto->session_reset(); // this resets out_queue, msg_ and
2146 // connect_seq #'s
2147 }
2148 return replace(existing, reply, authorizer_reply);
2149 }
2150
2151 if (connect_msg.connect_seq < exproto->connect_seq) {
2152 // old attempt, or we sent READY but they didn't get it.
2153 ldout(cct, 10) << __func__ << " accept existing " << existing << ".cseq "
2154 << exproto->connect_seq << " > " << connect_msg.connect_seq
2155 << ", RETRY_SESSION" << dendl;
2156 reply.connect_seq = exproto->connect_seq + 1;
2157 existing->lock.unlock();
2158 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2159 authorizer_reply);
2160 }
2161
2162 if (connect_msg.connect_seq == exproto->connect_seq) {
2163 // if the existing connection successfully opened, and/or
2164 // subsequently went to standby, then the peer should bump
2165 // their connect_seq and retry: this is not a connection race
2166 // we need to resolve here.
2167 if (exproto->state == OPENED || exproto->state == STANDBY) {
2168 ldout(cct, 10) << __func__ << " accept connection race, existing "
2169 << existing << ".cseq " << exproto->connect_seq
2170 << " == " << connect_msg.connect_seq
2171 << ", OPEN|STANDBY, RETRY_SESSION " << dendl;
2172 // if connect_seq both zero, dont stuck into dead lock. it's ok to
2173 // replace
2174 if (connection->policy.resetcheck && exproto->connect_seq == 0) {
2175 return replace(existing, reply, authorizer_reply);
2176 }
2177
2178 reply.connect_seq = exproto->connect_seq + 1;
2179 existing->lock.unlock();
2180 return send_connect_message_reply(CEPH_MSGR_TAG_RETRY_SESSION, reply,
2181 authorizer_reply);
2182 }
2183
2184 // connection race?
2185 if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr_legacy() ||
2186 existing->policy.server) {
2187 // incoming wins
2188 ldout(cct, 10) << __func__ << " accept connection race, existing "
2189 << existing << ".cseq " << exproto->connect_seq
2190 << " == " << connect_msg.connect_seq
2191 << ", or we are server, replacing my attempt" << dendl;
2192 return replace(existing, reply, authorizer_reply);
2193 } else {
2194 // our existing outgoing wins
2195 ldout(messenger->cct, 10)
2196 << __func__ << " accept connection race, existing " << existing
2197 << ".cseq " << exproto->connect_seq
2198 << " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
2199 ceph_assert(connection->peer_addrs->legacy_addr() >
2200 messenger->get_myaddr_legacy());
2201 existing->lock.unlock();
2202 // make sure we follow through with opening the existing
2203 // connection (if it isn't yet open) since we know the peer
2204 // has something to send to us.
2205 existing->send_keepalive();
2206 return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
2207 authorizer_reply);
2208 }
2209 }
2210
2211 ceph_assert(connect_msg.connect_seq > exproto->connect_seq);
2212 ceph_assert(connect_msg.global_seq >= exproto->peer_global_seq);
2213 if (connection->policy.resetcheck && // RESETSESSION only used by servers;
2214 // peers do not reset each other
2215 exproto->connect_seq == 0) {
2216 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2217 << connect_msg.connect_seq << ", " << existing
2218 << ".cseq = " << exproto->connect_seq
2219 << "), sending RESETSESSION " << dendl;
2220 existing->lock.unlock();
2221 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2222 authorizer_reply);
2223 }
2224
2225 // reconnect
2226 ldout(cct, 10) << __func__ << " accept peer sent cseq "
2227 << connect_msg.connect_seq << " > " << exproto->connect_seq
2228 << dendl;
2229 return replace(existing, reply, authorizer_reply);
2230 } // existing
2231 else if (!replacing && connect_msg.connect_seq > 0) {
2232 // we reset, and they are opening a new session
2233 ldout(cct, 0) << __func__ << " accept we reset (peer sent cseq "
2234 << connect_msg.connect_seq << "), sending RESETSESSION"
2235 << dendl;
2236 return send_connect_message_reply(CEPH_MSGR_TAG_RESETSESSION, reply,
2237 authorizer_reply);
2238 } else {
2239 // new session
2240 ldout(cct, 10) << __func__ << " accept new session" << dendl;
2241 existing = nullptr;
2242 return open(reply, authorizer_reply);
2243 }
2244 }
2245
2246 CtPtr ProtocolV1::send_connect_message_reply(char tag,
2247 ceph_msg_connect_reply &reply,
2248 bufferlist &authorizer_reply) {
2249 ldout(cct, 20) << __func__ << dendl;
2250 bufferlist reply_bl;
2251 reply.tag = tag;
2252 reply.features =
2253 ((uint64_t)connect_msg.features & connection->policy.features_supported) |
2254 connection->policy.features_required;
2255 reply.authorizer_len = authorizer_reply.length();
2256 reply_bl.append((char *)&reply, sizeof(reply));
2257
2258 ldout(cct, 10) << __func__ << " reply features 0x" << std::hex
2259 << reply.features << " = (policy sup 0x"
2260 << connection->policy.features_supported
2261 << " & connect 0x" << (uint64_t)connect_msg.features
2262 << ") | policy req 0x"
2263 << connection->policy.features_required
2264 << dendl;
2265
2266 if (reply.authorizer_len) {
2267 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2268 authorizer_reply.clear();
2269 }
2270
2271 return WRITE(reply_bl, handle_connect_message_reply_write);
2272 }
2273
2274 CtPtr ProtocolV1::handle_connect_message_reply_write(int r) {
2275 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2276
2277 if (r < 0) {
2278 ldout(cct, 1) << " write connect message reply failed" << dendl;
2279 connection->inject_delay();
2280 return _fault();
2281 }
2282
2283 return CONTINUE(wait_connect_message);
2284 }
2285
2286 CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
2287 ceph_msg_connect_reply &reply,
2288 bufferlist &authorizer_reply) {
2289 ldout(cct, 10) << __func__ << " accept replacing " << existing << dendl;
2290
2291 connection->inject_delay();
2292 if (existing->policy.lossy) {
2293 // disconnect from the Connection
2294 ldout(cct, 1) << __func__ << " replacing on lossy channel, failing existing"
2295 << dendl;
2296 existing->protocol->stop();
2297 existing->dispatch_queue->queue_reset(existing.get());
2298 } else {
2299 ceph_assert(can_write == WriteStatus::NOWRITE);
2300 existing->write_lock.lock();
2301
2302 ProtocolV1 *exproto = dynamic_cast<ProtocolV1 *>(existing->protocol.get());
2303
2304 // reset the in_seq if this is a hard reset from peer,
2305 // otherwise we respect our original connection's value
2306 if (is_reset_from_peer) {
2307 exproto->is_reset_from_peer = true;
2308 }
2309
2310 connection->center->delete_file_event(connection->cs.fd(),
2311 EVENT_READABLE | EVENT_WRITABLE);
2312
2313 if (existing->delay_state) {
2314 existing->delay_state->flush();
2315 ceph_assert(!connection->delay_state);
2316 }
2317 exproto->reset_recv_state();
2318
2319 exproto->connect_msg.features = connect_msg.features;
2320
2321 auto temp_cs = std::move(connection->cs);
2322 EventCenter *new_center = connection->center;
2323 Worker *new_worker = connection->worker;
2324 // avoid _stop shutdown replacing socket
2325 // queue a reset on the new connection, which we're dumping for the old
2326 stop();
2327
2328 connection->dispatch_queue->queue_reset(connection);
2329 ldout(messenger->cct, 1)
2330 << __func__ << " stop myself to swap existing" << dendl;
2331 exproto->can_write = WriteStatus::REPLACING;
2332 exproto->replacing = true;
2333 exproto->write_in_progress = false;
2334 existing->state_offset = 0;
2335 // avoid previous thread modify event
2336 exproto->state = NONE;
2337 existing->state = AsyncConnection::STATE_NONE;
2338 // Discard existing prefetch buffer in `recv_buf`
2339 existing->recv_start = existing->recv_end = 0;
2340 // there shouldn't exist any buffer
2341 ceph_assert(connection->recv_start == connection->recv_end);
2342
2343 auto deactivate_existing = std::bind(
2344 [existing, new_worker, new_center, exproto, reply,
2345 authorizer_reply](ConnectedSocket &cs) mutable {
2346 // we need to delete time event in original thread
2347 {
2348 std::lock_guard<std::mutex> l(existing->lock);
2349 existing->write_lock.lock();
2350 exproto->requeue_sent();
2351 existing->outgoing_bl.clear();
2352 existing->open_write = false;
2353 existing->write_lock.unlock();
2354 if (exproto->state == NONE) {
2355 existing->shutdown_socket();
2356 existing->cs = std::move(cs);
2357 existing->worker->references--;
2358 new_worker->references++;
2359 existing->logger = new_worker->get_perf_counter();
2360 existing->worker = new_worker;
2361 existing->center = new_center;
2362 if (existing->delay_state)
2363 existing->delay_state->set_center(new_center);
2364 } else if (exproto->state == CLOSED) {
2365 auto back_to_close =
2366 std::bind([](ConnectedSocket &cs) mutable { cs.close(); },
2367 std::move(cs));
2368 new_center->submit_to(new_center->get_id(),
2369 std::move(back_to_close), true);
2370 return;
2371 } else {
2372 ceph_abort();
2373 }
2374 }
2375
2376 // Before changing existing->center, it may already exists some
2377 // events in existing->center's queue. Then if we mark down
2378 // `existing`, it will execute in another thread and clean up
2379 // connection. Previous event will result in segment fault
2380 auto transfer_existing = [existing, exproto, reply,
2381 authorizer_reply]() mutable {
2382 std::lock_guard<std::mutex> l(existing->lock);
2383 if (exproto->state == CLOSED) return;
2384 ceph_assert(exproto->state == NONE);
2385
2386 // we have called shutdown_socket above
2387 ceph_assert(existing->last_tick_id == 0);
2388 // restart timer since we are going to re-build connection
2389 existing->last_connect_started = ceph::coarse_mono_clock::now();
2390 existing->last_tick_id = existing->center->create_time_event(
2391 existing->connect_timeout_us, existing->tick_handler);
2392 existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
2393 exproto->state = ACCEPTING;
2394
2395 existing->center->create_file_event(
2396 existing->cs.fd(), EVENT_READABLE, existing->read_handler);
2397 reply.global_seq = exproto->peer_global_seq;
2398 exproto->run_continuation(exproto->send_connect_message_reply(
2399 CEPH_MSGR_TAG_RETRY_GLOBAL, reply, authorizer_reply));
2400 };
2401 if (existing->center->in_thread())
2402 transfer_existing();
2403 else
2404 existing->center->submit_to(existing->center->get_id(),
2405 std::move(transfer_existing), true);
2406 },
2407 std::move(temp_cs));
2408
2409 existing->center->submit_to(existing->center->get_id(),
2410 std::move(deactivate_existing), true);
2411 existing->write_lock.unlock();
2412 existing->lock.unlock();
2413 return nullptr;
2414 }
2415 existing->lock.unlock();
2416
2417 return open(reply, authorizer_reply);
2418 }
2419
2420 CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
2421 bufferlist &authorizer_reply) {
2422 ldout(cct, 20) << __func__ << dendl;
2423
2424 connect_seq = connect_msg.connect_seq + 1;
2425 peer_global_seq = connect_msg.global_seq;
2426 ldout(cct, 10) << __func__ << " accept success, connect_seq = " << connect_seq
2427 << " in_seq=" << in_seq << ", sending READY" << dendl;
2428
2429 // if it is a hard reset from peer, we don't need a round-trip to negotiate
2430 // in/out sequence
2431 if ((connect_msg.features & CEPH_FEATURE_RECONNECT_SEQ) &&
2432 !is_reset_from_peer) {
2433 reply.tag = CEPH_MSGR_TAG_SEQ;
2434 wait_for_seq = true;
2435 } else {
2436 reply.tag = CEPH_MSGR_TAG_READY;
2437 wait_for_seq = false;
2438 out_seq = discard_requeued_up_to(out_seq, 0);
2439 is_reset_from_peer = false;
2440 in_seq = 0;
2441 }
2442
2443 // send READY reply
2444 reply.features = connection->policy.features_supported;
2445 reply.global_seq = messenger->get_global_seq();
2446 reply.connect_seq = connect_seq;
2447 reply.flags = 0;
2448 reply.authorizer_len = authorizer_reply.length();
2449 if (connection->policy.lossy) {
2450 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
2451 }
2452
2453 connection->set_features((uint64_t)reply.features &
2454 (uint64_t)connect_msg.features);
2455 ldout(cct, 10) << __func__ << " accept features "
2456 << connection->get_features()
2457 << " authorizer_protocol "
2458 << connect_msg.authorizer_protocol << dendl;
2459
2460 session_security.reset(
2461 get_auth_session_handler(cct, auth_meta->auth_method,
2462 auth_meta->session_key,
2463 connection->get_features()));
2464
2465 bufferlist reply_bl;
2466 reply_bl.append((char *)&reply, sizeof(reply));
2467
2468 if (reply.authorizer_len) {
2469 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
2470 }
2471
2472 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
2473 uint64_t s = in_seq;
2474 reply_bl.append((char *)&s, sizeof(s));
2475 }
2476
2477 connection->lock.unlock();
2478 // Because "replacing" will prevent other connections preempt this addr,
2479 // it's safe that here we don't acquire Connection's lock
2480 ssize_t r = messenger->accept_conn(connection);
2481
2482 connection->inject_delay();
2483
2484 connection->lock.lock();
2485 replacing = false;
2486 if (r < 0) {
2487 ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
2488 << connection->peer_addrs->legacy_addr()
2489 << " just fail later one(this)" << dendl;
2490 ldout(cct, 10) << "accept fault after register" << dendl;
2491 connection->inject_delay();
2492 return _fault();
2493 }
2494 if (state != ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2495 ldout(cct, 1) << __func__
2496 << " state changed while accept_conn, it must be mark_down"
2497 << dendl;
2498 ceph_assert(state == CLOSED || state == NONE);
2499 ldout(cct, 10) << "accept fault after register" << dendl;
2500 messenger->unregister_conn(connection);
2501 connection->inject_delay();
2502 return _fault();
2503 }
2504
2505 return WRITE(reply_bl, handle_ready_connect_message_reply_write);
2506 }
2507
2508 CtPtr ProtocolV1::handle_ready_connect_message_reply_write(int r) {
2509 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2510
2511 if (r < 0) {
2512 ldout(cct, 1) << __func__ << " write ready connect message reply failed"
2513 << dendl;
2514 return _fault();
2515 }
2516
2517 // notify
2518 connection->dispatch_queue->queue_accept(connection);
2519 messenger->ms_deliver_handle_fast_accept(connection);
2520 once_ready = true;
2521
2522 state = ACCEPTING_HANDLED_CONNECT_MSG;
2523
2524 if (wait_for_seq) {
2525 return wait_seq();
2526 }
2527
2528 return server_ready();
2529 }
2530
2531 CtPtr ProtocolV1::wait_seq() {
2532 ldout(cct, 20) << __func__ << dendl;
2533
2534 return READ(sizeof(uint64_t), handle_seq);
2535 }
2536
2537 CtPtr ProtocolV1::handle_seq(char *buffer, int r) {
2538 ldout(cct, 20) << __func__ << " r=" << r << dendl;
2539
2540 if (r < 0) {
2541 ldout(cct, 1) << __func__ << " read ack seq failed" << dendl;
2542 return _fault();
2543 }
2544
2545 uint64_t newly_acked_seq = *(uint64_t *)buffer;
2546 ldout(cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq
2547 << dendl;
2548 out_seq = discard_requeued_up_to(out_seq, newly_acked_seq);
2549
2550 return server_ready();
2551 }
2552
2553 CtPtr ProtocolV1::server_ready() {
2554 ldout(cct, 20) << __func__ << " session_security is "
2555 << session_security
2556 << dendl;
2557
2558 ldout(cct, 20) << __func__ << " accept done" << dendl;
2559 memset(&connect_msg, 0, sizeof(connect_msg));
2560
2561 if (connection->delay_state) {
2562 ceph_assert(connection->delay_state->ready());
2563 }
2564
2565 return ready();
2566 }
2567