1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#ifndef CEPH_MSG_ASYNCCONNECTION_H
18   	#define CEPH_MSG_ASYNCCONNECTION_H
19   	
20   	#include <atomic>
21   	#include <pthread.h>
22   	#include <climits>
23   	#include <list>
24   	#include <mutex>
25   	#include <map>
26   	#include <functional>
27   	#include <optional>
28   	
29   	#include "auth/AuthSessionHandler.h"
30   	#include "common/ceph_time.h"
31   	#include "common/perf_counters.h"
32   	#include "include/buffer.h"
33   	#include "msg/Connection.h"
34   	#include "msg/Messenger.h"
35   	
36   	#include "Event.h"
37   	#include "Stack.h"
38   	
39   	class AsyncMessenger;
40   	class DispatchQueue;
41   	class Worker;
42   	class Protocol;
43   	
44   	static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
45   	
46   	/*
47   	 * AsyncConnection maintains a logic session between two endpoints. In other
48   	 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49   	 * will handle with network fault or read/write transactions. If one file
50   	 * descriptor broken, AsyncConnection will maintain the message queue and
51   	 * sequence, try to reconnect peer endpoint.
52   	 */
53   	class AsyncConnection : public Connection {
54   	  ssize_t read(unsigned len, char *buffer,
55   	               std::function<void(char *, ssize_t)> callback);
56   	  ssize_t read_until(unsigned needed, char *p);
57   	  ssize_t read_bulk(char *buf, unsigned len);
58   	
59   	  ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback,
60   	                bool more=false);
61   	  ssize_t _try_send(bool more=false);
62   	
63   	  void _connect();
64   	  void _stop();
65   	  void fault();
66   	  void inject_delay();
67   	
68   	  bool is_queued() const;
69   	  void shutdown_socket();
70   	
71   	   /**
72   	   * The DelayedDelivery is for injecting delays into Message delivery off
73   	   * the socket. It is only enabled if delays are requested, and if they
74   	   * are then it pulls Messages off the DelayQueue and puts them into the
75   	   * AsyncMessenger event queue.
76   	   */
77   	  class DelayedDelivery : public EventCallback {
78   	    std::set<uint64_t> register_time_events; // need to delete it if stop
79   	    std::deque<Message*> delay_queue;
80   	    std::mutex delay_lock;
81   	    AsyncMessenger *msgr;
82   	    EventCenter *center;
83   	    DispatchQueue *dispatch_queue;
84   	    uint64_t conn_id;
85   	    std::atomic_bool stop_dispatch;
86   	
87   	   public:
88   	    explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
89   	                             DispatchQueue *q, uint64_t cid)
90   	      : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
91   	        stop_dispatch(false) { }
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
92   	    ~DelayedDelivery() override {
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
93   	      ceph_assert(register_time_events.empty());
94   	      ceph_assert(delay_queue.empty());
95   	    }
96   	    void set_center(EventCenter *c) { center = c; }
97   	    void do_request(uint64_t id) override;
98   	    void queue(double delay_period, Message *m) {
99   	      std::lock_guard<std::mutex> l(delay_lock);
100  	      delay_queue.push_back(m);
101  	      register_time_events.insert(center->create_time_event(delay_period*1000000, this));
102  	    }
103  	    void discard();
104  	    bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
105  	    void flush();
106  	  } *delay_state;
107  	
108  	private:
109  	  FRIEND_MAKE_REF(AsyncConnection);
110  	  AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
111  			  Worker *w, bool is_msgr2, bool local);
112  	  ~AsyncConnection() override;
113  	public:
114  	  void maybe_start_delay_thread();
115  	
116  	  ostream& _conn_prefix(std::ostream *_dout);
117  	
118  	  bool is_connected() override;
119  	
120  	  // Only call when AsyncConnection first construct
121  	  void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
122  	
123  	  // Only call when AsyncConnection first construct
124  	  void accept(ConnectedSocket socket,
125  		      const entity_addr_t &listen_addr,
126  		      const entity_addr_t &peer_addr);
127  	  int send_message(Message *m) override;
128  	
129  	  void send_keepalive() override;
130  	  void mark_down() override;
131  	  void mark_disposable() override {
132  	    std::lock_guard<std::mutex> l(lock);
133  	    policy.lossy = true;
134  	  }
135  	
136  	  entity_addr_t get_peer_socket_addr() const override {
137  	    return target_addr;
138  	  }
139  	
140  	  int get_con_mode() const override;
141  	
142  	 private:
143  	  enum {
144  	    STATE_NONE,
145  	    STATE_CONNECTING,
146  	    STATE_CONNECTING_RE,
147  	    STATE_ACCEPTING,
148  	    STATE_CONNECTION_ESTABLISHED,
149  	    STATE_CLOSED
150  	  };
151  	
152  	  static const uint32_t TCP_PREFETCH_MIN_SIZE;
153  	  static const char *get_state_name(int state) {
154  	      const char* const statenames[] = {"STATE_NONE",
155  	                                        "STATE_CONNECTING",
156  	                                        "STATE_CONNECTING_RE",
157  	                                        "STATE_ACCEPTING",
158  	                                        "STATE_CONNECTION_ESTABLISHED",
159  	                                        "STATE_CLOSED"};
160  	      return statenames[state];
161  	  }
162  	
163  	  AsyncMessenger *async_msgr;
164  	  uint64_t conn_id;
165  	  PerfCounters *logger;
166  	  int state;
167  	  ConnectedSocket cs;
168  	  int port;
169  	public:
170  	  Messenger::Policy policy;
171  	private:
172  	
173  	  DispatchQueue *dispatch_queue;
174  	
175  	  // lockfree, only used in own thread
176  	  bufferlist outgoing_bl;
177  	  bool open_write = false;
178  	
179  	  std::mutex write_lock;
180  	
181  	  std::mutex lock;
182  	  EventCallbackRef read_handler;
183  	  EventCallbackRef write_handler;
184  	  EventCallbackRef write_callback_handler;
185  	  EventCallbackRef wakeup_handler;
186  	  EventCallbackRef tick_handler;
187  	  char *recv_buf;
188  	  uint32_t recv_max_prefetch;
189  	  uint32_t recv_start;
190  	  uint32_t recv_end;
191  	  set<uint64_t> register_time_events; // need to delete it if stop
192  	  ceph::coarse_mono_clock::time_point last_connect_started;
193  	  ceph::coarse_mono_clock::time_point last_active;
194  	  ceph::mono_clock::time_point recv_start_time;
195  	  uint64_t last_tick_id = 0;
196  	  const uint64_t connect_timeout_us;
197  	  const uint64_t inactive_timeout_us;
198  	
199  	  // Tis section are temp variables used by state transition
200  	
201  	  // Accepting state
202  	  bool msgr2 = false;
203  	  entity_addr_t socket_addr;  ///< local socket addr
204  	  entity_addr_t target_addr;  ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
205  	
206  	  entity_addr_t _infer_target_addr(const entity_addrvec_t& av);
207  	
208  	  // used only by "read_until"
209  	  uint64_t state_offset;
210  	  Worker *worker;
211  	  EventCenter *center;
212  	
213  	  std::unique_ptr<Protocol> protocol;
214  	
215  	  std::optional<std::function<void(ssize_t)>> writeCallback;
216  	  std::function<void(char *, ssize_t)> readCallback;
217  	  std::optional<unsigned> pendingReadLen;
218  	  char *read_buffer;
219  	
220  	 public:
221  	  // used by eventcallback
222  	  void handle_write();
223  	  void handle_write_callback();
224  	  void process();
225  	  void wakeup_from(uint64_t id);
226  	  void tick(uint64_t id);
227  	  void local_deliver();
228  	  void stop(bool queue_reset);
229  	  void cleanup();
230  	  PerfCounters *get_perf_counter() {
231  	    return logger;
232  	  }
233  	
234  	  bool is_msgr2() const override;
235  	
236  	  friend class Protocol;
237  	  friend class ProtocolV1;
238  	  friend class ProtocolV2;
239  	}; /* AsyncConnection */
240  	
241  	using AsyncConnectionRef = ceph::ref_t<AsyncConnection>;
242  	
243  	#endif
244