1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #include <errno.h>
22
23 #include <algorithm>
24
25 #include "PosixStack.h"
26
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "common/errno.h"
30 #include "common/strtol.h"
31 #include "common/dout.h"
32 #include "msg/Messenger.h"
33 #include "include/compat.h"
34 #include "include/sock_compat.h"
35
36 #define dout_subsys ceph_subsys_ms
37 #undef dout_prefix
38 #define dout_prefix *_dout << "PosixStack "
39
40 class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
41 NetHandler &handler;
42 int _fd;
43 entity_addr_t sa;
44 bool connected;
45
46 public:
47 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
48 : handler(h), _fd(f), sa(sa), connected(connected) {}
49
50 int is_connected() override {
51 if (connected)
52 return 1;
53
54 int r = handler.reconnect(sa, _fd);
55 if (r == 0) {
56 connected = true;
57 return 1;
58 } else if (r < 0) {
59 return r;
60 } else {
61 return 0;
62 }
63 }
64
65 ssize_t zero_copy_read(bufferptr&) override {
66 return -EOPNOTSUPP;
67 }
68
69 ssize_t read(char *buf, size_t len) override {
70 ssize_t r = ::read(_fd, buf, len);
71 if (r < 0)
72 r = -errno;
73 return r;
74 }
75
76 // return the sent length
77 // < 0 means error occurred
78 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
79 {
80 size_t sent = 0;
81 while (1) {
82 MSGR_SIGPIPE_STOPPER;
83 ssize_t r;
84 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
85 if (r < 0) {
86 if (errno == EINTR) {
87 continue;
88 } else if (errno == EAGAIN) {
89 break;
90 }
91 return -errno;
92 }
93
94 sent += r;
95 if (len == sent) break;
96
97 while (r > 0) {
98 if (msg.msg_iov[0].iov_len <= (size_t)r) {
99 // drain this whole item
100 r -= msg.msg_iov[0].iov_len;
101 msg.msg_iov++;
102 msg.msg_iovlen--;
103 } else {
104 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
105 msg.msg_iov[0].iov_len -= r;
106 break;
107 }
108 }
109 }
110 return (ssize_t)sent;
111 }
112
113 ssize_t send(bufferlist &bl, bool more) override {
114 size_t sent_bytes = 0;
115 auto pb = std::cbegin(bl.buffers());
116 uint64_t left_pbrs = std::size(bl.buffers());
117 while (left_pbrs) {
118 struct msghdr msg;
(1) Event stack_use_local_overflow: |
Local variable "msgvec" uses 16384 bytes of stack space, which exceeds the maximum single use of 10000 bytes. |
119 struct iovec msgvec[IOV_MAX];
120 uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
121 left_pbrs -= size;
122 memset(&msg, 0, sizeof(msg));
123 msg.msg_iovlen = size;
124 msg.msg_iov = msgvec;
125 unsigned msglen = 0;
126 for (auto iov = msgvec; iov != msgvec + size; iov++) {
127 iov->iov_base = (void*)(pb->c_str());
128 iov->iov_len = pb->length();
129 msglen += pb->length();
130 ++pb;
131 }
132 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
133 if (r < 0)
134 return r;
135
136 // "r" is the remaining length
137 sent_bytes += r;
138 if (static_cast<unsigned>(r) < msglen)
139 break;
140 // only "r" == 0 continue
141 }
142
143 if (sent_bytes) {
144 bufferlist swapped;
145 if (sent_bytes < bl.length()) {
146 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
147 bl.swap(swapped);
148 } else {
149 bl.clear();
150 }
151 }
152
153 return static_cast<ssize_t>(sent_bytes);
154 }
155 void shutdown() override {
156 ::shutdown(_fd, SHUT_RDWR);
157 }
158 void close() override {
159 ::close(_fd);
160 }
161 int fd() const override {
162 return _fd;
163 }
164 friend class PosixServerSocketImpl;
165 friend class PosixNetworkStack;
166 };
167
168 class PosixServerSocketImpl : public ServerSocketImpl {
169 NetHandler &handler;
170 int _fd;
171
172 public:
173 explicit PosixServerSocketImpl(NetHandler &h, int f,
174 const entity_addr_t& listen_addr, unsigned slot)
175 : ServerSocketImpl(listen_addr.get_type(), slot),
176 handler(h), _fd(f) {}
177 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
178 void abort_accept() override {
179 ::close(_fd);
180 _fd = -1;
181 }
182 int fd() const override {
183 return _fd;
184 }
185 };
186
187 int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
188 ceph_assert(sock);
189 sockaddr_storage ss;
190 socklen_t slen = sizeof(ss);
191 int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
192 if (sd < 0) {
193 return -errno;
194 }
195
196 int r = handler.set_nonblock(sd);
197 if (r < 0) {
198 ::close(sd);
199 return -errno;
200 }
201
202 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
203 if (r < 0) {
204 ::close(sd);
205 return -errno;
206 }
207
208 ceph_assert(NULL != out); //out should not be NULL in accept connection
209
210 out->set_type(addr_type);
211 out->set_sockaddr((sockaddr*)&ss);
212 handler.set_priority(sd, opt.priority, out->get_family());
213
214 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
215 *sock = ConnectedSocket(std::move(csi));
216 return 0;
217 }
218
219 void PosixWorker::initialize()
220 {
221 }
222
223 int PosixWorker::listen(entity_addr_t &sa,
224 unsigned addr_slot,
225 const SocketOptions &opt,
226 ServerSocket *sock)
227 {
228 int listen_sd = net.create_socket(sa.get_family(), true);
229 if (listen_sd < 0) {
230 return -errno;
231 }
232
233 int r = net.set_nonblock(listen_sd);
234 if (r < 0) {
235 ::close(listen_sd);
236 return -errno;
237 }
238
239 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
240 if (r < 0) {
241 ::close(listen_sd);
242 return -errno;
243 }
244
245 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
246 if (r < 0) {
247 r = -errno;
248 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
249 << ": " << cpp_strerror(r) << dendl;
250 ::close(listen_sd);
251 return r;
252 }
253
254 r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
255 if (r < 0) {
256 r = -errno;
257 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
258 ::close(listen_sd);
259 return r;
260 }
261
262 *sock = ServerSocket(
263 std::unique_ptr<PosixServerSocketImpl>(
264 new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
265 return 0;
266 }
267
268 int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
269 int sd;
270
271 if (opts.nonblock) {
272 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
273 } else {
274 sd = net.connect(addr, opts.connect_bind_addr);
275 }
276
277 if (sd < 0) {
278 return -errno;
279 }
280
281 net.set_priority(sd, opts.priority, addr.get_family());
282 *socket = ConnectedSocket(
283 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
284 return 0;
285 }
286
287 PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
288 : NetworkStack(c, t)
289 {
290 }
291