1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#include <poll.h>
18   	#include <errno.h>
19   	#include <sys/time.h>
20   	#include <sys/resource.h>
21   	
22   	#include "include/str_list.h"
23   	#include "include/compat.h"
24   	#include "common/Cycles.h"
25   	#include "common/deleter.h"
26   	#include "common/Tub.h"
27   	#include "RDMAStack.h"
28   	
29   	#define dout_subsys ceph_subsys_ms
30   	#undef dout_prefix
31   	#define dout_prefix *_dout << "RDMAStack "
32   	
33   	RDMADispatcher::~RDMADispatcher()
34   	{
35   	  ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
36   	  polling_stop();
37   	
38   	  ceph_assert(qp_conns.empty());
39   	  ceph_assert(num_qp_conn == 0);
40   	  ceph_assert(dead_queue_pairs.empty());
41   	}
42   	
43   	RDMADispatcher::RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib)
44   	  : cct(c), ib(ib)
45   	{
46   	  PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
47   	
48   	  plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
49   	  plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
50   	  plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
51   	  plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
52   	
53   	  plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
54   	  plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
55   	  plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
56   	  plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
57   	
58   	  plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
59   	  plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
60   	  plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
61   	
62   	  plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
63   	  plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
64   	
65   	  plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
66   	
67   	
68   	  plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
69   	  plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
70   	
71   	  perf_logger = plb.create_perf_counters();
72   	  cct->get_perfcounters_collection()->add(perf_logger);
73   	  Cycles::init();
74   	}
75   	
76   	void RDMADispatcher::polling_start()
77   	{
78   	  // take lock because listen/connect can happen from different worker threads
79   	  std::lock_guard l{lock};
80   	
81   	  if (t.joinable()) 
82   	    return; // dispatcher thread already running 
83   	
84   	  ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
85   	
86   	  tx_cc = ib->create_comp_channel(cct);
87   	  ceph_assert(tx_cc);
88   	  rx_cc = ib->create_comp_channel(cct);
89   	  ceph_assert(rx_cc);
90   	  tx_cq = ib->create_comp_queue(cct, tx_cc);
91   	  ceph_assert(tx_cq);
92   	  rx_cq = ib->create_comp_queue(cct, rx_cc);
93   	  ceph_assert(rx_cq);
94   	
95   	  t = std::thread(&RDMADispatcher::polling, this);
96   	  ceph_pthread_setname(t.native_handle(), "rdma-polling");
97   	}
98   	
99   	void RDMADispatcher::polling_stop()
100  	{
101  	  {
102  	    std::lock_guard l{lock};
103  	    done = true;
104  	  }
105  	
106  	  if (!t.joinable())
107  	    return;
108  	
109  	  t.join();
110  	
111  	  tx_cc->ack_events();
112  	  rx_cc->ack_events();
113  	  delete tx_cq;
114  	  delete rx_cq;
115  	  delete tx_cc;
116  	  delete rx_cc;
117  	}
118  	
119  	void RDMADispatcher::handle_async_event()
120  	{
121  	  ldout(cct, 30) << __func__ << dendl;
122  	  while (1) {
123  	    ibv_async_event async_event;
124  	    if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
125  	      if (errno != EAGAIN)
126  	       lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
127  	                  << " " << cpp_strerror(errno) << ")" << dendl;
128  	      return;
129  	    }
130  	    perf_logger->inc(l_msgr_rdma_total_async_events);
131  	    ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
132  	
133  	    switch (async_event.event_type) {
134  	      /***********************CQ events********************/
135  	      case IBV_EVENT_CQ_ERR:
136  	        lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
137  	                   << " CQ Overflow, dev = " << ib->get_device()->ctxt
138  	                   << " Need destroy and recreate resource " << dendl;
139  	        break;
140  	      /***********************QP events********************/
141  	      case IBV_EVENT_QP_FATAL:
142  	        {
143  	          /* Error occurred on a QP and it transitioned to error state */
144  	          ibv_qp* ib_qp = async_event.element.qp;
145  	          uint32_t qpn = ib_qp->qp_num;
146  	          QueuePair* qp = get_qp(qpn);
147  	          lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
148  	                     << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
149  	                     << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
150  	        }
151  	        break;
152  	      case IBV_EVENT_QP_LAST_WQE_REACHED:
153  	        {
154  	          /*
155  	           * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
156  	           *    Reason: QP is force switched into Error before posting Beacon WR.
157  	           *            The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
158  	           *            For SRQ, only WRs on the QP which is switched into Error status will be flushed.
159  	           *    Handle: Only confirm that qp enter into dead queue pairs
160  	           * 2. The CQE with error was generated for the last WQE
161  	           *    Handle: output error log
162  	           */
163  	          perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
164  	          ibv_qp* ib_qp = async_event.element.qp;
165  	          uint32_t qpn = ib_qp->qp_num;
166  	          std::lock_guard l{lock};
167  	          RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
168  	          QueuePair* qp = get_qp_lockless(qpn);
169  	
170  	          if (qp && !qp->is_dead()) {
171  	            lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
172  	                       << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
173  	                       << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
174  	          }
175  	          if (!conn) {
176  	            ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
177  	                           << " qp number: " << qpn << dendl;
178  	          } else {
179  	             conn->fault();
180  	             if (qp) {
181  	                if (!cct->_conf->ms_async_rdma_cm)
182  	                enqueue_dead_qp(qpn);
183  	             }
184  	          }
185  	        }
186  	        break;
187  	      case IBV_EVENT_QP_REQ_ERR:
188  	        /* Invalid Request Local Work Queue Error */
189  	        [[fallthrough]];
190  	      case IBV_EVENT_QP_ACCESS_ERR:
191  	        /* Local access violation error */
192  	        [[fallthrough]];
193  	      case IBV_EVENT_COMM_EST:
194  	        /* Communication was established on a QP */
195  	        [[fallthrough]];
196  	      case IBV_EVENT_SQ_DRAINED:
197  	        /* Send Queue was drained of outstanding messages in progress */
198  	        [[fallthrough]];
199  	      case IBV_EVENT_PATH_MIG:
200  	        /* A connection has migrated to the alternate path */
201  	        [[fallthrough]];
202  	      case IBV_EVENT_PATH_MIG_ERR:
203  	        /* A connection failed to migrate to the alternate path */
204  	        break;
205  	      /***********************SRQ events*******************/
206  	      case IBV_EVENT_SRQ_ERR:
207  	        /* Error occurred on an SRQ */
208  	        [[fallthrough]];
209  	      case IBV_EVENT_SRQ_LIMIT_REACHED:
210  	        /* SRQ limit was reached */
211  	        break;
212  	      /***********************Port events******************/
213  	      case IBV_EVENT_PORT_ACTIVE:
214  	        /* Link became active on a port */
215  	        [[fallthrough]];
216  	      case IBV_EVENT_PORT_ERR:
217  	        /* Link became unavailable on a port */
218  	        [[fallthrough]];
219  	      case IBV_EVENT_LID_CHANGE:
220  	        /* LID was changed on a port */
221  	        [[fallthrough]];
222  	      case IBV_EVENT_PKEY_CHANGE:
223  	        /* P_Key table was changed on a port */
224  	        [[fallthrough]];
225  	      case IBV_EVENT_SM_CHANGE:
226  	        /* SM was changed on a port */
227  	        [[fallthrough]];
228  	      case IBV_EVENT_CLIENT_REREGISTER:
229  	        /* SM sent a CLIENT_REREGISTER request to a port */
230  	        [[fallthrough]];
231  	      case IBV_EVENT_GID_CHANGE:
232  	        /* GID table was changed on a port */
233  	        break;
234  	
235  	      /***********************CA events******************/
236  	      //CA events:
237  	      case IBV_EVENT_DEVICE_FATAL:
238  	        /* CA is in FATAL state */
239  	        lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
240  	                   << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
241  	        break;
242  	      default:
243  	        lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
244  	                   << " unknown event: " << async_event.event_type << dendl;
245  	        break;
246  	    }
247  	    ibv_ack_async_event(&async_event);
248  	  }
249  	}
250  	
251  	void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
252  	{
253  	  std::lock_guard l{lock};
254  	  ib->post_chunk_to_pool(chunk);
255  	  perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
256  	}
257  	
258  	int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
259  	{
260  	  std::lock_guard l{lock};
261  	  return ib->post_chunks_to_rq(num, qp);
262  	}
263  	
264  	void RDMADispatcher::polling()
265  	{
266  	  static int MAX_COMPLETIONS = 32;
267  	  ibv_wc wc[MAX_COMPLETIONS];
268  	
269  	  std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
270  	  std::vector<ibv_wc> tx_cqe;
271  	  ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
272  	  uint64_t last_inactive = Cycles::rdtsc();
273  	  bool rearmed = false;
274  	  int r = 0;
275  	
276  	  while (true) {
277  	    int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
278  	    if (tx_ret > 0) {
279  	      ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
280  	                     << " responses."<< dendl;
281  	      handle_tx_event(wc, tx_ret);
282  	    }
283  	
284  	    int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
285  	    if (rx_ret > 0) {
286  	      ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
287  	                     << " responses."<< dendl;
288  	      handle_rx_event(wc, rx_ret);
289  	    }
290  	
291  	    if (!tx_ret && !rx_ret) {
292  	      perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
293  	      //
294  	      // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
295  	      // we can destroy QPs even earlier, just when beacon has been received,
296  	      // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
297  	      // CQ before other WCs are fully consumed from rx CQ. For safety, we
298  	      // wait for beacon and then "no-events" from CQs.
299  	      //
300  	      // Calling size() on vector without locks is totally fine, since we
301  	      // use it as a hint (accuracy is not important here)
302  	      //
303  	      if (!dead_queue_pairs.empty()) {
304  	        decltype(dead_queue_pairs) dead_qps;
305  	        {
306  	          std::lock_guard l{lock};
307  	          dead_queue_pairs.swap(dead_qps);
308  	        }
309  	
310  	        for (auto& qp: dead_qps) {
311  	          perf_logger->dec(l_msgr_rdma_active_queue_pair);
312  	          ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
313  	          delete qp;
314  	        }
315  	      }
316  	
317  	      if (!num_qp_conn && done && dead_queue_pairs.empty())
318  	        break;
319  	
320  	      uint64_t now = Cycles::rdtsc();
321  	      if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) {
322  	        handle_async_event();
323  	        if (!rearmed) {
324  	          // Clean up cq events after rearm notify ensure no new incoming event
325  	          // arrived between polling and rearm
326  	          tx_cq->rearm_notify();
327  	          rx_cq->rearm_notify();
328  	          rearmed = true;
329  	          continue;
330  	        }
331  	
332  	        struct pollfd channel_poll[2];
333  	        channel_poll[0].fd = tx_cc->get_fd();
334  	        channel_poll[0].events = POLLIN;
335  	        channel_poll[0].revents = 0;
336  	        channel_poll[1].fd = rx_cc->get_fd();
337  	        channel_poll[1].events = POLLIN;
338  	        channel_poll[1].revents = 0;
339  	        r = 0;
340  	        perf_logger->set(l_msgr_rdma_polling, 0);
341  	        while (!done && r == 0) {
342  	          r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100));
343  	          if (r < 0) {
344  	            r = -errno;
345  	            lderr(cct) << __func__ << " poll failed " << r << dendl;
346  	            ceph_abort();
347  	          }
348  	        }
349  	        if (r > 0 && tx_cc->get_cq_event())
350  	          ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
351  	        if (r > 0 && rx_cc->get_cq_event())
352  	          ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
353  	        last_inactive = Cycles::rdtsc();
354  	        perf_logger->set(l_msgr_rdma_polling, 1);
355  	        rearmed = false;
356  	      }
357  	    }
358  	  }
359  	}
360  	
361  	void RDMADispatcher::notify_pending_workers() {
362  	  if (num_pending_workers) {
363  	    RDMAWorker *w = nullptr;
364  	    {
365  	      std::lock_guard l{w_lock};
366  	      if (!pending_workers.empty()) {
367  	        w = pending_workers.front();
368  	        pending_workers.pop_front();
369  	        --num_pending_workers;
370  	      }
371  	    }
372  	    if (w)
373  	      w->notify_worker();
374  	  }
375  	}
376  	
377  	void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
378  	{
379  	  std::lock_guard l{lock};
380  	  ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
381  	  qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
382  	  ++num_qp_conn;
383  	}
384  	
385  	RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
386  	{
387  	  auto it = qp_conns.find(qp);
388  	  if (it == qp_conns.end())
389  	    return nullptr;
390  	  if (it->second.first->is_dead())
391  	    return nullptr;
392  	  return it->second.second;
393  	}
394  	
395  	Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
396  	{
397  	  // Try to find the QP in qp_conns firstly.
398  	  auto it = qp_conns.find(qp);
399  	  if (it != qp_conns.end())
400  	    return it->second.first;
401  	
402  	  // Try again in dead_queue_pairs.
403  	  for (auto &i: dead_queue_pairs)
404  	    if (i->get_local_qp_number() == qp)
405  	      return i;
406  	
407  	  return nullptr;
408  	}
409  	
410  	Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
411  	{
412  	  std::lock_guard l{lock};
413  	  return get_qp_lockless(qp);
414  	}
415  	
416  	void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
417  	{
418  	  std::lock_guard l{lock};
419  	  auto it = qp_conns.find(qpn);
420  	  if (it == qp_conns.end()) {
421  	    lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
422  	    return ;
423  	  }
424  	  QueuePair *qp = it->second.first;
425  	  dead_queue_pairs.push_back(qp);
426  	  qp_conns.erase(it);
427  	  --num_qp_conn;
428  	}
429  	
430  	void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
431  	{
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
432  	  std::lock_guard l{lock};
433  	  auto it = qp_conns.find(qpn);
434  	  if (it == qp_conns.end()) {
435  	    lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
436  	    return;
437  	  }
438  	  QueuePair *qp = it->second.first;
439  	  if (qp->to_dead()) {
440  	    //
441  	    // Failed to switch to dead. This is abnormal, but we can't
442  	    // do anything, so just destroy QP.
443  	    //
444  	    dead_queue_pairs.push_back(qp);
445  	    qp_conns.erase(it);
446  	    --num_qp_conn;
447  	  } else {
448  	    //
449  	    // Successfully switched to dead, thus keep entry in the map.
450  	    // But only zero out socked pointer in order to return null from
451  	    // get_conn_lockless();
452  	    it->second.second = nullptr;
453  	  }
454  	}
455  	
456  	void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
457  	{
458  	  std::vector<Chunk*> tx_chunks;
459  	
460  	  for (int i = 0; i < n; ++i) {
461  	    ibv_wc* response = &cqe[i];
462  	
463  	    // If it's beacon WR, enqueue the QP to be destroyed later
464  	    if (response->wr_id == BEACON_WRID) {
465  	      enqueue_dead_qp(response->qp_num);
466  	      continue;
467  	    }
468  	
469  	    ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
470  	                   << " status: " << ib->wc_status_to_string(response->status) << dendl;
471  	
472  	    if (response->status != IBV_WC_SUCCESS) {
473  	      switch(response->status) {
474  	        case IBV_WC_RETRY_EXC_ERR:
475  	          {
476  	            perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
477  	
478  	            ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
479  	                          << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
480  	                          << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
481  	                          << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
482  	
483  	            std::lock_guard l{lock};
484  	            RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
485  	            if (conn) {
486  	              ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
487  	                            << conn->get_peer_qpn() << dendl;
488  	            }
489  	          }
490  	          break;
491  	        case IBV_WC_WR_FLUSH_ERR:
492  	          {
493  	            perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
494  	
495  	            std::lock_guard l{lock};
496  	            QueuePair *qp = get_qp_lockless(response->qp_num);
497  	            if (qp) {
498  	              ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
499  	            }
500  	            if (qp && qp->is_dead()) {
501  	              ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
502  	            } else {
503  	              lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
504  	                         << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
505  	                         << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
506  	                         << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
507  	
508  	              RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
509  	              if (conn) {
510  	                ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
511  	                              << conn->get_peer_qpn() << dendl;
512  	              }
513  	            }
514  	          }
515  	          break;
516  	
517  	        default:
518  	          {
519  	            lderr(cct) << __func__ << " SQ WR return error,"
520  	                       << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
521  	                       << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
522  	                       << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
523  	
524  	            std::lock_guard l{lock};
525  	            RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
526  	            if (conn && conn->is_connected()) {
527  	              ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
528  	                             << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
529  	              conn->fault();
530  	            } else {
531  	              ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
532  	            }
533  	          }
534  	          break;
535  	      }
536  	    }
537  	
538  	    auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
539  	    //TX completion may come either from
540  	    // 1) regular send message, WCE wr_id points to chunk
541  	    // 2) 'fin' message, wr_id points to the QP
542  	    if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
543  	      tx_chunks.push_back(chunk);
544  	    } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
545  	      ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
546  	    } else {
547  	      ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
548  	      ceph_abort();
549  	    }
550  	  }
551  	
552  	  perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
553  	  post_tx_buffer(tx_chunks);
554  	}
555  	
556  	/**
557  	 * Add the given Chunks to the given free queue.
558  	 *
559  	 * \param[in] chunks
560  	 *      The Chunks to enqueue.
561  	 * \return
562  	 *      0 if success or -1 for failure
563  	 */
564  	void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
565  	{
566  	  if (chunks.empty())
567  	    return ;
568  	
569  	  inflight -= chunks.size();
570  	  ib->get_memory_manager()->return_tx(chunks);
571  	  ldout(cct, 30) << __func__ << " release " << chunks.size()
572  	                 << " chunks, inflight " << inflight << dendl;
573  	  notify_pending_workers();
574  	}
575  	
576  	void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
577  	{
578  	  perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
579  	  perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
580  	
581  	  std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
582  	  std::lock_guard l{lock};//make sure connected socket alive when pass wc
583  	
584  	  for (int i = 0; i < rx_number; ++i) {
585  	    ibv_wc* response = &cqe[i];
586  	    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
587  	    RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
588  	    QueuePair *qp = get_qp_lockless(response->qp_num);
589  	
590  	    switch (response->status) {
591  	      case IBV_WC_SUCCESS:
592  	        ceph_assert(response->opcode == IBV_WC_RECV);
593  	        if (!conn) {
594  	          ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
595  	                        << std::hex << chunk << " will be back." << std::dec << dendl;
596  	          ib->post_chunk_to_pool(chunk);
597  	          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
598  	        } else {
599  	          conn->post_chunks_to_rq(1);
600  	          polled[conn].push_back(*response);
601  	
602  	          if (qp != nullptr && !qp->get_srq()) {
603  	            qp->remove_rq_wr(chunk);
604  	            chunk->clear_qp();
605  	          }
606  	        }
607  	        break;
608  	
609  	      case IBV_WC_WR_FLUSH_ERR:
610  	        perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
611  	
612  	        if (qp) {
613  	          ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
614  	        }
615  	        if (qp && qp->is_dead()) {
616  	          ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
617  	        } else {
618  	          ldout(cct, 1) << __func__ << " RQ WR return error,"
619  	                     << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
620  	                     << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
621  	                     << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
622  	          if (conn) {
623  	            ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
624  	                       << conn->get_peer_qpn() << dendl;
625  	          }
626  	        }
627  	
628  	        ib->post_chunk_to_pool(chunk);
629  	        perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
630  	        break;
631  	
632  	      default:
633  	        perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
634  	
635  	        ldout(cct, 1) << __func__ << " RQ WR return error,"
636  	                      << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
637  	                      << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
638  	                      << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
639  	        if (conn && conn->is_connected())
640  	          conn->fault();
641  	
642  	        ib->post_chunk_to_pool(chunk);
643  	        perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
644  	        break;
645  	    }
646  	  }
647  	
648  	  for (auto &i : polled)
649  	    i.first->pass_wc(std::move(i.second));
650  	  polled.clear();
651  	}
652  	
653  	RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
654  	  : Worker(c, worker_id),
655  	    tx_handler(new C_handle_cq_tx(this))
656  	{
657  	  // initialize perf_logger
658  	  char name[128];
659  	  sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
660  	  PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
661  	
662  	  plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
663  	  plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
664  	  plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
665  	
666  	  plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
667  	  plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
668  	  plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
669  	  plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
670  	  plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
671  	
672  	  perf_logger = plb.create_perf_counters();
673  	  cct->get_perfcounters_collection()->add(perf_logger);
674  	}
675  	
676  	RDMAWorker::~RDMAWorker()
677  	{
678  	  delete tx_handler;
679  	}
680  	
681  	void RDMAWorker::initialize()
682  	{
683  	  ceph_assert(dispatcher);
684  	}
685  	
686  	int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
687  			       const SocketOptions &opt,ServerSocket *sock)
688  	{
689  	  ib->init();
690  	  dispatcher->polling_start();
691  	
692  	  RDMAServerSocketImpl *p;
693  	  if (cct->_conf->ms_async_rdma_type == "iwarp") {
694  	    p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
695  	  } else {
696  	    p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
697  	  }
698  	  int r = p->listen(sa, opt);
699  	  if (r < 0) {
700  	    delete p;
701  	    return r;
702  	  }
703  	
704  	  *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
705  	  return 0;
706  	}
707  	
708  	int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
709  	{
710  	  ib->init();
711  	  dispatcher->polling_start();
712  	
713  	  RDMAConnectedSocketImpl* p;
714  	  if (cct->_conf->ms_async_rdma_type == "iwarp") {
715  	    p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
716  	  } else {
717  	    p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
718  	  }
719  	  int r = p->try_connect(addr, opts);
720  	
721  	  if (r < 0) {
722  	    ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
723  	    delete p;
724  	    return r;
725  	  }
726  	  std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
727  	  *socket = ConnectedSocket(std::move(csi));
728  	  return 0;
729  	}
730  	
731  	int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
732  	{
733  	  ceph_assert(center.in_thread());
734  	  int r = ib->get_tx_buffers(c, bytes);
735  	  size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
736  	  ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
737  	  dispatcher->inflight += r;
738  	  if (got >= bytes)
739  	    return r;
740  	
741  	  if (o) {
742  	    if (!o->is_pending()) {
743  	      pending_sent_conns.push_back(o);
744  	      perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
745  	      o->set_pending(1);
746  	    }
747  	    dispatcher->make_pending_worker(this);
748  	  }
749  	  return r;
750  	}
751  	
752  	
753  	void RDMAWorker::handle_pending_message()
754  	{
755  	  ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
756  	  while (!pending_sent_conns.empty()) {
757  	    RDMAConnectedSocketImpl *o = pending_sent_conns.front();
758  	    pending_sent_conns.pop_front();
759  	    ssize_t r = o->submit(false);
760  	    ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
761  	    if (r < 0) {
762  	      if (r == -EAGAIN) {
763  	        pending_sent_conns.push_back(o);
764  	        dispatcher->make_pending_worker(this);
765  	        return ;
766  	      }
767  	      o->fault();
768  	    }
769  	    o->set_pending(0);
770  	    perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
771  	  }
772  	  dispatcher->notify_pending_workers();
773  	}
774  	
775  	RDMAStack::RDMAStack(CephContext *cct, const string &t)
776  	  : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
777  	    rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
778  	{
779  	  ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
780  	
781  	  unsigned num = get_num_worker();
782  	  for (unsigned i = 0; i < num; ++i) {
783  	    RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
784  	    w->set_dispatcher(rdma_dispatcher);
785  	    w->set_ib(ib);
786  	  }
787  	  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
788  	}
789  	
790  	RDMAStack::~RDMAStack()
791  	{
792  	  if (cct->_conf->ms_async_rdma_enable_hugepage) {
793  	    unsetenv("RDMAV_HUGEPAGES_SAFE");	//remove env variable on destruction
794  	  }
795  	}
796  	
797  	void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
798  	{
799  	  threads.resize(i+1);
800  	  threads[i] = std::thread(func);
801  	}
802  	
803  	void RDMAStack::join_worker(unsigned i)
804  	{
805  	  ceph_assert(threads.size() > i && threads[i].joinable());
806  	  threads[i].join();
807  	}
808