1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <poll.h>
18 #include <errno.h>
19 #include <sys/time.h>
20 #include <sys/resource.h>
21
22 #include "include/str_list.h"
23 #include "include/compat.h"
24 #include "common/Cycles.h"
25 #include "common/deleter.h"
26 #include "common/Tub.h"
27 #include "RDMAStack.h"
28
29 #define dout_subsys ceph_subsys_ms
30 #undef dout_prefix
31 #define dout_prefix *_dout << "RDMAStack "
32
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
33 RDMADispatcher::~RDMADispatcher()
34 {
35 ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
36 polling_stop();
37
38 ceph_assert(qp_conns.empty());
39 ceph_assert(num_qp_conn == 0);
40 ceph_assert(dead_queue_pairs.empty());
41 }
42
43 RDMADispatcher::RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib)
44 : cct(c), ib(ib)
45 {
46 PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
47
48 plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
49 plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
50 plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
51 plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
52
53 plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
54 plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
55 plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
56 plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
57
58 plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
59 plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
60 plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
61
62 plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
63 plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
64
65 plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
66
67
68 plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
69 plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
70
71 perf_logger = plb.create_perf_counters();
72 cct->get_perfcounters_collection()->add(perf_logger);
73 Cycles::init();
74 }
75
76 void RDMADispatcher::polling_start()
77 {
78 // take lock because listen/connect can happen from different worker threads
79 std::lock_guard l{lock};
80
81 if (t.joinable())
82 return; // dispatcher thread already running
83
84 ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
85
86 tx_cc = ib->create_comp_channel(cct);
87 ceph_assert(tx_cc);
88 rx_cc = ib->create_comp_channel(cct);
89 ceph_assert(rx_cc);
90 tx_cq = ib->create_comp_queue(cct, tx_cc);
91 ceph_assert(tx_cq);
92 rx_cq = ib->create_comp_queue(cct, rx_cc);
93 ceph_assert(rx_cq);
94
95 t = std::thread(&RDMADispatcher::polling, this);
96 ceph_pthread_setname(t.native_handle(), "rdma-polling");
97 }
98
99 void RDMADispatcher::polling_stop()
100 {
101 {
(1) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
102 std::lock_guard l{lock};
103 done = true;
104 }
105
106 if (!t.joinable())
107 return;
108
109 t.join();
110
111 tx_cc->ack_events();
112 rx_cc->ack_events();
113 delete tx_cq;
114 delete rx_cq;
115 delete tx_cc;
116 delete rx_cc;
117 }
118
119 void RDMADispatcher::handle_async_event()
120 {
121 ldout(cct, 30) << __func__ << dendl;
122 while (1) {
123 ibv_async_event async_event;
124 if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
125 if (errno != EAGAIN)
126 lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
127 << " " << cpp_strerror(errno) << ")" << dendl;
128 return;
129 }
130 perf_logger->inc(l_msgr_rdma_total_async_events);
131 ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
132
133 switch (async_event.event_type) {
134 /***********************CQ events********************/
135 case IBV_EVENT_CQ_ERR:
136 lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
137 << " CQ Overflow, dev = " << ib->get_device()->ctxt
138 << " Need destroy and recreate resource " << dendl;
139 break;
140 /***********************QP events********************/
141 case IBV_EVENT_QP_FATAL:
142 {
143 /* Error occurred on a QP and it transitioned to error state */
144 ibv_qp* ib_qp = async_event.element.qp;
145 uint32_t qpn = ib_qp->qp_num;
146 QueuePair* qp = get_qp(qpn);
147 lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
148 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
149 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
150 }
151 break;
152 case IBV_EVENT_QP_LAST_WQE_REACHED:
153 {
154 /*
155 * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
156 * Reason: QP is force switched into Error before posting Beacon WR.
157 * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
158 * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
159 * Handle: Only confirm that qp enter into dead queue pairs
160 * 2. The CQE with error was generated for the last WQE
161 * Handle: output error log
162 */
163 perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
164 ibv_qp* ib_qp = async_event.element.qp;
165 uint32_t qpn = ib_qp->qp_num;
166 std::lock_guard l{lock};
167 RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
168 QueuePair* qp = get_qp_lockless(qpn);
169
170 if (qp && !qp->is_dead()) {
171 lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
172 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
173 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
174 }
175 if (!conn) {
176 ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
177 << " qp number: " << qpn << dendl;
178 } else {
179 conn->fault();
180 if (qp) {
181 if (!cct->_conf->ms_async_rdma_cm)
182 enqueue_dead_qp(qpn);
183 }
184 }
185 }
186 break;
187 case IBV_EVENT_QP_REQ_ERR:
188 /* Invalid Request Local Work Queue Error */
189 [[fallthrough]];
190 case IBV_EVENT_QP_ACCESS_ERR:
191 /* Local access violation error */
192 [[fallthrough]];
193 case IBV_EVENT_COMM_EST:
194 /* Communication was established on a QP */
195 [[fallthrough]];
196 case IBV_EVENT_SQ_DRAINED:
197 /* Send Queue was drained of outstanding messages in progress */
198 [[fallthrough]];
199 case IBV_EVENT_PATH_MIG:
200 /* A connection has migrated to the alternate path */
201 [[fallthrough]];
202 case IBV_EVENT_PATH_MIG_ERR:
203 /* A connection failed to migrate to the alternate path */
204 break;
205 /***********************SRQ events*******************/
206 case IBV_EVENT_SRQ_ERR:
207 /* Error occurred on an SRQ */
208 [[fallthrough]];
209 case IBV_EVENT_SRQ_LIMIT_REACHED:
210 /* SRQ limit was reached */
211 break;
212 /***********************Port events******************/
213 case IBV_EVENT_PORT_ACTIVE:
214 /* Link became active on a port */
215 [[fallthrough]];
216 case IBV_EVENT_PORT_ERR:
217 /* Link became unavailable on a port */
218 [[fallthrough]];
219 case IBV_EVENT_LID_CHANGE:
220 /* LID was changed on a port */
221 [[fallthrough]];
222 case IBV_EVENT_PKEY_CHANGE:
223 /* P_Key table was changed on a port */
224 [[fallthrough]];
225 case IBV_EVENT_SM_CHANGE:
226 /* SM was changed on a port */
227 [[fallthrough]];
228 case IBV_EVENT_CLIENT_REREGISTER:
229 /* SM sent a CLIENT_REREGISTER request to a port */
230 [[fallthrough]];
231 case IBV_EVENT_GID_CHANGE:
232 /* GID table was changed on a port */
233 break;
234
235 /***********************CA events******************/
236 //CA events:
237 case IBV_EVENT_DEVICE_FATAL:
238 /* CA is in FATAL state */
239 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
240 << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
241 break;
242 default:
243 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
244 << " unknown event: " << async_event.event_type << dendl;
245 break;
246 }
247 ibv_ack_async_event(&async_event);
248 }
249 }
250
251 void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
252 {
253 std::lock_guard l{lock};
254 ib->post_chunk_to_pool(chunk);
255 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
256 }
257
258 int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
259 {
260 std::lock_guard l{lock};
261 return ib->post_chunks_to_rq(num, qp);
262 }
263
264 void RDMADispatcher::polling()
265 {
266 static int MAX_COMPLETIONS = 32;
267 ibv_wc wc[MAX_COMPLETIONS];
268
269 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
270 std::vector<ibv_wc> tx_cqe;
271 ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
272 uint64_t last_inactive = Cycles::rdtsc();
273 bool rearmed = false;
274 int r = 0;
275
276 while (true) {
277 int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
278 if (tx_ret > 0) {
279 ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
280 << " responses."<< dendl;
281 handle_tx_event(wc, tx_ret);
282 }
283
284 int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
285 if (rx_ret > 0) {
286 ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
287 << " responses."<< dendl;
288 handle_rx_event(wc, rx_ret);
289 }
290
291 if (!tx_ret && !rx_ret) {
292 perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
293 //
294 // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
295 // we can destroy QPs even earlier, just when beacon has been received,
296 // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
297 // CQ before other WCs are fully consumed from rx CQ. For safety, we
298 // wait for beacon and then "no-events" from CQs.
299 //
300 // Calling size() on vector without locks is totally fine, since we
301 // use it as a hint (accuracy is not important here)
302 //
303 if (!dead_queue_pairs.empty()) {
304 decltype(dead_queue_pairs) dead_qps;
305 {
306 std::lock_guard l{lock};
307 dead_queue_pairs.swap(dead_qps);
308 }
309
310 for (auto& qp: dead_qps) {
311 perf_logger->dec(l_msgr_rdma_active_queue_pair);
312 ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
313 delete qp;
314 }
315 }
316
317 if (!num_qp_conn && done && dead_queue_pairs.empty())
318 break;
319
320 uint64_t now = Cycles::rdtsc();
321 if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) {
322 handle_async_event();
323 if (!rearmed) {
324 // Clean up cq events after rearm notify ensure no new incoming event
325 // arrived between polling and rearm
326 tx_cq->rearm_notify();
327 rx_cq->rearm_notify();
328 rearmed = true;
329 continue;
330 }
331
332 struct pollfd channel_poll[2];
333 channel_poll[0].fd = tx_cc->get_fd();
334 channel_poll[0].events = POLLIN;
335 channel_poll[0].revents = 0;
336 channel_poll[1].fd = rx_cc->get_fd();
337 channel_poll[1].events = POLLIN;
338 channel_poll[1].revents = 0;
339 r = 0;
340 perf_logger->set(l_msgr_rdma_polling, 0);
341 while (!done && r == 0) {
342 r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100));
343 if (r < 0) {
344 r = -errno;
345 lderr(cct) << __func__ << " poll failed " << r << dendl;
346 ceph_abort();
347 }
348 }
349 if (r > 0 && tx_cc->get_cq_event())
350 ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
351 if (r > 0 && rx_cc->get_cq_event())
352 ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
353 last_inactive = Cycles::rdtsc();
354 perf_logger->set(l_msgr_rdma_polling, 1);
355 rearmed = false;
356 }
357 }
358 }
359 }
360
361 void RDMADispatcher::notify_pending_workers() {
362 if (num_pending_workers) {
363 RDMAWorker *w = nullptr;
364 {
365 std::lock_guard l{w_lock};
366 if (!pending_workers.empty()) {
367 w = pending_workers.front();
368 pending_workers.pop_front();
369 --num_pending_workers;
370 }
371 }
372 if (w)
373 w->notify_worker();
374 }
375 }
376
377 void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
378 {
379 std::lock_guard l{lock};
380 ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
381 qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
382 ++num_qp_conn;
383 }
384
385 RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
386 {
387 auto it = qp_conns.find(qp);
388 if (it == qp_conns.end())
389 return nullptr;
390 if (it->second.first->is_dead())
391 return nullptr;
392 return it->second.second;
393 }
394
395 Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
396 {
397 // Try to find the QP in qp_conns firstly.
398 auto it = qp_conns.find(qp);
399 if (it != qp_conns.end())
400 return it->second.first;
401
402 // Try again in dead_queue_pairs.
403 for (auto &i: dead_queue_pairs)
404 if (i->get_local_qp_number() == qp)
405 return i;
406
407 return nullptr;
408 }
409
410 Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
411 {
412 std::lock_guard l{lock};
413 return get_qp_lockless(qp);
414 }
415
416 void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
417 {
418 std::lock_guard l{lock};
419 auto it = qp_conns.find(qpn);
420 if (it == qp_conns.end()) {
421 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
422 return ;
423 }
424 QueuePair *qp = it->second.first;
425 dead_queue_pairs.push_back(qp);
426 qp_conns.erase(it);
427 --num_qp_conn;
428 }
429
430 void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
431 {
432 std::lock_guard l{lock};
433 auto it = qp_conns.find(qpn);
434 if (it == qp_conns.end()) {
435 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
436 return;
437 }
438 QueuePair *qp = it->second.first;
439 if (qp->to_dead()) {
440 //
441 // Failed to switch to dead. This is abnormal, but we can't
442 // do anything, so just destroy QP.
443 //
444 dead_queue_pairs.push_back(qp);
445 qp_conns.erase(it);
446 --num_qp_conn;
447 } else {
448 //
449 // Successfully switched to dead, thus keep entry in the map.
450 // But only zero out socked pointer in order to return null from
451 // get_conn_lockless();
452 it->second.second = nullptr;
453 }
454 }
455
456 void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
457 {
458 std::vector<Chunk*> tx_chunks;
459
460 for (int i = 0; i < n; ++i) {
461 ibv_wc* response = &cqe[i];
462
463 // If it's beacon WR, enqueue the QP to be destroyed later
464 if (response->wr_id == BEACON_WRID) {
465 enqueue_dead_qp(response->qp_num);
466 continue;
467 }
468
469 ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
470 << " status: " << ib->wc_status_to_string(response->status) << dendl;
471
472 if (response->status != IBV_WC_SUCCESS) {
473 switch(response->status) {
474 case IBV_WC_RETRY_EXC_ERR:
475 {
476 perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
477
478 ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
479 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
480 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
481 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
482
483 std::lock_guard l{lock};
484 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
485 if (conn) {
486 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
487 << conn->get_peer_qpn() << dendl;
488 }
489 }
490 break;
491 case IBV_WC_WR_FLUSH_ERR:
492 {
493 perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
494
495 std::lock_guard l{lock};
496 QueuePair *qp = get_qp_lockless(response->qp_num);
497 if (qp) {
498 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
499 }
500 if (qp && qp->is_dead()) {
501 ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
502 } else {
503 lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
504 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
505 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
506 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
507
508 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
509 if (conn) {
510 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
511 << conn->get_peer_qpn() << dendl;
512 }
513 }
514 }
515 break;
516
517 default:
518 {
519 lderr(cct) << __func__ << " SQ WR return error,"
520 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
521 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
522 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
523
524 std::lock_guard l{lock};
525 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
526 if (conn && conn->is_connected()) {
527 ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
528 << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
529 conn->fault();
530 } else {
531 ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
532 }
533 }
534 break;
535 }
536 }
537
538 auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
539 //TX completion may come either from
540 // 1) regular send message, WCE wr_id points to chunk
541 // 2) 'fin' message, wr_id points to the QP
542 if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
543 tx_chunks.push_back(chunk);
544 } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
545 ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
546 } else {
547 ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
548 ceph_abort();
549 }
550 }
551
552 perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
553 post_tx_buffer(tx_chunks);
554 }
555
556 /**
557 * Add the given Chunks to the given free queue.
558 *
559 * \param[in] chunks
560 * The Chunks to enqueue.
561 * \return
562 * 0 if success or -1 for failure
563 */
564 void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
565 {
566 if (chunks.empty())
567 return ;
568
569 inflight -= chunks.size();
570 ib->get_memory_manager()->return_tx(chunks);
571 ldout(cct, 30) << __func__ << " release " << chunks.size()
572 << " chunks, inflight " << inflight << dendl;
573 notify_pending_workers();
574 }
575
576 void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
577 {
578 perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
579 perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
580
581 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
582 std::lock_guard l{lock};//make sure connected socket alive when pass wc
583
584 for (int i = 0; i < rx_number; ++i) {
585 ibv_wc* response = &cqe[i];
586 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
587 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
588 QueuePair *qp = get_qp_lockless(response->qp_num);
589
590 switch (response->status) {
591 case IBV_WC_SUCCESS:
592 ceph_assert(response->opcode == IBV_WC_RECV);
593 if (!conn) {
594 ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
595 << std::hex << chunk << " will be back." << std::dec << dendl;
596 ib->post_chunk_to_pool(chunk);
597 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
598 } else {
599 conn->post_chunks_to_rq(1);
600 polled[conn].push_back(*response);
601
602 if (qp != nullptr && !qp->get_srq()) {
603 qp->remove_rq_wr(chunk);
604 chunk->clear_qp();
605 }
606 }
607 break;
608
609 case IBV_WC_WR_FLUSH_ERR:
610 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
611
612 if (qp) {
613 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
614 }
615 if (qp && qp->is_dead()) {
616 ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
617 } else {
618 ldout(cct, 1) << __func__ << " RQ WR return error,"
619 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
620 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
621 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
622 if (conn) {
623 ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
624 << conn->get_peer_qpn() << dendl;
625 }
626 }
627
628 ib->post_chunk_to_pool(chunk);
629 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
630 break;
631
632 default:
633 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
634
635 ldout(cct, 1) << __func__ << " RQ WR return error,"
636 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
637 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
638 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
639 if (conn && conn->is_connected())
640 conn->fault();
641
642 ib->post_chunk_to_pool(chunk);
643 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
644 break;
645 }
646 }
647
648 for (auto &i : polled)
649 i.first->pass_wc(std::move(i.second));
650 polled.clear();
651 }
652
653 RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
654 : Worker(c, worker_id),
655 tx_handler(new C_handle_cq_tx(this))
656 {
657 // initialize perf_logger
658 char name[128];
659 sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
660 PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
661
662 plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
663 plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
664 plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
665
666 plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
667 plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
668 plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
669 plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
670 plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
671
672 perf_logger = plb.create_perf_counters();
673 cct->get_perfcounters_collection()->add(perf_logger);
674 }
675
676 RDMAWorker::~RDMAWorker()
677 {
678 delete tx_handler;
679 }
680
681 void RDMAWorker::initialize()
682 {
683 ceph_assert(dispatcher);
684 }
685
686 int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
687 const SocketOptions &opt,ServerSocket *sock)
688 {
689 ib->init();
690 dispatcher->polling_start();
691
692 RDMAServerSocketImpl *p;
693 if (cct->_conf->ms_async_rdma_type == "iwarp") {
694 p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
695 } else {
696 p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
697 }
698 int r = p->listen(sa, opt);
699 if (r < 0) {
700 delete p;
701 return r;
702 }
703
704 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
705 return 0;
706 }
707
708 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
709 {
710 ib->init();
711 dispatcher->polling_start();
712
713 RDMAConnectedSocketImpl* p;
714 if (cct->_conf->ms_async_rdma_type == "iwarp") {
715 p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
716 } else {
717 p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
718 }
719 int r = p->try_connect(addr, opts);
720
721 if (r < 0) {
722 ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
723 delete p;
724 return r;
725 }
726 std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
727 *socket = ConnectedSocket(std::move(csi));
728 return 0;
729 }
730
731 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
732 {
733 ceph_assert(center.in_thread());
734 int r = ib->get_tx_buffers(c, bytes);
735 size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
736 ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
737 dispatcher->inflight += r;
738 if (got >= bytes)
739 return r;
740
741 if (o) {
742 if (!o->is_pending()) {
743 pending_sent_conns.push_back(o);
744 perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
745 o->set_pending(1);
746 }
747 dispatcher->make_pending_worker(this);
748 }
749 return r;
750 }
751
752
753 void RDMAWorker::handle_pending_message()
754 {
755 ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
756 while (!pending_sent_conns.empty()) {
757 RDMAConnectedSocketImpl *o = pending_sent_conns.front();
758 pending_sent_conns.pop_front();
759 ssize_t r = o->submit(false);
760 ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
761 if (r < 0) {
762 if (r == -EAGAIN) {
763 pending_sent_conns.push_back(o);
764 dispatcher->make_pending_worker(this);
765 return ;
766 }
767 o->fault();
768 }
769 o->set_pending(0);
770 perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
771 }
772 dispatcher->notify_pending_workers();
773 }
774
775 RDMAStack::RDMAStack(CephContext *cct, const string &t)
776 : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
777 rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
778 {
779 ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
780
781 unsigned num = get_num_worker();
782 for (unsigned i = 0; i < num; ++i) {
783 RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
784 w->set_dispatcher(rdma_dispatcher);
785 w->set_ib(ib);
786 }
787 ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
788 }
789
790 RDMAStack::~RDMAStack()
791 {
792 if (cct->_conf->ms_async_rdma_enable_hugepage) {
793 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
794 }
795 }
796
797 void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
798 {
799 threads.resize(i+1);
800 threads[i] = std::thread(func);
801 }
802
803 void RDMAStack::join_worker(unsigned i)
804 {
805 ceph_assert(threads.size() > i && threads[i].joinable());
806 threads[i].join();
807 }
808