1 #include "RDMAStack.h"
2
3 #define dout_subsys ceph_subsys_ms
4 #undef dout_prefix
5 #define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl "
6
7 #define TIMEOUT_MS 3000
8 #define RETRY_COUNT 7
9
10 RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
11 shared_ptr<RDMADispatcher>& rdma_dispatcher,
12 RDMAWorker *w, RDMACMInfo *info)
13 : RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this))
14 {
15 status = IDLE;
16 notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
17 if (info) {
18 is_server = true;
19 cm_id = info->cm_id;
20 cm_channel = info->cm_channel;
21 status = RDMA_ID_CREATED;
22 peer_qpn = info->qp_num;
23 if (alloc_resource()) {
24 close_notify();
25 return;
26 }
27 worker->center.submit_to(worker->center.get_id(), [this]() {
28 worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
29 status = CHANNEL_FD_CREATED;
30 }, false);
31 status = RESOURCE_ALLOCATED;
32 qp->get_local_cm_meta().peer_qpn = peer_qpn;
33 qp->get_peer_cm_meta().local_qpn = peer_qpn;
34 } else {
35 is_server = false;
36 cm_channel = rdma_create_event_channel();
37 rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
38 status = RDMA_ID_CREATED;
39 ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
40 }
41 }
42
(1) Event exn_spec_violation: |
An exception of type "std::length_error" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
43 RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() {
(2) Event fun_call_w_exception: |
Called function throws an exception of type "std::length_error". [details] |
Also see events: |
[exn_spec_violation] |
44 ldout(cct, 20) << __func__ << " destruct." << dendl;
45 std::unique_lock l(close_mtx);
46 close_condition.wait(l, [&] { return closed; });
47 if (status >= RDMA_ID_CREATED) {
48 rdma_destroy_id(cm_id);
49 rdma_destroy_event_channel(cm_channel);
50 }
51 }
52
53 int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
54 worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
55 status = CHANNEL_FD_CREATED;
56 if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) {
57 lderr(cct) << __func__ << " failed to resolve addr" << dendl;
58 return -1;
59 }
60 return 0;
61 }
62
63 void RDMAIWARPConnectedSocketImpl::close() {
64 error = ECONNRESET;
65 active = false;
66 if (status >= CONNECTED) {
67 rdma_disconnect(cm_id);
68 }
69 close_notify();
70 }
71
72 void RDMAIWARPConnectedSocketImpl::shutdown() {
73 error = ECONNRESET;
74 active = false;
75 }
76
77 void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
78 struct rdma_cm_event *event;
79 rdma_get_cm_event(cm_channel, &event);
80 ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event)
81 << " (cm id: " << cm_id << ")" << dendl;
82 struct rdma_conn_param cm_params;
83 switch (event->event) {
84 case RDMA_CM_EVENT_ADDR_RESOLVED:
85 status = ADDR_RESOLVED;
86 if (rdma_resolve_route(cm_id, TIMEOUT_MS)) {
87 lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl;
88 notify();
89 }
90 break;
91
92 case RDMA_CM_EVENT_ROUTE_RESOLVED:
93 status = ROUTE_RESOLVED;
94 if (alloc_resource()) {
95 lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl;
96 connected = -ECONNREFUSED;
97 notify();
98 break;
99 }
100
101 memset(&cm_params, 0, sizeof(cm_params));
102 cm_params.retry_count = RETRY_COUNT;
103 cm_params.qp_num = local_qpn;
104 if (rdma_connect(cm_id, &cm_params)) {
105 lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl;
106 connected = -ECONNREFUSED;
107 notify();
108 }
109 break;
110
111 case RDMA_CM_EVENT_ESTABLISHED:
112 ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
113 status = CONNECTED;
114 if (!is_server) {
115 peer_qpn = event->param.conn.qp_num;
116 activate();
117 qp->get_local_cm_meta().peer_qpn = peer_qpn;
118 qp->get_peer_cm_meta().local_qpn = peer_qpn;
119 notify();
120 }
121 break;
122
123 case RDMA_CM_EVENT_ADDR_ERROR:
124 case RDMA_CM_EVENT_ROUTE_ERROR:
125 case RDMA_CM_EVENT_CONNECT_ERROR:
126 case RDMA_CM_EVENT_UNREACHABLE:
127 case RDMA_CM_EVENT_REJECTED:
128 lderr(cct) << __func__ << " rdma connection rejected" << dendl;
129 connected = -ECONNREFUSED;
130 notify();
131 break;
132
133 case RDMA_CM_EVENT_DISCONNECTED:
134 status = DISCONNECTED;
135 close_notify();
136 if (!error) {
137 error = ECONNRESET;
138 notify();
139 }
140 break;
141
142 case RDMA_CM_EVENT_DEVICE_REMOVAL:
143 break;
144
145 default:
146 ceph_abort_msg("unhandled event");
147 break;
148 }
149 rdma_ack_cm_event(event);
150 }
151
152 void RDMAIWARPConnectedSocketImpl::activate() {
153 ldout(cct, 30) << __func__ << dendl;
154 active = true;
155 connected = 1;
156 }
157
158 int RDMAIWARPConnectedSocketImpl::alloc_resource() {
159 ldout(cct, 30) << __func__ << dendl;
160 qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(),
161 dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
162 if (!qp) {
163 return -1;
164 }
165 local_qpn = qp->get_local_qp_number();
166 dispatcher->register_qp(qp, this);
167 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
168 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
169 return 0;
170 }
171
172 void RDMAIWARPConnectedSocketImpl::close_notify() {
173 ldout(cct, 30) << __func__ << dendl;
174 if (status >= CHANNEL_FD_CREATED) {
175 worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE);
176 }
177 std::unique_lock l(close_mtx);
178 if (!closed) {
179 closed = true;
180 close_condition.notify_all();
181 }
182 }
183