1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include "Infiniband.h"
18 #include "common/errno.h"
19 #include "common/debug.h"
20 #include "RDMAStack.h"
21 #include <sys/time.h>
22 #include <sys/resource.h>
23
24 #define dout_subsys ceph_subsys_ms
25 #undef dout_prefix
26 #define dout_prefix *_dout << "Infiniband "
27
28 static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
29 static const uint32_t MAX_INLINE_DATA = 0;
30 static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
31 static const uint32_t CQ_DEPTH = 30000;
32
33 Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), gid_idx(0)
34 {
35 int r = ibv_query_port(ctxt, port_num, &port_attr);
36 if (r == -1) {
37 lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
38 ceph_abort();
39 }
40
41 lid = port_attr.lid;
42
43 #ifdef HAVE_IBV_EXP
44 union ibv_gid cgid;
45 struct ibv_exp_gid_attr gid_attr;
46 bool malformed = false;
47
48 ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
49
50
51 // search for requested GID in GIDs table
52 ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
53 << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
54 r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
55 "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
56 ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
57 &cgid.raw[ 0], &cgid.raw[ 1],
58 &cgid.raw[ 2], &cgid.raw[ 3],
59 &cgid.raw[ 4], &cgid.raw[ 5],
60 &cgid.raw[ 6], &cgid.raw[ 7],
61 &cgid.raw[ 8], &cgid.raw[ 9],
62 &cgid.raw[10], &cgid.raw[11],
63 &cgid.raw[12], &cgid.raw[13],
64 &cgid.raw[14], &cgid.raw[15]);
65
66 if (r != 16) {
67 ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
68 malformed = true;
69 }
70
71 gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
72
73 for (gid_idx = 0; gid_idx < port_attr.gid_tbl_len; gid_idx++) {
74 r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
75 if (r) {
76 lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
77 ceph_abort();
78 }
79 r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
80 if (r) {
81 lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
82 ceph_abort();
83 }
84
85 if (malformed) break; // stay with gid_idx=0
86 if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
87 (memcmp(&gid, &cgid, 16) == 0) ) {
88 ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
89 break;
90 }
91 }
92
93 if (gid_idx == port_attr.gid_tbl_len) {
94 lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
95 ceph_abort();
96 }
97 #else
98 r = ibv_query_gid(ctxt, port_num, 0, &gid);
99 if (r) {
100 lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
101 ceph_abort();
102 }
103 #endif
104 }
105
106 Device::Device(CephContext *cct, ibv_device* ib_dev): device(ib_dev), active_port(nullptr)
107 {
108 ceph_assert(device);
109 ctxt = ibv_open_device(device);
110 ceph_assert(ctxt);
111
112 name = ibv_get_device_name(device);
113
114 int r = ibv_query_device(ctxt, &device_attr);
115 if (r) {
116 lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
117 ceph_abort();
118 }
119 }
120
121 Device::Device(CephContext *cct, struct ibv_context *ib_ctx): device(ib_ctx->device),
122 active_port(nullptr)
123 {
124 ceph_assert(device);
125 ctxt = ib_ctx;
126 ceph_assert(ctxt);
127
128 name = ibv_get_device_name(device);
129
130 int r = ibv_query_device(ctxt, &device_attr);
131 if (r) {
132 lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
133 ceph_abort();
134 }
135 }
136
137 void Device::binding_port(CephContext *cct, int port_num) {
138 port_cnt = device_attr.phys_port_cnt;
139 for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) {
140 Port *port = new Port(cct, ctxt, port_id);
141 if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
142 active_port = port;
143 ldout(cct, 1) << __func__ << " found active port " << static_cast<int>(port_id) << dendl;
144 break;
145 } else {
146 ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: "
147 << ibv_port_state_str(port->get_port_attr()->state) << dendl;
148 delete port;
149 }
150 }
151 if (nullptr == active_port) {
152 lderr(cct) << __func__ << " port not found" << dendl;
153 ceph_assert(active_port);
154 }
155 }
156
157
158 Infiniband::QueuePair::QueuePair(
159 CephContext *c, Infiniband& infiniband, ibv_qp_type type,
160 int port, ibv_srq *srq,
161 Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
162 uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
163 : cct(c), infiniband(infiniband),
164 type(type),
165 ctxt(infiniband.device->ctxt),
166 ib_physical_port(port),
167 pd(infiniband.pd->pd),
168 srq(srq),
169 qp(NULL),
170 cm_id(cid), peer_cm_meta{0}, local_cm_meta{0},
171 txcq(txcq),
172 rxcq(rxcq),
(1) Event dont_call: |
"lrand48()" should not be used for security-related applications, because linear congruential algorithms are too easy to break. |
(2) Event remediation: |
Use a compliant random number generator, such as "/dev/random" or "/dev/urandom" on Unix-like systems, and CNG (Cryptography API: Next Generation) on Windows. |
173 initial_psn(lrand48() & PSN_MSK),
174 // One extra WR for beacon
175 max_send_wr(tx_queue_len + 1),
176 max_recv_wr(rx_queue_len),
177 q_key(q_key),
178 dead(false)
179 {
180 if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
181 lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
182 ceph_abort();
183 }
184 }
185
186 int Infiniband::QueuePair::modify_qp_to_error(void)
187 {
188 ibv_qp_attr qpa;
189 memset(&qpa, 0, sizeof(qpa));
190 qpa.qp_state = IBV_QPS_ERR;
191 if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) {
192 lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl;
193 return -1;
194 }
195 ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl;
196 return 0;
197 }
198
199 int Infiniband::QueuePair::modify_qp_to_rts(void)
200 {
201 // move from RTR state RTS
202 ibv_qp_attr qpa;
203 memset(&qpa, 0, sizeof(qpa));
204 qpa.qp_state = IBV_QPS_RTS;
205 /*
206 * How long to wait before retrying if packet lost or server dead.
207 * Supposedly the timeout is 4.096us*2^timeout. However, the actual
208 * timeout appears to be 4.096us*2^(timeout+1), so the setting
209 * below creates a 135ms timeout.
210 */
211 qpa.timeout = 0x12;
212 // How many times to retry after timeouts before giving up.
213 qpa.retry_cnt = 7;
214 /*
215 * How many times to retry after RNR (receiver not ready) condition
216 * before giving up. Occurs when the remote side has not yet posted
217 * a receive request.
218 */
219 qpa.rnr_retry = 7; // 7 is infinite retry.
220 qpa.sq_psn = local_cm_meta.psn;
221 qpa.max_rd_atomic = 1;
222
223 int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
224 int r = ibv_modify_qp(qp, &qpa, attr_mask);
225 if (r) {
226 lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl;
227 return -1;
228 }
229 ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
230 return 0;
231 }
232
233 int Infiniband::QueuePair::modify_qp_to_rtr(void)
234 {
235 // move from INIT to RTR state
236 ibv_qp_attr qpa;
237 memset(&qpa, 0, sizeof(qpa));
238 qpa.qp_state = IBV_QPS_RTR;
239 qpa.path_mtu = IBV_MTU_1024;
240 qpa.dest_qp_num = peer_cm_meta.local_qpn;
241 qpa.rq_psn = peer_cm_meta.psn;
242 qpa.max_dest_rd_atomic = 1;
243 qpa.min_rnr_timer = 0x12;
244 qpa.ah_attr.is_global = 1;
245 qpa.ah_attr.grh.hop_limit = 6;
246 qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
247 qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx();
248 qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
249 //qpa.ah_attr.grh.flow_label = 0;
250
251 qpa.ah_attr.dlid = peer_cm_meta.lid;
252 qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
253 qpa.ah_attr.src_path_bits = 0;
254 qpa.ah_attr.port_num = (uint8_t)(ib_physical_port);
255
256 ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
257
258 int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC;
259
260 int r = ibv_modify_qp(qp, &qpa, attr_mask);
261 if (r) {
262 lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl;
263 return -1;
264 }
265 ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
266 return 0;
267 }
268
269 int Infiniband::QueuePair::modify_qp_to_init(void)
270 {
271 // move from RESET to INIT state
272 ibv_qp_attr qpa;
273 memset(&qpa, 0, sizeof(qpa));
274 qpa.qp_state = IBV_QPS_INIT;
275 qpa.pkey_index = 0;
276 qpa.port_num = (uint8_t)(ib_physical_port);
277 qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
278 qpa.qkey = q_key;
279
280 int mask = IBV_QP_STATE | IBV_QP_PORT;
281 switch (type) {
282 case IBV_QPT_RC:
283 mask |= IBV_QP_ACCESS_FLAGS;
284 mask |= IBV_QP_PKEY_INDEX;
285 break;
286 case IBV_QPT_UD:
287 mask |= IBV_QP_QKEY;
288 mask |= IBV_QP_PKEY_INDEX;
289 break;
290 case IBV_QPT_RAW_PACKET:
291 break;
292 default:
293 ceph_abort();
294 }
295
296 if (ibv_modify_qp(qp, &qpa, mask)) {
297 lderr(cct) << __func__ << " failed to switch to INIT state Queue Pair, qp number: " << qp->qp_num
298 << " Error: " << cpp_strerror(errno) << dendl;
299 return -1;
300 }
301 ldout(cct, 20) << __func__ << " successfully switch to INIT state Queue Pair, qp number: " << qp->qp_num << dendl;
302 return 0;
303 }
304
305 int Infiniband::QueuePair::init()
306 {
307 ldout(cct, 20) << __func__ << " started." << dendl;
308 ibv_qp_init_attr qpia;
309 memset(&qpia, 0, sizeof(qpia));
310 qpia.send_cq = txcq->get_cq();
311 qpia.recv_cq = rxcq->get_cq();
312 if (srq) {
313 qpia.srq = srq; // use the same shared receive queue
314 } else {
315 qpia.cap.max_recv_wr = max_recv_wr;
316 qpia.cap.max_recv_sge = 1;
317 }
318 qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
319 qpia.cap.max_send_sge = 1; // max send scatter-gather elements
320 qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
321 qpia.qp_type = type; // RC, UC, UD, or XRC
322 qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
323
324 if (!cct->_conf->ms_async_rdma_cm) {
325 qp = ibv_create_qp(pd, &qpia);
326 if (qp == NULL) {
327 lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
328 if (errno == ENOMEM) {
329 lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
330 " ms_async_rdma_send_buffers or"
331 " ms_async_rdma_buffer_size" << dendl;
332 }
333 return -1;
334 }
335 if (modify_qp_to_init() != 0) {
336 ibv_destroy_qp(qp);
337 return -1;
338 }
339 } else {
340 ceph_assert(cm_id->verbs == pd->context);
341 if (rdma_create_qp(cm_id, pd, &qpia)) {
342 lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
343 << cpp_strerror(errno) << dendl;
344 return -1;
345 }
346 qp = cm_id->qp;
347 }
348 ldout(cct, 20) << __func__ << " successfully create queue pair: "
349 << "qp=" << qp << dendl;
350 local_cm_meta.local_qpn = get_local_qp_number();
351 local_cm_meta.psn = get_initial_psn();
352 local_cm_meta.lid = infiniband.get_lid();
353 local_cm_meta.peer_qpn = 0;
354 local_cm_meta.gid = infiniband.get_gid();
355 if (!srq) {
356 int rq_wrs = infiniband.post_chunks_to_rq(max_recv_wr, this);
357 if (rq_wrs == 0) {
358 lderr(cct) << __func__ << " intialize no SRQ Queue Pair, qp number: " << qp->qp_num
359 << " fatal error: can't post SQ WR " << dendl;
360 return -1;
361 }
362 ldout(cct, 20) << __func__ << " initialize no SRQ Queue Pair, qp number: "
363 << qp->qp_num << " post SQ WR " << rq_wrs << dendl;
364 }
365 return 0;
366 }
367
368 void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data)
369 {
370 char tmp[9];
371 uint32_t v32;
372 int i;
373
374 for (tmp[8] = 0, i = 0; i < 4; ++i) {
375 memcpy(tmp, wgid + i * 8, 8);
376 sscanf(tmp, "%x", &v32);
377 *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32);
378 }
379 }
380
381 void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[])
382 {
383 for (int i = 0; i < 4; ++i)
384 sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4)));
385 }
386
387 /*
388 * return value
389 * 1: means no valid buffer read
390 * 0: means got enough buffer
391 * < 0: means error
392 */
393 int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd)
394 {
395 char msg[TCP_MSG_LEN];
396 char gid[33];
397 ssize_t r = ::read(socket_fd, &msg, sizeof(msg));
398 // Drop incoming qpt
399 if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
400 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
401 ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
402 return -EINVAL;
403 }
404 }
405 if (r < 0) {
406 r = -errno;
407 lderr(cct) << __func__ << " got error " << r << ": "
408 << cpp_strerror(r) << dendl;
409 } else if (r == 0) { // valid disconnect message of length 0
410 ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
411 } else if ((size_t)r != sizeof(msg)) { // invalid message
412 ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
413 r = -EINVAL;
414 } else { // valid message
415 sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid);
416 wire_gid_to_gid(gid, &peer_cm_meta);
417 ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn
418 << ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl;
419 }
420 return r;
421 }
422
423 int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd)
424 {
425 int retry = 0;
426 ssize_t r;
427
428 char msg[TCP_MSG_LEN];
429 char gid[33];
430 retry:
431 gid_to_wire_gid(local_cm_meta, gid);
432 sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid);
433 ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn
434 << ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", " << gid << dendl;
435 r = ::write(socket_fd, msg, sizeof(msg));
436 // Drop incoming qpt
437 if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
438 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
439 ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
440 return -EINVAL;
441 }
442 }
443
444 if ((size_t)r != sizeof(msg)) {
445 // FIXME need to handle EAGAIN instead of retry
446 if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
447 retry++;
448 goto retry;
449 }
450 if (r < 0)
451 lderr(cct) << __func__ << " send returned error " << errno << ": "
452 << cpp_strerror(errno) << dendl;
453 else
454 lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
455 return -errno;
456 }
457 return 0;
458 }
459
460 /**
461 * Switch QP to ERROR state and then post a beacon to be able to drain all
462 * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event()
463 * for details.
464 *
465 * \return
466 * -errno if the QueuePair can't switch to ERROR
467 * 0 for success.
468 */
469 int Infiniband::QueuePair::to_dead()
470 {
471 if (dead)
472 return 0;
473
474 if (modify_qp_to_error()) {
475 return -1;
476 }
477 ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn
478 << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl;
479
480 struct ibv_send_wr *bad_wr = nullptr, beacon;
481 memset(&beacon, 0, sizeof(beacon));
482 beacon.wr_id = BEACON_WRID;
483 beacon.opcode = IBV_WR_SEND;
484 beacon.send_flags = IBV_SEND_SIGNALED;
485 if (ibv_post_send(qp, &beacon, &bad_wr)) {
486 lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl;
487 return -errno;
488 }
489 ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl;
490 dead = true;
491
492 return 0;
493 }
494
495 int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
496 {
497 ibv_qp_attr qpa;
498 ibv_qp_init_attr qpia;
499
500 int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
501 if (r) {
502 lderr(cct) << __func__ << " failed to query qp: "
503 << cpp_strerror(errno) << dendl;
504 return -1;
505 }
506
507 if (rqp)
508 *rqp = qpa.dest_qp_num;
509 return 0;
510 }
511
512 /**
513 * Get the remote infiniband address for this QueuePair, as set in #plumb().
514 * LIDs are "local IDs" in infiniband terminology. They are short, locally
515 * routable addresses.
516 */
517 int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
518 {
519 ibv_qp_attr qpa;
520 ibv_qp_init_attr qpia;
521
522 int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
523 if (r) {
524 lderr(cct) << __func__ << " failed to query qp: "
525 << cpp_strerror(errno) << dendl;
526 return -1;
527 }
528
529 if (lid)
530 *lid = qpa.ah_attr.dlid;
531 return 0;
532 }
533
534 /**
535 * Get the state of a QueuePair.
536 */
537 int Infiniband::QueuePair::get_state() const
538 {
539 ibv_qp_attr qpa;
540 ibv_qp_init_attr qpia;
541
542 int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
543 if (r) {
544 lderr(cct) << __func__ << " failed to get state: "
545 << cpp_strerror(errno) << dendl;
546 return -1;
547 }
548 return qpa.qp_state;
549 }
550
551 Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
552 : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
553 {
554 }
555
556 Infiniband::CompletionChannel::~CompletionChannel()
557 {
558 if (channel) {
559 int r = ibv_destroy_comp_channel(channel);
560 if (r < 0)
561 lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
562 ceph_assert(r == 0);
563 }
564 }
565
566 int Infiniband::CompletionChannel::init()
567 {
568 ldout(cct, 20) << __func__ << " started." << dendl;
569 channel = ibv_create_comp_channel(infiniband.device->ctxt);
570 if (!channel) {
571 lderr(cct) << __func__ << " failed to create receive completion channel: "
572 << cpp_strerror(errno) << dendl;
573 return -1;
574 }
575 int rc = NetHandler(cct).set_nonblock(channel->fd);
576 if (rc < 0) {
577 ibv_destroy_comp_channel(channel);
578 return -1;
579 }
580 return 0;
581 }
582
583 void Infiniband::CompletionChannel::ack_events()
584 {
585 ibv_ack_cq_events(cq, cq_events_that_need_ack);
586 cq_events_that_need_ack = 0;
587 }
588
589 bool Infiniband::CompletionChannel::get_cq_event()
590 {
591 ibv_cq *cq = NULL;
592 void *ev_ctx;
593 if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
594 if (errno != EAGAIN && errno != EINTR)
595 lderr(cct) << __func__ << " failed to retrieve CQ event: "
596 << cpp_strerror(errno) << dendl;
597 return false;
598 }
599
600 /* accumulate number of cq events that need to
601 * * be acked, and periodically ack them
602 * */
603 if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
604 ldout(cct, 20) << __func__ << " ack aq events." << dendl;
605 ibv_ack_cq_events(cq, MAX_ACK_EVENT);
606 cq_events_that_need_ack = 0;
607 }
608
609 return true;
610 }
611
612
613 Infiniband::CompletionQueue::~CompletionQueue()
614 {
615 if (cq) {
616 int r = ibv_destroy_cq(cq);
617 if (r < 0)
618 lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
619 ceph_assert(r == 0);
620 }
621 }
622
623 int Infiniband::CompletionQueue::init()
624 {
625 cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
626 if (!cq) {
627 lderr(cct) << __func__ << " failed to create receive completion queue: "
628 << cpp_strerror(errno) << dendl;
629 return -1;
630 }
631
632 if (ibv_req_notify_cq(cq, 0)) {
633 lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
634 ibv_destroy_cq(cq);
635 cq = nullptr;
636 return -1;
637 }
638
639 channel->bind_cq(cq);
640 ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
641 return 0;
642 }
643
644 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
645 {
646 ldout(cct, 20) << __func__ << " started." << dendl;
647 int r = ibv_req_notify_cq(cq, 0);
648 if (r < 0)
649 lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
650 return r;
651 }
652
653 int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
654 int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
655 if (r < 0) {
656 lderr(cct) << __func__ << " poll_completion_queue occur met error: "
657 << cpp_strerror(errno) << dendl;
658 return -1;
659 }
660 return r;
661 }
662
663
664 Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
665 : pd(ibv_alloc_pd(device->ctxt))
666 {
667 if (pd == NULL) {
668 lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
669 ceph_abort();
670 }
671 }
672
673 Infiniband::ProtectionDomain::~ProtectionDomain()
674 {
675 ibv_dealloc_pd(pd);
676 }
677
678
679 Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t bytes, char* buffer,
680 uint32_t offset, uint32_t bound, uint32_t lkey, QueuePair* qp)
681 : mr(m), qp(qp), lkey(lkey), bytes(bytes), offset(offset), bound(bound), buffer(buffer)
682 {
683 }
684
685 Infiniband::MemoryManager::Chunk::~Chunk()
686 {
687 }
688
689 uint32_t Infiniband::MemoryManager::Chunk::get_offset()
690 {
691 return offset;
692 }
693
694 uint32_t Infiniband::MemoryManager::Chunk::get_size() const
695 {
696 return bound - offset;
697 }
698
699 void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
700 {
701 offset = 0;
702 bound = b;
703 }
704
705 uint32_t Infiniband::MemoryManager::Chunk::get_bound()
706 {
707 return bound;
708 }
709
710 uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
711 {
712 uint32_t left = get_size();
713 uint32_t read_len = left <= len ? left : len;
714 memcpy(buf, buffer + offset, read_len);
715 offset += read_len;
716 return read_len;
717 }
718
719 uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
720 {
721 uint32_t write_len = (bytes - offset) <= len ? (bytes - offset) : len;
722 memcpy(buffer + offset, buf, write_len);
723 offset += write_len;
724 return write_len;
725 }
726
727 bool Infiniband::MemoryManager::Chunk::full()
728 {
729 return offset == bytes;
730 }
731
732 void Infiniband::MemoryManager::Chunk::reset_read_chunk()
733 {
734 offset = 0;
735 bound = 0;
736 }
737
738 void Infiniband::MemoryManager::Chunk::reset_write_chunk()
739 {
740 offset = 0;
741 bound = bytes;
742 }
743
744 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
745 : manager(m), buffer_size(s)
746 {
747 }
748
749 Infiniband::MemoryManager::Cluster::~Cluster()
750 {
751 int r = ibv_dereg_mr(chunk_base->mr);
752 ceph_assert(r == 0);
753 const auto chunk_end = chunk_base + num_chunk;
754 for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
755 chunk->~Chunk();
756 }
757
758 ::free(chunk_base);
759 manager.free(base);
760 }
761
762 int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
763 {
764 ceph_assert(!base);
765 num_chunk = num;
766 uint32_t bytes = buffer_size * num;
767
768 base = (char*)manager.malloc(bytes);
769 end = base + bytes;
770 ceph_assert(base);
771 chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
772 memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
773 free_chunks.reserve(num);
774 ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
775 ceph_assert(m);
776 Chunk* chunk = chunk_base;
777 for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
778 new(chunk) Chunk(m, buffer_size, base + offset, 0, buffer_size, m->lkey);
779 free_chunks.push_back(chunk);
780 chunk++;
781 }
782 return 0;
783 }
784
785 void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
786 {
787 std::lock_guard l{lock};
788 for (auto c : ck) {
789 c->reset_write_chunk();
790 free_chunks.push_back(c);
791 }
792 }
793
794 int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t block_size)
795 {
796 std::lock_guard l{lock};
797 uint32_t chunk_buffer_number = (block_size + buffer_size - 1) / buffer_size;
798 chunk_buffer_number = free_chunks.size() < chunk_buffer_number ? free_chunks.size(): chunk_buffer_number;
799 uint32_t r = 0;
800
801 for (r = 0; r < chunk_buffer_number; ++r) {
802 chunks.push_back(free_chunks.back());
803 free_chunks.pop_back();
804 }
805 return r;
806 }
807
808 bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs)
809 {
810 /* unlimited */
811 if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
812 return true;
813
814 if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
815 lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
816 n_bufs_allocated << " requested: " << nbufs <<
817 " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
818 return false;
819 }
820
821 return true;
822 }
823
824 void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) {
825 perf_logger = logger;
826 if (perf_logger != nullptr)
827 perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
828 }
829
830 void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs)
831 {
832 n_bufs_allocated += nbufs;
833
834 if (!perf_logger)
835 return;
836
837 if (nbufs > 0) {
838 perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
839 } else {
840 perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
841 }
842 }
843
844 void *Infiniband::MemoryManager::mem_pool::slow_malloc()
845 {
846 void *p;
847
848 std::lock_guard l{PoolAllocator::lock};
849 PoolAllocator::g_ctx = ctx;
850 // this will trigger pool expansion via PoolAllocator::malloc()
851 p = boost::pool<PoolAllocator>::malloc();
852 PoolAllocator::g_ctx = nullptr;
853 return p;
854 }
855 Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
856 ceph::mutex Infiniband::MemoryManager::PoolAllocator::lock =
857 ceph::make_mutex("pool-alloc-lock");
858
859 // lock is taken by mem_pool::slow_malloc()
860 char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size)
861 {
862 ceph_assert(g_ctx);
863 MemoryManager *manager = g_ctx->manager;
864 CephContext *cct = manager->cct;
865 size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
866 size_t chunk_buffer_number = block_size / chunk_buffer_size;
867
868 if (!g_ctx->can_alloc(chunk_buffer_number))
869 return NULL;
870
871 mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info)));
872 if (!minfo) {
873 lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers "
874 " Its block size is : " << block_size + sizeof(mem_info) << dendl;
875 return NULL;
876 }
877
878 minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
879 if (minfo->mr == NULL) {
880 lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. "
881 " relase allocated memory now." << dendl;
882 manager->free(minfo);
883 return NULL;
884 }
885
886 minfo->nbufs = chunk_buffer_number;
887 // save this chunk context
888 minfo->ctx = g_ctx;
889
890 // note that the memory can be allocated before perf logger is set
891 g_ctx->update_stats(chunk_buffer_number);
892
893 /* initialize chunks */
894 Chunk *chunk = minfo->chunks;
895 for (unsigned i = 0; i < chunk_buffer_number; i++) {
896 new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey);
897 chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size);
898 }
899
900 return reinterpret_cast<char *>(minfo->chunks);
901 }
902
903
904 void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
905 {
906 mem_info *m;
907 std::lock_guard l{lock};
908
909 Chunk *mem_info_chunk = reinterpret_cast<Chunk *>(block);
910 m = reinterpret_cast<mem_info *>(reinterpret_cast<char *>(mem_info_chunk) - offsetof(mem_info, chunks));
911 m->ctx->update_stats(-m->nbufs);
912 ibv_dereg_mr(m->mr);
913 m->ctx->manager->free(m);
914 }
915
916 Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
917 : cct(c), device(d), pd(p),
918 rxbuf_pool_ctx(this),
919 rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
920 c->_conf->ms_async_rdma_receive_buffers > 0 ?
921 // if possible make initial pool size 2 * receive_queue_len
922 // that way there will be no pool expansion upon receive of the
923 // first packet.
924 (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
925 c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
926 // rx pool is infinite, we can set any initial size that we want
927 2 * c->_conf->ms_async_rdma_receive_queue_len,
928 device->device_attr.max_mr_size / (sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size))
929 {
930 }
931
932 Infiniband::MemoryManager::~MemoryManager()
933 {
934 if (send)
935 delete send;
936 }
937
938 void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
939 {
940 size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB;
941 char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0);
942 if (ptr == MAP_FAILED) {
943 ptr = (char *)std::malloc(real_size);
944 if (ptr == NULL) return NULL;
945 real_size = 0;
946 }
947 *((size_t *)ptr) = real_size;
948 return ptr + HUGE_PAGE_SIZE_2MB;
949 }
950
951 void Infiniband::MemoryManager::huge_pages_free(void *ptr)
952 {
953 if (ptr == NULL) return;
954 void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB;
955 size_t real_size = *((size_t *)real_ptr);
956 ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0);
957 if (real_size != 0)
958 munmap(real_ptr, real_size);
959 else
960 std::free(real_ptr);
961 }
962
963
964 void* Infiniband::MemoryManager::malloc(size_t size)
965 {
966 if (cct->_conf->ms_async_rdma_enable_hugepage)
967 return huge_pages_malloc(size);
968 else
969 return std::malloc(size);
970 }
971
972 void Infiniband::MemoryManager::free(void *ptr)
973 {
974 if (cct->_conf->ms_async_rdma_enable_hugepage)
975 huge_pages_free(ptr);
976 else
977 std::free(ptr);
978 }
979
980 void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
981 {
982 ceph_assert(device);
983 ceph_assert(pd);
984
985 send = new Cluster(*this, size);
986 send->fill(tx_num);
987 }
988
989 void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
990 {
991 send->take_back(chunks);
992 }
993
994 int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
995 {
996 return send->get_buffers(c, bytes);
997 }
998
999 static std::atomic<bool> init_prereq = {false};
1000
1001 void Infiniband::verify_prereq(CephContext *cct) {
1002 int rc = 0;
1003 ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
1004 if (cct->_conf->ms_async_rdma_enable_hugepage){
1005 rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
1006 ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
1007 if (rc) {
1008 lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
1009 ceph_abort();
1010 }
1011 }
1012
1013 //On RDMA MUST be called before fork
1014 rc = ibv_fork_init();
1015 if (rc) {
1016 lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
1017 ceph_abort();
1018 }
1019
1020 //Check ulimit
1021 struct rlimit limit;
1022 getrlimit(RLIMIT_MEMLOCK, &limit);
1023 if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
1024 lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
1025 " We recommend setting this parameter to infinity" << dendl;
1026 }
1027 init_prereq = true;
1028 }
1029
1030 Infiniband::Infiniband(CephContext *cct)
1031 : cct(cct),
1032 device_name(cct->_conf->ms_async_rdma_device_name),
1033 port_num( cct->_conf->ms_async_rdma_port_num)
1034 {
1035 if (!init_prereq)
1036 verify_prereq(cct);
1037 ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
1038 }
1039
1040 void Infiniband::init()
1041 {
1042 std::lock_guard l{lock};
1043
1044 if (initialized)
1045 return;
1046
1047 device_list = new DeviceList(cct);
1048 initialized = true;
1049
1050 device = device_list->get_device(device_name.c_str());
1051 ceph_assert(device);
1052 device->binding_port(cct, port_num);
1053 ib_physical_port = device->active_port->get_port_num();
1054 pd = new ProtectionDomain(cct, device);
1055 ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
1056
1057 support_srq = cct->_conf->ms_async_rdma_support_srq;
1058 if (support_srq) {
1059 ceph_assert(device->device_attr.max_srq);
1060 rx_queue_len = device->device_attr.max_srq_wr;
1061 }
1062 else
1063 rx_queue_len = device->device_attr.max_qp_wr;
1064 if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
1065 rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
1066 ldout(cct, 1) << __func__ << " assigning: " << rx_queue_len << " receive buffers" << dendl;
1067 } else {
1068 ldout(cct, 0) << __func__ << " using the max allowed receive buffers: " << rx_queue_len << dendl;
1069 }
1070
1071 // check for the misconfiguration
1072 if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
1073 rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
1074 lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
1075 rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
1076 cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
1077 ceph_abort();
1078 }
1079
1080 // Keep extra one WR for a beacon to indicate all WCEs were consumed
1081 tx_queue_len = device->device_attr.max_qp_wr - 1;
1082 if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
1083 tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
1084 ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
1085 } else {
1086 ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
1087 }
1088
1089 //check for the memory region size misconfiguration
1090 if ((uint64_t)cct->_conf->ms_async_rdma_buffer_size * tx_queue_len > device->device_attr.max_mr_size) {
1091 lderr(cct) << __func__ << " Out of max memory region size " << dendl;
1092 ceph_abort();
1093 }
1094
1095 ldout(cct, 1) << __func__ << " device allow " << device->device_attr.max_cqe
1096 << " completion entries" << dendl;
1097
1098 memory_manager = new MemoryManager(cct, device, pd);
1099 memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
1100
1101 if (support_srq) {
1102 srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
1103 post_chunks_to_rq(rx_queue_len, NULL); //add to srq
1104 }
1105 }
1106
1107 Infiniband::~Infiniband()
1108 {
1109 if (!initialized)
1110 return;
1111 if (support_srq)
1112 ibv_destroy_srq(srq);
1113 delete memory_manager;
1114 delete pd;
1115 delete device_list;
1116 }
1117
1118 /**
1119 * Create a shared receive queue. This basically wraps the verbs call.
1120 *
1121 * \param[in] max_wr
1122 * The max number of outstanding work requests in the SRQ.
1123 * \param[in] max_sge
1124 * The max number of scatter elements per WR.
1125 * \return
1126 * A valid ibv_srq pointer, or NULL on error.
1127 */
1128 ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
1129 {
1130 ibv_srq_init_attr sia;
1131 memset(&sia, 0, sizeof(sia));
1132 sia.srq_context = device->ctxt;
1133 sia.attr.max_wr = max_wr;
1134 sia.attr.max_sge = max_sge;
1135 return ibv_create_srq(pd->pd, &sia);
1136 }
1137
1138 int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
1139 {
1140 return memory_manager->get_send_buffers(c, bytes);
1141 }
1142
1143 /**
1144 * Create a new QueuePair. This factory should be used in preference to
1145 * the QueuePair constructor directly, since this lets derivatives of
1146 * Infiniband, e.g. MockInfiniband (if it existed),
1147 * return mocked out QueuePair derivatives.
1148 *
1149 * \return
1150 * QueuePair on success or NULL if init fails
1151 * See QueuePair::QueuePair for parameter documentation.
1152 */
1153 Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
1154 CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
1155 {
1156 Infiniband::QueuePair *qp = new QueuePair(
1157 cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
1158 if (qp->init()) {
1159 delete qp;
1160 return NULL;
1161 }
1162 return qp;
1163 }
1164
1165 int Infiniband::post_chunks_to_rq(int rq_wr_num, QueuePair *qp)
1166 {
1167 int ret = 0;
1168 Chunk *chunk = nullptr;
1169
1170 ibv_recv_wr *rx_work_request = static_cast<ibv_recv_wr*>(::calloc(rq_wr_num, sizeof(ibv_recv_wr)));
1171 ibv_sge *isge = static_cast<ibv_sge*>(::calloc(rq_wr_num, sizeof(ibv_sge)));
1172 ceph_assert(rx_work_request);
1173 ceph_assert(isge);
1174
1175 int i = 0;
1176 while (i < rq_wr_num) {
1177 chunk = get_memory_manager()->get_rx_buffer();
1178 if (chunk == nullptr) {
1179 lderr(cct) << __func__ << " WARNING: out of memory. Request " << rq_wr_num <<
1180 " rx buffers. Only get " << i << " rx buffers." << dendl;
1181 if (i == 0) {
1182 ::free(rx_work_request);
1183 ::free(isge);
1184 return 0;
1185 }
1186 break; //get some buffers, so we need post them to recevie queue
1187 }
1188
1189 isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
1190 isge[i].length = chunk->bytes;
1191 isge[i].lkey = chunk->lkey;
1192
1193 rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// assign chunk address as work request id
1194
1195 if (i != 0) {
1196 rx_work_request[i - 1].next = &rx_work_request[i];
1197 }
1198 rx_work_request[i].sg_list = &isge[i];
1199 rx_work_request[i].num_sge = 1;
1200
1201 if (qp && !qp->get_srq()) {
1202 chunk->set_qp(qp);
1203 qp->add_rq_wr(chunk);
1204 }
1205 i++;
1206 }
1207
1208 ibv_recv_wr *badworkrequest = nullptr;
1209 if (support_srq) {
1210 ret = ibv_post_srq_recv(srq, rx_work_request, &badworkrequest);
1211 } else {
1212 ceph_assert(qp);
1213 ret = ibv_post_recv(qp->get_qp(), rx_work_request, &badworkrequest);
1214 }
1215
1216 ::free(rx_work_request);
1217 ::free(isge);
1218 ceph_assert(badworkrequest == nullptr && ret == 0);
1219 return i;
1220 }
1221
1222 Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
1223 {
1224 Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
1225 if (cc->init()) {
1226 delete cc;
1227 return NULL;
1228 }
1229 return cc;
1230 }
1231
1232 Infiniband::CompletionQueue* Infiniband::create_comp_queue(
1233 CephContext *cct, CompletionChannel *cc)
1234 {
1235 Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
1236 cct, *this, CQ_DEPTH, cc);
1237 if (cq->init()) {
1238 delete cq;
1239 return NULL;
1240 }
1241 return cq;
1242 }
1243
1244 Infiniband::QueuePair::~QueuePair()
1245 {
1246 ldout(cct, 20) << __func__ << " destroy Queue Pair, qp number: " << qp->qp_num << " left SQ WR " << recv_queue.size() << dendl;
1247 if (qp) {
1248 ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
1249 ceph_assert(!ibv_destroy_qp(qp));
1250 }
1251
1252 for (auto& chunk: recv_queue) {
1253 infiniband.get_memory_manager()->release_rx_buffer(chunk);
1254 }
1255 recv_queue.clear();
1256 }
1257
1258 /**
1259 * Given a string representation of the `status' field from Verbs
1260 * struct `ibv_wc'.
1261 *
1262 * \param[in] status
1263 * The integer status obtained in ibv_wc.status.
1264 * \return
1265 * A string corresponding to the given status.
1266 */
1267 const char* Infiniband::wc_status_to_string(int status)
1268 {
1269 static const char *lookup[] = {
1270 "SUCCESS",
1271 "LOC_LEN_ERR",
1272 "LOC_QP_OP_ERR",
1273 "LOC_EEC_OP_ERR",
1274 "LOC_PROT_ERR",
1275 "WR_FLUSH_ERR",
1276 "MW_BIND_ERR",
1277 "BAD_RESP_ERR",
1278 "LOC_ACCESS_ERR",
1279 "REM_INV_REQ_ERR",
1280 "REM_ACCESS_ERR",
1281 "REM_OP_ERR",
1282 "RETRY_EXC_ERR",
1283 "RNR_RETRY_EXC_ERR",
1284 "LOC_RDD_VIOL_ERR",
1285 "REM_INV_RD_REQ_ERR",
1286 "REM_ABORT_ERR",
1287 "INV_EECN_ERR",
1288 "INV_EEC_STATE_ERR",
1289 "FATAL_ERR",
1290 "RESP_TIMEOUT_ERR",
1291 "GENERAL_ERR"
1292 };
1293
1294 if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
1295 return "<status out of range!>";
1296 return lookup[status];
1297 }
1298
1299 const char* Infiniband::qp_state_string(int status) {
1300 switch(status) {
1301 case IBV_QPS_RESET : return "IBV_QPS_RESET";
1302 case IBV_QPS_INIT : return "IBV_QPS_INIT";
1303 case IBV_QPS_RTR : return "IBV_QPS_RTR";
1304 case IBV_QPS_RTS : return "IBV_QPS_RTS";
1305 case IBV_QPS_SQD : return "IBV_QPS_SQD";
1306 case IBV_QPS_SQE : return "IBV_QPS_SQE";
1307 case IBV_QPS_ERR : return "IBV_QPS_ERR";
1308 default: return " out of range.";
1309 }
1310 }
1311