1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <unistd.h>
18
19 #include "include/Context.h"
20 #include "include/random.h"
21 #include "common/errno.h"
22 #include "AsyncMessenger.h"
23 #include "AsyncConnection.h"
24
25 #include "ProtocolV1.h"
26 #include "ProtocolV2.h"
27
28 #include "messages/MOSDOp.h"
29 #include "messages/MOSDOpReply.h"
30 #include "common/EventTrace.h"
31
32 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
33 #define SEQ_MASK 0x7fffffff
34
35 #define dout_subsys ceph_subsys_ms
36 #undef dout_prefix
37 #define dout_prefix _conn_prefix(_dout)
38 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
39 return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
40 << *peer_addrs << " conn(" << this
41 << (msgr2 ? " msgr2=" : " legacy=")
42 << protocol.get()
43 << " " << ceph_con_mode_name(protocol->auth_meta->con_mode)
44 << " :" << port
45 << " s=" << get_state_name(state)
46 << " l=" << policy.lossy
47 << ").";
48 }
49
50 // Notes:
51 // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
52
53 const uint32_t AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
54
55 class C_time_wakeup : public EventCallback {
56 AsyncConnectionRef conn;
57
58 public:
59 explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
60 void do_request(uint64_t fd_or_id) override {
61 conn->wakeup_from(fd_or_id);
62 }
63 };
64
65 class C_handle_read : public EventCallback {
66 AsyncConnectionRef conn;
67
68 public:
69 explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
70 void do_request(uint64_t fd_or_id) override {
71 conn->process();
72 }
73 };
74
75 class C_handle_write : public EventCallback {
76 AsyncConnectionRef conn;
77
78 public:
79 explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
80 void do_request(uint64_t fd) override {
81 conn->handle_write();
82 }
83 };
84
85 class C_handle_write_callback : public EventCallback {
86 AsyncConnectionRef conn;
87
88 public:
89 explicit C_handle_write_callback(AsyncConnectionRef c) : conn(c) {}
90 void do_request(uint64_t fd) override { conn->handle_write_callback(); }
91 };
92
93 class C_clean_handler : public EventCallback {
94 AsyncConnectionRef conn;
95 public:
96 explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
97 void do_request(uint64_t id) override {
98 conn->cleanup();
99 delete this;
100 }
101 };
102
103 class C_tick_wakeup : public EventCallback {
104 AsyncConnectionRef conn;
105
106 public:
107 explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
108 void do_request(uint64_t fd_or_id) override {
109 conn->tick(fd_or_id);
110 }
111 };
112
113
114 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
115 Worker *w, bool m2, bool local)
116 : Connection(cct, m),
117 delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
118 logger(w->get_perf_counter()),
119 state(STATE_NONE), port(-1),
120 dispatch_queue(q), recv_buf(NULL),
121 recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
122 recv_start(0), recv_end(0),
123 last_active(ceph::coarse_mono_clock::now()),
124 connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
125 inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
126 msgr2(m2), state_offset(0),
127 worker(w), center(&w->center),read_buffer(nullptr)
128 {
129 #ifdef UNIT_TESTS_BUILT
130 this->interceptor = m->interceptor;
131 #endif
132 read_handler = new C_handle_read(this);
133 write_handler = new C_handle_write(this);
134 write_callback_handler = new C_handle_write_callback(this);
135 wakeup_handler = new C_time_wakeup(this);
136 tick_handler = new C_tick_wakeup(this);
137 // double recv_max_prefetch see "read_until"
138 recv_buf = new char[2*recv_max_prefetch];
139 if (local) {
140 protocol = std::unique_ptr<Protocol>(new LoopbackProtocolV1(this));
141 } else if (m2) {
142 protocol = std::unique_ptr<Protocol>(new ProtocolV2(this));
143 } else {
144 protocol = std::unique_ptr<Protocol>(new ProtocolV1(this));
145 }
146 logger->inc(l_msgr_created_connections);
147 }
148
149 AsyncConnection::~AsyncConnection()
150 {
151 if (recv_buf)
152 delete[] recv_buf;
153 ceph_assert(!delay_state);
154 }
155
156 int AsyncConnection::get_con_mode() const
157 {
158 return protocol->get_con_mode();
159 }
160
161 bool AsyncConnection::is_msgr2() const
162 {
163 return protocol->proto_type == 2;
164 }
165
166 void AsyncConnection::maybe_start_delay_thread()
167 {
168 if (!delay_state) {
169 async_msgr->cct->_conf.with_val<std::string>(
170 "ms_inject_delay_type",
171 [this](const string& s) {
172 if (s.find(ceph_entity_type_name(peer_type)) != string::npos) {
173 ldout(msgr->cct, 1) << __func__ << " setting up a delay queue"
174 << dendl;
175 delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue,
176 conn_id);
177 }
178 });
179 }
180 }
181
182
183 ssize_t AsyncConnection::read(unsigned len, char *buffer,
184 std::function<void(char *, ssize_t)> callback) {
185 ldout(async_msgr->cct, 20) << __func__
186 << (pendingReadLen ? " continue" : " start")
187 << " len=" << len << dendl;
188 ssize_t r = read_until(len, buffer);
189 if (r > 0) {
190 readCallback = callback;
191 pendingReadLen = len;
192 read_buffer = buffer;
193 }
194 return r;
195 }
196
197 // Because this func will be called multi times to populate
198 // the needed buffer, so the passed in bufferptr must be the same.
199 // Normally, only "read_message" will pass existing bufferptr in
200 //
201 // And it will uses readahead method to reduce small read overhead,
202 // "recv_buf" is used to store read buffer
203 //
204 // return the remaining bytes, 0 means this buffer is finished
205 // else return < 0 means error
206 ssize_t AsyncConnection::read_until(unsigned len, char *p)
207 {
208 ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
209 << state_offset << dendl;
210
211 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
212 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
213 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
214 cs.shutdown();
215 }
216 }
217
218 ssize_t r = 0;
219 uint64_t left = len - state_offset;
220 if (recv_end > recv_start) {
221 uint64_t to_read = std::min<uint64_t>(recv_end - recv_start, left);
222 memcpy(p, recv_buf+recv_start, to_read);
223 recv_start += to_read;
224 left -= to_read;
225 ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
226 << " left is " << left << " buffer still has "
227 << recv_end - recv_start << dendl;
228 if (left == 0) {
229 return 0;
230 }
231 state_offset += to_read;
232 }
233
234 recv_end = recv_start = 0;
235 /* nothing left in the prefetch buffer */
236 if (left > (uint64_t)recv_max_prefetch) {
237 /* this was a large read, we don't prefetch for these */
238 do {
239 r = read_bulk(p+state_offset, left);
240 ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
241 if (r < 0) {
242 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
243 return -1;
244 } else if (r == static_cast<int>(left)) {
245 state_offset = 0;
246 return 0;
247 }
248 state_offset += r;
249 left -= r;
250 } while (r > 0);
251 } else {
252 do {
253 r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
254 ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
255 << " left is " << left << " got " << r << dendl;
256 if (r < 0) {
257 ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
258 return -1;
259 }
260 recv_end += r;
261 if (r >= static_cast<int>(left)) {
262 recv_start = len - state_offset;
263 memcpy(p+state_offset, recv_buf, recv_start);
264 state_offset = 0;
265 return 0;
266 }
267 left -= r;
268 } while (r > 0);
269 memcpy(p+state_offset, recv_buf, recv_end-recv_start);
270 state_offset += (recv_end - recv_start);
271 recv_end = recv_start = 0;
272 }
273 ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
274 << len - state_offset << " bytes" << dendl;
275 return len - state_offset;
276 }
277
278 /* return -1 means `fd` occurs error or closed, it should be closed
279 * return 0 means EAGAIN or EINTR */
280 ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
281 {
282 ssize_t nread;
283 again:
284 nread = cs.read(buf, len);
285 if (nread < 0) {
286 if (nread == -EAGAIN) {
287 nread = 0;
288 } else if (nread == -EINTR) {
289 goto again;
290 } else {
291 ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
292 << " : "<< strerror(nread) << dendl;
293 return -1;
294 }
295 } else if (nread == 0) {
296 ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
297 << cs.fd() << dendl;
298 return -1;
299 }
300 return nread;
301 }
302
303 ssize_t AsyncConnection::write(bufferlist &bl,
304 std::function<void(ssize_t)> callback,
305 bool more) {
306
307 std::unique_lock<std::mutex> l(write_lock);
308 outgoing_bl.claim_append(bl);
309 ssize_t r = _try_send(more);
310 if (r > 0) {
311 writeCallback = callback;
312 }
313 return r;
314 }
315
316 // return the remaining bytes, it may larger than the length of ptr
317 // else return < 0 means error
318 ssize_t AsyncConnection::_try_send(bool more)
319 {
320 if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
(1) Event dont_call: |
"rand" should not be used for security-related applications, because linear congruential algorithms are too easy to break. |
(2) Event remediation: |
Use a compliant random number generator, such as "/dev/random" or "/dev/urandom" on Unix-like systems, and CNG (Cryptography API: Next Generation) on Windows. |
321 if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
322 ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
323 cs.shutdown();
324 }
325 }
326
327 ceph_assert(center->in_thread());
328 ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
329 << " bytes" << dendl;
330 ssize_t r = cs.send(outgoing_bl, more);
331 if (r < 0) {
332 ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
333 return r;
334 }
335
336 ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
337 << " remaining bytes " << outgoing_bl.length() << dendl;
338
339 if (!open_write && is_queued()) {
340 center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
341 open_write = true;
342 }
343
344 if (open_write && !is_queued()) {
345 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
346 open_write = false;
347 if (writeCallback) {
348 center->dispatch_event_external(write_callback_handler);
349 }
350 }
351
352 return outgoing_bl.length();
353 }
354
355 void AsyncConnection::inject_delay() {
356 if (async_msgr->cct->_conf->ms_inject_internal_delays) {
357 ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
358 async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
359 utime_t t;
360 t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
361 t.sleep();
362 }
363 }
364
365 void AsyncConnection::process() {
366 std::lock_guard<std::mutex> l(lock);
367 last_active = ceph::coarse_mono_clock::now();
368 recv_start_time = ceph::mono_clock::now();
369
370 ldout(async_msgr->cct, 20) << __func__ << dendl;
371
372 switch (state) {
373 case STATE_NONE: {
374 ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
375 return;
376 }
377 case STATE_CLOSED: {
378 ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
379 return;
380 }
381 case STATE_CONNECTING: {
382 ceph_assert(!policy.server);
383
384 // clear timer (if any) since we are connecting/re-connecting
385 if (last_tick_id) {
386 center->delete_time_event(last_tick_id);
387 last_tick_id = 0;
388 }
389
390 if (cs) {
391 center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
392 cs.close();
393 }
394
395 SocketOptions opts;
396 opts.priority = async_msgr->get_socket_priority();
397 opts.connect_bind_addr = msgr->get_myaddrs().front();
398 ssize_t r = worker->connect(target_addr, opts, &cs);
399 if (r < 0) {
400 protocol->fault();
401 return;
402 }
403
404 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
405 state = STATE_CONNECTING_RE;
406 }
407 case STATE_CONNECTING_RE: {
408 ssize_t r = cs.is_connected();
409 if (r < 0) {
410 ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to "
411 << target_addr << dendl;
412 if (r == -ECONNREFUSED) {
413 ldout(async_msgr->cct, 2)
414 << __func__ << " connection refused!" << dendl;
415 dispatch_queue->queue_refused(this);
416 }
417 protocol->fault();
418 return;
419 } else if (r == 0) {
420 ldout(async_msgr->cct, 10)
421 << __func__ << " nonblock connect inprogress" << dendl;
422 if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) {
423 center->create_file_event(cs.fd(), EVENT_WRITABLE,
424 read_handler);
425 }
426 logger->tinc(l_msgr_running_recv_time,
427 ceph::mono_clock::now() - recv_start_time);
428 return;
429 }
430
431 center->delete_file_event(cs.fd(), EVENT_WRITABLE);
432 ldout(async_msgr->cct, 10)
433 << __func__ << " connect successfully, ready to send banner" << dendl;
434 state = STATE_CONNECTION_ESTABLISHED;
435 ceph_assert(last_tick_id == 0);
436 // exclude TCP nonblock connect time
437 last_connect_started = ceph::coarse_mono_clock::now();
438 last_tick_id = center->create_time_event(
439 connect_timeout_us, tick_handler);
440 break;
441 }
442
443 case STATE_ACCEPTING: {
444 center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
445 state = STATE_CONNECTION_ESTABLISHED;
446
447 break;
448 }
449
450 case STATE_CONNECTION_ESTABLISHED: {
451 if (pendingReadLen) {
452 ssize_t r = read(*pendingReadLen, read_buffer, readCallback);
453 if (r <= 0) { // read all bytes, or an error occured
454 pendingReadLen.reset();
455 char *buf_tmp = read_buffer;
456 read_buffer = nullptr;
457 readCallback(buf_tmp, r);
458 }
459 logger->tinc(l_msgr_running_recv_time,
460 ceph::mono_clock::now() - recv_start_time);
461 return;
462 }
463 break;
464 }
465 }
466
467 protocol->read_event();
468
469 logger->tinc(l_msgr_running_recv_time,
470 ceph::mono_clock::now() - recv_start_time);
471 }
472
473 bool AsyncConnection::is_connected() {
474 return protocol->is_connected();
475 }
476
477 void AsyncConnection::connect(const entity_addrvec_t &addrs, int type,
478 entity_addr_t &target) {
479
480 std::lock_guard<std::mutex> l(lock);
481 set_peer_type(type);
482 set_peer_addrs(addrs);
483 policy = msgr->get_policy(type);
484 target_addr = target;
485 _connect();
486 }
487
488 void AsyncConnection::_connect()
489 {
490 ldout(async_msgr->cct, 10) << __func__ << dendl;
491
492 state = STATE_CONNECTING;
493 protocol->connect();
494 // rescheduler connection in order to avoid lock dep
495 // may called by external thread(send_message)
496 center->dispatch_event_external(read_handler);
497 }
498
499 void AsyncConnection::accept(ConnectedSocket socket,
500 const entity_addr_t &listen_addr,
501 const entity_addr_t &peer_addr)
502 {
503 ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
504 << " listen_addr " << listen_addr
505 << " peer_addr " << peer_addr << dendl;
506 ceph_assert(socket.fd() >= 0);
507
508 std::lock_guard<std::mutex> l(lock);
509 cs = std::move(socket);
510 socket_addr = listen_addr;
511 target_addr = peer_addr; // until we know better
512 state = STATE_ACCEPTING;
513 protocol->accept();
514 // rescheduler connection in order to avoid lock dep
515 center->dispatch_event_external(read_handler);
516 }
517
518 int AsyncConnection::send_message(Message *m)
519 {
520 FUNCTRACE(async_msgr->cct);
521 lgeneric_subdout(async_msgr->cct, ms,
522 1) << "-- " << async_msgr->get_myaddrs() << " --> "
523 << get_peer_addrs() << " -- "
524 << *m << " -- " << m << " con "
525 << this
526 << dendl;
527
528 if (is_blackhole()) {
529 lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
530 << " blackhole " << *m << dendl;
531 m->put();
532 return 0;
533 }
534
535 // optimistic think it's ok to encode(actually may broken now)
536 if (!m->get_priority())
537 m->set_priority(async_msgr->get_default_send_priority());
538
539 m->get_header().src = async_msgr->get_myname();
540 m->set_connection(this);
541
542 #if defined(WITH_EVENTTRACE)
543 if (m->get_type() == CEPH_MSG_OSD_OP)
544 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
545 else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
546 OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
547 #endif
548
549 if (is_loopback) { //loopback connection
550 ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
551 std::lock_guard<std::mutex> l(write_lock);
552 if (protocol->is_connected()) {
553 dispatch_queue->local_delivery(m, m->get_priority());
554 } else {
555 ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
556 << " Drop message " << m << dendl;
557 m->put();
558 }
559 return 0;
560 }
561
562 // we don't want to consider local message here, it's too lightweight which
563 // may disturb users
564 logger->inc(l_msgr_send_messages);
565
566 protocol->send_message(m);
567 return 0;
568 }
569
570 entity_addr_t AsyncConnection::_infer_target_addr(const entity_addrvec_t& av)
571 {
572 // pick the first addr of the same address family as socket_addr. it could be
573 // an any: or v2: addr, we don't care. it should not be a v1 addr.
574 for (auto& i : av.v) {
575 if (i.is_legacy()) {
576 continue;
577 }
578 if (i.get_family() == socket_addr.get_family()) {
579 ldout(async_msgr->cct,10) << __func__ << " " << av << " -> " << i << dendl;
580 return i;
581 }
582 }
583 ldout(async_msgr->cct,10) << __func__ << " " << av << " -> nothing to match "
584 << socket_addr << dendl;
585 return {};
586 }
587
588 void AsyncConnection::fault()
589 {
590 shutdown_socket();
591 open_write = false;
592
593 // queue delayed items immediately
594 if (delay_state)
595 delay_state->flush();
596
597 recv_start = recv_end = 0;
598 state_offset = 0;
599 outgoing_bl.clear();
600 }
601
602 void AsyncConnection::_stop() {
603 writeCallback.reset();
604 dispatch_queue->discard_queue(conn_id);
605 async_msgr->unregister_conn(this);
606 worker->release_worker();
607
608 state = STATE_CLOSED;
609 open_write = false;
610
611 state_offset = 0;
612 // Make sure in-queue events will been processed
613 center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
614 }
615
616 bool AsyncConnection::is_queued() const {
617 return outgoing_bl.length();
618 }
619
620 void AsyncConnection::shutdown_socket() {
621 for (auto &&t : register_time_events) center->delete_time_event(t);
622 register_time_events.clear();
623 if (last_tick_id) {
624 center->delete_time_event(last_tick_id);
625 last_tick_id = 0;
626 }
627 if (cs) {
628 center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
629 cs.shutdown();
630 cs.close();
631 }
632 }
633
634 void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
635 {
636 Message *m = nullptr;
637 {
638 std::lock_guard<std::mutex> l(delay_lock);
639 register_time_events.erase(id);
640 if (stop_dispatch)
641 return ;
642 if (delay_queue.empty())
643 return ;
644 m = delay_queue.front();
645 delay_queue.pop_front();
646 }
647 if (msgr->ms_can_fast_dispatch(m)) {
648 dispatch_queue->fast_dispatch(m);
649 } else {
650 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
651 }
652 }
653
654 void AsyncConnection::DelayedDelivery::discard() {
655 stop_dispatch = true;
656 center->submit_to(center->get_id(),
657 [this]() mutable {
658 std::lock_guard<std::mutex> l(delay_lock);
659 while (!delay_queue.empty()) {
660 Message *m = delay_queue.front();
661 dispatch_queue->dispatch_throttle_release(
662 m->get_dispatch_throttle_size());
663 m->put();
664 delay_queue.pop_front();
665 }
666 for (auto i : register_time_events)
667 center->delete_time_event(i);
668 register_time_events.clear();
669 stop_dispatch = false;
670 },
671 true);
672 }
673
674 void AsyncConnection::DelayedDelivery::flush() {
675 stop_dispatch = true;
676 center->submit_to(
677 center->get_id(), [this] () mutable {
678 std::lock_guard<std::mutex> l(delay_lock);
679 while (!delay_queue.empty()) {
680 Message *m = delay_queue.front();
681 if (msgr->ms_can_fast_dispatch(m)) {
682 dispatch_queue->fast_dispatch(m);
683 } else {
684 dispatch_queue->enqueue(m, m->get_priority(), conn_id);
685 }
686 delay_queue.pop_front();
687 }
688 for (auto i : register_time_events)
689 center->delete_time_event(i);
690 register_time_events.clear();
691 stop_dispatch = false;
692 }, true);
693 }
694
695 void AsyncConnection::send_keepalive()
696 {
697 protocol->send_keepalive();
698 }
699
700 void AsyncConnection::mark_down()
701 {
702 ldout(async_msgr->cct, 1) << __func__ << dendl;
703 std::lock_guard<std::mutex> l(lock);
704 protocol->stop();
705 }
706
707 void AsyncConnection::handle_write()
708 {
709 ldout(async_msgr->cct, 10) << __func__ << dendl;
710 protocol->write_event();
711 }
712
713 void AsyncConnection::handle_write_callback() {
714 std::lock_guard<std::mutex> l(lock);
715 last_active = ceph::coarse_mono_clock::now();
716 recv_start_time = ceph::mono_clock::now();
717 write_lock.lock();
718 if (writeCallback) {
719 auto callback = *writeCallback;
720 writeCallback.reset();
721 write_lock.unlock();
722 callback(0);
723 return;
724 }
725 write_lock.unlock();
726 }
727
728 void AsyncConnection::stop(bool queue_reset) {
729 lock.lock();
730 bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
731 protocol->stop();
732 lock.unlock();
733 if (need_queue_reset) dispatch_queue->queue_reset(this);
734 }
735
736 void AsyncConnection::cleanup() {
737 shutdown_socket();
738 delete read_handler;
739 delete write_handler;
740 delete write_callback_handler;
741 delete wakeup_handler;
742 delete tick_handler;
743 if (delay_state) {
744 delete delay_state;
745 delay_state = NULL;
746 }
747 }
748
749 void AsyncConnection::wakeup_from(uint64_t id)
750 {
751 lock.lock();
752 register_time_events.erase(id);
753 lock.unlock();
754 process();
755 }
756
757 void AsyncConnection::tick(uint64_t id)
758 {
759 auto now = ceph::coarse_mono_clock::now();
760 ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
761 << " last_active=" << last_active << dendl;
762 std::lock_guard<std::mutex> l(lock);
763 last_tick_id = 0;
764 if (!is_connected()) {
765 if (connect_timeout_us <=
766 (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>
767 (now - last_connect_started).count()) {
768 ldout(async_msgr->cct, 1) << __func__ << " see no progress in more than "
769 << connect_timeout_us
770 << " us during connecting, fault."
771 << dendl;
772 protocol->fault();
773 } else {
774 last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
775 }
776 } else {
777 auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>
778 (now - last_active).count();
779 if (inactive_timeout_us < (uint64_t)idle_period) {
780 ldout(async_msgr->cct, 1) << __func__ << " idle (" << idle_period
781 << ") for more than " << inactive_timeout_us
782 << " us, fault."
783 << dendl;
784 protocol->fault();
785 } else {
786 last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
787 }
788 }
789 }
790