1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2011 New Dream Network
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#include <signal.h>
16   	#include <unistd.h>
17   	#ifdef __linux__
18   	#include <sys/syscall.h>   /* For SYS_xxx definitions */
19   	#endif
20   	
21   	#include "common/Thread.h"
22   	#include "common/code_environment.h"
23   	#include "common/debug.h"
24   	#include "common/signal.h"
25   	
26   	#ifdef HAVE_SCHED
27   	#include <sched.h>
28   	#endif
29   	
30   	
31   	pid_t ceph_gettid(void)
32   	{
33   	#ifdef __linux__
34   	  return syscall(SYS_gettid);
35   	#else
36   	  return -ENOSYS;
37   	#endif
38   	}
39   	
40   	static int _set_affinity(int id)
41   	{
42   	#ifdef HAVE_SCHED
43   	  if (id >= 0 && id < CPU_SETSIZE) {
44   	    cpu_set_t cpuset;
45   	    CPU_ZERO(&cpuset);
46   	
47   	    CPU_SET(id, &cpuset);
48   	
49   	    if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
50   	      return -errno;
51   	    /* guaranteed to take effect immediately */
52   	    sched_yield();
53   	  }
54   	#endif
55   	  return 0;
56   	}
57   	
58   	Thread::Thread()
59   	  : thread_id(0),
60   	    pid(0),
61   	    cpuid(-1),
62   	    thread_name(NULL)
63   	{
64   	}
65   	
66   	Thread::~Thread()
67   	{
68   	}
69   	
70   	void *Thread::_entry_func(void *arg) {
71   	  void *r = ((Thread*)arg)->entry_wrapper();
72   	  return r;
73   	}
74   	
75   	void *Thread::entry_wrapper()
76   	{
77   	  int p = ceph_gettid(); // may return -ENOSYS on other platforms
78   	  if (p > 0)
79   	    pid = p;
80   	  if (pid && cpuid >= 0)
81   	    _set_affinity(cpuid);
82   	
83   	  ceph_pthread_setname(pthread_self(), thread_name);
84   	  return entry();
85   	}
86   	
87   	const pthread_t &Thread::get_thread_id() const
88   	{
89   	  return thread_id;
90   	}
91   	
92   	bool Thread::is_started() const
93   	{
94   	  return thread_id != 0;
95   	}
96   	
97   	bool Thread::am_self() const
98   	{
99   	  return (pthread_self() == thread_id);
100  	}
101  	
102  	int Thread::kill(int signal)
103  	{
104  	  if (thread_id)
105  	    return pthread_kill(thread_id, signal);
106  	  else
107  	    return -EINVAL;
108  	}
109  	
110  	int Thread::try_create(size_t stacksize)
111  	{
112  	  pthread_attr_t *thread_attr = NULL;
113  	  pthread_attr_t thread_attr_loc;
114  	  
115  	  stacksize &= CEPH_PAGE_MASK;  // must be multiple of page
116  	  if (stacksize) {
117  	    thread_attr = &thread_attr_loc;
118  	    pthread_attr_init(thread_attr);
119  	    pthread_attr_setstacksize(thread_attr, stacksize);
120  	  }
121  	
122  	  int r;
123  	
124  	  // The child thread will inherit our signal mask.  Set our signal mask to
125  	  // the set of signals we want to block.  (It's ok to block signals more
126  	  // signals than usual for a little while-- they will just be delivered to
127  	  // another thread or delieverd to this thread later.)
128  	  sigset_t old_sigset;
129  	  if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
130  	    block_signals(NULL, &old_sigset);
131  	  }
132  	  else {
133  	    int to_block[] = { SIGPIPE , 0 };
134  	    block_signals(to_block, &old_sigset);
135  	  }
136  	  r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
137  	  restore_sigset(&old_sigset);
138  	
139  	  if (thread_attr) {
140  	    pthread_attr_destroy(thread_attr);	
141  	  }
142  	
143  	  return r;
144  	}
145  	
146  	void Thread::create(const char *name, size_t stacksize)
147  	{
148  	  ceph_assert(strlen(name) < 16);
149  	  thread_name = name;
150  	
151  	  int ret = try_create(stacksize);
152  	  if (ret != 0) {
153  	    char buf[256];
154  	    snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
155  		     "failed with error %d", ret);
156  	    dout_emergency(buf);
157  	    ceph_assert(ret == 0);
158  	  }
159  	}
160  	
161  	int Thread::join(void **prval)
162  	{
163  	  if (thread_id == 0) {
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
164  	    ceph_abort_msg("join on thread that was never started");
165  	    return -EINVAL;
166  	  }
167  	
168  	  int status = pthread_join(thread_id, prval);
169  	  if (status != 0) {
170  	    char buf[256];
171  	    snprintf(buf, sizeof(buf), "Thread::join(): pthread_join "
172  	             "failed with error %d\n", status);
173  	    dout_emergency(buf);
174  	    ceph_assert(status == 0);
175  	  }
176  	
177  	  thread_id = 0;
178  	  return status;
179  	}
180  	
181  	int Thread::detach()
182  	{
183  	  return pthread_detach(thread_id);
184  	}
185  	
186  	int Thread::set_affinity(int id)
187  	{
188  	  int r = 0;
189  	  cpuid = id;
190  	  if (pid && ceph_gettid() == pid)
191  	    r = _set_affinity(id);
192  	  return r;
193  	}
194  	
195  	// Functions for std::thread
196  	// =========================
197  	
198  	void set_thread_name(std::thread& t, const std::string& s) {
199  	  int r = ceph_pthread_setname(t.native_handle(), s.c_str());
200  	  if (r != 0) {
201  	    throw std::system_error(r, std::generic_category());
202  	  }
203  	}
204  	std::string get_thread_name(const std::thread& t) {
205  	  std::string s(256, '\0');
206  	
207  	  int r = ceph_pthread_getname(const_cast<std::thread&>(t).native_handle(),
208  				       s.data(), s.length());
209  	  if (r != 0) {
210  	    throw std::system_error(r, std::generic_category());
211  	  }
212  	  s.resize(std::strlen(s.data()));
213  	  return s;
214  	}
215  	
216  	void kill(std::thread& t, int signal)
217  	{
218  	  auto r = pthread_kill(t.native_handle(), signal);
219  	  if (r != 0) {
220  	    throw std::system_error(r, std::generic_category());
221  	  }
222  	}
223