1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#ifndef _MSG_ASYNC_PROTOCOL_V2_
5    	#define _MSG_ASYNC_PROTOCOL_V2_
6    	
7    	#include <boost/container/static_vector.hpp>
8    	
9    	#include "Protocol.h"
10   	#include "crypto_onwire.h"
11   	#include "frames_v2.h"
12   	
13   	class ProtocolV2 : public Protocol {
14   	private:
15   	  enum State {
16   	    NONE,
17   	    START_CONNECT,
18   	    BANNER_CONNECTING,
19   	    HELLO_CONNECTING,
20   	    AUTH_CONNECTING,
21   	    AUTH_CONNECTING_SIGN,
22   	    SESSION_CONNECTING,
23   	    SESSION_RECONNECTING,
24   	    START_ACCEPT,
25   	    BANNER_ACCEPTING,
26   	    HELLO_ACCEPTING,
27   	    AUTH_ACCEPTING,
28   	    AUTH_ACCEPTING_MORE,
29   	    AUTH_ACCEPTING_SIGN,
30   	    SESSION_ACCEPTING,
31   	    READY,
32   	    THROTTLE_MESSAGE,
33   	    THROTTLE_BYTES,
34   	    THROTTLE_DISPATCH_QUEUE,
35   	    THROTTLE_DONE,
36   	    READ_MESSAGE_COMPLETE,
37   	    STANDBY,
38   	    WAIT,
39   	    CLOSED
40   	  };
41   	
42   	  static const char *get_state_name(int state) {
43   	    const char *const statenames[] = {"NONE",
44   	                                      "START_CONNECT",
45   	                                      "BANNER_CONNECTING",
46   	                                      "HELLO_CONNECTING",
47   	                                      "AUTH_CONNECTING",
48   	                                      "AUTH_CONNECTING_SIGN",
49   	                                      "SESSION_CONNECTING",
50   	                                      "SESSION_RECONNECTING",
51   	                                      "START_ACCEPT",
52   	                                      "BANNER_ACCEPTING",
53   	                                      "HELLO_ACCEPTING",
54   	                                      "AUTH_ACCEPTING",
55   	                                      "AUTH_ACCEPTING_MORE",
56   	                                      "AUTH_ACCEPTING_SIGN",
57   	                                      "SESSION_ACCEPTING",
58   	                                      "READY",
59   	                                      "THROTTLE_MESSAGE",
60   	                                      "THROTTLE_BYTES",
61   	                                      "THROTTLE_DISPATCH_QUEUE",
62   	                                      "THROTTLE_DONE",
63   	                                      "READ_MESSAGE_COMPLETE",
64   	                                      "STANDBY",
65   	                                      "WAIT",
66   	                                      "CLOSED"};
67   	    return statenames[state];
68   	  }
69   	
70   	public:
71   	  // TODO: move into auth_meta?
72   	  ceph::crypto::onwire::rxtx_t session_stream_handlers;
73   	private:
74   	  entity_name_t peer_name;
75   	  State state;
76   	  uint64_t peer_required_features;
77   	
78   	  uint64_t client_cookie;
79   	  uint64_t server_cookie;
80   	  uint64_t global_seq;
81   	  uint64_t connect_seq;
82   	  uint64_t peer_global_seq;
83   	  uint64_t message_seq;
84   	  bool reconnecting;
85   	  bool replacing;
86   	  bool can_write;
87   	  struct out_queue_entry_t {
88   	    bool is_prepared {false};
89   	    Message* m {nullptr};
90   	  };
91   	  std::map<int, std::list<out_queue_entry_t>> out_queue;
92   	  std::list<Message *> sent;
93   	  std::atomic<uint64_t> out_seq{0};
94   	  std::atomic<uint64_t> in_seq{0};
95   	  std::atomic<uint64_t> ack_left{0};
96   	
97   	  using ProtFuncPtr = void (ProtocolV2::*)();
98   	  Ct<ProtocolV2> *bannerExchangeCallback;
99   	
100  	  boost::container::static_vector<ceph::msgr::v2::segment_t,
101  					  ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
102  	  boost::container::static_vector<ceph::bufferlist,
103  					  ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_data;
104  	  ceph::msgr::v2::Tag next_tag;
105  	  utime_t backoff;  // backoff time
106  	  utime_t recv_stamp;
107  	  utime_t throttle_stamp;
108  	
109  	  struct {
110  	    ceph::bufferlist rxbuf;
111  	    ceph::bufferlist txbuf;
112  	    bool enabled {true};
113  	  } pre_auth;
114  	
115  	  bool keepalive;
116  	  bool write_in_progress = false;
117  	
118  	  ostream &_conn_prefix(std::ostream *_dout);
119  	  void run_continuation(Ct<ProtocolV2> *pcontinuation);
120  	  void run_continuation(Ct<ProtocolV2> &continuation);
121  	
122  	  Ct<ProtocolV2> *read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
123  	                       rx_buffer_t&& buffer);
124  	  template <class F>
125  	  Ct<ProtocolV2> *write(const std::string &desc,
126  	                        CONTINUATION_TYPE<ProtocolV2> &next,
127  				F &frame);
128  	  Ct<ProtocolV2> *write(const std::string &desc,
129  	                        CONTINUATION_TYPE<ProtocolV2> &next,
130  	                        bufferlist &buffer);
131  	
132  	  void requeue_sent();
133  	  uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
134  	  void reset_recv_state();
135  	  void reset_throttle();
136  	  Ct<ProtocolV2> *_fault();
137  	  void discard_out_queue();
138  	  void reset_session();
139  	  void prepare_send_message(uint64_t features, Message *m);
140  	  out_queue_entry_t _get_next_outgoing();
141  	  ssize_t write_message(Message *m, bool more);
142  	  void append_keepalive();
143  	  void append_keepalive_ack(utime_t &timestamp);
144  	  void handle_message_ack(uint64_t seq);
145  	
146  	  CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
147  	  READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
148  	  READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload);
149  	
150  	  Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> &callback);
151  	  Ct<ProtocolV2> *_wait_for_peer_banner();
152  	  Ct<ProtocolV2> *_handle_peer_banner(rx_buffer_t &&buffer, int r);
153  	  Ct<ProtocolV2> *_handle_peer_banner_payload(rx_buffer_t &&buffer, int r);
154  	  Ct<ProtocolV2> *handle_hello(ceph::bufferlist &payload);
155  	
156  	  CONTINUATION_DECL(ProtocolV2, read_frame);
157  	  CONTINUATION_DECL(ProtocolV2, finish_auth);
158  	  READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
159  	  READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment);
160  	  READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_epilogue_main);
161  	  CONTINUATION_DECL(ProtocolV2, throttle_message);
162  	  CONTINUATION_DECL(ProtocolV2, throttle_bytes);
163  	  CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
164  	
165  	  Ct<ProtocolV2> *read_frame();
166  	  Ct<ProtocolV2> *finish_auth();
167  	  Ct<ProtocolV2> *finish_client_auth();
168  	  Ct<ProtocolV2> *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r);
169  	  Ct<ProtocolV2> *read_frame_segment();
170  	  Ct<ProtocolV2> *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r);
171  	  Ct<ProtocolV2> *handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r);
172  	  Ct<ProtocolV2> *handle_read_frame_dispatch();
173  	  Ct<ProtocolV2> *handle_frame_payload();
174  	
175  	  Ct<ProtocolV2> *ready();
176  	
177  	  Ct<ProtocolV2> *handle_message();
178  	  Ct<ProtocolV2> *throttle_message();
179  	  Ct<ProtocolV2> *throttle_bytes();
180  	  Ct<ProtocolV2> *throttle_dispatch_queue();
181  	  Ct<ProtocolV2> *read_message_data_prepare();
182  	
183  	  Ct<ProtocolV2> *handle_keepalive2(ceph::bufferlist &payload);
184  	  Ct<ProtocolV2> *handle_keepalive2_ack(ceph::bufferlist &payload);
185  	
186  	  Ct<ProtocolV2> *handle_message_ack(ceph::bufferlist &payload);
187  	
188  	public:
(1) Event member_decl: Class member declaration for "connection_features".
Also see events: [uninit_member]
189  	  uint64_t connection_features;
190  	
191  	  ProtocolV2(AsyncConnection *connection);
192  	  virtual ~ProtocolV2();
193  	
194  	  virtual void connect() override;
195  	  virtual void accept() override;
196  	  virtual bool is_connected() override;
197  	  virtual void stop() override;
198  	  virtual void fault() override;
199  	  virtual void send_message(Message *m) override;
200  	  virtual void send_keepalive() override;
201  	
202  	  virtual void read_event() override;
203  	  virtual void write_event() override;
204  	  virtual bool is_queued() override;
205  	
206  	private:
207  	  // Client Protocol
208  	  CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange);
209  	  CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
210  	
211  	  Ct<ProtocolV2> *start_client_banner_exchange();
212  	  Ct<ProtocolV2> *post_client_banner_exchange();
213  	  inline Ct<ProtocolV2> *send_auth_request() {
214  	    std::vector<uint32_t> empty;
215  	    return send_auth_request(empty);
216  	  }
217  	  Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> &allowed_methods);
218  	  Ct<ProtocolV2> *handle_auth_bad_method(ceph::bufferlist &payload);
219  	  Ct<ProtocolV2> *handle_auth_reply_more(ceph::bufferlist &payload);
220  	  Ct<ProtocolV2> *handle_auth_done(ceph::bufferlist &payload);
221  	  Ct<ProtocolV2> *handle_auth_signature(ceph::bufferlist &payload);
222  	  Ct<ProtocolV2> *send_client_ident();
223  	  Ct<ProtocolV2> *send_reconnect();
224  	  Ct<ProtocolV2> *handle_ident_missing_features(ceph::bufferlist &payload);
225  	  Ct<ProtocolV2> *handle_session_reset(ceph::bufferlist &payload);
226  	  Ct<ProtocolV2> *handle_session_retry(ceph::bufferlist &payload);
227  	  Ct<ProtocolV2> *handle_session_retry_global(ceph::bufferlist &payload);
228  	  Ct<ProtocolV2> *handle_wait(ceph::bufferlist &payload);
229  	  Ct<ProtocolV2> *handle_reconnect_ok(ceph::bufferlist &payload);
230  	  Ct<ProtocolV2> *handle_server_ident(ceph::bufferlist &payload);
231  	
232  	  // Server Protocol
233  	  CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
234  	  CONTINUATION_DECL(ProtocolV2, post_server_banner_exchange);
235  	  CONTINUATION_DECL(ProtocolV2, server_ready);
236  	
237  	  Ct<ProtocolV2> *start_server_banner_exchange();
238  	  Ct<ProtocolV2> *post_server_banner_exchange();
239  	  Ct<ProtocolV2> *handle_auth_request(ceph::bufferlist &payload);
240  	  Ct<ProtocolV2> *handle_auth_request_more(ceph::bufferlist &payload);
241  	  Ct<ProtocolV2> *_handle_auth_request(bufferlist& auth_payload, bool more);
242  	  Ct<ProtocolV2> *_auth_bad_method(int r);
243  	  Ct<ProtocolV2> *handle_client_ident(ceph::bufferlist &payload);
244  	  Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
245  	  Ct<ProtocolV2> *handle_reconnect(ceph::bufferlist &payload);
246  	  Ct<ProtocolV2> *handle_existing_connection(const AsyncConnectionRef& existing);
247  	  Ct<ProtocolV2> *reuse_connection(const AsyncConnectionRef& existing,
248  	                                   ProtocolV2 *exproto);
249  	  Ct<ProtocolV2> *send_server_ident();
250  	  Ct<ProtocolV2> *send_reconnect_ok();
251  	  Ct<ProtocolV2> *server_ready();
252  	
253  	  uint32_t get_onwire_size(uint32_t logical_size) const;
254  	  uint32_t get_epilogue_size() const;
255  	  size_t get_current_msg_size() const;
256  	};
257  	
258  	#endif /* _MSG_ASYNC_PROTOCOL_V2_ */
259