1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
19
20 #include <sys/eventfd.h>
21
22 #include <list>
23 #include <vector>
24 #include <thread>
25
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
31
32 class RDMAConnectedSocketImpl;
33 class RDMAServerSocketImpl;
34 class RDMAStack;
35 class RDMAWorker;
36
37 class RDMADispatcher {
38 typedef Infiniband::MemoryManager::Chunk Chunk;
39 typedef Infiniband::QueuePair QueuePair;
40
41 std::thread t;
42 CephContext *cct;
43 shared_ptr<Infiniband> ib;
44 Infiniband::CompletionQueue* tx_cq = nullptr;
45 Infiniband::CompletionQueue* rx_cq = nullptr;
46 Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
47 bool done = false;
48 std::atomic<uint64_t> num_qp_conn = {0};
49 // protect `qp_conns`, `dead_queue_pairs`
50 ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
51 // qp_num -> InfRcConnection
52 // The main usage of `qp_conns` is looking up connection by qp_num,
53 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
54 //// make qp queue into dead state
55 /**
56 * 1. Connection call mark_down
57 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
58 * 3. Post a beacon
59 * 4. Wait for beacon which indicates queues are drained
60 * 5. Destroy the QP by calling ibv_destroy_qp()
61 *
62 * @param qp The qp needed to dead
63 */
64 ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
65
66 /// if a queue pair is closed when transmit buffers are active
67 /// on it, the transmit buffers never get returned via tx_cq. To
68 /// work around this problem, don't delete queue pairs immediately. Instead,
69 /// save them in this vector and delete them at a safe time, when there are
70 /// no outstanding transmit buffers to be lost.
71 std::vector<QueuePair*> dead_queue_pairs;
72
73 std::atomic<uint64_t> num_pending_workers = {0};
74 // protect pending workers
75 ceph::mutex w_lock =
76 ceph::make_mutex("RDMADispatcher::for worker pending list");
77 // fixme: lockfree
78 std::list<RDMAWorker*> pending_workers;
79 void enqueue_dead_qp(uint32_t qp);
80
81 public:
82 PerfCounters *perf_logger;
83
84 explicit RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib);
85 virtual ~RDMADispatcher();
86 void handle_async_event();
87
88 void polling_start();
89 void polling_stop();
90 void polling();
91 void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
92 void make_pending_worker(RDMAWorker* w) {
93 std::lock_guard l{w_lock};
94 auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
95 if (it != pending_workers.end())
96 return;
97 pending_workers.push_back(w);
98 ++num_pending_workers;
99 }
100 RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
101 QueuePair* get_qp_lockless(uint32_t qp);
102 QueuePair* get_qp(uint32_t qp);
103 void schedule_qp_destroy(uint32_t qp);
104 Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
105 Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
106 void notify_pending_workers();
107 void handle_tx_event(ibv_wc *cqe, int n);
108 void post_tx_buffer(std::vector<Chunk*> &chunks);
109 void handle_rx_event(ibv_wc *cqe, int rx_number);
110
111 std::atomic<uint64_t> inflight = {0};
112
113 void post_chunk_to_pool(Chunk* chunk);
114 int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
115 };
116
117 class RDMAWorker : public Worker {
118 typedef Infiniband::CompletionQueue CompletionQueue;
119 typedef Infiniband::CompletionChannel CompletionChannel;
120 typedef Infiniband::MemoryManager::Chunk Chunk;
121 typedef Infiniband::MemoryManager MemoryManager;
122 typedef std::vector<Chunk*>::iterator ChunkIter;
123 shared_ptr<Infiniband> ib;
124 EventCallbackRef tx_handler;
125 std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
126 shared_ptr<RDMADispatcher> dispatcher;
127 ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
128
129 class C_handle_cq_tx : public EventCallback {
130 RDMAWorker *worker;
131 public:
132 explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {}
133 void do_request(uint64_t fd) {
134 worker->handle_pending_message();
135 }
136 };
137
138 public:
139 PerfCounters *perf_logger;
140 explicit RDMAWorker(CephContext *c, unsigned i);
141 virtual ~RDMAWorker();
142 virtual int listen(entity_addr_t &addr,
143 unsigned addr_slot,
144 const SocketOptions &opts, ServerSocket *) override;
145 virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
146 virtual void initialize() override;
147 int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
148 void remove_pending_conn(RDMAConnectedSocketImpl *o) {
149 ceph_assert(center.in_thread());
150 pending_sent_conns.remove(o);
151 }
152 void handle_pending_message();
153 void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
154 void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
155 void notify_worker() {
156 center.dispatch_event_external(tx_handler);
157 }
158 };
159
160 struct RDMACMInfo {
161 RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
162 : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
163 rdma_cm_id *cm_id;
164 rdma_event_channel *cm_channel;
165 uint32_t qp_num;
166 };
167
168 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
169 public:
170 typedef Infiniband::MemoryManager::Chunk Chunk;
171 typedef Infiniband::CompletionChannel CompletionChannel;
172 typedef Infiniband::CompletionQueue CompletionQueue;
173
174 protected:
175 CephContext *cct;
176 Infiniband::QueuePair *qp;
177 uint32_t peer_qpn = 0;
178 uint32_t local_qpn = 0;
179 int connected;
180 int error;
181 shared_ptr<Infiniband> ib;
182 shared_ptr<RDMADispatcher> dispatcher;
183 RDMAWorker* worker;
184 std::vector<Chunk*> buffers;
185 int notify_fd = -1;
186 bufferlist pending_bl;
187
188 ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
189 std::vector<ibv_wc> wc;
190 bool is_server;
191 EventCallbackRef con_handler;
192 int tcp_fd = -1;
193 bool active;// qp is active ?
194 bool pending;
195 int post_backlog = 0;
196
197 void notify();
198 void buffer_prefetch(void);
199 ssize_t read_buffers(char* buf, size_t len);
200 int post_work_request(std::vector<Chunk*>&);
201 size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len,
202 decltype(std::cbegin(pending_bl.buffers()))& start,
203 const decltype(std::cbegin(pending_bl.buffers()))& end);
204
205 public:
206 RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
207 shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
208 virtual ~RDMAConnectedSocketImpl();
209
210 void pass_wc(std::vector<ibv_wc> &&v);
211 void get_wc(std::vector<ibv_wc> &w);
212 virtual int is_connected() override { return connected; }
213
214 virtual ssize_t read(char* buf, size_t len) override;
215 virtual ssize_t zero_copy_read(bufferptr &data) override;
216 virtual ssize_t send(bufferlist &bl, bool more) override;
217 virtual void shutdown() override;
218 virtual void close() override;
219 virtual int fd() const override { return notify_fd; }
220 void fault();
221 const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
222 uint32_t get_peer_qpn () const { return peer_qpn; }
223 uint32_t get_local_qpn () const { return local_qpn; }
224 ssize_t submit(bool more);
225 int activate();
226 void fin();
227 void handle_connection();
228 void cleanup();
229 void set_accept_fd(int sd);
230 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
231 bool is_pending() {return pending;}
232 void set_pending(bool val) {pending = val;}
233 void post_chunks_to_rq(int num);
234 void update_post_backlog();
235
236 class C_handle_connection : public EventCallback {
237 RDMAConnectedSocketImpl *csi;
238 bool active;
239 public:
240 explicit C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
241 void do_request(uint64_t fd) {
242 if (active)
243 csi->handle_connection();
244 }
245 void close() {
246 active = false;
247 }
248 };
249 };
250
251 enum RDMA_CM_STATUS {
252 IDLE = 1,
253 RDMA_ID_CREATED,
254 CHANNEL_FD_CREATED,
255 RESOURCE_ALLOCATED,
256 ADDR_RESOLVED,
257 ROUTE_RESOLVED,
258 CONNECTED,
259 DISCONNECTED,
260 ERROR
261 };
262
263 class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
264 public:
265 RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
266 shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr);
267 ~RDMAIWARPConnectedSocketImpl();
268 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
269 virtual void close() override;
270 virtual void shutdown() override;
271 virtual void handle_cm_connection();
272 void activate();
273 int alloc_resource();
274 void close_notify();
275
276 private:
277 rdma_cm_id *cm_id = nullptr;
278 rdma_event_channel *cm_channel = nullptr;
279 EventCallbackRef cm_con_handler;
280 std::mutex close_mtx;
281 std::condition_variable close_condition;
282 bool closed = false;
283 RDMA_CM_STATUS status = IDLE;
284
285
286 class C_handle_cm_connection : public EventCallback {
287 RDMAIWARPConnectedSocketImpl *csi;
288 public:
289 C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
290 void do_request(uint64_t fd) {
291 csi->handle_cm_connection();
292 }
293 };
294 };
295
296 class RDMAServerSocketImpl : public ServerSocketImpl {
297 protected:
298 CephContext *cct;
299 NetHandler net;
300 int server_setup_socket;
301 shared_ptr<Infiniband> ib;
302 shared_ptr<RDMADispatcher> dispatcher;
303 RDMAWorker *worker;
304 entity_addr_t sa;
305
306 public:
307 RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
308 shared_ptr<RDMADispatcher>& rdma_dispatcher,
309 RDMAWorker *w, entity_addr_t& a, unsigned slot);
310
311 virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
312 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
313 virtual void abort_accept() override;
314 virtual int fd() const override { return server_setup_socket; }
315 };
316
317 class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
318 public:
319 RDMAIWARPServerSocketImpl(
320 CephContext *cct, shared_ptr<Infiniband>& ib,
321 shared_ptr<RDMADispatcher>& rdma_dispatcher,
322 RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
323 virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
324 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
325 virtual void abort_accept() override;
326 private:
327 rdma_cm_id *cm_id = nullptr;
328 rdma_event_channel *cm_channel = nullptr;
329 };
330
331 class RDMAStack : public NetworkStack {
332 vector<std::thread> threads;
(11) Event member_decl: |
Class member declaration for "perf_counter". |
Also see events: |
[uninit_member] |
333 PerfCounters *perf_counter;
334 shared_ptr<Infiniband> ib;
335 shared_ptr<RDMADispatcher> rdma_dispatcher;
336
337 std::atomic<bool> fork_finished = {false};
338
339 public:
340 explicit RDMAStack(CephContext *cct, const string &t);
341 virtual ~RDMAStack();
342 virtual bool support_zero_copy_read() const override { return false; }
343 virtual bool nonblock_connect_need_writable_event() const override { return false; }
344
345 virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
346 virtual void join_worker(unsigned i) override;
347 virtual bool is_ready() override { return fork_finished.load(); };
348 virtual void ready() override { fork_finished = true; };
349 };
350
351
352 #endif
353