1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
18
19 #define dout_subsys ceph_subsys_tp
20 #undef dout_prefix
21 #define dout_prefix *_dout << name << " "
22
23
24 ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26 lockname(name + "::lock"),
27 _lock(ceph::make_mutex(lockname)), // this should be safe due to declaration order
28 _stop(false),
29 _pause(0),
30 _draining(0),
31 _num_threads(n),
32 processing(0)
33 {
34 if (option) {
35 _thread_num_option = option;
36 // set up conf_keys
37 _conf_keys = new const char*[2];
38 _conf_keys[0] = _thread_num_option.c_str();
39 _conf_keys[1] = NULL;
40 } else {
41 _conf_keys = new const char*[1];
42 _conf_keys[0] = NULL;
43 }
44 }
45
46 void ThreadPool::TPHandle::suspend_tp_timeout()
47 {
48 cct->get_heartbeat_map()->clear_timeout(hb);
49 }
50
51 void ThreadPool::TPHandle::reset_tp_timeout()
52 {
53 cct->get_heartbeat_map()->reset_timeout(
54 hb, grace, suicide_grace);
55 }
56
57 ThreadPool::~ThreadPool()
58 {
59 ceph_assert(_threads.empty());
60 delete[] _conf_keys;
61 }
62
63 void ThreadPool::handle_conf_change(const ConfigProxy& conf,
64 const std::set <std::string> &changed)
65 {
66 if (changed.count(_thread_num_option)) {
67 char *buf;
68 int r = conf.get_val(_thread_num_option.c_str(), &buf, -1);
69 ceph_assert(r >= 0);
70 int v = atoi(buf);
71 free(buf);
72 if (v >= 0) {
73 _lock.lock();
74 _num_threads = v;
75 start_threads();
76 _cond.notify_all();
77 _lock.unlock();
78 }
79 }
80 }
81
82 void ThreadPool::worker(WorkThread *wt)
83 {
84 std::unique_lock ul(_lock);
85 ldout(cct,10) << "worker start" << dendl;
86
87 std::stringstream ss;
88 ss << name << " thread " << (void *)pthread_self();
89 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
90
91 while (!_stop) {
92
93 // manage dynamic thread pool
94 join_old_threads();
95 if (_threads.size() > _num_threads) {
96 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
97 _threads.erase(wt);
98 _old_threads.push_back(wt);
99 break;
100 }
101
102 if (!_pause && !work_queues.empty()) {
103 WorkQueue_* wq;
104 int tries = 2 * work_queues.size();
105 bool did = false;
106 while (tries--) {
107 next_work_queue %= work_queues.size();
108 wq = work_queues[next_work_queue++];
109
110 void *item = wq->_void_dequeue();
111 if (item) {
112 processing++;
113 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
114 << " (" << processing << " active)" << dendl;
115 ul.unlock();
116 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
117 tp_handle.reset_tp_timeout();
118 wq->_void_process(item, tp_handle);
119 ul.lock();
120 wq->_void_process_finish(item);
121 processing--;
122 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
123 << " (" << processing << " active)" << dendl;
124 if (_pause || _draining)
125 _wait_cond.notify_all();
126 did = true;
127 break;
128 }
129 }
130 if (did)
131 continue;
132 }
133
134 ldout(cct,20) << "worker waiting" << dendl;
135 cct->get_heartbeat_map()->reset_timeout(
136 hb,
137 cct->_conf->threadpool_default_timeout,
138 0);
139 auto wait = std::chrono::seconds(
140 cct->_conf->threadpool_empty_queue_max_wait);
141 _cond.wait_for(ul, wait);
142 }
143 ldout(cct,1) << "worker finish" << dendl;
144
145 cct->get_heartbeat_map()->remove_worker(hb);
146 }
147
148 void ThreadPool::start_threads()
149 {
150 ceph_assert(ceph_mutex_is_locked(_lock));
151 while (_threads.size() < _num_threads) {
152 WorkThread *wt = new WorkThread(this);
153 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
154 _threads.insert(wt);
155
156 wt->create(thread_name.c_str());
157 }
158 }
159
160 void ThreadPool::join_old_threads()
161 {
162 ceph_assert(ceph_mutex_is_locked(_lock));
163 while (!_old_threads.empty()) {
164 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
165 _old_threads.front()->join();
166 delete _old_threads.front();
167 _old_threads.pop_front();
168 }
169 }
170
171 void ThreadPool::start()
172 {
173 ldout(cct,10) << "start" << dendl;
174
175 if (_thread_num_option.length()) {
176 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
177 cct->_conf.add_observer(this);
178 }
179
180 _lock.lock();
181 start_threads();
182 _lock.unlock();
183 ldout(cct,15) << "started" << dendl;
184 }
185
186 void ThreadPool::stop(bool clear_after)
187 {
188 ldout(cct,10) << "stop" << dendl;
189
190 if (_thread_num_option.length()) {
191 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
192 cct->_conf.remove_observer(this);
193 }
194
195 _lock.lock();
196 _stop = true;
197 _cond.notify_all();
198 join_old_threads();
199 _lock.unlock();
200 for (set<WorkThread*>::iterator p = _threads.begin();
201 p != _threads.end();
202 ++p) {
203 (*p)->join();
204 delete *p;
205 }
206 _threads.clear();
207 _lock.lock();
208 for (unsigned i=0; i<work_queues.size(); i++)
209 work_queues[i]->_clear();
210 _stop = false;
211 _lock.unlock();
212 ldout(cct,15) << "stopped" << dendl;
213 }
214
215 void ThreadPool::pause()
216 {
217 std::unique_lock ul(_lock);
(1) Event cond_true: |
Condition "should_gather", taking true branch. |
218 ldout(cct,10) << "pause" << dendl;
(2) Event missing_lock: |
Accessing "this->_pause" without holding lock "ceph::mutex_debug_detail::mutex_debug_impl<false>.m". Elsewhere, "ThreadPool._pause" is accessed with "ceph::mutex_debug_detail::mutex_debug_impl<false>.m" held 2 out of 3 times (1 of these accesses strongly imply that it is necessary). |
Also see events: |
[example_lock][example_access][example_lock][example_access] |
219 _pause++;
220 while (processing) {
221 _wait_cond.wait(ul);
222 }
223 ldout(cct,15) << "paused" << dendl;
224 }
225
226 void ThreadPool::pause_new()
227 {
228 ldout(cct,10) << "pause_new" << dendl;
229 _lock.lock();
230 _pause++;
231 _lock.unlock();
232 }
233
234 void ThreadPool::unpause()
235 {
236 ldout(cct,10) << "unpause" << dendl;
237 _lock.lock();
238 ceph_assert(_pause > 0);
239 _pause--;
240 _cond.notify_all();
241 _lock.unlock();
242 }
243
244 void ThreadPool::drain(WorkQueue_* wq)
245 {
246 std::unique_lock ul(_lock);
247 ldout(cct,10) << "drain" << dendl;
248 _draining++;
249 while (processing || (wq != NULL && !wq->_empty())) {
250 _wait_cond.wait(ul);
251 }
252 _draining--;
253 }
254
255 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
256 uint32_t pnum_threads):
257 cct(pcct_),
258 name(std::move(nm)),
259 thread_name(std::move(tn)),
260 lockname(name + "::lock"),
261 shardedpool_lock(ceph::make_mutex(lockname)),
262 num_threads(pnum_threads),
263 num_paused(0),
264 num_drained(0),
265 wq(NULL) {}
266
267 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
268 {
269 ceph_assert(wq != NULL);
270 ldout(cct,10) << "worker start" << dendl;
271
272 std::stringstream ss;
273 ss << name << " thread " << (void *)pthread_self();
274 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
275
276 while (!stop_threads) {
277 if (pause_threads) {
278 std::unique_lock ul(shardedpool_lock);
279 ++num_paused;
280 wait_cond.notify_all();
281 while (pause_threads) {
282 cct->get_heartbeat_map()->reset_timeout(
283 hb,
284 wq->timeout_interval, wq->suicide_interval);
285 shardedpool_cond.wait_for(
286 ul,
287 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
288 }
289 --num_paused;
290 }
291 if (drain_threads) {
292 std::unique_lock ul(shardedpool_lock);
293 if (wq->is_shard_empty(thread_index)) {
294 ++num_drained;
295 wait_cond.notify_all();
296 while (drain_threads) {
297 cct->get_heartbeat_map()->reset_timeout(
298 hb,
299 wq->timeout_interval, wq->suicide_interval);
300 shardedpool_cond.wait_for(
301 ul,
302 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
303 }
304 --num_drained;
305 }
306 }
307
308 cct->get_heartbeat_map()->reset_timeout(
309 hb,
310 wq->timeout_interval, wq->suicide_interval);
311 wq->_process(thread_index, hb);
312
313 }
314
315 ldout(cct,10) << "sharded worker finish" << dendl;
316
317 cct->get_heartbeat_map()->remove_worker(hb);
318
319 }
320
321 void ShardedThreadPool::start_threads()
322 {
323 ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
324 int32_t thread_index = 0;
325 while (threads_shardedpool.size() < num_threads) {
326
327 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
328 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
329 threads_shardedpool.push_back(wt);
330 wt->create(thread_name.c_str());
331 thread_index++;
332 }
333 }
334
335 void ShardedThreadPool::start()
336 {
337 ldout(cct,10) << "start" << dendl;
338
339 shardedpool_lock.lock();
340 start_threads();
341 shardedpool_lock.unlock();
342 ldout(cct,15) << "started" << dendl;
343 }
344
345 void ShardedThreadPool::stop()
346 {
347 ldout(cct,10) << "stop" << dendl;
348 stop_threads = true;
349 ceph_assert(wq != NULL);
350 wq->return_waiting_threads();
351 for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
352 p != threads_shardedpool.end();
353 ++p) {
354 (*p)->join();
355 delete *p;
356 }
357 threads_shardedpool.clear();
358 ldout(cct,15) << "stopped" << dendl;
359 }
360
361 void ShardedThreadPool::pause()
362 {
363 std::unique_lock ul(shardedpool_lock);
364 ldout(cct,10) << "pause" << dendl;
365 pause_threads = true;
366 ceph_assert(wq != NULL);
367 wq->return_waiting_threads();
368 while (num_threads != num_paused){
369 wait_cond.wait(ul);
370 }
371 ldout(cct,10) << "paused" << dendl;
372 }
373
374 void ShardedThreadPool::pause_new()
375 {
376 ldout(cct,10) << "pause_new" << dendl;
377 shardedpool_lock.lock();
378 pause_threads = true;
379 ceph_assert(wq != NULL);
380 wq->return_waiting_threads();
381 shardedpool_lock.unlock();
382 ldout(cct,10) << "paused_new" << dendl;
383 }
384
385 void ShardedThreadPool::unpause()
386 {
387 ldout(cct,10) << "unpause" << dendl;
388 shardedpool_lock.lock();
389 pause_threads = false;
390 wq->stop_return_waiting_threads();
391 shardedpool_cond.notify_all();
392 shardedpool_lock.unlock();
393 ldout(cct,10) << "unpaused" << dendl;
394 }
395
396 void ShardedThreadPool::drain()
397 {
398 std::unique_lock ul(shardedpool_lock);
399 ldout(cct,10) << "drain" << dendl;
400 drain_threads = true;
401 ceph_assert(wq != NULL);
402 wq->return_waiting_threads();
403 while (num_threads != num_drained) {
404 wait_cond.wait(ul);
405 }
406 drain_threads = false;
407 wq->stop_return_waiting_threads();
408 shardedpool_cond.notify_all();
409 ldout(cct,10) << "drained" << dendl;
410 }
411
412