1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	#include "RDMAStack.h"
17   	
18   	#define dout_subsys ceph_subsys_ms
19   	#undef dout_prefix
20   	#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
21   	
22   	RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
23   	                                                 shared_ptr<RDMADispatcher>& rdma_dispatcher,
24   	                                                 RDMAWorker *w)
25   	  : cct(cct), connected(0), error(0), ib(ib),
26   	    dispatcher(rdma_dispatcher), worker(w),
27   	    is_server(false), con_handler(new C_handle_connection(this)),
28   	    active(false), pending(false)
29   	{
30   	  if (!cct->_conf->ms_async_rdma_cm) {
31   	    qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
32   	    local_qpn = qp->get_local_qp_number();
33   	    notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
34   	    dispatcher->register_qp(qp, this);
35   	    dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
36   	    dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
37   	  }
38   	}
39   	
40   	RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
41   	{
42   	  ldout(cct, 20) << __func__ << " destruct." << dendl;
43   	  cleanup();
44   	  worker->remove_pending_conn(this);
45   	  dispatcher->schedule_qp_destroy(local_qpn);
46   	
47   	  for (unsigned i=0; i < wc.size(); ++i) {
48   	    dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
49   	  }
50   	  for (unsigned i=0; i < buffers.size(); ++i) {
51   	    dispatcher->post_chunk_to_pool(buffers[i]);
52   	  }
53   	
54   	  std::lock_guard l{lock};
55   	  if (notify_fd >= 0)
56   	    ::close(notify_fd);
57   	  if (tcp_fd >= 0)
58   	    ::close(tcp_fd);
59   	  error = ECONNRESET;
60   	}
61   	
62   	void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
63   	{
64   	  std::lock_guard l{lock};
65   	  if (wc.empty())
66   	    wc = std::move(v);
67   	  else
68   	    wc.insert(wc.end(), v.begin(), v.end());
69   	  notify();
70   	}
71   	
72   	void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
73   	{
74   	  std::lock_guard l{lock};
75   	  if (wc.empty())
76   	    return ;
77   	  w.swap(wc);
78   	}
79   	
80   	int RDMAConnectedSocketImpl::activate()
81   	{
82   	  qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
83   	  if (qp->modify_qp_to_rtr() != 0)
84   	    return -1;
85   	
86   	  if (qp->modify_qp_to_rts() != 0)
87   	    return -1;
88   	
89   	  if (!is_server) {
90   	    connected = 1; //indicate successfully
91   	    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
92   	    submit(false);
93   	  }
94   	  active = true;
95   	  peer_qpn = qp->get_local_cm_meta().peer_qpn;
96   	
97   	  return 0;
98   	}
99   	
100  	int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
101  	  ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
102  	                 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
103  	  NetHandler net(cct);
104  	  tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
105  	
106  	  if (tcp_fd < 0) {
107  	    return -errno;
108  	  }
109  	
110  	  int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
111  	  if (r < 0) {
112  	    ::close(tcp_fd);
113  	    tcp_fd = -1;
114  	    return -errno;
115  	  }
116  	
117  	  ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
118  	  net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
119  	  qp->get_local_cm_meta().peer_qpn = 0;
120  	  r = qp->send_cm_meta(cct, tcp_fd);
121  	  if (r < 0)
122  	    return r;
123  	
124  	  worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
125  	  return 0;
126  	}
127  	
128  	void RDMAConnectedSocketImpl::handle_connection() {
129  	  ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
130  	  int r = qp->recv_cm_meta(cct, tcp_fd);
131  	  if (r <= 0) {
132  	    if (r != -EAGAIN) {
133  	      dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
134  	      ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
135  	      fault();
136  	    }
137  	    return;
138  	  }
139  	
140  	  if (1 == connected) {
141  	    ldout(cct, 1) << __func__ << " warnning: logic failed: read len: " << r << dendl;
142  	    fault();
143  	    return;
144  	  }
145  	
146  	  if (!is_server) {// first time: cm meta sync + ack from server
147  	    if (!connected) {
148  	      r = activate();
149  	      ceph_assert(!r);
150  	    }
151  	    notify();
152  	    r = qp->send_cm_meta(cct, tcp_fd);
153  	    if (r < 0) {
154  	      ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
155  	      dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
156  	      fault();
157  	    }
158  	  } else {
159  	    if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
160  	      if (active) {
161  	        ldout(cct, 10) << __func__ << " server is already active." << dendl;
162  	        return ;
163  	      }
164  	      r = activate();
165  	      ceph_assert(!r);
166  	      r = qp->send_cm_meta(cct, tcp_fd);
167  	      if (r < 0) {
168  	        ldout(cct, 1) << __func__ << " server ack failed." << dendl;
169  	        dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
170  	        fault();
171  	        return ;
172  	      }
173  	    } else { // second time: cm meta ack from client
174  	      connected = 1;
175  	      ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
176  	      //cleanup();
177  	      submit(false);
178  	      notify();
179  	    }
180  	  }
181  	}
182  	
183  	ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
184  	{
185  	  eventfd_t event_val = 0;
186  	  int r = eventfd_read(notify_fd, &event_val);
187  	  ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
188  	                 << " r = " << r << dendl;
189  	
190  	  if (!active) {
191  	    ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
192  	    return -EAGAIN;
193  	  }
194  	
195  	  if (0 == connected) {
196  	    ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
197  	    return -EAGAIN;
198  	  }
199  	  ssize_t read = 0;
200  	  read = read_buffers(buf,len);
201  	
202  	  if (is_server && connected == 0) {
203  	    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
204  	    connected = 1; //if so, we don't need the last handshake
205  	    cleanup();
206  	    submit(false);
207  	  }
208  	
209  	  if (!buffers.empty()) {
210  	    notify();
211  	  }
212  	
213  	  if (read == 0 && error)
214  	    return -error;
215  	  return read == 0 ? -EAGAIN : read;
216  	}
217  	
218  	void RDMAConnectedSocketImpl::buffer_prefetch(void)
219  	{
220  	  std::vector<ibv_wc> cqe;
221  	  get_wc(cqe);
222  	  if(cqe.empty())
223  	    return;
224  	
225  	  for(size_t i = 0; i < cqe.size(); ++i) {
226  	    ibv_wc* response = &cqe[i];
227  	    ceph_assert(response->status == IBV_WC_SUCCESS);
228  	    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
229  	    chunk->prepare_read(response->byte_len);
230  	
231  	    if (chunk->get_size() == 0) {
232  	      chunk->reset_read_chunk();
233  	      dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
234  	      if (connected) {
235  	        error = ECONNRESET;
236  	        ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
237  	      }
238  	      dispatcher->post_chunk_to_pool(chunk);
239  	      continue;
240  	    } else {
241  	      buffers.push_back(chunk);
242  	      ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
243  	    }
244  	  }
245  	  worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
246  	}
247  	
248  	ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
249  	{
250  	  size_t read_size = 0, tmp = 0;
251  	  buffer_prefetch();
252  	  auto pchunk = buffers.begin();
253  	  while (pchunk != buffers.end()) {
254  	    tmp = (*pchunk)->read(buf + read_size, len - read_size);
255  	    read_size += tmp;
256  	    ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
257  	                   << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
258  	
259  	    if ((*pchunk)->get_size() == 0) {
260  	      (*pchunk)->reset_read_chunk();
261  	      dispatcher->post_chunk_to_pool(*pchunk);
262  	      update_post_backlog();
263  	      ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
264  	      pchunk++;
265  	    }
266  	
267  	    if (read_size == len) {
268  	      break;
269  	    }
270  	  }
271  	
272  	  buffers.erase(buffers.begin(), pchunk);
273  	  ldout(cct, 25) << __func__ << " got " << read_size  << " bytes, buffers size: " << buffers.size() << dendl;
274  	  worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
275  	  return read_size;
276  	}
277  	
278  	ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
279  	{
(1) Event cond_false: Condition "this->error", taking false branch.
280  	  if (error)
(2) Event if_end: End of if statement.
281  	    return -error;
282  	  static const int MAX_COMPLETIONS = 16;
283  	  ibv_wc wc[MAX_COMPLETIONS];
284  	  ssize_t size = 0;
285  	
286  	  ibv_wc*  response;
287  	  Chunk* chunk;
288  	  bool loaded = false;
289  	  auto iter = buffers.begin();
(3) Event cond_true: Condition "iter != this->buffers.end()", taking true branch.
290  	  if (iter != buffers.end()) {
291  	    chunk = *iter;
292  	    // FIXME need to handle release
293  	    // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
(4) Event erase_iterator: "erase" invalidates iterator "iter".
Also see events: [use_iterator]
294  	    buffers.erase(iter);
295  	    loaded = true;
296  	    size = chunk->bound;
297  	  }
298  	
299  	  std::vector<ibv_wc> cqe;
300  	  get_wc(cqe);
(5) Event cond_false: Condition "cqe.empty()", taking false branch.
301  	  if (cqe.empty())
(6) Event if_end: End of if statement.
302  	    return size == 0 ? -EAGAIN : size;
303  	
(7) Event cond_true: Condition "should_gather", taking true branch.
304  	  ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
305  	
(8) Event cond_true: Condition "i < cqe.size()", taking true branch.
306  	  for (size_t i = 0; i < cqe.size(); ++i) {
307  	    response = &wc[i];
308  	    chunk = reinterpret_cast<Chunk*>(response->wr_id);
309  	    chunk->prepare_read(response->byte_len);
(9) Event cond_false: Condition "!loaded", taking false branch.
310  	    if (!loaded && i == 0) {
311  	      // FIXME need to handle release
312  	      // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
313  	      size = chunk->bound;
314  	      continue;
(10) Event if_end: End of if statement.
315  	    }
316  	    buffers.push_back(chunk);
(11) Event use_iterator: Using invalid iterator "iter".
Also see events: [erase_iterator]
317  	    iter++;
318  	  }
319  	
320  	  if (size == 0)
321  	    return -EAGAIN;
322  	  return size;
323  	}
324  	
325  	ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
326  	{
327  	  if (error) {
328  	    if (!active)
329  	      return -EPIPE;
330  	    return -error;
331  	  }
332  	  size_t bytes = bl.length();
333  	  if (!bytes)
334  	    return 0;
335  	  {
336  	    std::lock_guard l{lock};
337  	    pending_bl.claim_append(bl);
338  	    if (!connected) {
339  	      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
340  	      return bytes;
341  	    }
342  	  }
343  	  ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
344  	  ssize_t r = submit(more);
345  	  if (r < 0 && r != -EAGAIN)
346  	    return r;
347  	  return bytes;
348  	}
349  	
350  	size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
351  	    size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
352  	    const decltype(std::cbegin(pending_bl.buffers()))& end)
353  	{
354  	  ceph_assert(start != end);
355  	  auto chunk_idx = tx_buffers.size();
356  	  if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
357  	    ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
358  	    worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
359  	    return 0;
360  	  }
361  	
362  	  Chunk *current_chunk = tx_buffers[chunk_idx];
363  	  size_t write_len = 0;
364  	  while (start != end) {
365  	    const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
366  	
367  	    size_t slice_write_len = 0;
368  	    while (slice_write_len < start->length()) {
369  	      size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
370  	
371  	      slice_write_len += real_len;
372  	      write_len += real_len;
373  	      req_copy_len -= real_len;
374  	
375  	      if (current_chunk->full()) {
376  	        if (++chunk_idx == tx_buffers.size())
377  	          return write_len;
378  	        current_chunk = tx_buffers[chunk_idx];
379  	      }
380  	    }
381  	
382  	    ++start;
383  	  }
384  	  ceph_assert(req_copy_len == 0);
385  	  return write_len;
386  	}
387  	
388  	ssize_t RDMAConnectedSocketImpl::submit(bool more)
389  	{
390  	  if (error)
391  	    return -error;
392  	  std::lock_guard l{lock};
393  	  size_t bytes = pending_bl.length();
394  	  ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
395  	                 << pending_bl.buffers().size() << dendl;
396  	  if (!bytes)
397  	    return 0;
398  	
399  	  std::vector<Chunk*> tx_buffers;
400  	  auto it = std::cbegin(pending_bl.buffers());
401  	  auto copy_start = it;
402  	  size_t total_copied = 0, wait_copy_len = 0;
403  	  while (it != pending_bl.buffers().end()) {
404  	    if (ib->is_tx_buffer(it->raw_c_str())) {
405  	      if (wait_copy_len) {
406  	        size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
407  	        total_copied += copied;
408  	        if (copied < wait_copy_len)
409  	          goto sending;
410  	        wait_copy_len = 0;
411  	      }
412  	      ceph_assert(copy_start == it);
413  	      tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
414  	      total_copied += it->length();
415  	      ++copy_start;
416  	    } else {
417  	      wait_copy_len += it->length();
418  	    }
419  	    ++it;
420  	  }
421  	  if (wait_copy_len)
422  	    total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
423  	
424  	 sending:
425  	  if (total_copied == 0)
426  	    return -EAGAIN;
427  	  ceph_assert(total_copied <= pending_bl.length());
428  	  bufferlist swapped;
429  	  if (total_copied < pending_bl.length()) {
430  	    worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
431  	    pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
432  	    pending_bl.swap(swapped);
433  	  } else {
434  	    pending_bl.clear();
435  	  }
436  	
437  	  ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
438  	                 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
439  	
440  	  int r = post_work_request(tx_buffers);
441  	  if (r < 0)
442  	    return r;
443  	
444  	  ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
445  	  return pending_bl.length() ? -EAGAIN : 0;
446  	}
447  	
448  	int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
449  	{
450  	  ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
451  	  vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
452  	  ibv_sge isge[tx_buffers.size()];
453  	  uint32_t current_sge = 0;
454  	  ibv_send_wr iswr[tx_buffers.size()];
455  	  uint32_t current_swr = 0;
456  	  ibv_send_wr* pre_wr = NULL;
457  	  uint32_t num = 0; 
458  	
459  	  memset(iswr, 0, sizeof(iswr));
460  	  memset(isge, 0, sizeof(isge));
461  	 
462  	  while (current_buffer != tx_buffers.end()) {
463  	    isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
464  	    isge[current_sge].length = (*current_buffer)->get_offset();
465  	    isge[current_sge].lkey = (*current_buffer)->mr->lkey;
466  	    ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length  << dendl;
467  	
468  	    iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
469  	    iswr[current_swr].next = NULL;
470  	    iswr[current_swr].sg_list = &isge[current_sge];
471  	    iswr[current_swr].num_sge = 1;
472  	    iswr[current_swr].opcode = IBV_WR_SEND;
473  	    iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
474  	
475  	    num++;
476  	    worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
477  	    if (pre_wr)
478  	      pre_wr->next = &iswr[current_swr];
479  	    pre_wr = &iswr[current_swr];
480  	    ++current_sge;
481  	    ++current_swr;
482  	    ++current_buffer;
483  	  }
484  	
485  	  ibv_send_wr *bad_tx_work_request = nullptr;
486  	  if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
487  	    ldout(cct, 1) << __func__ << " failed to send data"
488  	                  << " (most probably should be peer not ready): "
489  	                  << cpp_strerror(errno) << dendl;
490  	    worker->perf_logger->inc(l_msgr_rdma_tx_failed);
491  	    return -errno;
492  	  }
493  	  worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
494  	  ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
495  	  return 0;
496  	}
497  	
498  	void RDMAConnectedSocketImpl::fin() {
499  	  ibv_send_wr wr;
500  	  memset(&wr, 0, sizeof(wr));
501  	
502  	  wr.wr_id = reinterpret_cast<uint64_t>(qp);
503  	  wr.num_sge = 0;
504  	  wr.opcode = IBV_WR_SEND;
505  	  wr.send_flags = IBV_SEND_SIGNALED;
506  	  ibv_send_wr* bad_tx_work_request = nullptr;
507  	  if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
508  	    ldout(cct, 1) << __func__ << " failed to send message="
509  	                  << " ibv_post_send failed(most probably should be peer not ready): "
510  	                  << cpp_strerror(errno) << dendl;
511  	    worker->perf_logger->inc(l_msgr_rdma_tx_failed);
512  	    return ;
513  	  }
514  	}
515  	
516  	void RDMAConnectedSocketImpl::cleanup() {
517  	  if (con_handler && tcp_fd >= 0) {
518  	    (static_cast<C_handle_connection*>(con_handler))->close();
519  	    worker->center.submit_to(worker->center.get_id(), [this]() {
520  	      worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
521  	    }, false);
522  	    delete con_handler;
523  	    con_handler = nullptr;
524  	  }
525  	}
526  	
527  	void RDMAConnectedSocketImpl::notify()
528  	{
529  	  eventfd_t event_val = 1;
530  	  int r = eventfd_write(notify_fd, event_val);
531  	  ceph_assert(r == 0);
532  	}
533  	
534  	void RDMAConnectedSocketImpl::shutdown()
535  	{
536  	  if (!error)
537  	    fin();
538  	  error = ECONNRESET;
539  	  active = false;
540  	}
541  	
542  	void RDMAConnectedSocketImpl::close()
543  	{
544  	  if (!error)
545  	    fin();
546  	  error = ECONNRESET;
547  	  active = false;
548  	}
549  	
550  	void RDMAConnectedSocketImpl::fault()
551  	{
552  	  ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
553  	  error = ECONNRESET;
554  	  connected = 1;
555  	  notify();
556  	}
557  	
558  	void RDMAConnectedSocketImpl::set_accept_fd(int sd)
559  	{
560  	  tcp_fd = sd;
561  	  is_server = true;
562  	  worker->center.submit_to(worker->center.get_id(), [this]() {
563  				   worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
564  				   }, true);
565  	}
566  	
567  	void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
568  	{
569  	  post_backlog += num - ib->post_chunks_to_rq(num, qp);
570  	}
571  	
572  	void RDMAConnectedSocketImpl::update_post_backlog()
573  	{
574  	  if (post_backlog)
575  	    post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
576  	}
577