1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
17
18 #ifdef WITH_SEASTAR
19 // for ObjectStore.h
20 struct ThreadPool {
21 struct TPHandle {
22 };
23 };
24
25 #else
26
27 #include <atomic>
28 #include <list>
29 #include <set>
30 #include <string>
31 #include <vector>
32
33 #include "common/ceph_mutex.h"
34 #include "include/unordered_map.h"
35 #include "common/config_obs.h"
36 #include "common/HeartbeatMap.h"
37 #include "common/Thread.h"
38 #include "include/Context.h"
39 #include "common/HBHandle.h"
40
41 class CephContext;
42
43 /// Pool of threads that share work submitted to multiple work queues.
44 class ThreadPool : public md_config_obs_t {
45 protected:
46 CephContext *cct;
47 std::string name;
48 std::string thread_name;
49 std::string lockname;
50 ceph::mutex _lock;
51 ceph::condition_variable _cond;
52 bool _stop;
53 int _pause;
54 int _draining;
55 ceph::condition_variable _wait_cond;
56
57 public:
58 class TPHandle : public HBHandle {
59 friend class ThreadPool;
60 CephContext *cct;
61 ceph::heartbeat_handle_d *hb;
62 ceph::coarse_mono_clock::rep grace;
63 ceph::coarse_mono_clock::rep suicide_grace;
64 public:
65 TPHandle(
66 CephContext *cct,
67 ceph::heartbeat_handle_d *hb,
68 time_t grace,
69 time_t suicide_grace)
70 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
71 void reset_tp_timeout() override final;
72 void suspend_tp_timeout() override final;
73 };
74 protected:
75
76 /// Basic interface to a work queue used by the worker threads.
77 struct WorkQueue_ {
78 std::string name;
79 time_t timeout_interval, suicide_interval;
80 WorkQueue_(std::string n, time_t ti, time_t sti)
81 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
82 { }
83 virtual ~WorkQueue_() {}
84 /// Remove all work items from the queue.
85 virtual void _clear() = 0;
86 /// Check whether there is anything to do.
87 virtual bool _empty() = 0;
88 /// Get the next work item to process.
89 virtual void *_void_dequeue() = 0;
90 /** @brief Process the work item.
91 * This function will be called several times in parallel
92 * and must therefore be thread-safe. */
93 virtual void _void_process(void *item, TPHandle &handle) = 0;
94 /** @brief Synchronously finish processing a work item.
95 * This function is called after _void_process with the global thread pool lock held,
96 * so at most one copy will execute simultaneously for a given thread pool.
97 * It can be used for non-thread-safe finalization. */
98 virtual void _void_process_finish(void *) = 0;
99 };
100
101 // track thread pool size changes
102 unsigned _num_threads;
103 std::string _thread_num_option;
104 const char **_conf_keys;
105
106 const char **get_tracked_conf_keys() const override {
107 return _conf_keys;
108 }
109 void handle_conf_change(const ConfigProxy& conf,
110 const std::set <std::string> &changed) override;
111
112 public:
113 /** @brief Work queue that processes several submitted items at once.
114 * The queue will automatically add itself to the thread pool on construction
115 * and remove itself on destruction. */
116 template<class T>
117 class BatchWorkQueue : public WorkQueue_ {
118 ThreadPool *pool;
119
120 virtual bool _enqueue(T *) = 0;
121 virtual void _dequeue(T *) = 0;
122 virtual void _dequeue(std::list<T*> *) = 0;
123 virtual void _process_finish(const std::list<T*> &) {}
124
125 // virtual methods from WorkQueue_ below
126 void *_void_dequeue() override {
127 std::list<T*> *out(new std::list<T*>);
128 _dequeue(out);
129 if (!out->empty()) {
130 return (void *)out;
131 } else {
132 delete out;
133 return 0;
134 }
135 }
136 void _void_process(void *p, TPHandle &handle) override {
137 _process(*((std::list<T*>*)p), handle);
138 }
139 void _void_process_finish(void *p) override {
140 _process_finish(*(std::list<T*>*)p);
141 delete (std::list<T*> *)p;
142 }
143
144 protected:
145 virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
146
147 public:
148 BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
149 : WorkQueue_(std::move(n), ti, sti), pool(p) {
150 pool->add_work_queue(this);
151 }
152 ~BatchWorkQueue() override {
153 pool->remove_work_queue(this);
154 }
155
156 bool queue(T *item) {
157 pool->_lock.lock();
158 bool r = _enqueue(item);
159 pool->_cond.notify_one();
160 pool->_lock.unlock();
161 return r;
162 }
163 void dequeue(T *item) {
164 pool->_lock.lock();
165 _dequeue(item);
166 pool->_lock.unlock();
167 }
168 void clear() {
169 pool->_lock.lock();
170 _clear();
171 pool->_lock.unlock();
172 }
173
174 void lock() {
175 pool->lock();
176 }
177 void unlock() {
178 pool->unlock();
179 }
180 void wake() {
181 pool->wake();
182 }
183 void _wake() {
184 pool->_wake();
185 }
186 void drain() {
187 pool->drain(this);
188 }
189
190 };
191
192 /** @brief Templated by-value work queue.
193 * Skeleton implementation of a queue that processes items submitted by value.
194 * This is useful if the items are single primitive values or very small objects
195 * (a few bytes). The queue will automatically add itself to the thread pool on
196 * construction and remove itself on destruction. */
197 template<typename T, typename U = T>
198 class WorkQueueVal : public WorkQueue_ {
199 ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
200 ThreadPool *pool;
201 std::list<U> to_process;
202 std::list<U> to_finish;
203 virtual void _enqueue(T) = 0;
204 virtual void _enqueue_front(T) = 0;
205 bool _empty() override = 0;
206 virtual U _dequeue() = 0;
207 virtual void _process_finish(U) {}
208
209 void *_void_dequeue() override {
210 {
211 std::lock_guard l(_lock);
212 if (_empty())
213 return 0;
214 U u = _dequeue();
215 to_process.push_back(u);
216 }
217 return ((void*)1); // Not used
218 }
219 void _void_process(void *, TPHandle &handle) override {
220 _lock.lock();
221 ceph_assert(!to_process.empty());
222 U u = to_process.front();
223 to_process.pop_front();
224 _lock.unlock();
225
226 _process(u, handle);
227
228 _lock.lock();
229 to_finish.push_back(u);
230 _lock.unlock();
231 }
232
233 void _void_process_finish(void *) override {
234 _lock.lock();
235 ceph_assert(!to_finish.empty());
236 U u = to_finish.front();
237 to_finish.pop_front();
238 _lock.unlock();
239
240 _process_finish(u);
241 }
242
243 void _clear() override {}
244
245 public:
246 WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
247 : WorkQueue_(std::move(n), ti, sti), pool(p) {
248 pool->add_work_queue(this);
249 }
250 ~WorkQueueVal() override {
251 pool->remove_work_queue(this);
252 }
253 void queue(T item) {
254 std::lock_guard l(pool->_lock);
255 _enqueue(item);
256 pool->_cond.notify_one();
257 }
258 void queue_front(T item) {
259 std::lock_guard l(pool->_lock);
260 _enqueue_front(item);
261 pool->_cond.notify_one();
262 }
263 void drain() {
264 pool->drain(this);
265 }
266 protected:
267 void lock() {
268 pool->lock();
269 }
270 void unlock() {
271 pool->unlock();
272 }
273 virtual void _process(U u, TPHandle &) = 0;
274 };
275
276 /** @brief Template by-pointer work queue.
277 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
278 * This is useful when the work item are large or include dynamically allocated memory. The queue
279 * will automatically add itself to the thread pool on construction and remove itself on
280 * destruction. */
281 template<class T>
282 class WorkQueue : public WorkQueue_ {
283 ThreadPool *pool;
284
285 /// Add a work item to the queue.
286 virtual bool _enqueue(T *) = 0;
287 /// Dequeue a previously submitted work item.
288 virtual void _dequeue(T *) = 0;
289 /// Dequeue a work item and return the original submitted pointer.
290 virtual T *_dequeue() = 0;
291 virtual void _process_finish(T *) {}
292
293 // implementation of virtual methods from WorkQueue_
294 void *_void_dequeue() override {
295 return (void *)_dequeue();
296 }
297 void _void_process(void *p, TPHandle &handle) override {
298 _process(static_cast<T *>(p), handle);
299 }
300 void _void_process_finish(void *p) override {
301 _process_finish(static_cast<T *>(p));
302 }
303
304 protected:
305 /// Process a work item. Called from the worker threads.
306 virtual void _process(T *t, TPHandle &) = 0;
307
308 public:
309 WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
310 : WorkQueue_(std::move(n), ti, sti), pool(p) {
311 pool->add_work_queue(this);
312 }
313 ~WorkQueue() override {
314 pool->remove_work_queue(this);
315 }
316
317 bool queue(T *item) {
318 pool->_lock.lock();
319 bool r = _enqueue(item);
320 pool->_cond.notify_one();
321 pool->_lock.unlock();
322 return r;
323 }
324 void dequeue(T *item) {
325 pool->_lock.lock();
326 _dequeue(item);
327 pool->_lock.unlock();
328 }
329 void clear() {
330 pool->_lock.lock();
331 _clear();
332 pool->_lock.unlock();
333 }
334
335 void lock() {
336 pool->lock();
337 }
338 void unlock() {
339 pool->unlock();
340 }
341 /// wake up the thread pool (without lock held)
342 void wake() {
343 pool->wake();
344 }
345 /// wake up the thread pool (with lock already held)
346 void _wake() {
347 pool->_wake();
348 }
349 void _wait() {
350 pool->_wait();
351 }
352 void drain() {
353 pool->drain(this);
354 }
355
356 };
357
358 template<typename T>
359 class PointerWQ : public WorkQueue_ {
360 public:
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
361 ~PointerWQ() override {
362 m_pool->remove_work_queue(this);
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
363 ceph_assert(m_processing == 0);
364 }
365 void drain() {
366 {
367 // if this queue is empty and not processing, don't wait for other
368 // queues to finish processing
369 std::lock_guard l(m_pool->_lock);
370 if (m_processing == 0 && m_items.empty()) {
371 return;
372 }
373 }
374 m_pool->drain(this);
375 }
376 void queue(T *item) {
377 std::lock_guard l(m_pool->_lock);
378 m_items.push_back(item);
379 m_pool->_cond.notify_one();
380 }
381 bool empty() {
382 std::lock_guard l(m_pool->_lock);
383 return _empty();
384 }
385 protected:
386 PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
387 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
388 }
389 void register_work_queue() {
390 m_pool->add_work_queue(this);
391 }
392 void _clear() override {
393 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
394 m_items.clear();
395 }
396 bool _empty() override {
397 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
398 return m_items.empty();
399 }
400 void *_void_dequeue() override {
401 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
402 if (m_items.empty()) {
403 return NULL;
404 }
405
406 ++m_processing;
407 T *item = m_items.front();
408 m_items.pop_front();
409 return item;
410 }
411 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
412 process(reinterpret_cast<T *>(item));
413 }
414 void _void_process_finish(void *item) override {
415 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
416 ceph_assert(m_processing > 0);
417 --m_processing;
418 }
419
420 virtual void process(T *item) = 0;
421 void process_finish() {
422 std::lock_guard locker(m_pool->_lock);
423 _void_process_finish(nullptr);
424 }
425
426 T *front() {
427 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
428 if (m_items.empty()) {
429 return NULL;
430 }
431 return m_items.front();
432 }
433 void requeue_front(T *item) {
434 std::lock_guard pool_locker(m_pool->_lock);
435 _void_process_finish(nullptr);
436 m_items.push_front(item);
437 }
438 void requeue_back(T *item) {
439 std::lock_guard pool_locker(m_pool->_lock);
440 _void_process_finish(nullptr);
441 m_items.push_back(item);
442 }
443 void signal() {
444 std::lock_guard pool_locker(m_pool->_lock);
445 m_pool->_cond.notify_one();
446 }
447 ceph::mutex &get_pool_lock() {
448 return m_pool->_lock;
449 }
450 private:
451 ThreadPool *m_pool;
452 std::list<T *> m_items;
453 uint32_t m_processing;
454 };
455 protected:
456 std::vector<WorkQueue_*> work_queues;
457 int next_work_queue = 0;
458
459
460 // threads
461 struct WorkThread : public Thread {
462 ThreadPool *pool;
463 // cppcheck-suppress noExplicitConstructor
464 WorkThread(ThreadPool *p) : pool(p) {}
465 void *entry() override {
466 pool->worker(this);
467 return 0;
468 }
469 };
470
471 std::set<WorkThread*> _threads;
472 std::list<WorkThread*> _old_threads; ///< need to be joined
473 int processing;
474
475 void start_threads();
476 void join_old_threads();
477 virtual void worker(WorkThread *wt);
478
479 public:
480 ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
481 ~ThreadPool() override;
482
483 /// return number of threads currently running
484 int get_num_threads() {
485 std::lock_guard l(_lock);
486 return _num_threads;
487 }
488
489 /// assign a work queue to this thread pool
490 void add_work_queue(WorkQueue_* wq) {
491 std::lock_guard l(_lock);
492 work_queues.push_back(wq);
493 }
494 /// remove a work queue from this thread pool
495 void remove_work_queue(WorkQueue_* wq) {
496 std::lock_guard l(_lock);
497 unsigned i = 0;
498 while (work_queues[i] != wq)
499 i++;
500 for (i++; i < work_queues.size(); i++)
501 work_queues[i-1] = work_queues[i];
502 ceph_assert(i == work_queues.size());
503 work_queues.resize(i-1);
504 }
505
506 /// take thread pool lock
507 void lock() {
508 _lock.lock();
509 }
510 /// release thread pool lock
511 void unlock() {
512 _lock.unlock();
513 }
514
515 /// wait for a kick on this thread pool
516 void wait(ceph::condition_variable &c) {
517 std::unique_lock l(_lock, std::adopt_lock);
518 c.wait(l);
519 }
520
521 /// wake up a waiter (with lock already held)
522 void _wake() {
523 _cond.notify_all();
524 }
525 /// wake up a waiter (without lock held)
526 void wake() {
527 std::lock_guard l(_lock);
528 _cond.notify_all();
529 }
530 void _wait() {
531 std::unique_lock l(_lock, std::adopt_lock);
532 _cond.wait(l);
533 }
534
535 /// start thread pool thread
536 void start();
537 /// stop thread pool thread
538 void stop(bool clear_after=true);
539 /// pause thread pool (if it not already paused)
540 void pause();
541 /// pause initiation of new work
542 void pause_new();
543 /// resume work in thread pool. must match each pause() call 1:1 to resume.
544 void unpause();
545 /** @brief Wait until work completes.
546 * If the parameter is NULL, blocks until all threads are idle.
547 * If it is not NULL, blocks until the given work queue does not have
548 * any items left to process. */
549 void drain(WorkQueue_* wq = 0);
550 };
551
552 class GenContextWQ :
553 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
554 std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
555 public:
556 GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
557 : ThreadPool::WorkQueueVal<
558 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
559
560 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
561 _queue.push_back(c);
562 }
563 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
564 _queue.push_front(c);
565 }
566 bool _empty() override {
567 return _queue.empty();
568 }
569 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
570 ceph_assert(!_queue.empty());
571 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
572 _queue.pop_front();
573 return c;
574 }
575 void _process(GenContext<ThreadPool::TPHandle&> *c,
576 ThreadPool::TPHandle &tp) override {
577 c->complete(tp);
578 }
579 };
580
581 class C_QueueInWQ : public Context {
582 GenContextWQ *wq;
583 GenContext<ThreadPool::TPHandle&> *c;
584 public:
585 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
586 : wq(wq), c(c) {}
587 void finish(int) override {
588 wq->queue(c);
589 }
590 };
591
592 /// Work queue that asynchronously completes contexts (executes callbacks).
593 /// @see Finisher
594 class ContextWQ : public ThreadPool::PointerWQ<Context> {
595 public:
596 ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
597 : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
598 this->register_work_queue();
599 }
600
601 void queue(Context *ctx, int result = 0) {
602 if (result != 0) {
603 std::lock_guard locker(m_lock);
604 m_context_results[ctx] = result;
605 }
606 ThreadPool::PointerWQ<Context>::queue(ctx);
607 }
608 protected:
609 void _clear() override {
610 ThreadPool::PointerWQ<Context>::_clear();
611
612 std::lock_guard locker(m_lock);
613 m_context_results.clear();
614 }
615
616 void process(Context *ctx) override {
617 int result = 0;
618 {
619 std::lock_guard locker(m_lock);
620 ceph::unordered_map<Context *, int>::iterator it =
621 m_context_results.find(ctx);
622 if (it != m_context_results.end()) {
623 result = it->second;
624 m_context_results.erase(it);
625 }
626 }
627 ctx->complete(result);
628 }
629 private:
630 ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
631 ceph::unordered_map<Context*, int> m_context_results;
632 };
633
634 class ShardedThreadPool {
635
636 CephContext *cct;
637 std::string name;
638 std::string thread_name;
639 std::string lockname;
640 ceph::mutex shardedpool_lock;
641 ceph::condition_variable shardedpool_cond;
642 ceph::condition_variable wait_cond;
643 uint32_t num_threads;
644
645 std::atomic<bool> stop_threads = { false };
646 std::atomic<bool> pause_threads = { false };
647 std::atomic<bool> drain_threads = { false };
648
649 uint32_t num_paused;
650 uint32_t num_drained;
651
652 public:
653
654 class BaseShardedWQ {
655
656 public:
657 time_t timeout_interval, suicide_interval;
658 BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
659 virtual ~BaseShardedWQ() {}
660
661 virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
662 virtual void return_waiting_threads() = 0;
663 virtual void stop_return_waiting_threads() = 0;
664 virtual bool is_shard_empty(uint32_t thread_index) = 0;
665 };
666
667 template <typename T>
668 class ShardedWQ: public BaseShardedWQ {
669
670 ShardedThreadPool* sharded_pool;
671
672 protected:
673 virtual void _enqueue(T&&) = 0;
674 virtual void _enqueue_front(T&&) = 0;
675
676
677 public:
678 ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
679 sharded_pool(tp) {
680 tp->set_wq(this);
681 }
682 ~ShardedWQ() override {}
683
684 void queue(T&& item) {
685 _enqueue(std::move(item));
686 }
687 void queue_front(T&& item) {
688 _enqueue_front(std::move(item));
689 }
690 void drain() {
691 sharded_pool->drain();
692 }
693
694 };
695
696 private:
697
698 BaseShardedWQ* wq;
699 // threads
700 struct WorkThreadSharded : public Thread {
701 ShardedThreadPool *pool;
702 uint32_t thread_index;
703 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
704 thread_index(pthread_index) {}
705 void *entry() override {
706 pool->shardedthreadpool_worker(thread_index);
707 return 0;
708 }
709 };
710
711 std::vector<WorkThreadSharded*> threads_shardedpool;
712 void start_threads();
713 void shardedthreadpool_worker(uint32_t thread_index);
714 void set_wq(BaseShardedWQ* swq) {
715 wq = swq;
716 }
717
718
719
720 public:
721
722 ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
723
724 ~ShardedThreadPool(){};
725
726 /// start thread pool thread
727 void start();
728 /// stop thread pool thread
729 void stop();
730 /// pause thread pool (if it not already paused)
731 void pause();
732 /// pause initiation of new work
733 void pause_new();
734 /// resume work in thread pool. must match each pause() call 1:1 to resume.
735 void unpause();
736 /// wait for all work to complete
737 void drain();
738
739 };
740
741 #endif
742
743 #endif
744