1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16 #include "RDMAStack.h"
17
18 #define dout_subsys ceph_subsys_ms
19 #undef dout_prefix
20 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
21
22 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
23 shared_ptr<RDMADispatcher>& rdma_dispatcher,
24 RDMAWorker *w)
25 : cct(cct), connected(0), error(0), ib(ib),
26 dispatcher(rdma_dispatcher), worker(w),
27 is_server(false), con_handler(new C_handle_connection(this)),
28 active(false), pending(false)
29 {
30 if (!cct->_conf->ms_async_rdma_cm) {
31 qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
32 local_qpn = qp->get_local_qp_number();
33 notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
34 dispatcher->register_qp(qp, this);
35 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
36 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
37 }
38 }
39
40 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
41 {
42 ldout(cct, 20) << __func__ << " destruct." << dendl;
43 cleanup();
44 worker->remove_pending_conn(this);
45 dispatcher->schedule_qp_destroy(local_qpn);
46
47 for (unsigned i=0; i < wc.size(); ++i) {
48 dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
49 }
50 for (unsigned i=0; i < buffers.size(); ++i) {
51 dispatcher->post_chunk_to_pool(buffers[i]);
52 }
53
54 std::lock_guard l{lock};
55 if (notify_fd >= 0)
56 ::close(notify_fd);
57 if (tcp_fd >= 0)
58 ::close(tcp_fd);
59 error = ECONNRESET;
60 }
61
62 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
63 {
64 std::lock_guard l{lock};
65 if (wc.empty())
66 wc = std::move(v);
67 else
68 wc.insert(wc.end(), v.begin(), v.end());
69 notify();
70 }
71
72 void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
73 {
74 std::lock_guard l{lock};
75 if (wc.empty())
76 return ;
77 w.swap(wc);
78 }
79
80 int RDMAConnectedSocketImpl::activate()
81 {
82 qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
83 if (qp->modify_qp_to_rtr() != 0)
84 return -1;
85
86 if (qp->modify_qp_to_rts() != 0)
87 return -1;
88
89 if (!is_server) {
90 connected = 1; //indicate successfully
91 ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
92 submit(false);
93 }
94 active = true;
95 peer_qpn = qp->get_local_cm_meta().peer_qpn;
96
97 return 0;
98 }
99
100 int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
101 ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
102 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
103 NetHandler net(cct);
104 tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
105
106 if (tcp_fd < 0) {
107 return -errno;
108 }
109
110 int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
111 if (r < 0) {
112 ::close(tcp_fd);
113 tcp_fd = -1;
114 return -errno;
115 }
116
117 ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
118 net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
119 qp->get_local_cm_meta().peer_qpn = 0;
120 r = qp->send_cm_meta(cct, tcp_fd);
121 if (r < 0)
122 return r;
123
124 worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
125 return 0;
126 }
127
128 void RDMAConnectedSocketImpl::handle_connection() {
129 ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
130 int r = qp->recv_cm_meta(cct, tcp_fd);
131 if (r <= 0) {
132 if (r != -EAGAIN) {
133 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
134 ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
135 fault();
136 }
137 return;
138 }
139
140 if (1 == connected) {
141 ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl;
142 fault();
143 return;
144 }
145
146 if (!is_server) {// first time: cm meta sync + ack from server
147 if (!connected) {
148 r = activate();
149 ceph_assert(!r);
150 }
151 notify();
152 r = qp->send_cm_meta(cct, tcp_fd);
153 if (r < 0) {
154 ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
155 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
156 fault();
157 }
158 } else {
159 if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
160 if (active) {
161 ldout(cct, 10) << __func__ << " server is already active." << dendl;
162 return ;
163 }
164 r = activate();
165 ceph_assert(!r);
166 r = qp->send_cm_meta(cct, tcp_fd);
167 if (r < 0) {
168 ldout(cct, 1) << __func__ << " server ack failed." << dendl;
169 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
170 fault();
171 return ;
172 }
173 } else { // second time: cm meta ack from client
174 connected = 1;
175 ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
176 //cleanup();
177 submit(false);
178 notify();
179 }
180 }
181 }
182
183 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
184 {
185 eventfd_t event_val = 0;
186 int r = eventfd_read(notify_fd, &event_val);
187 ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
188 << " r = " << r << dendl;
189
190 if (!active) {
191 ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
192 return -EAGAIN;
193 }
194
195 if (0 == connected) {
196 ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
197 return -EAGAIN;
198 }
199 ssize_t read = 0;
200 read = read_buffers(buf,len);
201
202 if (is_server && connected == 0) {
203 ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
204 connected = 1; //if so, we don't need the last handshake
205 cleanup();
206 submit(false);
207 }
208
209 if (!buffers.empty()) {
210 notify();
211 }
212
213 if (read == 0 && error)
214 return -error;
215 return read == 0 ? -EAGAIN : read;
216 }
217
218 void RDMAConnectedSocketImpl::buffer_prefetch(void)
219 {
220 std::vector<ibv_wc> cqe;
221 get_wc(cqe);
222 if(cqe.empty())
223 return;
224
225 for(size_t i = 0; i < cqe.size(); ++i) {
226 ibv_wc* response = &cqe[i];
227 ceph_assert(response->status == IBV_WC_SUCCESS);
228 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
229 chunk->prepare_read(response->byte_len);
230
231 if (chunk->get_size() == 0) {
232 chunk->reset_read_chunk();
233 dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
234 if (connected) {
235 error = ECONNRESET;
236 ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
237 }
238 dispatcher->post_chunk_to_pool(chunk);
239 continue;
240 } else {
241 buffers.push_back(chunk);
242 ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
243 }
244 }
245 worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
246 }
247
248 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
249 {
250 size_t read_size = 0, tmp = 0;
251 buffer_prefetch();
252 auto pchunk = buffers.begin();
253 while (pchunk != buffers.end()) {
254 tmp = (*pchunk)->read(buf + read_size, len - read_size);
255 read_size += tmp;
256 ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
257 << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
258
259 if ((*pchunk)->get_size() == 0) {
260 (*pchunk)->reset_read_chunk();
261 dispatcher->post_chunk_to_pool(*pchunk);
262 update_post_backlog();
263 ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
264 pchunk++;
265 }
266
267 if (read_size == len) {
268 break;
269 }
270 }
271
272 buffers.erase(buffers.begin(), pchunk);
273 ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
274 worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
275 return read_size;
276 }
277
278 ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
279 {
(1) Event cond_false: |
Condition "this->error", taking false branch. |
280 if (error)
(2) Event if_end: |
End of if statement. |
281 return -error;
282 static const int MAX_COMPLETIONS = 16;
283 ibv_wc wc[MAX_COMPLETIONS];
284 ssize_t size = 0;
285
286 ibv_wc* response;
287 Chunk* chunk;
288 bool loaded = false;
289 auto iter = buffers.begin();
(3) Event cond_true: |
Condition "iter != this->buffers.end()", taking true branch. |
290 if (iter != buffers.end()) {
291 chunk = *iter;
292 // FIXME need to handle release
293 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
(4) Event erase_iterator: |
"erase" invalidates iterator "iter". |
Also see events: |
[use_iterator] |
294 buffers.erase(iter);
295 loaded = true;
296 size = chunk->bound;
297 }
298
299 std::vector<ibv_wc> cqe;
300 get_wc(cqe);
(5) Event cond_false: |
Condition "cqe.empty()", taking false branch. |
301 if (cqe.empty())
(6) Event if_end: |
End of if statement. |
302 return size == 0 ? -EAGAIN : size;
303
(7) Event cond_true: |
Condition "should_gather", taking true branch. |
304 ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
305
(8) Event cond_true: |
Condition "i < cqe.size()", taking true branch. |
306 for (size_t i = 0; i < cqe.size(); ++i) {
307 response = &wc[i];
308 chunk = reinterpret_cast<Chunk*>(response->wr_id);
309 chunk->prepare_read(response->byte_len);
(9) Event cond_false: |
Condition "!loaded", taking false branch. |
310 if (!loaded && i == 0) {
311 // FIXME need to handle release
312 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
313 size = chunk->bound;
314 continue;
(10) Event if_end: |
End of if statement. |
315 }
316 buffers.push_back(chunk);
(11) Event use_iterator: |
Using invalid iterator "iter". |
Also see events: |
[erase_iterator] |
317 iter++;
318 }
319
320 if (size == 0)
321 return -EAGAIN;
322 return size;
323 }
324
325 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
326 {
327 if (error) {
328 if (!active)
329 return -EPIPE;
330 return -error;
331 }
332 size_t bytes = bl.length();
333 if (!bytes)
334 return 0;
335 {
336 std::lock_guard l{lock};
337 pending_bl.claim_append(bl);
338 if (!connected) {
339 ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
340 return bytes;
341 }
342 }
343 ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
344 ssize_t r = submit(more);
345 if (r < 0 && r != -EAGAIN)
346 return r;
347 return bytes;
348 }
349
350 size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
351 size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
352 const decltype(std::cbegin(pending_bl.buffers()))& end)
353 {
354 ceph_assert(start != end);
355 auto chunk_idx = tx_buffers.size();
356 if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
357 ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
358 worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
359 return 0;
360 }
361
362 Chunk *current_chunk = tx_buffers[chunk_idx];
363 size_t write_len = 0;
364 while (start != end) {
365 const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
366
367 size_t slice_write_len = 0;
368 while (slice_write_len < start->length()) {
369 size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
370
371 slice_write_len += real_len;
372 write_len += real_len;
373 req_copy_len -= real_len;
374
375 if (current_chunk->full()) {
376 if (++chunk_idx == tx_buffers.size())
377 return write_len;
378 current_chunk = tx_buffers[chunk_idx];
379 }
380 }
381
382 ++start;
383 }
384 ceph_assert(req_copy_len == 0);
385 return write_len;
386 }
387
388 ssize_t RDMAConnectedSocketImpl::submit(bool more)
389 {
390 if (error)
391 return -error;
392 std::lock_guard l{lock};
393 size_t bytes = pending_bl.length();
394 ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
395 << pending_bl.buffers().size() << dendl;
396 if (!bytes)
397 return 0;
398
399 std::vector<Chunk*> tx_buffers;
400 auto it = std::cbegin(pending_bl.buffers());
401 auto copy_start = it;
402 size_t total_copied = 0, wait_copy_len = 0;
403 while (it != pending_bl.buffers().end()) {
404 if (ib->is_tx_buffer(it->raw_c_str())) {
405 if (wait_copy_len) {
406 size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
407 total_copied += copied;
408 if (copied < wait_copy_len)
409 goto sending;
410 wait_copy_len = 0;
411 }
412 ceph_assert(copy_start == it);
413 tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
414 total_copied += it->length();
415 ++copy_start;
416 } else {
417 wait_copy_len += it->length();
418 }
419 ++it;
420 }
421 if (wait_copy_len)
422 total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
423
424 sending:
425 if (total_copied == 0)
426 return -EAGAIN;
427 ceph_assert(total_copied <= pending_bl.length());
428 bufferlist swapped;
429 if (total_copied < pending_bl.length()) {
430 worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
431 pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
432 pending_bl.swap(swapped);
433 } else {
434 pending_bl.clear();
435 }
436
437 ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
438 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
439
440 int r = post_work_request(tx_buffers);
441 if (r < 0)
442 return r;
443
444 ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
445 return pending_bl.length() ? -EAGAIN : 0;
446 }
447
448 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
449 {
450 ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
451 vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
452 ibv_sge isge[tx_buffers.size()];
453 uint32_t current_sge = 0;
454 ibv_send_wr iswr[tx_buffers.size()];
455 uint32_t current_swr = 0;
456 ibv_send_wr* pre_wr = NULL;
457 uint32_t num = 0;
458
459 memset(iswr, 0, sizeof(iswr));
460 memset(isge, 0, sizeof(isge));
461
462 while (current_buffer != tx_buffers.end()) {
463 isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
464 isge[current_sge].length = (*current_buffer)->get_offset();
465 isge[current_sge].lkey = (*current_buffer)->mr->lkey;
466 ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
467
468 iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
469 iswr[current_swr].next = NULL;
470 iswr[current_swr].sg_list = &isge[current_sge];
471 iswr[current_swr].num_sge = 1;
472 iswr[current_swr].opcode = IBV_WR_SEND;
473 iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
474
475 num++;
476 worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
477 if (pre_wr)
478 pre_wr->next = &iswr[current_swr];
479 pre_wr = &iswr[current_swr];
480 ++current_sge;
481 ++current_swr;
482 ++current_buffer;
483 }
484
485 ibv_send_wr *bad_tx_work_request = nullptr;
486 if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
487 ldout(cct, 1) << __func__ << " failed to send data"
488 << " (most probably should be peer not ready): "
489 << cpp_strerror(errno) << dendl;
490 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
491 return -errno;
492 }
493 worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
494 ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
495 return 0;
496 }
497
498 void RDMAConnectedSocketImpl::fin() {
499 ibv_send_wr wr;
500 memset(&wr, 0, sizeof(wr));
501
502 wr.wr_id = reinterpret_cast<uint64_t>(qp);
503 wr.num_sge = 0;
504 wr.opcode = IBV_WR_SEND;
505 wr.send_flags = IBV_SEND_SIGNALED;
506 ibv_send_wr* bad_tx_work_request = nullptr;
507 if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
508 ldout(cct, 1) << __func__ << " failed to send message="
509 << " ibv_post_send failed(most probably should be peer not ready): "
510 << cpp_strerror(errno) << dendl;
511 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
512 return ;
513 }
514 }
515
516 void RDMAConnectedSocketImpl::cleanup() {
517 if (con_handler && tcp_fd >= 0) {
518 (static_cast<C_handle_connection*>(con_handler))->close();
519 worker->center.submit_to(worker->center.get_id(), [this]() {
520 worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
521 }, false);
522 delete con_handler;
523 con_handler = nullptr;
524 }
525 }
526
527 void RDMAConnectedSocketImpl::notify()
528 {
529 eventfd_t event_val = 1;
530 int r = eventfd_write(notify_fd, event_val);
531 ceph_assert(r == 0);
532 }
533
534 void RDMAConnectedSocketImpl::shutdown()
535 {
536 if (!error)
537 fin();
538 error = ECONNRESET;
539 active = false;
540 }
541
542 void RDMAConnectedSocketImpl::close()
543 {
544 if (!error)
545 fin();
546 error = ECONNRESET;
547 active = false;
548 }
549
550 void RDMAConnectedSocketImpl::fault()
551 {
552 ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
553 error = ECONNRESET;
554 connected = 1;
555 notify();
556 }
557
558 void RDMAConnectedSocketImpl::set_accept_fd(int sd)
559 {
560 tcp_fd = sd;
561 is_server = true;
562 worker->center.submit_to(worker->center.get_id(), [this]() {
563 worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
564 }, true);
565 }
566
567 void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
568 {
569 post_backlog += num - ib->post_chunks_to_rq(num, qp);
570 }
571
572 void RDMAConnectedSocketImpl::update_post_backlog()
573 {
574 if (post_backlog)
575 post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
576 }
577