1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <signal.h>
16 #include <unistd.h>
17 #ifdef __linux__
18 #include <sys/syscall.h> /* For SYS_xxx definitions */
19 #endif
20
21 #include "common/Thread.h"
22 #include "common/code_environment.h"
23 #include "common/debug.h"
24 #include "common/signal.h"
25
26 #ifdef HAVE_SCHED
27 #include <sched.h>
28 #endif
29
30
31 pid_t ceph_gettid(void)
32 {
33 #ifdef __linux__
34 return syscall(SYS_gettid);
35 #else
36 return -ENOSYS;
37 #endif
38 }
39
40 static int _set_affinity(int id)
41 {
42 #ifdef HAVE_SCHED
43 if (id >= 0 && id < CPU_SETSIZE) {
44 cpu_set_t cpuset;
45 CPU_ZERO(&cpuset);
46
47 CPU_SET(id, &cpuset);
48
49 if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
50 return -errno;
51 /* guaranteed to take effect immediately */
52 sched_yield();
53 }
54 #endif
55 return 0;
56 }
57
58 Thread::Thread()
59 : thread_id(0),
60 pid(0),
61 cpuid(-1),
62 thread_name(NULL)
63 {
64 }
65
66 Thread::~Thread()
67 {
68 }
69
70 void *Thread::_entry_func(void *arg) {
71 void *r = ((Thread*)arg)->entry_wrapper();
72 return r;
73 }
74
75 void *Thread::entry_wrapper()
76 {
77 int p = ceph_gettid(); // may return -ENOSYS on other platforms
78 if (p > 0)
79 pid = p;
80 if (pid && cpuid >= 0)
81 _set_affinity(cpuid);
82
83 ceph_pthread_setname(pthread_self(), thread_name);
84 return entry();
85 }
86
87 const pthread_t &Thread::get_thread_id() const
88 {
89 return thread_id;
90 }
91
92 bool Thread::is_started() const
93 {
94 return thread_id != 0;
95 }
96
97 bool Thread::am_self() const
98 {
99 return (pthread_self() == thread_id);
100 }
101
102 int Thread::kill(int signal)
103 {
104 if (thread_id)
105 return pthread_kill(thread_id, signal);
106 else
107 return -EINVAL;
108 }
109
110 int Thread::try_create(size_t stacksize)
111 {
112 pthread_attr_t *thread_attr = NULL;
113 pthread_attr_t thread_attr_loc;
114
115 stacksize &= CEPH_PAGE_MASK; // must be multiple of page
116 if (stacksize) {
117 thread_attr = &thread_attr_loc;
118 pthread_attr_init(thread_attr);
119 pthread_attr_setstacksize(thread_attr, stacksize);
120 }
121
122 int r;
123
124 // The child thread will inherit our signal mask. Set our signal mask to
125 // the set of signals we want to block. (It's ok to block signals more
126 // signals than usual for a little while-- they will just be delivered to
127 // another thread or delieverd to this thread later.)
128 sigset_t old_sigset;
129 if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
130 block_signals(NULL, &old_sigset);
131 }
132 else {
133 int to_block[] = { SIGPIPE , 0 };
134 block_signals(to_block, &old_sigset);
135 }
136 r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
137 restore_sigset(&old_sigset);
138
139 if (thread_attr) {
140 pthread_attr_destroy(thread_attr);
141 }
142
143 return r;
144 }
145
146 void Thread::create(const char *name, size_t stacksize)
147 {
148 ceph_assert(strlen(name) < 16);
149 thread_name = name;
150
151 int ret = try_create(stacksize);
152 if (ret != 0) {
153 char buf[256];
154 snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
155 "failed with error %d", ret);
156 dout_emergency(buf);
157 ceph_assert(ret == 0);
158 }
159 }
160
161 int Thread::join(void **prval)
162 {
163 if (thread_id == 0) {
(1) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
164 ceph_abort_msg("join on thread that was never started");
165 return -EINVAL;
166 }
167
168 int status = pthread_join(thread_id, prval);
169 if (status != 0) {
170 char buf[256];
171 snprintf(buf, sizeof(buf), "Thread::join(): pthread_join "
172 "failed with error %d\n", status);
173 dout_emergency(buf);
174 ceph_assert(status == 0);
175 }
176
177 thread_id = 0;
178 return status;
179 }
180
181 int Thread::detach()
182 {
183 return pthread_detach(thread_id);
184 }
185
186 int Thread::set_affinity(int id)
187 {
188 int r = 0;
189 cpuid = id;
190 if (pid && ceph_gettid() == pid)
191 r = _set_affinity(id);
192 return r;
193 }
194
195 // Functions for std::thread
196 // =========================
197
198 void set_thread_name(std::thread& t, const std::string& s) {
199 int r = ceph_pthread_setname(t.native_handle(), s.c_str());
200 if (r != 0) {
201 throw std::system_error(r, std::generic_category());
202 }
203 }
204 std::string get_thread_name(const std::thread& t) {
205 std::string s(256, '\0');
206
207 int r = ceph_pthread_getname(const_cast<std::thread&>(t).native_handle(),
208 s.data(), s.length());
209 if (r != 0) {
210 throw std::system_error(r, std::generic_category());
211 }
212 s.resize(std::strlen(s.data()));
213 return s;
214 }
215
216 void kill(std::thread& t, int signal)
217 {
218 auto r = pthread_kill(t.native_handle(), signal);
219 if (r != 0) {
220 throw std::system_error(r, std::generic_category());
221 }
222 }
223