1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef _MSG_ASYNC_PROTOCOL_V2_
5 #define _MSG_ASYNC_PROTOCOL_V2_
6
7 #include <boost/container/static_vector.hpp>
8
9 #include "Protocol.h"
10 #include "crypto_onwire.h"
11 #include "frames_v2.h"
12
13 class ProtocolV2 : public Protocol {
14 private:
15 enum State {
16 NONE,
17 START_CONNECT,
18 BANNER_CONNECTING,
19 HELLO_CONNECTING,
20 AUTH_CONNECTING,
21 AUTH_CONNECTING_SIGN,
22 SESSION_CONNECTING,
23 SESSION_RECONNECTING,
24 START_ACCEPT,
25 BANNER_ACCEPTING,
26 HELLO_ACCEPTING,
27 AUTH_ACCEPTING,
28 AUTH_ACCEPTING_MORE,
29 AUTH_ACCEPTING_SIGN,
30 SESSION_ACCEPTING,
31 READY,
32 THROTTLE_MESSAGE,
33 THROTTLE_BYTES,
34 THROTTLE_DISPATCH_QUEUE,
35 THROTTLE_DONE,
36 READ_MESSAGE_COMPLETE,
37 STANDBY,
38 WAIT,
39 CLOSED
40 };
41
42 static const char *get_state_name(int state) {
43 const char *const statenames[] = {"NONE",
44 "START_CONNECT",
45 "BANNER_CONNECTING",
46 "HELLO_CONNECTING",
47 "AUTH_CONNECTING",
48 "AUTH_CONNECTING_SIGN",
49 "SESSION_CONNECTING",
50 "SESSION_RECONNECTING",
51 "START_ACCEPT",
52 "BANNER_ACCEPTING",
53 "HELLO_ACCEPTING",
54 "AUTH_ACCEPTING",
55 "AUTH_ACCEPTING_MORE",
56 "AUTH_ACCEPTING_SIGN",
57 "SESSION_ACCEPTING",
58 "READY",
59 "THROTTLE_MESSAGE",
60 "THROTTLE_BYTES",
61 "THROTTLE_DISPATCH_QUEUE",
62 "THROTTLE_DONE",
63 "READ_MESSAGE_COMPLETE",
64 "STANDBY",
65 "WAIT",
66 "CLOSED"};
67 return statenames[state];
68 }
69
70 public:
71 // TODO: move into auth_meta?
72 ceph::crypto::onwire::rxtx_t session_stream_handlers;
73 private:
74 entity_name_t peer_name;
75 State state;
76 uint64_t peer_required_features;
77
78 uint64_t client_cookie;
79 uint64_t server_cookie;
80 uint64_t global_seq;
81 uint64_t connect_seq;
82 uint64_t peer_global_seq;
83 uint64_t message_seq;
84 bool reconnecting;
85 bool replacing;
86 bool can_write;
87 struct out_queue_entry_t {
88 bool is_prepared {false};
89 Message* m {nullptr};
90 };
91 std::map<int, std::list<out_queue_entry_t>> out_queue;
92 std::list<Message *> sent;
93 std::atomic<uint64_t> out_seq{0};
94 std::atomic<uint64_t> in_seq{0};
95 std::atomic<uint64_t> ack_left{0};
96
97 using ProtFuncPtr = void (ProtocolV2::*)();
98 Ct<ProtocolV2> *bannerExchangeCallback;
99
100 boost::container::static_vector<ceph::msgr::v2::segment_t,
101 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
102 boost::container::static_vector<ceph::bufferlist,
103 ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data;
104 ceph::msgr::v2::Tag next_tag;
105 utime_t backoff; // backoff time
106 utime_t recv_stamp;
107 utime_t throttle_stamp;
108
109 struct {
110 ceph::bufferlist rxbuf;
111 ceph::bufferlist txbuf;
112 bool enabled {true};
113 } pre_auth;
114
115 bool keepalive;
116 bool write_in_progress = false;
117
118 ostream &_conn_prefix(std::ostream *_dout);
119 void run_continuation(Ct<ProtocolV2> *pcontinuation);
120 void run_continuation(Ct<ProtocolV2> &continuation);
121
122 Ct<ProtocolV2> *read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
123 rx_buffer_t&& buffer);
124 template <class F>
125 Ct<ProtocolV2> *write(const std::string &desc,
126 CONTINUATION_TYPE<ProtocolV2> &next,
127 F &frame);
128 Ct<ProtocolV2> *write(const std::string &desc,
129 CONTINUATION_TYPE<ProtocolV2> &next,
130 bufferlist &buffer);
131
132 void requeue_sent();
133 uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
134 void reset_recv_state();
135 void reset_throttle();
136 Ct<ProtocolV2> *_fault();
137 void discard_out_queue();
138 void reset_session();
139 void prepare_send_message(uint64_t features, Message *m);
140 out_queue_entry_t _get_next_outgoing();
141 ssize_t write_message(Message *m, bool more);
142 void append_keepalive();
143 void append_keepalive_ack(utime_t ×tamp);
144 void handle_message_ack(uint64_t seq);
145
146 CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
147 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
148 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload);
149
150 Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> &callback);
151 Ct<ProtocolV2> *_wait_for_peer_banner();
152 Ct<ProtocolV2> *_handle_peer_banner(rx_buffer_t &&buffer, int r);
153 Ct<ProtocolV2> *_handle_peer_banner_payload(rx_buffer_t &&buffer, int r);
154 Ct<ProtocolV2> *handle_hello(ceph::bufferlist &payload);
155
156 CONTINUATION_DECL(ProtocolV2, read_frame);
157 CONTINUATION_DECL(ProtocolV2, finish_auth);
158 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
159 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment);
160 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_epilogue_main);
161 CONTINUATION_DECL(ProtocolV2, throttle_message);
162 CONTINUATION_DECL(ProtocolV2, throttle_bytes);
163 CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
164
165 Ct<ProtocolV2> *read_frame();
166 Ct<ProtocolV2> *finish_auth();
167 Ct<ProtocolV2> *finish_client_auth();
168 Ct<ProtocolV2> *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r);
169 Ct<ProtocolV2> *read_frame_segment();
170 Ct<ProtocolV2> *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r);
171 Ct<ProtocolV2> *handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r);
172 Ct<ProtocolV2> *handle_read_frame_dispatch();
173 Ct<ProtocolV2> *handle_frame_payload();
174
175 Ct<ProtocolV2> *ready();
176
177 Ct<ProtocolV2> *handle_message();
178 Ct<ProtocolV2> *throttle_message();
179 Ct<ProtocolV2> *throttle_bytes();
180 Ct<ProtocolV2> *throttle_dispatch_queue();
181 Ct<ProtocolV2> *read_message_data_prepare();
182
183 Ct<ProtocolV2> *handle_keepalive2(ceph::bufferlist &payload);
184 Ct<ProtocolV2> *handle_keepalive2_ack(ceph::bufferlist &payload);
185
186 Ct<ProtocolV2> *handle_message_ack(ceph::bufferlist &payload);
187
188 public:
(1) Event member_decl: |
Class member declaration for "connection_features". |
Also see events: |
[uninit_member] |
189 uint64_t connection_features;
190
191 ProtocolV2(AsyncConnection *connection);
192 virtual ~ProtocolV2();
193
194 virtual void connect() override;
195 virtual void accept() override;
196 virtual bool is_connected() override;
197 virtual void stop() override;
198 virtual void fault() override;
199 virtual void send_message(Message *m) override;
200 virtual void send_keepalive() override;
201
202 virtual void read_event() override;
203 virtual void write_event() override;
204 virtual bool is_queued() override;
205
206 private:
207 // Client Protocol
208 CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange);
209 CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
210
211 Ct<ProtocolV2> *start_client_banner_exchange();
212 Ct<ProtocolV2> *post_client_banner_exchange();
213 inline Ct<ProtocolV2> *send_auth_request() {
214 std::vector<uint32_t> empty;
215 return send_auth_request(empty);
216 }
217 Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> &allowed_methods);
218 Ct<ProtocolV2> *handle_auth_bad_method(ceph::bufferlist &payload);
219 Ct<ProtocolV2> *handle_auth_reply_more(ceph::bufferlist &payload);
220 Ct<ProtocolV2> *handle_auth_done(ceph::bufferlist &payload);
221 Ct<ProtocolV2> *handle_auth_signature(ceph::bufferlist &payload);
222 Ct<ProtocolV2> *send_client_ident();
223 Ct<ProtocolV2> *send_reconnect();
224 Ct<ProtocolV2> *handle_ident_missing_features(ceph::bufferlist &payload);
225 Ct<ProtocolV2> *handle_session_reset(ceph::bufferlist &payload);
226 Ct<ProtocolV2> *handle_session_retry(ceph::bufferlist &payload);
227 Ct<ProtocolV2> *handle_session_retry_global(ceph::bufferlist &payload);
228 Ct<ProtocolV2> *handle_wait(ceph::bufferlist &payload);
229 Ct<ProtocolV2> *handle_reconnect_ok(ceph::bufferlist &payload);
230 Ct<ProtocolV2> *handle_server_ident(ceph::bufferlist &payload);
231
232 // Server Protocol
233 CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
234 CONTINUATION_DECL(ProtocolV2, post_server_banner_exchange);
235 CONTINUATION_DECL(ProtocolV2, server_ready);
236
237 Ct<ProtocolV2> *start_server_banner_exchange();
238 Ct<ProtocolV2> *post_server_banner_exchange();
239 Ct<ProtocolV2> *handle_auth_request(ceph::bufferlist &payload);
240 Ct<ProtocolV2> *handle_auth_request_more(ceph::bufferlist &payload);
241 Ct<ProtocolV2> *_handle_auth_request(bufferlist& auth_payload, bool more);
242 Ct<ProtocolV2> *_auth_bad_method(int r);
243 Ct<ProtocolV2> *handle_client_ident(ceph::bufferlist &payload);
244 Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
245 Ct<ProtocolV2> *handle_reconnect(ceph::bufferlist &payload);
246 Ct<ProtocolV2> *handle_existing_connection(const AsyncConnectionRef& existing);
247 Ct<ProtocolV2> *reuse_connection(const AsyncConnectionRef& existing,
248 ProtocolV2 *exproto);
249 Ct<ProtocolV2> *send_server_ident();
250 Ct<ProtocolV2> *send_reconnect_ok();
251 Ct<ProtocolV2> *server_ready();
252
253 uint32_t get_onwire_size(uint32_t logical_size) const;
254 uint32_t get_epilogue_size() const;
255 size_t get_current_msg_size() const;
256 };
257
258 #endif /* _MSG_ASYNC_PROTOCOL_V2_ */
259