1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#include "Infiniband.h"
18   	#include "common/errno.h"
19   	#include "common/debug.h"
20   	#include "RDMAStack.h"
21   	#include <sys/time.h>
22   	#include <sys/resource.h>
23   	
24   	#define dout_subsys ceph_subsys_ms
25   	#undef dout_prefix
26   	#define dout_prefix *_dout << "Infiniband "
27   	
28   	static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
29   	static const uint32_t MAX_INLINE_DATA = 0;
30   	static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
31   	static const uint32_t CQ_DEPTH = 30000;
32   	
33   	Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), gid_idx(0)
34   	{
35   	  int r = ibv_query_port(ctxt, port_num, &port_attr);
36   	  if (r == -1) {
37   	    lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;
38   	    ceph_abort();
39   	  }
40   	
41   	  lid = port_attr.lid;
42   	
43   	#ifdef HAVE_IBV_EXP
44   	  union ibv_gid cgid;
45   	  struct ibv_exp_gid_attr gid_attr;
46   	  bool malformed = false;
47   	
48   	  ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
49   	
50   	
51   	  // search for requested GID in GIDs table
52   	  ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
53   	    << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
54   	  r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
55   		     "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
56   		     ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
57   		     &cgid.raw[ 0], &cgid.raw[ 1],
58   		     &cgid.raw[ 2], &cgid.raw[ 3],
59   		     &cgid.raw[ 4], &cgid.raw[ 5],
60   		     &cgid.raw[ 6], &cgid.raw[ 7],
61   		     &cgid.raw[ 8], &cgid.raw[ 9],
62   		     &cgid.raw[10], &cgid.raw[11],
63   		     &cgid.raw[12], &cgid.raw[13],
64   		     &cgid.raw[14], &cgid.raw[15]);
65   	
66   	  if (r != 16) {
67   	    ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
68   	    malformed = true;
69   	  }
70   	
71   	  gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
72   	
73   	  for (gid_idx = 0; gid_idx < port_attr.gid_tbl_len; gid_idx++) {
74   	    r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
75   	    if (r) {
76   	      lderr(cct) << __func__  << " query gid of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
77   	      ceph_abort();
78   	    }
79   	    r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
80   	    if (r) {
81   	      lderr(cct) << __func__  << " query gid attributes of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
82   	      ceph_abort();
83   	    }
84   	
85   	    if (malformed) break; // stay with gid_idx=0
86   	    if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
87   		 (memcmp(&gid, &cgid, 16) == 0) ) {
88   	      ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
89   	      break;
90   	    }
91   	  }
92   	
93   	  if (gid_idx == port_attr.gid_tbl_len) {
94   	    lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
95   	    ceph_abort();
96   	  }
97   	#else
98   	  r = ibv_query_gid(ctxt, port_num, 0, &gid);
99   	  if (r) {
100  	    lderr(cct) << __func__  << " query gid failed  " << cpp_strerror(errno) << dendl;
101  	    ceph_abort();
102  	  }
103  	#endif
104  	}
105  	
106  	Device::Device(CephContext *cct, ibv_device* ib_dev): device(ib_dev), active_port(nullptr)
107  	{
108  	  ceph_assert(device);
109  	  ctxt = ibv_open_device(device);
110  	  ceph_assert(ctxt);
111  	
112  	  name = ibv_get_device_name(device);
113  	
114  	  int r = ibv_query_device(ctxt, &device_attr);
115  	  if (r) {
116  	    lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
117  	    ceph_abort();
118  	  }
119  	}
120  	
121  	Device::Device(CephContext *cct, struct ibv_context *ib_ctx): device(ib_ctx->device),
122  	                                                              active_port(nullptr)
123  	{
124  	  ceph_assert(device);
125  	  ctxt = ib_ctx;
126  	  ceph_assert(ctxt);
127  	
128  	  name = ibv_get_device_name(device);
129  	
130  	  int r = ibv_query_device(ctxt, &device_attr);
131  	  if (r) {
132  	    lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
133  	    ceph_abort();
134  	  }
135  	}
136  	
137  	void Device::binding_port(CephContext *cct, int port_num) {
138  	  port_cnt = device_attr.phys_port_cnt;
139  	  for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) {
140  	    Port *port = new Port(cct, ctxt, port_id);
141  	    if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
142  	      active_port = port;
143  	      ldout(cct, 1) << __func__ << " found active port " << static_cast<int>(port_id) << dendl;
144  	      break;
145  	    } else {
146  	      ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: "
147  	                     << ibv_port_state_str(port->get_port_attr()->state) << dendl;
148  	      delete port;
149  	    }
150  	  }
151  	  if (nullptr == active_port) {
152  	    lderr(cct) << __func__ << "  port not found" << dendl;
153  	    ceph_assert(active_port);
154  	  }
155  	}
156  	
157  	
158  	Infiniband::QueuePair::QueuePair(
159  	    CephContext *c, Infiniband& infiniband, ibv_qp_type type,
160  	    int port, ibv_srq *srq,
161  	    Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
162  	    uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
163  	: cct(c), infiniband(infiniband),
164  	  type(type),
165  	  ctxt(infiniband.device->ctxt),
166  	  ib_physical_port(port),
167  	  pd(infiniband.pd->pd),
168  	  srq(srq),
169  	  qp(NULL),
170  	  cm_id(cid), peer_cm_meta{0}, local_cm_meta{0},
171  	  txcq(txcq),
172  	  rxcq(rxcq),
173  	  initial_psn(lrand48() & PSN_MSK),
174  	  // One extra WR for beacon
175  	  max_send_wr(tx_queue_len + 1),
176  	  max_recv_wr(rx_queue_len),
177  	  q_key(q_key),
178  	  dead(false)
179  	{
180  	  if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
181  	    lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
182  	    ceph_abort();
183  	  }
184  	}
185  	
186  	int Infiniband::QueuePair::modify_qp_to_error(void)
187  	{
188  	    ibv_qp_attr qpa;
189  	    memset(&qpa, 0, sizeof(qpa));
190  	    qpa.qp_state = IBV_QPS_ERR;
191  	    if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) {
192  	      lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl;
193  	      return -1;
194  	    }
195  	    ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl;
196  	    return 0;
197  	}
198  	
199  	int Infiniband::QueuePair::modify_qp_to_rts(void)
200  	{
201  	  // move from RTR state RTS
202  	  ibv_qp_attr qpa;
203  	  memset(&qpa, 0, sizeof(qpa));
204  	  qpa.qp_state = IBV_QPS_RTS;
205  	  /*
206  	   * How long to wait before retrying if packet lost or server dead.
207  	   * Supposedly the timeout is 4.096us*2^timeout.  However, the actual
208  	   * timeout appears to be 4.096us*2^(timeout+1), so the setting
209  	   * below creates a 135ms timeout.
210  	   */
211  	  qpa.timeout = 0x12;
212  	  // How many times to retry after timeouts before giving up.
213  	  qpa.retry_cnt = 7;
214  	  /*
215  	   * How many times to retry after RNR (receiver not ready) condition
216  	   * before giving up. Occurs when the remote side has not yet posted
217  	   * a receive request.
218  	   */
219  	  qpa.rnr_retry = 7; // 7 is infinite retry.
220  	  qpa.sq_psn = local_cm_meta.psn;
221  	  qpa.max_rd_atomic = 1;
222  	
223  	  int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
224  	  int r = ibv_modify_qp(qp, &qpa, attr_mask);
225  	  if (r) {
226  	    lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl;
227  	    return -1;
228  	  }
229  	  ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
230  	  return 0;
231  	}
232  	
233  	int Infiniband::QueuePair::modify_qp_to_rtr(void)
234  	{
235  	  // move from INIT to RTR state
236  	  ibv_qp_attr qpa;
237  	  memset(&qpa, 0, sizeof(qpa));
238  	  qpa.qp_state = IBV_QPS_RTR;
239  	  qpa.path_mtu = IBV_MTU_1024;
240  	  qpa.dest_qp_num = peer_cm_meta.local_qpn;
241  	  qpa.rq_psn = peer_cm_meta.psn;
242  	  qpa.max_dest_rd_atomic = 1;
243  	  qpa.min_rnr_timer = 0x12;
244  	  qpa.ah_attr.is_global = 1;
245  	  qpa.ah_attr.grh.hop_limit = 6;
246  	  qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
247  	  qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx();
248  	  qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
249  	  //qpa.ah_attr.grh.flow_label = 0;
250  	
251  	  qpa.ah_attr.dlid = peer_cm_meta.lid;
252  	  qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
253  	  qpa.ah_attr.src_path_bits = 0;
254  	  qpa.ah_attr.port_num = (uint8_t)(ib_physical_port);
255  	
256  	  ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
257  	
258  	  int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC;
259  	
260  	  int r = ibv_modify_qp(qp, &qpa, attr_mask);
261  	  if (r) {
262  	    lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl;
263  	    return -1;
264  	  }
265  	  ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
266  	  return 0;
267  	}
268  	
269  	int Infiniband::QueuePair::modify_qp_to_init(void)
270  	{
271  	  // move from RESET to INIT state
272  	  ibv_qp_attr qpa;
273  	  memset(&qpa, 0, sizeof(qpa));
274  	  qpa.qp_state   = IBV_QPS_INIT;
275  	  qpa.pkey_index = 0;
276  	  qpa.port_num   = (uint8_t)(ib_physical_port);
277  	  qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
278  	  qpa.qkey       = q_key;
279  	
280  	  int mask = IBV_QP_STATE | IBV_QP_PORT;
281  	  switch (type) {
282  	    case IBV_QPT_RC:
283  	      mask |= IBV_QP_ACCESS_FLAGS;
284  	      mask |= IBV_QP_PKEY_INDEX;
285  	      break;
286  	    case IBV_QPT_UD:
287  	      mask |= IBV_QP_QKEY;
288  	      mask |= IBV_QP_PKEY_INDEX;
289  	      break;
290  	    case IBV_QPT_RAW_PACKET:
291  	      break;
292  	    default:
293  	      ceph_abort();
294  	  }
295  	
296  	  if (ibv_modify_qp(qp, &qpa, mask)) {
297  	    lderr(cct) << __func__ << " failed to switch to INIT state Queue Pair, qp number: " << qp->qp_num
298  	               << " Error: " << cpp_strerror(errno) << dendl;
299  	    return -1;
300  	  }
301  	  ldout(cct, 20) << __func__ << " successfully switch to INIT state Queue Pair, qp number: " << qp->qp_num << dendl;
302  	  return 0;
303  	}
304  	
305  	int Infiniband::QueuePair::init()
306  	{
307  	  ldout(cct, 20) << __func__ << " started." << dendl;
308  	  ibv_qp_init_attr qpia;
309  	  memset(&qpia, 0, sizeof(qpia));
310  	  qpia.send_cq = txcq->get_cq();
311  	  qpia.recv_cq = rxcq->get_cq();
312  	  if (srq) {
313  	    qpia.srq = srq;                      // use the same shared receive queue
314  	  } else {
315  	    qpia.cap.max_recv_wr = max_recv_wr;
316  	    qpia.cap.max_recv_sge = 1;
317  	  }
318  	  qpia.cap.max_send_wr  = max_send_wr; // max outstanding send requests
319  	  qpia.cap.max_send_sge = 1;           // max send scatter-gather elements
320  	  qpia.cap.max_inline_data = MAX_INLINE_DATA;          // max bytes of immediate data on send q
321  	  qpia.qp_type = type;                 // RC, UC, UD, or XRC
322  	  qpia.sq_sig_all = 0;                 // only generate CQEs on requested WQEs
323  	
324  	  if (!cct->_conf->ms_async_rdma_cm) {
325  	    qp = ibv_create_qp(pd, &qpia);
326  	    if (qp == NULL) {
327  	      lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
328  	      if (errno == ENOMEM) {
329  	        lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
330  	                                  " ms_async_rdma_send_buffers or"
331  	                                  " ms_async_rdma_buffer_size" << dendl;
332  	      }
333  	      return -1;
334  	    }
335  	    if (modify_qp_to_init() != 0) {
336  	      ibv_destroy_qp(qp);
337  	      return -1;
338  	    }
339  	  } else {
340  	    ceph_assert(cm_id->verbs == pd->context);
341  	    if (rdma_create_qp(cm_id, pd, &qpia)) {
342  	      lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
343  	                 << cpp_strerror(errno) << dendl;
344  	      return -1;
345  	    }
346  	    qp = cm_id->qp;
347  	  }
348  	  ldout(cct, 20) << __func__ << " successfully create queue pair: "
349  	                 << "qp=" << qp << dendl;
350  	  local_cm_meta.local_qpn = get_local_qp_number();
351  	  local_cm_meta.psn = get_initial_psn();
352  	  local_cm_meta.lid = infiniband.get_lid();
353  	  local_cm_meta.peer_qpn = 0;
354  	  local_cm_meta.gid = infiniband.get_gid();
355  	  if (!srq) {
356  	    int rq_wrs = infiniband.post_chunks_to_rq(max_recv_wr, this);
357  	    if (rq_wrs  == 0) {
358  	      lderr(cct) << __func__ << " intialize no SRQ Queue Pair, qp number: " << qp->qp_num
359  	                 << " fatal error: can't post SQ WR " << dendl;
360  	      return -1;
361  	    }
362  	    ldout(cct, 20) << __func__ << " initialize no SRQ Queue Pair, qp number: "
363  	                   << qp->qp_num << " post SQ WR " << rq_wrs << dendl;
364  	  }
365  	  return 0;
366  	}
367  	
368  	void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data)
369  	{
370  	  char tmp[9];
371  	  uint32_t v32;
372  	  int i;
373  	
374  	  for (tmp[8] = 0, i = 0; i < 4; ++i) {
375  	    memcpy(tmp, wgid + i * 8, 8);
376  	    sscanf(tmp, "%x", &v32);
377  	    *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32);
378  	  }
379  	}
380  	
381  	void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[])
382  	{
383  	  for (int i = 0; i < 4; ++i)
384  	    sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4)));
385  	}
386  	
387  	/*
388  	 * return value
389  	 *   1: means no valid buffer read
390  	 *   0: means got enough buffer
391  	 * < 0: means error
392  	 */
393  	int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd)
394  	{
395  	  char msg[TCP_MSG_LEN];
396  	  char gid[33];
397  	  ssize_t r = ::read(socket_fd, &msg, sizeof(msg));
398  	  // Drop incoming qpt
399  	  if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
400  	    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
401  	      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
402  	      return -EINVAL;
403  	    }
404  	  }
405  	  if (r < 0) {
406  	    r = -errno;
407  	    lderr(cct) << __func__ << " got error " << r << ": "
408  	               << cpp_strerror(r) << dendl;
409  	  } else if (r == 0) { // valid disconnect message of length 0
410  	    ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
411  	  } else if ((size_t)r != sizeof(msg)) { // invalid message
412  	    ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
413  	    r = -EINVAL;
414  	  } else { // valid message
415  	    sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid);
416  	    wire_gid_to_gid(gid, &peer_cm_meta);
417  	    ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn
418  	                  << ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl;
419  	  }
420  	  return r;
421  	}
422  	
423  	int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd)
424  	{
425  	  int retry = 0;
426  	  ssize_t r;
427  	
428  	  char msg[TCP_MSG_LEN];
429  	  char gid[33];
430  	retry:
431  	  gid_to_wire_gid(local_cm_meta, gid);
432  	  sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid);
433  	  ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn
434  	                 << ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", "  << gid  << dendl;
435  	  r = ::write(socket_fd, msg, sizeof(msg));
436  	  // Drop incoming qpt
437  	  if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
438  	    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
439  	      ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
440  	      return -EINVAL;
441  	    }
442  	  }
443  	
444  	  if ((size_t)r != sizeof(msg)) {
445  	    // FIXME need to handle EAGAIN instead of retry
446  	    if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
447  	      retry++;
448  	      goto retry;
449  	    }
450  	    if (r < 0)
451  	      lderr(cct) << __func__ << " send returned error " << errno << ": "
452  	                 << cpp_strerror(errno) << dendl;
453  	    else
454  	      lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
455  	    return -errno;
456  	  }
457  	  return 0;
458  	}
459  	
460  	/**
461  	 * Switch QP to ERROR state and then post a beacon to be able to drain all
462  	 * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event()
463  	 * for details.
464  	 *
465  	 * \return
466  	 *      -errno if the QueuePair can't switch to ERROR
467  	 *      0 for success.
468  	 */
469  	int Infiniband::QueuePair::to_dead()
470  	{
471  	  if (dead)
472  	    return 0;
473  	
474  	  if (modify_qp_to_error()) {
475  	    return -1;
476  	  }
477  	  ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn
478  	                 << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl;
479  	
480  	  struct ibv_send_wr *bad_wr = nullptr, beacon;
481  	  memset(&beacon, 0, sizeof(beacon));
482  	  beacon.wr_id = BEACON_WRID;
483  	  beacon.opcode = IBV_WR_SEND;
484  	  beacon.send_flags = IBV_SEND_SIGNALED;
485  	  if (ibv_post_send(qp, &beacon, &bad_wr)) {
486  	    lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl;
487  	    return -errno;
488  	  }
489  	  ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl;
490  	  dead = true;
491  	
492  	  return 0;
493  	}
494  	
495  	int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
496  	{
497  	  ibv_qp_attr qpa;
498  	  ibv_qp_init_attr qpia;
499  	
500  	  int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
501  	  if (r) {
502  	    lderr(cct) << __func__ << " failed to query qp: "
503  	      << cpp_strerror(errno) << dendl;
504  	    return -1;
505  	  }
506  	
507  	  if (rqp)
508  	    *rqp = qpa.dest_qp_num;
509  	  return 0;
510  	}
511  	
512  	/**
513  	 * Get the remote infiniband address for this QueuePair, as set in #plumb().
514  	 * LIDs are "local IDs" in infiniband terminology. They are short, locally
515  	 * routable addresses.
516  	 */
517  	int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
518  	{
519  	  ibv_qp_attr qpa;
520  	  ibv_qp_init_attr qpia;
521  	
522  	  int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
523  	  if (r) {
524  	    lderr(cct) << __func__ << " failed to query qp: "
525  	      << cpp_strerror(errno) << dendl;
526  	    return -1;
527  	  }
528  	
529  	  if (lid)
530  	    *lid = qpa.ah_attr.dlid;
531  	  return 0;
532  	}
533  	
534  	/**
535  	 * Get the state of a QueuePair.
536  	 */
537  	int Infiniband::QueuePair::get_state() const
538  	{
539  	  ibv_qp_attr qpa;
540  	  ibv_qp_init_attr qpia;
541  	
542  	  int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
543  	  if (r) {
544  	    lderr(cct) << __func__ << " failed to get state: "
545  	      << cpp_strerror(errno) << dendl;
546  	    return -1;
547  	  }
548  	  return qpa.qp_state;
549  	}
550  	
551  	Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
552  	  : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
553  	{
554  	}
555  	
556  	Infiniband::CompletionChannel::~CompletionChannel()
557  	{
558  	  if (channel) {
559  	    int r = ibv_destroy_comp_channel(channel);
560  	    if (r < 0)
561  	      lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
562  	    ceph_assert(r == 0);
563  	  }
564  	}
565  	
566  	int Infiniband::CompletionChannel::init()
567  	{
568  	  ldout(cct, 20) << __func__ << " started." << dendl;
569  	  channel = ibv_create_comp_channel(infiniband.device->ctxt);
570  	  if (!channel) {
571  	    lderr(cct) << __func__ << " failed to create receive completion channel: "
572  	                          << cpp_strerror(errno) << dendl;
573  	    return -1;
574  	  }
575  	  int rc = NetHandler(cct).set_nonblock(channel->fd);
576  	  if (rc < 0) {
577  	    ibv_destroy_comp_channel(channel);
578  	    return -1;
579  	  }
580  	  return 0;
581  	}
582  	
583  	void Infiniband::CompletionChannel::ack_events()
584  	{
585  	  ibv_ack_cq_events(cq, cq_events_that_need_ack);
586  	  cq_events_that_need_ack = 0;
587  	}
588  	
589  	bool Infiniband::CompletionChannel::get_cq_event()
590  	{
591  	  ibv_cq *cq = NULL;
592  	  void *ev_ctx;
593  	  if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
594  	    if (errno != EAGAIN && errno != EINTR)
595  	      lderr(cct) << __func__ << " failed to retrieve CQ event: "
596  	                 << cpp_strerror(errno) << dendl;
597  	    return false;
598  	  }
599  	
600  	  /* accumulate number of cq events that need to
601  	   *    * be acked, and periodically ack them
602  	   *       */
603  	  if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
604  	    ldout(cct, 20) << __func__ << " ack aq events." << dendl;
605  	    ibv_ack_cq_events(cq, MAX_ACK_EVENT);
606  	    cq_events_that_need_ack = 0;
607  	  }
608  	
609  	  return true;
610  	}
611  	
612  	
613  	Infiniband::CompletionQueue::~CompletionQueue()
614  	{
615  	  if (cq) {
616  	    int r = ibv_destroy_cq(cq);
617  	    if (r < 0)
618  	      lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
619  	    ceph_assert(r == 0);
620  	  }
621  	}
622  	
623  	int Infiniband::CompletionQueue::init()
624  	{
625  	  cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
626  	  if (!cq) {
627  	    lderr(cct) << __func__ << " failed to create receive completion queue: "
628  	      << cpp_strerror(errno) << dendl;
629  	    return -1;
630  	  }
631  	
632  	  if (ibv_req_notify_cq(cq, 0)) {
633  	    lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
634  	    ibv_destroy_cq(cq);
635  	    cq = nullptr;
636  	    return -1;
637  	  }
638  	
639  	  channel->bind_cq(cq);
640  	  ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
641  	  return 0;
642  	}
643  	
644  	int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
645  	{
646  	  ldout(cct, 20) << __func__ << " started." << dendl;
647  	  int r = ibv_req_notify_cq(cq, 0);
648  	  if (r < 0)
649  	    lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
650  	  return r;
651  	}
652  	
653  	int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
654  	  int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
655  	  if (r < 0) {
656  	    lderr(cct) << __func__ << " poll_completion_queue occur met error: "
657  	      << cpp_strerror(errno) << dendl;
658  	    return -1;
659  	  }
660  	  return r;
661  	}
662  	
663  	
664  	Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
665  	  : pd(ibv_alloc_pd(device->ctxt))
666  	{
667  	  if (pd == NULL) {
668  	    lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
669  	    ceph_abort();
670  	  }
671  	}
672  	
673  	Infiniband::ProtectionDomain::~ProtectionDomain()
674  	{
675  	  ibv_dealloc_pd(pd);
676  	}
677  	
678  	
679  	Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t bytes, char* buffer,
680  	    uint32_t offset, uint32_t bound, uint32_t lkey, QueuePair* qp)
681  	  : mr(m), qp(qp), lkey(lkey), bytes(bytes), offset(offset), bound(bound), buffer(buffer)
682  	{
683  	}
684  	
685  	Infiniband::MemoryManager::Chunk::~Chunk()
686  	{
687  	}
688  	
689  	uint32_t Infiniband::MemoryManager::Chunk::get_offset()
690  	{
691  	  return offset;
692  	}
693  	
694  	uint32_t Infiniband::MemoryManager::Chunk::get_size() const
695  	{
696  	  return bound - offset;
697  	}
698  	
699  	void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
700  	{
701  	  offset = 0;
702  	  bound = b;
703  	}
704  	
705  	uint32_t Infiniband::MemoryManager::Chunk::get_bound()
706  	{
707  	  return bound;
708  	}
709  	
710  	uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
711  	{
712  	  uint32_t left = get_size();
713  	  uint32_t read_len = left <= len ? left : len;
714  	  memcpy(buf, buffer + offset, read_len);
715  	  offset += read_len;
716  	  return read_len;
717  	}
718  	
719  	uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
720  	{
721  	  uint32_t write_len = (bytes - offset) <= len ? (bytes - offset) : len;
722  	  memcpy(buffer + offset, buf, write_len);
723  	  offset += write_len;
724  	  return write_len;
725  	}
726  	
727  	bool Infiniband::MemoryManager::Chunk::full()
728  	{
729  	  return offset == bytes;
730  	}
731  	
732  	void Infiniband::MemoryManager::Chunk::reset_read_chunk()
733  	{
734  	  offset = 0;
735  	  bound = 0;
736  	}
737  	
738  	void Infiniband::MemoryManager::Chunk::reset_write_chunk()
739  	{
740  	  offset = 0;
741  	  bound = bytes;
742  	}
743  	
744  	Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
745  	  : manager(m), buffer_size(s)
746  	{
747  	}
748  	
749  	Infiniband::MemoryManager::Cluster::~Cluster()
750  	{
751  	  int r = ibv_dereg_mr(chunk_base->mr);
752  	  ceph_assert(r == 0);
753  	  const auto chunk_end = chunk_base + num_chunk;
754  	  for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
755  	    chunk->~Chunk();
756  	  }
757  	
758  	  ::free(chunk_base);
759  	  manager.free(base);
760  	}
761  	
762  	int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
763  	{
764  	  ceph_assert(!base);
765  	  num_chunk = num;
766  	  uint32_t bytes = buffer_size * num;
767  	
768  	  base = (char*)manager.malloc(bytes);
769  	  end = base + bytes;
770  	  ceph_assert(base);
771  	  chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
772  	  memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
773  	  free_chunks.reserve(num);
774  	  ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
775  	  ceph_assert(m);
776  	  Chunk* chunk = chunk_base;
777  	  for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
778  	    new(chunk) Chunk(m, buffer_size, base + offset, 0, buffer_size, m->lkey);
779  	    free_chunks.push_back(chunk);
780  	    chunk++;
781  	  }
782  	  return 0;
783  	}
784  	
785  	void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
786  	{
787  	  std::lock_guard l{lock};
788  	  for (auto c : ck) {
789  	    c->reset_write_chunk();
790  	    free_chunks.push_back(c);
791  	  }
792  	}
793  	
794  	int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t block_size)
795  	{
796  	  std::lock_guard l{lock};
797  	  uint32_t chunk_buffer_number = (block_size + buffer_size - 1) / buffer_size;
798  	  chunk_buffer_number = free_chunks.size() < chunk_buffer_number ? free_chunks.size(): chunk_buffer_number;
799  	  uint32_t r = 0;
800  	
801  	  for (r = 0; r < chunk_buffer_number; ++r) {
802  	    chunks.push_back(free_chunks.back());
803  	    free_chunks.pop_back();
804  	  }
805  	  return r;
806  	}
807  	
808  	bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs)
809  	{
810  	  /* unlimited */
811  	  if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
812  	    return true;
813  	
814  	  if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
815  	    lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
816  	        n_bufs_allocated << " requested: " << nbufs <<
817  	        " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
818  	    return false;
819  	  }
820  	
821  	  return true;
822  	}
823  	
824  	void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) {
825  	  perf_logger = logger;
826  	  if (perf_logger != nullptr)
827  	    perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
828  	}
829  	
830  	void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs)
831  	{
832  	  n_bufs_allocated += nbufs;
833  	
834  	  if (!perf_logger)
835  	    return;
836  	
837  	  if (nbufs > 0) {
838  	    perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
839  	  } else {
840  	    perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
841  	  }
842  	}
843  	
844  	void *Infiniband::MemoryManager::mem_pool::slow_malloc()
845  	{
846  	  void *p;
847  	
848  	  std::lock_guard l{PoolAllocator::lock};
849  	  PoolAllocator::g_ctx = ctx;
850  	  // this will trigger pool expansion via PoolAllocator::malloc()
851  	  p = boost::pool<PoolAllocator>::malloc();
852  	  PoolAllocator::g_ctx = nullptr;
853  	  return p;
854  	}
855  	Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
856  	ceph::mutex Infiniband::MemoryManager::PoolAllocator::lock =
857  				    ceph::make_mutex("pool-alloc-lock");
858  	
859  	// lock is taken by mem_pool::slow_malloc()
860  	char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size)
861  	{
862  	  ceph_assert(g_ctx);
863  	  MemoryManager *manager = g_ctx->manager;
864  	  CephContext *cct = manager->cct;
865  	  size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
866  	  size_t chunk_buffer_number = block_size / chunk_buffer_size;
867  	
868  	  if (!g_ctx->can_alloc(chunk_buffer_number))
869  	    return NULL;
870  	
871  	  mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info)));
872  	  if (!minfo) {
873  	    lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers "
874  	      " Its block size is : " << block_size + sizeof(mem_info) << dendl;
875  	    return NULL;
876  	  }
877  	
878  	  minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
879  	  if (minfo->mr == NULL) {
880  	    lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. "
881  	      " relase allocated memory now." << dendl;
882  	    manager->free(minfo);
883  	    return NULL;
884  	  }
885  	
886  	  minfo->nbufs = chunk_buffer_number;
887  	  // save this chunk context
888  	  minfo->ctx   = g_ctx;
889  	
890  	  // note that the memory can be allocated before perf logger is set
891  	  g_ctx->update_stats(chunk_buffer_number);
892  	
893  	  /* initialize chunks */
894  	  Chunk *chunk = minfo->chunks;
895  	  for (unsigned i = 0; i < chunk_buffer_number; i++) {
896  	    new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey);
897  	    chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size);
898  	  }
899  	
900  	  return reinterpret_cast<char *>(minfo->chunks);
901  	}
902  	
903  	
904  	void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
905  	{
906  	  mem_info *m;
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
907  	  std::lock_guard l{lock};
908  	    
909  	  Chunk *mem_info_chunk = reinterpret_cast<Chunk *>(block);
910  	  m = reinterpret_cast<mem_info *>(reinterpret_cast<char *>(mem_info_chunk) - offsetof(mem_info, chunks));
911  	  m->ctx->update_stats(-m->nbufs);
912  	  ibv_dereg_mr(m->mr);
913  	  m->ctx->manager->free(m);
914  	}
915  	
916  	Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
917  	  : cct(c), device(d), pd(p),
918  	    rxbuf_pool_ctx(this),
919  	    rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
920  	               c->_conf->ms_async_rdma_receive_buffers > 0 ?
921  	                  // if possible make initial pool size 2 * receive_queue_len
922  	                  // that way there will be no pool expansion upon receive of the
923  	                  // first packet.
924  	                  (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
925  	                   c->_conf->ms_async_rdma_receive_buffers :  2 * c->_conf->ms_async_rdma_receive_queue_len) :
926  	                  // rx pool is infinite, we can set any initial size that we want
927  	                   2 * c->_conf->ms_async_rdma_receive_queue_len,
928  	                   device->device_attr.max_mr_size / (sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size))
929  	{
930  	}
931  	
932  	Infiniband::MemoryManager::~MemoryManager()
933  	{
934  	  if (send)
935  	    delete send;
936  	}
937  	
938  	void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
939  	{
940  	  size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB;
941  	  char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0);
942  	  if (ptr == MAP_FAILED) {
943  	    ptr = (char *)std::malloc(real_size);
944  	    if (ptr == NULL) return NULL;
945  	    real_size = 0;
946  	  }
947  	  *((size_t *)ptr) = real_size;
948  	  return ptr + HUGE_PAGE_SIZE_2MB;
949  	}
950  	
951  	void Infiniband::MemoryManager::huge_pages_free(void *ptr)
952  	{
953  	  if (ptr == NULL) return;
954  	  void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB;
955  	  size_t real_size = *((size_t *)real_ptr);
956  	  ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0);
957  	  if (real_size != 0)
958  	    munmap(real_ptr, real_size);
959  	  else
960  	    std::free(real_ptr);
961  	}
962  	
963  	
964  	void* Infiniband::MemoryManager::malloc(size_t size)
965  	{
966  	  if (cct->_conf->ms_async_rdma_enable_hugepage)
967  	    return huge_pages_malloc(size);
968  	  else
969  	    return std::malloc(size);
970  	}
971  	
972  	void Infiniband::MemoryManager::free(void *ptr)
973  	{
974  	  if (cct->_conf->ms_async_rdma_enable_hugepage)
975  	    huge_pages_free(ptr);
976  	  else
977  	    std::free(ptr);
978  	}
979  	
980  	void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
981  	{
982  	  ceph_assert(device);
983  	  ceph_assert(pd);
984  	
985  	  send = new Cluster(*this, size);
986  	  send->fill(tx_num);
987  	}
988  	
989  	void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
990  	{
991  	  send->take_back(chunks);
992  	}
993  	
994  	int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
995  	{
996  	  return send->get_buffers(c, bytes);
997  	}
998  	
999  	static std::atomic<bool> init_prereq = {false};
1000 	
1001 	void Infiniband::verify_prereq(CephContext *cct) {
1002 	   int rc = 0;
1003 	   ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage <<  dendl;
1004 	   if (cct->_conf->ms_async_rdma_enable_hugepage){
1005 	     rc =  setenv("RDMAV_HUGEPAGES_SAFE","1",1);
1006 	     ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") <<  dendl;
1007 	     if (rc) {
1008 	       lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
1009 	       ceph_abort();
1010 	     }
1011 	   }
1012 	
1013 	  //On RDMA MUST be called before fork
1014 	   rc = ibv_fork_init();
1015 	   if (rc) {
1016 	      lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
1017 	      ceph_abort();
1018 	   }
1019 	
1020 	   //Check ulimit
1021 	   struct rlimit limit;
1022 	   getrlimit(RLIMIT_MEMLOCK, &limit);
1023 	   if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
1024 	      lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
1025 					  " We recommend setting this parameter to infinity" << dendl;
1026 	   }
1027 	   init_prereq = true;
1028 	}
1029 	
1030 	Infiniband::Infiniband(CephContext *cct)
1031 	  : cct(cct),
1032 	    device_name(cct->_conf->ms_async_rdma_device_name),
1033 	    port_num( cct->_conf->ms_async_rdma_port_num)
1034 	{
1035 	  if (!init_prereq)
1036 	    verify_prereq(cct);
1037 	  ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
1038 	}
1039 	
1040 	void Infiniband::init()
1041 	{
1042 	  std::lock_guard l{lock};
1043 	
1044 	  if (initialized)
1045 	    return;
1046 	
1047 	  device_list = new DeviceList(cct);
1048 	  initialized = true;
1049 	
1050 	  device = device_list->get_device(device_name.c_str());
1051 	  ceph_assert(device);
1052 	  device->binding_port(cct, port_num);
1053 	  ib_physical_port = device->active_port->get_port_num();
1054 	  pd = new ProtectionDomain(cct, device);
1055 	  ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
1056 	
1057 	  support_srq = cct->_conf->ms_async_rdma_support_srq;
1058 	  if (support_srq) {
1059 	    ceph_assert(device->device_attr.max_srq);
1060 	    rx_queue_len = device->device_attr.max_srq_wr;
1061 	  }
1062 	  else
1063 	    rx_queue_len = device->device_attr.max_qp_wr;
1064 	  if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
1065 	    rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
1066 	    ldout(cct, 1) << __func__ << " assigning: " << rx_queue_len << " receive buffers" << dendl;
1067 	  } else {
1068 	    ldout(cct, 0) << __func__ << " using the max allowed receive buffers: " << rx_queue_len << dendl;
1069 	  }
1070 	
1071 	  // check for the misconfiguration
1072 	  if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
1073 	      rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
1074 	    lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
1075 	                  rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
1076 	                  cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
1077 	    ceph_abort();
1078 	  }
1079 	
1080 	  // Keep extra one WR for a beacon to indicate all WCEs were consumed
1081 	  tx_queue_len = device->device_attr.max_qp_wr - 1;
1082 	  if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
1083 	    tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
1084 	    ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers"  << dendl;
1085 	  } else {
1086 	    ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
1087 	  }
1088 	
1089 	  //check for the memory region size misconfiguration
1090 	  if ((uint64_t)cct->_conf->ms_async_rdma_buffer_size * tx_queue_len > device->device_attr.max_mr_size) {
1091 	    lderr(cct) << __func__ << " Out of max memory region size " << dendl;
1092 	    ceph_abort();
1093 	  }
1094 	
1095 	  ldout(cct, 1) << __func__ << " device allow " << device->device_attr.max_cqe
1096 	                << " completion entries" << dendl;
1097 	
1098 	  memory_manager = new MemoryManager(cct, device, pd);
1099 	  memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
1100 	
1101 	  if (support_srq) {
1102 	    srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
1103 	    post_chunks_to_rq(rx_queue_len, NULL); //add to srq
1104 	  }
1105 	}
1106 	
1107 	Infiniband::~Infiniband()
1108 	{
1109 	  if (!initialized)
1110 	    return;
1111 	  if (support_srq)
1112 	    ibv_destroy_srq(srq);
1113 	  delete memory_manager;
1114 	  delete pd;
1115 	  delete device_list;
1116 	}
1117 	
1118 	/**
1119 	 * Create a shared receive queue. This basically wraps the verbs call. 
1120 	 *
1121 	 * \param[in] max_wr
1122 	 *      The max number of outstanding work requests in the SRQ.
1123 	 * \param[in] max_sge
1124 	 *      The max number of scatter elements per WR.
1125 	 * \return
1126 	 *      A valid ibv_srq pointer, or NULL on error.
1127 	 */
1128 	ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
1129 	{
1130 	  ibv_srq_init_attr sia;
1131 	  memset(&sia, 0, sizeof(sia));
1132 	  sia.srq_context = device->ctxt;
1133 	  sia.attr.max_wr = max_wr;
1134 	  sia.attr.max_sge = max_sge;
1135 	  return ibv_create_srq(pd->pd, &sia);
1136 	}
1137 	
1138 	int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
1139 	{
1140 	  return memory_manager->get_send_buffers(c, bytes);
1141 	}
1142 	
1143 	/**
1144 	 * Create a new QueuePair. This factory should be used in preference to
1145 	 * the QueuePair constructor directly, since this lets derivatives of
1146 	 * Infiniband, e.g. MockInfiniband (if it existed),
1147 	 * return mocked out QueuePair derivatives.
1148 	 *
1149 	 * \return
1150 	 *      QueuePair on success or NULL if init fails
1151 	 * See QueuePair::QueuePair for parameter documentation.
1152 	 */
1153 	Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
1154 	    CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
1155 	{
1156 	  Infiniband::QueuePair *qp = new QueuePair(
1157 	      cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
1158 	  if (qp->init()) {
1159 	    delete qp;
1160 	    return NULL;
1161 	  }
1162 	  return qp;
1163 	}
1164 	
1165 	int Infiniband::post_chunks_to_rq(int rq_wr_num, QueuePair *qp)
1166 	{
1167 	  int ret = 0;
1168 	  Chunk *chunk = nullptr;
1169 	
1170 	  ibv_recv_wr *rx_work_request = static_cast<ibv_recv_wr*>(::calloc(rq_wr_num, sizeof(ibv_recv_wr)));
1171 	  ibv_sge *isge = static_cast<ibv_sge*>(::calloc(rq_wr_num, sizeof(ibv_sge)));
1172 	  ceph_assert(rx_work_request);
1173 	  ceph_assert(isge);
1174 	
1175 	  int i = 0;
1176 	  while (i < rq_wr_num) {
1177 	    chunk = get_memory_manager()->get_rx_buffer();
1178 	    if (chunk == nullptr) {
1179 	      lderr(cct) << __func__ << " WARNING: out of memory. Request " << rq_wr_num <<
1180 	                 " rx buffers. Only get " << i << " rx buffers." << dendl;
1181 	      if (i == 0) {
1182 	        ::free(rx_work_request);
1183 	        ::free(isge);
1184 	        return 0;
1185 	      }
1186 	      break; //get some buffers, so we need post them to recevie queue
1187 	    }
1188 	
1189 	    isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
1190 	    isge[i].length = chunk->bytes;
1191 	    isge[i].lkey = chunk->lkey;
1192 	
1193 	    rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// assign chunk address as work request id
1194 	
1195 	    if (i != 0) {
1196 	      rx_work_request[i - 1].next = &rx_work_request[i];
1197 	    }
1198 	    rx_work_request[i].sg_list = &isge[i];
1199 	    rx_work_request[i].num_sge = 1;
1200 	
1201 	    if (qp && !qp->get_srq()) {
1202 	       chunk->set_qp(qp);
1203 	       qp->add_rq_wr(chunk);
1204 	    }
1205 	    i++;
1206 	  }
1207 	
1208 	  ibv_recv_wr *badworkrequest = nullptr;
1209 	  if (support_srq) {
1210 	    ret = ibv_post_srq_recv(srq, rx_work_request, &badworkrequest);
1211 	  } else {
1212 	    ceph_assert(qp);
1213 	    ret = ibv_post_recv(qp->get_qp(), rx_work_request, &badworkrequest);
1214 	  }
1215 	
1216 	  ::free(rx_work_request);
1217 	  ::free(isge);
1218 	  ceph_assert(badworkrequest == nullptr && ret == 0);
1219 	  return i;
1220 	}
1221 	
1222 	Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
1223 	{
1224 	  Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
1225 	  if (cc->init()) {
1226 	    delete cc;
1227 	    return NULL;
1228 	  }
1229 	  return cc;
1230 	}
1231 	
1232 	Infiniband::CompletionQueue* Infiniband::create_comp_queue(
1233 	    CephContext *cct, CompletionChannel *cc)
1234 	{
1235 	  Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
1236 	      cct, *this, CQ_DEPTH, cc);
1237 	  if (cq->init()) {
1238 	    delete cq;
1239 	    return NULL;
1240 	  }
1241 	  return cq;
1242 	}
1243 	
1244 	Infiniband::QueuePair::~QueuePair()
1245 	{
1246 	  ldout(cct, 20) << __func__ << " destroy Queue Pair, qp number: " << qp->qp_num << " left SQ WR " << recv_queue.size() << dendl;
1247 	  if (qp) {
1248 	    ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
1249 	    ceph_assert(!ibv_destroy_qp(qp));
1250 	  }
1251 	
1252 	  for (auto& chunk: recv_queue) {
1253 	    infiniband.get_memory_manager()->release_rx_buffer(chunk);
1254 	  }
1255 	  recv_queue.clear();
1256 	}
1257 	
1258 	/**
1259 	 * Given a string representation of the `status' field from Verbs
1260 	 * struct `ibv_wc'.
1261 	 *
1262 	 * \param[in] status
1263 	 *      The integer status obtained in ibv_wc.status.
1264 	 * \return
1265 	 *      A string corresponding to the given status.
1266 	 */
1267 	const char* Infiniband::wc_status_to_string(int status)
1268 	{
1269 	  static const char *lookup[] = {
1270 	      "SUCCESS",
1271 	      "LOC_LEN_ERR",
1272 	      "LOC_QP_OP_ERR",
1273 	      "LOC_EEC_OP_ERR",
1274 	      "LOC_PROT_ERR",
1275 	      "WR_FLUSH_ERR",
1276 	      "MW_BIND_ERR",
1277 	      "BAD_RESP_ERR",
1278 	      "LOC_ACCESS_ERR",
1279 	      "REM_INV_REQ_ERR",
1280 	      "REM_ACCESS_ERR",
1281 	      "REM_OP_ERR",
1282 	      "RETRY_EXC_ERR",
1283 	      "RNR_RETRY_EXC_ERR",
1284 	      "LOC_RDD_VIOL_ERR",
1285 	      "REM_INV_RD_REQ_ERR",
1286 	      "REM_ABORT_ERR",
1287 	      "INV_EECN_ERR",
1288 	      "INV_EEC_STATE_ERR",
1289 	      "FATAL_ERR",
1290 	      "RESP_TIMEOUT_ERR",
1291 	      "GENERAL_ERR"
1292 	  };
1293 	
1294 	  if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
1295 	    return "<status out of range!>";
1296 	  return lookup[status];
1297 	}
1298 	
1299 	const char* Infiniband::qp_state_string(int status) {
1300 	  switch(status) {
1301 	    case IBV_QPS_RESET : return "IBV_QPS_RESET";
1302 	    case IBV_QPS_INIT  : return "IBV_QPS_INIT";
1303 	    case IBV_QPS_RTR   : return "IBV_QPS_RTR";
1304 	    case IBV_QPS_RTS   : return "IBV_QPS_RTS";
1305 	    case IBV_QPS_SQD   : return "IBV_QPS_SQD";
1306 	    case IBV_QPS_SQE   : return "IBV_QPS_SQE";
1307 	    case IBV_QPS_ERR   : return "IBV_QPS_ERR";
1308 	    default: return " out of range.";
1309 	  }
1310 	}
1311