1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
19
20 #include <atomic>
21 #include <pthread.h>
22 #include <climits>
23 #include <list>
24 #include <mutex>
25 #include <map>
26 #include <functional>
27 #include <optional>
28
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
35
36 #include "Event.h"
37 #include "Stack.h"
38
39 class AsyncMessenger;
40 class DispatchQueue;
41 class Worker;
42 class Protocol;
43
44 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
45
46 /*
47 * AsyncConnection maintains a logic session between two endpoints. In other
48 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49 * will handle with network fault or read/write transactions. If one file
50 * descriptor broken, AsyncConnection will maintain the message queue and
51 * sequence, try to reconnect peer endpoint.
52 */
53 class AsyncConnection : public Connection {
54 ssize_t read(unsigned len, char *buffer,
55 std::function<void(char *, ssize_t)> callback);
56 ssize_t read_until(unsigned needed, char *p);
57 ssize_t read_bulk(char *buf, unsigned len);
58
59 ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback,
60 bool more=false);
61 ssize_t _try_send(bool more=false);
62
63 void _connect();
64 void _stop();
65 void fault();
66 void inject_delay();
67
68 bool is_queued() const;
69 void shutdown_socket();
70
71 /**
72 * The DelayedDelivery is for injecting delays into Message delivery off
73 * the socket. It is only enabled if delays are requested, and if they
74 * are then it pulls Messages off the DelayQueue and puts them into the
75 * AsyncMessenger event queue.
76 */
77 class DelayedDelivery : public EventCallback {
78 std::set<uint64_t> register_time_events; // need to delete it if stop
79 std::deque<Message*> delay_queue;
80 std::mutex delay_lock;
81 AsyncMessenger *msgr;
82 EventCenter *center;
83 DispatchQueue *dispatch_queue;
84 uint64_t conn_id;
85 std::atomic_bool stop_dispatch;
86
87 public:
88 explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
89 DispatchQueue *q, uint64_t cid)
90 : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
91 stop_dispatch(false) { }
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
92 ~DelayedDelivery() override {
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
93 ceph_assert(register_time_events.empty());
94 ceph_assert(delay_queue.empty());
95 }
96 void set_center(EventCenter *c) { center = c; }
97 void do_request(uint64_t id) override;
98 void queue(double delay_period, Message *m) {
99 std::lock_guard<std::mutex> l(delay_lock);
100 delay_queue.push_back(m);
101 register_time_events.insert(center->create_time_event(delay_period*1000000, this));
102 }
103 void discard();
104 bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
105 void flush();
106 } *delay_state;
107
108 private:
109 FRIEND_MAKE_REF(AsyncConnection);
110 AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
111 Worker *w, bool is_msgr2, bool local);
112 ~AsyncConnection() override;
113 public:
114 void maybe_start_delay_thread();
115
116 ostream& _conn_prefix(std::ostream *_dout);
117
118 bool is_connected() override;
119
120 // Only call when AsyncConnection first construct
121 void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
122
123 // Only call when AsyncConnection first construct
124 void accept(ConnectedSocket socket,
125 const entity_addr_t &listen_addr,
126 const entity_addr_t &peer_addr);
127 int send_message(Message *m) override;
128
129 void send_keepalive() override;
130 void mark_down() override;
131 void mark_disposable() override {
132 std::lock_guard<std::mutex> l(lock);
133 policy.lossy = true;
134 }
135
136 entity_addr_t get_peer_socket_addr() const override {
137 return target_addr;
138 }
139
140 int get_con_mode() const override;
141
142 private:
143 enum {
144 STATE_NONE,
145 STATE_CONNECTING,
146 STATE_CONNECTING_RE,
147 STATE_ACCEPTING,
148 STATE_CONNECTION_ESTABLISHED,
149 STATE_CLOSED
150 };
151
152 static const uint32_t TCP_PREFETCH_MIN_SIZE;
153 static const char *get_state_name(int state) {
154 const char* const statenames[] = {"STATE_NONE",
155 "STATE_CONNECTING",
156 "STATE_CONNECTING_RE",
157 "STATE_ACCEPTING",
158 "STATE_CONNECTION_ESTABLISHED",
159 "STATE_CLOSED"};
160 return statenames[state];
161 }
162
163 AsyncMessenger *async_msgr;
164 uint64_t conn_id;
165 PerfCounters *logger;
166 int state;
167 ConnectedSocket cs;
168 int port;
169 public:
170 Messenger::Policy policy;
171 private:
172
173 DispatchQueue *dispatch_queue;
174
175 // lockfree, only used in own thread
176 bufferlist outgoing_bl;
177 bool open_write = false;
178
179 std::mutex write_lock;
180
181 std::mutex lock;
182 EventCallbackRef read_handler;
183 EventCallbackRef write_handler;
184 EventCallbackRef write_callback_handler;
185 EventCallbackRef wakeup_handler;
186 EventCallbackRef tick_handler;
187 char *recv_buf;
188 uint32_t recv_max_prefetch;
189 uint32_t recv_start;
190 uint32_t recv_end;
191 set<uint64_t> register_time_events; // need to delete it if stop
192 ceph::coarse_mono_clock::time_point last_connect_started;
193 ceph::coarse_mono_clock::time_point last_active;
194 ceph::mono_clock::time_point recv_start_time;
195 uint64_t last_tick_id = 0;
196 const uint64_t connect_timeout_us;
197 const uint64_t inactive_timeout_us;
198
199 // Tis section are temp variables used by state transition
200
201 // Accepting state
202 bool msgr2 = false;
203 entity_addr_t socket_addr; ///< local socket addr
204 entity_addr_t target_addr; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
205
206 entity_addr_t _infer_target_addr(const entity_addrvec_t& av);
207
208 // used only by "read_until"
209 uint64_t state_offset;
210 Worker *worker;
211 EventCenter *center;
212
213 std::unique_ptr<Protocol> protocol;
214
215 std::optional<std::function<void(ssize_t)>> writeCallback;
216 std::function<void(char *, ssize_t)> readCallback;
217 std::optional<unsigned> pendingReadLen;
218 char *read_buffer;
219
220 public:
221 // used by eventcallback
222 void handle_write();
223 void handle_write_callback();
224 void process();
225 void wakeup_from(uint64_t id);
226 void tick(uint64_t id);
227 void local_deliver();
228 void stop(bool queue_reset);
229 void cleanup();
230 PerfCounters *get_perf_counter() {
231 return logger;
232 }
233
234 bool is_msgr2() const override;
235
236 friend class Protocol;
237 friend class ProtocolV1;
238 friend class ProtocolV2;
239 }; /* AsyncConnection */
240
241 using AsyncConnectionRef = ceph::ref_t<AsyncConnection>;
242
243 #endif
244