1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#ifndef CEPH_MSG_RDMASTACK_H
18   	#define CEPH_MSG_RDMASTACK_H
19   	
20   	#include <sys/eventfd.h>
21   	
22   	#include <list>
23   	#include <vector>
24   	#include <thread>
25   	
26   	#include "common/ceph_context.h"
27   	#include "common/debug.h"
28   	#include "common/errno.h"
29   	#include "msg/async/Stack.h"
30   	#include "Infiniband.h"
31   	
32   	class RDMAConnectedSocketImpl;
33   	class RDMAServerSocketImpl;
34   	class RDMAStack;
35   	class RDMAWorker;
36   	
37   	class RDMADispatcher {
38   	  typedef Infiniband::MemoryManager::Chunk Chunk;
39   	  typedef Infiniband::QueuePair QueuePair;
40   	
41   	  std::thread t;
42   	  CephContext *cct;
43   	  shared_ptr<Infiniband> ib;
44   	  Infiniband::CompletionQueue* tx_cq = nullptr;
45   	  Infiniband::CompletionQueue* rx_cq = nullptr;
46   	  Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
47   	  bool done = false;
48   	  std::atomic<uint64_t> num_qp_conn = {0};
49   	  // protect `qp_conns`, `dead_queue_pairs`
50   	  ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
51   	  // qp_num -> InfRcConnection
52   	  // The main usage of `qp_conns` is looking up connection by qp_num,
53   	  // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
54   	  //// make qp queue into dead state
55   	  /**
56   	   * 1. Connection call mark_down
57   	   * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
58   	   * 3. Post a beacon
59   	   * 4. Wait for beacon which indicates queues are drained
60   	   * 5. Destroy the QP by calling ibv_destroy_qp()
61   	   *
62   	   * @param qp The qp needed to dead
63   	   */
64   	  ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
65   	
66   	  /// if a queue pair is closed when transmit buffers are active
67   	  /// on it, the transmit buffers never get returned via tx_cq.  To
68   	  /// work around this problem, don't delete queue pairs immediately. Instead,
69   	  /// save them in this vector and delete them at a safe time, when there are
70   	  /// no outstanding transmit buffers to be lost.
71   	  std::vector<QueuePair*> dead_queue_pairs;
72   	
73   	  std::atomic<uint64_t> num_pending_workers = {0};
74   	  // protect pending workers
75   	  ceph::mutex w_lock =
76   	    ceph::make_mutex("RDMADispatcher::for worker pending list");
77   	  // fixme: lockfree
78   	  std::list<RDMAWorker*> pending_workers;
79   	  void enqueue_dead_qp(uint32_t qp);
80   	
81   	 public:
82   	  PerfCounters *perf_logger;
83   	
84   	  explicit RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib);
85   	  virtual ~RDMADispatcher();
86   	  void handle_async_event();
87   	
88   	  void polling_start();
89   	  void polling_stop();
90   	  void polling();
91   	  void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
92   	  void make_pending_worker(RDMAWorker* w) {
93   	    std::lock_guard l{w_lock};
94   	    auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
95   	    if (it != pending_workers.end())
96   	      return;
97   	    pending_workers.push_back(w);
98   	    ++num_pending_workers;
99   	  }
100  	  RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
101  	  QueuePair* get_qp_lockless(uint32_t qp);
102  	  QueuePair* get_qp(uint32_t qp);
103  	  void schedule_qp_destroy(uint32_t qp);
104  	  Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
105  	  Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
106  	  void notify_pending_workers();
107  	  void handle_tx_event(ibv_wc *cqe, int n);
108  	  void post_tx_buffer(std::vector<Chunk*> &chunks);
109  	  void handle_rx_event(ibv_wc *cqe, int rx_number);
110  	
111  	  std::atomic<uint64_t> inflight = {0};
112  	
113  	  void post_chunk_to_pool(Chunk* chunk);
114  	  int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
115  	};
116  	
117  	class RDMAWorker : public Worker {
118  	  typedef Infiniband::CompletionQueue CompletionQueue;
119  	  typedef Infiniband::CompletionChannel CompletionChannel;
120  	  typedef Infiniband::MemoryManager::Chunk Chunk;
121  	  typedef Infiniband::MemoryManager MemoryManager;
122  	  typedef std::vector<Chunk*>::iterator ChunkIter;
123  	  shared_ptr<Infiniband> ib;
124  	  EventCallbackRef tx_handler;
125  	  std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
126  	  shared_ptr<RDMADispatcher> dispatcher;
127  	  ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
128  	
129  	  class C_handle_cq_tx : public EventCallback {
130  	    RDMAWorker *worker;
131  	    public:
132  	    explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {}
133  	    void do_request(uint64_t fd) {
134  	      worker->handle_pending_message();
135  	    }
136  	  };
137  	
138  	 public:
139  	  PerfCounters *perf_logger;
140  	  explicit RDMAWorker(CephContext *c, unsigned i);
141  	  virtual ~RDMAWorker();
142  	  virtual int listen(entity_addr_t &addr,
143  			     unsigned addr_slot,
144  			     const SocketOptions &opts, ServerSocket *) override;
145  	  virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
146  	  virtual void initialize() override;
147  	  int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
148  	  void remove_pending_conn(RDMAConnectedSocketImpl *o) {
149  	    ceph_assert(center.in_thread());
150  	    pending_sent_conns.remove(o);
151  	  }
152  	  void handle_pending_message();
(1) Event deref_parm_field_in_call: Function "operator =" dereferences an offset off "this". [details]
153  	  void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
154  	  void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
155  	  void notify_worker() {
156  	    center.dispatch_event_external(tx_handler);
157  	  }
158  	};
159  	
160  	struct RDMACMInfo {
161  	  RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
162  	    : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
163  	  rdma_cm_id *cm_id;
164  	  rdma_event_channel *cm_channel;
165  	  uint32_t qp_num;
166  	};
167  	
168  	class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
169  	 public:
170  	  typedef Infiniband::MemoryManager::Chunk Chunk;
171  	  typedef Infiniband::CompletionChannel CompletionChannel;
172  	  typedef Infiniband::CompletionQueue CompletionQueue;
173  	
174  	 protected:
175  	  CephContext *cct;
176  	  Infiniband::QueuePair *qp;
177  	  uint32_t peer_qpn = 0;
178  	  uint32_t local_qpn = 0;
179  	  int connected;
180  	  int error;
181  	  shared_ptr<Infiniband> ib;
182  	  shared_ptr<RDMADispatcher> dispatcher;
183  	  RDMAWorker* worker;
184  	  std::vector<Chunk*> buffers;
185  	  int notify_fd = -1;
186  	  bufferlist pending_bl;
187  	
188  	  ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
189  	  std::vector<ibv_wc> wc;
190  	  bool is_server;
191  	  EventCallbackRef con_handler;
192  	  int tcp_fd = -1;
193  	  bool active;// qp is active ?
194  	  bool pending;
195  	  int post_backlog = 0;
196  	
197  	  void notify();
198  	  void buffer_prefetch(void);
199  	  ssize_t read_buffers(char* buf, size_t len);
200  	  int post_work_request(std::vector<Chunk*>&);
201  	  size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len,
202  	      decltype(std::cbegin(pending_bl.buffers()))& start,
203  	      const decltype(std::cbegin(pending_bl.buffers()))& end);
204  	
205  	 public:
206  	  RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
207  	      shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
208  	  virtual ~RDMAConnectedSocketImpl();
209  	
210  	  void pass_wc(std::vector<ibv_wc> &&v);
211  	  void get_wc(std::vector<ibv_wc> &w);
212  	  virtual int is_connected() override { return connected; }
213  	
214  	  virtual ssize_t read(char* buf, size_t len) override;
215  	  virtual ssize_t zero_copy_read(bufferptr &data) override;
216  	  virtual ssize_t send(bufferlist &bl, bool more) override;
217  	  virtual void shutdown() override;
218  	  virtual void close() override;
219  	  virtual int fd() const override { return notify_fd; }
220  	  void fault();
221  	  const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
222  	  uint32_t get_peer_qpn () const { return peer_qpn; }
223  	  uint32_t get_local_qpn () const { return local_qpn; }
224  	  ssize_t submit(bool more);
225  	  int activate();
226  	  void fin();
227  	  void handle_connection();
228  	  void cleanup();
229  	  void set_accept_fd(int sd);
230  	  virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
231  	  bool is_pending() {return pending;}
232  	  void set_pending(bool val) {pending = val;}
233  	  void post_chunks_to_rq(int num);
234  	  void update_post_backlog();
235  	
236  	  class C_handle_connection : public EventCallback {
237  	    RDMAConnectedSocketImpl *csi;
238  	    bool active;
239  	   public:
240  	    explicit C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
241  	    void do_request(uint64_t fd) {
242  	      if (active)
243  	        csi->handle_connection();
244  	    }
245  	    void close() {
246  	      active = false;
247  	    }
248  	  };
249  	};
250  	
251  	enum RDMA_CM_STATUS {
252  	  IDLE = 1,
253  	  RDMA_ID_CREATED,
254  	  CHANNEL_FD_CREATED,
255  	  RESOURCE_ALLOCATED,
256  	  ADDR_RESOLVED,
257  	  ROUTE_RESOLVED,
258  	  CONNECTED,
259  	  DISCONNECTED,
260  	  ERROR
261  	};
262  	
263  	class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
264  	  public:
265  	    RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
266  	        shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr);
267  	    ~RDMAIWARPConnectedSocketImpl();
268  	    virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
269  	    virtual void close() override;
270  	    virtual void shutdown() override;
271  	    virtual void handle_cm_connection();
272  	    void activate();
273  	    int alloc_resource();
274  	    void close_notify();
275  	
276  	  private:
277  	    rdma_cm_id *cm_id = nullptr;
278  	    rdma_event_channel *cm_channel = nullptr;
279  	    EventCallbackRef cm_con_handler;
280  	    std::mutex close_mtx;
281  	    std::condition_variable close_condition;
282  	    bool closed = false;
283  	    RDMA_CM_STATUS status = IDLE;
284  	
285  	
286  	  class C_handle_cm_connection : public EventCallback {
287  	    RDMAIWARPConnectedSocketImpl *csi;
288  	    public:
289  	      C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
290  	      void do_request(uint64_t fd) {
291  	        csi->handle_cm_connection();
292  	      }
293  	  };
294  	};
295  	
296  	class RDMAServerSocketImpl : public ServerSocketImpl {
297  	  protected:
298  	    CephContext *cct;
299  	    NetHandler net;
300  	    int server_setup_socket;
301  	    shared_ptr<Infiniband> ib;
302  	    shared_ptr<RDMADispatcher> dispatcher;
303  	    RDMAWorker *worker;
304  	    entity_addr_t sa;
305  	
306  	 public:
307  	  RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
308  	                       shared_ptr<RDMADispatcher>& rdma_dispatcher,
309  			       RDMAWorker *w, entity_addr_t& a, unsigned slot);
310  	
311  	  virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
312  	  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
313  	  virtual void abort_accept() override;
314  	  virtual int fd() const override { return server_setup_socket; }
315  	};
316  	
317  	class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
318  	  public:
319  	    RDMAIWARPServerSocketImpl(
320  	      CephContext *cct, shared_ptr<Infiniband>& ib,
321  	      shared_ptr<RDMADispatcher>& rdma_dispatcher,
322  	      RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
323  	    virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
324  	    virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
325  	    virtual void abort_accept() override;
326  	  private:
327  	    rdma_cm_id *cm_id = nullptr;
328  	    rdma_event_channel *cm_channel = nullptr;
329  	};
330  	
331  	class RDMAStack : public NetworkStack {
332  	  vector<std::thread> threads;
333  	  PerfCounters *perf_counter;
334  	  shared_ptr<Infiniband> ib;
335  	  shared_ptr<RDMADispatcher> rdma_dispatcher;
336  	
337  	  std::atomic<bool> fork_finished = {false};
338  	
339  	 public:
340  	  explicit RDMAStack(CephContext *cct, const string &t);
341  	  virtual ~RDMAStack();
342  	  virtual bool support_zero_copy_read() const override { return false; }
343  	  virtual bool nonblock_connect_need_writable_event() const override { return false; }
344  	
345  	  virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
346  	  virtual void join_worker(unsigned i) override;
347  	  virtual bool is_ready() override { return fork_finished.load(); };
348  	  virtual void ready() override { fork_finished = true; };
349  	};
350  	
351  	
352  	#endif
353