1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#include <sys/socket.h>
18   	#include <netinet/tcp.h>
19   	#include <netinet/in.h>
20   	#include <arpa/inet.h>
21   	#include <errno.h>
22   	
23   	#include <algorithm>
24   	
25   	#include "PosixStack.h"
26   	
27   	#include "include/buffer.h"
28   	#include "include/str_list.h"
29   	#include "common/errno.h"
30   	#include "common/strtol.h"
31   	#include "common/dout.h"
32   	#include "msg/Messenger.h"
33   	#include "include/compat.h"
34   	#include "include/sock_compat.h"
35   	
36   	#define dout_subsys ceph_subsys_ms
37   	#undef dout_prefix
38   	#define dout_prefix *_dout << "PosixStack "
39   	
40   	class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
41   	  NetHandler &handler;
42   	  int _fd;
43   	  entity_addr_t sa;
44   	  bool connected;
45   	
46   	 public:
47   	  explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
48   	      : handler(h), _fd(f), sa(sa), connected(connected) {}
49   	
50   	  int is_connected() override {
51   	    if (connected)
52   	      return 1;
53   	
54   	    int r = handler.reconnect(sa, _fd);
55   	    if (r == 0) {
56   	      connected = true;
57   	      return 1;
58   	    } else if (r < 0) {
59   	      return r;
60   	    } else {
61   	      return 0;
62   	    }
63   	  }
64   	
65   	  ssize_t zero_copy_read(bufferptr&) override {
66   	    return -EOPNOTSUPP;
67   	  }
68   	
69   	  ssize_t read(char *buf, size_t len) override {
70   	    ssize_t r = ::read(_fd, buf, len);
71   	    if (r < 0)
72   	      r = -errno;
73   	    return r;
74   	  }
75   	
76   	  // return the sent length
77   	  // < 0 means error occurred
78   	  static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
79   	  {
80   	    size_t sent = 0;
81   	    while (1) {
82   	      MSGR_SIGPIPE_STOPPER;
83   	      ssize_t r;
84   	      r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
85   	      if (r < 0) {
86   	        if (errno == EINTR) {
87   	          continue;
88   	        } else if (errno == EAGAIN) {
89   	          break;
90   	        }
91   	        return -errno;
92   	      }
93   	
94   	      sent += r;
95   	      if (len == sent) break;
96   	
97   	      while (r > 0) {
98   	        if (msg.msg_iov[0].iov_len <= (size_t)r) {
99   	          // drain this whole item
100  	          r -= msg.msg_iov[0].iov_len;
101  	          msg.msg_iov++;
102  	          msg.msg_iovlen--;
103  	        } else {
104  	          msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
105  	          msg.msg_iov[0].iov_len -= r;
106  	          break;
107  	        }
108  	      }
109  	    }
110  	    return (ssize_t)sent;
111  	  }
112  	
113  	  ssize_t send(bufferlist &bl, bool more) override {
114  	    size_t sent_bytes = 0;
115  	    auto pb = std::cbegin(bl.buffers());
116  	    uint64_t left_pbrs = std::size(bl.buffers());
117  	    while (left_pbrs) {
118  	      struct msghdr msg;
(1) Event stack_use_local_overflow: Local variable "msgvec" uses 16384 bytes of stack space, which exceeds the maximum single use of 10000 bytes.
119  	      struct iovec msgvec[IOV_MAX];
120  	      uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
121  	      left_pbrs -= size;
122  	      memset(&msg, 0, sizeof(msg));
123  	      msg.msg_iovlen = size;
124  	      msg.msg_iov = msgvec;
125  	      unsigned msglen = 0;
126  	      for (auto iov = msgvec; iov != msgvec + size; iov++) {
127  		iov->iov_base = (void*)(pb->c_str());
128  		iov->iov_len = pb->length();
129  		msglen += pb->length();
130  		++pb;
131  	      }
132  	      ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
133  	      if (r < 0)
134  	        return r;
135  	
136  	      // "r" is the remaining length
137  	      sent_bytes += r;
138  	      if (static_cast<unsigned>(r) < msglen)
139  	        break;
140  	      // only "r" == 0 continue
141  	    }
142  	
143  	    if (sent_bytes) {
144  	      bufferlist swapped;
145  	      if (sent_bytes < bl.length()) {
146  	        bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
147  	        bl.swap(swapped);
148  	      } else {
149  	        bl.clear();
150  	      }
151  	    }
152  	
153  	    return static_cast<ssize_t>(sent_bytes);
154  	  }
155  	  void shutdown() override {
156  	    ::shutdown(_fd, SHUT_RDWR);
157  	  }
158  	  void close() override {
159  	    ::close(_fd);
160  	  }
161  	  int fd() const override {
162  	    return _fd;
163  	  }
164  	  friend class PosixServerSocketImpl;
165  	  friend class PosixNetworkStack;
166  	};
167  	
168  	class PosixServerSocketImpl : public ServerSocketImpl {
169  	  NetHandler &handler;
170  	  int _fd;
171  	
172  	 public:
173  	  explicit PosixServerSocketImpl(NetHandler &h, int f,
174  					 const entity_addr_t& listen_addr, unsigned slot)
175  	    : ServerSocketImpl(listen_addr.get_type(), slot),
176  	      handler(h), _fd(f) {}
177  	  int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
178  	  void abort_accept() override {
179  	    ::close(_fd);
180  	    _fd = -1;
181  	  }
182  	  int fd() const override {
183  	    return _fd;
184  	  }
185  	};
186  	
187  	int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
188  	  ceph_assert(sock);
189  	  sockaddr_storage ss;
190  	  socklen_t slen = sizeof(ss);
191  	  int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
192  	  if (sd < 0) {
193  	    return -errno;
194  	  }
195  	
196  	  int r = handler.set_nonblock(sd);
197  	  if (r < 0) {
198  	    ::close(sd);
199  	    return -errno;
200  	  }
201  	
202  	  r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
203  	  if (r < 0) {
204  	    ::close(sd);
205  	    return -errno;
206  	  }
207  	
208  	  ceph_assert(NULL != out); //out should not be NULL in accept connection
209  	
210  	  out->set_type(addr_type);
211  	  out->set_sockaddr((sockaddr*)&ss);
212  	  handler.set_priority(sd, opt.priority, out->get_family());
213  	
214  	  std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
215  	  *sock = ConnectedSocket(std::move(csi));
216  	  return 0;
217  	}
218  	
219  	void PosixWorker::initialize()
220  	{
221  	}
222  	
223  	int PosixWorker::listen(entity_addr_t &sa,
224  				unsigned addr_slot,
225  				const SocketOptions &opt,
226  	                        ServerSocket *sock)
227  	{
228  	  int listen_sd = net.create_socket(sa.get_family(), true);
229  	  if (listen_sd < 0) {
230  	    return -errno;
231  	  }
232  	
233  	  int r = net.set_nonblock(listen_sd);
234  	  if (r < 0) {
235  	    ::close(listen_sd);
236  	    return -errno;
237  	  }
238  	
239  	  r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
240  	  if (r < 0) {
241  	    ::close(listen_sd);
242  	    return -errno;
243  	  }
244  	
245  	  r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
246  	  if (r < 0) {
247  	    r = -errno;
248  	    ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
249  	                   << ": " << cpp_strerror(r) << dendl;
250  	    ::close(listen_sd);
251  	    return r;
252  	  }
253  	
254  	  r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
255  	  if (r < 0) {
256  	    r = -errno;
257  	    lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
258  	    ::close(listen_sd);
259  	    return r;
260  	  }
261  	
262  	  *sock = ServerSocket(
263  	          std::unique_ptr<PosixServerSocketImpl>(
264  		    new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
265  	  return 0;
266  	}
267  	
268  	int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
269  	  int sd;
270  	
271  	  if (opts.nonblock) {
272  	    sd = net.nonblock_connect(addr, opts.connect_bind_addr);
273  	  } else {
274  	    sd = net.connect(addr, opts.connect_bind_addr);
275  	  }
276  	
277  	  if (sd < 0) {
278  	    return -errno;
279  	  }
280  	
281  	  net.set_priority(sd, opts.priority, addr.get_family());
282  	  *socket = ConnectedSocket(
283  	      std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
284  	  return 0;
285  	}
286  	
287  	PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
288  	    : NetworkStack(c, t)
289  	{
290  	}
291