1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2011 New Dream Network
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#include <poll.h>
16   	#include <sys/un.h>
17   	#include <unistd.h>
18   	
19   	#include "common/OutputDataSocket.h"
20   	#include "common/errno.h"
21   	#include "common/debug.h"
22   	#include "common/safe_io.h"
23   	#include "include/compat.h"
24   	#include "include/sock_compat.h"
25   	
26   	// re-include our assert to clobber the system one; fix dout:
27   	#include "include/ceph_assert.h"
28   	
29   	#define dout_subsys ceph_subsys_asok
30   	#undef dout_prefix
31   	#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
32   	
33   	using std::ostringstream;
34   	
35   	/*
36   	 * UNIX domain sockets created by an application persist even after that
37   	 * application closes, unless they're explicitly unlinked. This is because the
38   	 * directory containing the socket keeps a reference to the socket.
39   	 *
40   	 * This code makes things a little nicer by unlinking those dead sockets when
41   	 * the application exits normally.
42   	 */
43   	static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
44   	static std::vector <const char*> cleanup_files;
45   	static bool cleanup_atexit = false;
46   	
47   	static void remove_cleanup_file(const char *file)
48   	{
49   	  pthread_mutex_lock(&cleanup_lock);
50   	  VOID_TEMP_FAILURE_RETRY(unlink(file));
51   	  for (std::vector <const char*>::iterator i = cleanup_files.begin();
52   	       i != cleanup_files.end(); ++i) {
53   	    if (strcmp(file, *i) == 0) {
54   	      free((void*)*i);
55   	      cleanup_files.erase(i);
56   	      break;
57   	    }
58   	  }
59   	  pthread_mutex_unlock(&cleanup_lock);
60   	}
61   	
62   	static void remove_all_cleanup_files()
63   	{
64   	  pthread_mutex_lock(&cleanup_lock);
65   	  for (std::vector <const char*>::iterator i = cleanup_files.begin();
66   	       i != cleanup_files.end(); ++i) {
67   	    VOID_TEMP_FAILURE_RETRY(unlink(*i));
68   	    free((void*)*i);
69   	  }
70   	  cleanup_files.clear();
71   	  pthread_mutex_unlock(&cleanup_lock);
72   	}
73   	
74   	static void add_cleanup_file(const char *file)
75   	{
76   	  char *fname = strdup(file);
77   	  if (!fname)
78   	    return;
79   	  pthread_mutex_lock(&cleanup_lock);
80   	  cleanup_files.push_back(fname);
81   	  if (!cleanup_atexit) {
82   	    atexit(remove_all_cleanup_files);
83   	    cleanup_atexit = true;
84   	  }
85   	  pthread_mutex_unlock(&cleanup_lock);
86   	}
87   	
88   	
89   	OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
90   	  : m_cct(cct),
91   	    data_max_backlog(_backlog),
92   	    m_sock_fd(-1),
93   	    m_shutdown_rd_fd(-1),
94   	    m_shutdown_wr_fd(-1),
95   	    going_down(false),
96   	    data_size(0),
97   	    skipped(0)
98   	{
99   	}
100  	
(1) Event exn_spec_violation: An exception of type "std::length_error" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
101  	OutputDataSocket::~OutputDataSocket()
102  	{
(2) Event fun_call_w_exception: Called function throws an exception of type "std::length_error". [details]
Also see events: [exn_spec_violation]
103  	  shutdown();
104  	}
105  	
106  	/*
107  	 * This thread listens on the UNIX domain socket for incoming connections.
108  	 * It only handles one connection at a time at the moment. All I/O is nonblocking,
109  	 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
110  	 *
111  	 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
112  	 * pipe, the thread terminates itself gracefully, allowing the
113  	 * OutputDataSocketConfigObs class to join() it.
114  	 */
115  	
116  	#define PFL_SUCCESS ((void*)(intptr_t)0)
117  	#define PFL_FAIL ((void*)(intptr_t)1)
118  	
119  	std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
120  	{
121  	  int pipefd[2];
122  	  if (pipe_cloexec(pipefd, 0) < 0) {
123  	    int e = errno;
124  	    ostringstream oss;
125  	    oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e);
126  	    return oss.str();
127  	  }
128  	  
129  	  *pipe_rd = pipefd[0];
130  	  *pipe_wr = pipefd[1];
131  	  return "";
132  	}
133  	
134  	std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
135  	{
136  	  ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
137  	
138  	  struct sockaddr_un address;
139  	  if (sock_path.size() > sizeof(address.sun_path) - 1) {
140  	    ostringstream oss;
141  	    oss << "OutputDataSocket::bind_and_listen: "
142  		<< "The UNIX domain socket path " << sock_path << " is too long! The "
143  		<< "maximum length on this system is "
144  		<< (sizeof(address.sun_path) - 1);
145  	    return oss.str();
146  	  }
147  	  int sock_fd = socket_cloexec(PF_UNIX, SOCK_STREAM, 0);
148  	  if (sock_fd < 0) {
149  	    int err = errno;
150  	    ostringstream oss;
151  	    oss << "OutputDataSocket::bind_and_listen: "
152  		<< "failed to create socket: " << cpp_strerror(err);
153  	    return oss.str();
154  	  }
155  	  memset(&address, 0, sizeof(struct sockaddr_un));
156  	  address.sun_family = AF_UNIX;
157  	  snprintf(address.sun_path, sizeof(address.sun_path),
158  		   "%s", sock_path.c_str());
159  	  if (::bind(sock_fd, (struct sockaddr*)&address,
160  		   sizeof(struct sockaddr_un)) != 0) {
161  	    int err = errno;
162  	    if (err == EADDRINUSE) {
163  	      // The old UNIX domain socket must still be there.
164  	      // Let's unlink it and try again.
165  	      VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
166  	      if (::bind(sock_fd, (struct sockaddr*)&address,
167  		       sizeof(struct sockaddr_un)) == 0) {
168  		err = 0;
169  	      }
170  	      else {
171  		err = errno;
172  	      }
173  	    }
174  	    if (err != 0) {
175  	      ostringstream oss;
176  	      oss << "OutputDataSocket::bind_and_listen: "
177  		  << "failed to bind the UNIX domain socket to '" << sock_path
178  		  << "': " << cpp_strerror(err);
179  	      close(sock_fd);
180  	      return oss.str();
181  	    }
182  	  }
183  	  if (listen(sock_fd, 5) != 0) {
184  	    int err = errno;
185  	    ostringstream oss;
186  	    oss << "OutputDataSocket::bind_and_listen: "
187  		  << "failed to listen to socket: " << cpp_strerror(err);
188  	    close(sock_fd);
189  	    VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
190  	    return oss.str();
191  	  }
192  	  *fd = sock_fd;
193  	  return "";
194  	}
195  	
196  	void* OutputDataSocket::entry()
197  	{
198  	  ldout(m_cct, 5) << "entry start" << dendl;
199  	  while (true) {
200  	    struct pollfd fds[2];
201  	    memset(fds, 0, sizeof(fds));
202  	    fds[0].fd = m_sock_fd;
203  	    fds[0].events = POLLIN | POLLRDBAND;
204  	    fds[1].fd = m_shutdown_rd_fd;
205  	    fds[1].events = POLLIN | POLLRDBAND;
206  	
207  	    int ret = poll(fds, 2, -1);
208  	    if (ret < 0) {
209  	      int err = errno;
210  	      if (err == EINTR) {
211  		continue;
212  	      }
213  	      lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
214  			   << cpp_strerror(err) << dendl;
215  	      return PFL_FAIL;
216  	    }
217  	
218  	    if (fds[0].revents & POLLIN) {
219  	      // Send out some data
220  	      do_accept();
221  	    }
222  	    if (fds[1].revents & POLLIN) {
223  	      // Parent wants us to shut down
224  	      return PFL_SUCCESS;
225  	    }
226  	  }
227  	  ldout(m_cct, 5) << "entry exit" << dendl;
228  	
229  	  return PFL_SUCCESS; // unreachable
230  	}
231  	
232  	
233  	bool OutputDataSocket::do_accept()
234  	{
235  	  struct sockaddr_un address;
236  	  socklen_t address_length = sizeof(address);
237  	  ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
238  	  int connection_fd = accept_cloexec(m_sock_fd, (struct sockaddr*) &address,
239  				     &address_length);
240  	  if (connection_fd < 0) {
241  	    int err = errno;
242  	    lderr(m_cct) << "OutputDataSocket: do_accept error: '"
243  				   << cpp_strerror(err) << dendl;
244  	    return false;
245  	  }
246  	  ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
247  	
248  	  handle_connection(connection_fd);
249  	  close_connection(connection_fd);
250  	
251  	  return 0;
252  	}
253  	
254  	void OutputDataSocket::handle_connection(int fd)
255  	{
256  	  bufferlist bl;
257  	
258  	  m_lock.lock();
259  	  init_connection(bl);
260  	  m_lock.unlock();
261  	
262  	  if (bl.length()) {
263  	    /* need to special case the connection init buffer output, as it needs
264  	     * to be dumped before any data, including older data that was sent
265  	     * before the connection was established, or before we identified
266  	     * older connection was broken
267  	     */
268  	    int ret = safe_write(fd, bl.c_str(), bl.length());
269  	    if (ret < 0) {
270  	      return;
271  	    }
272  	  }
273  	
274  	  int ret = dump_data(fd);
275  	  if (ret < 0)
276  	    return;
277  	
278  	  do {
279  	    {
280  	      std::unique_lock l(m_lock);
281  	      if (!going_down) {
282  		cond.wait(l);
283  	      }
284  	      if (going_down) {
285  		break;
286  	      }
287  	    }
288  	    ret = dump_data(fd);
289  	  } while (ret >= 0);
290  	}
291  	
292  	int OutputDataSocket::dump_data(int fd)
293  	{
294  	  m_lock.lock();
295  	  vector<buffer::list> l = std::move(data);
296  	  data.clear();
297  	  data_size = 0;
298  	  m_lock.unlock();
299  	
300  	  for (auto iter = l.begin(); iter != l.end(); ++iter) {
301  	    bufferlist& bl = *iter;
302  	    int ret = safe_write(fd, bl.c_str(), bl.length());
303  	    if (ret >= 0) {
304  	      ret = safe_write(fd, delim.c_str(), delim.length());
305  	    }
306  	    if (ret < 0) {
307  	      std::scoped_lock lock(m_lock);
308  	      for (; iter != l.end(); ++iter) {
309  	        bufferlist& bl = *iter;
310  		data.push_back(bl);
311  		data_size += bl.length();
312  	      }
313  	      return ret;
314  	    }
315  	  }
316  	
317  	  return 0;
318  	}
319  	
320  	void OutputDataSocket::close_connection(int fd)
321  	{
322  	  VOID_TEMP_FAILURE_RETRY(close(fd));
323  	}
324  	
325  	bool OutputDataSocket::init(const std::string &path)
326  	{
327  	  ldout(m_cct, 5) << "init " << path << dendl;
328  	
329  	  /* Set up things for the new thread */
330  	  std::string err;
331  	  int pipe_rd = -1, pipe_wr = -1;
332  	  err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
333  	  if (!err.empty()) {
334  	    lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
335  	    return false;
336  	  }
337  	  int sock_fd;
338  	  err = bind_and_listen(path, &sock_fd);
339  	  if (!err.empty()) {
340  	    lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
341  	    close(pipe_rd);
342  	    close(pipe_wr);
343  	    return false;
344  	  }
345  	
346  	  /* Create new thread */
347  	  m_sock_fd = sock_fd;
348  	  m_shutdown_rd_fd = pipe_rd;
349  	  m_shutdown_wr_fd = pipe_wr;
350  	  m_path = path;
351  	  create("out_data_socket");
352  	  add_cleanup_file(m_path.c_str());
353  	  return true;
354  	}
355  	
356  	void OutputDataSocket::shutdown()
357  	{
358  	  m_lock.lock();
359  	  going_down = true;
360  	  cond.notify_all();
361  	  m_lock.unlock();
362  	
363  	  if (m_shutdown_wr_fd < 0)
364  	    return;
365  	
(1) Event fun_call_w_exception: Called function throws an exception of type "std::length_error". [details]
366  	  ldout(m_cct, 5) << "shutdown" << dendl;
367  	
368  	  // Send a byte to the shutdown pipe that the thread is listening to
369  	  char buf[1] = { 0x0 };
370  	  int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
371  	  VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
372  	  m_shutdown_wr_fd = -1;
373  	
374  	  if (ret == 0) {
375  	    join();
376  	  } else {
377  	    lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
378  	      "to thread shutdown pipe: error " << ret << dendl;
379  	  }
380  	
381  	  remove_cleanup_file(m_path.c_str());
382  	  m_path.clear();
383  	}
384  	
385  	void OutputDataSocket::append_output(bufferlist& bl)
386  	{
387  	  std::lock_guard l(m_lock);
388  	
389  	  if (data_size + bl.length() > data_max_backlog) {
390  	    if (skipped % 100 == 0) {
391  	      ldout(m_cct, 0) << "dropping data output, max backlog reached (skipped=="
392  			      << skipped << ")"
393  			      << dendl;
394  	      skipped = 1;
395  	    } else
396  	      ++skipped;
397  	    return;
398  	  }
399  	
400  	  data.push_back(bl);
401  	  data_size += bl.length();
402  	  cond.notify_all();
403  	}
404