1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#ifndef CEPH_WORKQUEUE_H
16   	#define CEPH_WORKQUEUE_H
17   	
18   	#ifdef WITH_SEASTAR
19   	// for ObjectStore.h
20   	struct ThreadPool {
21   	  struct TPHandle {
22   	  };
23   	};
24   	
25   	#else
26   	
27   	#include <atomic>
28   	#include <list>
29   	#include <set>
30   	#include <string>
31   	#include <vector>
32   	
33   	#include "common/ceph_mutex.h"
34   	#include "include/unordered_map.h"
35   	#include "common/config_obs.h"
36   	#include "common/HeartbeatMap.h"
37   	#include "common/Thread.h"
38   	#include "include/Context.h"
39   	#include "common/HBHandle.h"
40   	
41   	class CephContext;
42   	
43   	/// Pool of threads that share work submitted to multiple work queues.
44   	class ThreadPool : public md_config_obs_t {
45   	protected:
46   	  CephContext *cct;
47   	  std::string name;
48   	  std::string thread_name;
49   	  std::string lockname;
50   	  ceph::mutex _lock;
51   	  ceph::condition_variable _cond;
52   	  bool _stop;
53   	  int _pause;
54   	  int _draining;
55   	  ceph::condition_variable _wait_cond;
56   	
57   	public:
58   	  class TPHandle : public HBHandle {
59   	    friend class ThreadPool;
60   	    CephContext *cct;
61   	    ceph::heartbeat_handle_d *hb;
62   	    ceph::coarse_mono_clock::rep grace;
63   	    ceph::coarse_mono_clock::rep suicide_grace;
64   	  public:
65   	    TPHandle(
66   	      CephContext *cct,
67   	      ceph::heartbeat_handle_d *hb,
68   	      time_t grace,
69   	      time_t suicide_grace)
70   	      : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
71   	    void reset_tp_timeout() override final;
72   	    void suspend_tp_timeout() override final;
73   	  };
74   	protected:
75   	
76   	  /// Basic interface to a work queue used by the worker threads.
77   	  struct WorkQueue_ {
78   	    std::string name;
79   	    time_t timeout_interval, suicide_interval;
80   	    WorkQueue_(std::string n, time_t ti, time_t sti)
81   	      : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
82   	    { }
83   	    virtual ~WorkQueue_() {}
84   	    /// Remove all work items from the queue.
85   	    virtual void _clear() = 0;
86   	    /// Check whether there is anything to do.
87   	    virtual bool _empty() = 0;
88   	    /// Get the next work item to process.
89   	    virtual void *_void_dequeue() = 0;
90   	    /** @brief Process the work item.
91   	     * This function will be called several times in parallel
92   	     * and must therefore be thread-safe. */
93   	    virtual void _void_process(void *item, TPHandle &handle) = 0;
94   	    /** @brief Synchronously finish processing a work item.
95   	     * This function is called after _void_process with the global thread pool lock held,
96   	     * so at most one copy will execute simultaneously for a given thread pool.
97   	     * It can be used for non-thread-safe finalization. */
98   	    virtual void _void_process_finish(void *) = 0;
99   	  };
100  	
101  	  // track thread pool size changes
102  	  unsigned _num_threads;
103  	  std::string _thread_num_option;
104  	  const char **_conf_keys;
105  	
106  	  const char **get_tracked_conf_keys() const override {
107  	    return _conf_keys;
108  	  }
109  	  void handle_conf_change(const ConfigProxy& conf,
110  				  const std::set <std::string> &changed) override;
111  	
112  	public:
113  	  /** @brief Work queue that processes several submitted items at once.
114  	   * The queue will automatically add itself to the thread pool on construction
115  	   * and remove itself on destruction. */
116  	  template<class T>
117  	  class BatchWorkQueue : public WorkQueue_ {
118  	    ThreadPool *pool;
119  	
120  	    virtual bool _enqueue(T *) = 0;
121  	    virtual void _dequeue(T *) = 0;
122  	    virtual void _dequeue(std::list<T*> *) = 0;
123  	    virtual void _process_finish(const std::list<T*> &) {}
124  	
125  	    // virtual methods from WorkQueue_ below
126  	    void *_void_dequeue() override {
127  	      std::list<T*> *out(new std::list<T*>);
128  	      _dequeue(out);
129  	      if (!out->empty()) {
130  		return (void *)out;
131  	      } else {
132  		delete out;
133  		return 0;
134  	      }
135  	    }
136  	    void _void_process(void *p, TPHandle &handle) override {
137  	      _process(*((std::list<T*>*)p), handle);
138  	    }
139  	    void _void_process_finish(void *p) override {
140  	      _process_finish(*(std::list<T*>*)p);
141  	      delete (std::list<T*> *)p;
142  	    }
143  	
144  	  protected:
145  	    virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
146  	
147  	  public:
148  	    BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
149  	      : WorkQueue_(std::move(n), ti, sti), pool(p) {
150  	      pool->add_work_queue(this);
151  	    }
152  	    ~BatchWorkQueue() override {
153  	      pool->remove_work_queue(this);
154  	    }
155  	
156  	    bool queue(T *item) {
157  	      pool->_lock.lock();
158  	      bool r = _enqueue(item);
159  	      pool->_cond.notify_one();
160  	      pool->_lock.unlock();
161  	      return r;
162  	    }
163  	    void dequeue(T *item) {
164  	      pool->_lock.lock();
165  	      _dequeue(item);
166  	      pool->_lock.unlock();
167  	    }
168  	    void clear() {
169  	      pool->_lock.lock();
170  	      _clear();
171  	      pool->_lock.unlock();
172  	    }
173  	
174  	    void lock() {
175  	      pool->lock();
176  	    }
177  	    void unlock() {
178  	      pool->unlock();
179  	    }
180  	    void wake() {
181  	      pool->wake();
182  	    }
183  	    void _wake() {
184  	      pool->_wake();
185  	    }
186  	    void drain() {
187  	      pool->drain(this);
188  	    }
189  	
190  	  };
191  	
192  	  /** @brief Templated by-value work queue.
193  	   * Skeleton implementation of a queue that processes items submitted by value.
194  	   * This is useful if the items are single primitive values or very small objects
195  	   * (a few bytes). The queue will automatically add itself to the thread pool on
196  	   * construction and remove itself on destruction. */
197  	  template<typename T, typename U = T>
198  	  class WorkQueueVal : public WorkQueue_ {
199  	    ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
200  	    ThreadPool *pool;
201  	    std::list<U> to_process;
202  	    std::list<U> to_finish;
203  	    virtual void _enqueue(T) = 0;
204  	    virtual void _enqueue_front(T) = 0;
205  	    bool _empty() override = 0;
206  	    virtual U _dequeue() = 0;
207  	    virtual void _process_finish(U) {}
208  	
209  	    void *_void_dequeue() override {
210  	      {
211  		std::lock_guard l(_lock);
212  		if (_empty())
213  		  return 0;
214  		U u = _dequeue();
215  		to_process.push_back(u);
216  	      }
217  	      return ((void*)1); // Not used
218  	    }
219  	    void _void_process(void *, TPHandle &handle) override {
220  	      _lock.lock();
221  	      ceph_assert(!to_process.empty());
222  	      U u = to_process.front();
223  	      to_process.pop_front();
224  	      _lock.unlock();
225  	
226  	      _process(u, handle);
227  	
228  	      _lock.lock();
229  	      to_finish.push_back(u);
230  	      _lock.unlock();
231  	    }
232  	
233  	    void _void_process_finish(void *) override {
234  	      _lock.lock();
235  	      ceph_assert(!to_finish.empty());
236  	      U u = to_finish.front();
237  	      to_finish.pop_front();
238  	      _lock.unlock();
239  	
240  	      _process_finish(u);
241  	    }
242  	
243  	    void _clear() override {}
244  	
245  	  public:
246  	    WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
247  	      : WorkQueue_(std::move(n), ti, sti), pool(p) {
248  	      pool->add_work_queue(this);
249  	    }
250  	    ~WorkQueueVal() override {
251  	      pool->remove_work_queue(this);
252  	    }
253  	    void queue(T item) {
254  	      std::lock_guard l(pool->_lock);
255  	      _enqueue(item);
256  	      pool->_cond.notify_one();
257  	    }
258  	    void queue_front(T item) {
259  	      std::lock_guard l(pool->_lock);
260  	      _enqueue_front(item);
261  	      pool->_cond.notify_one();
262  	    }
263  	    void drain() {
264  	      pool->drain(this);
265  	    }
266  	  protected:
267  	    void lock() {
268  	      pool->lock();
269  	    }
270  	    void unlock() {
271  	      pool->unlock();
272  	    }
273  	    virtual void _process(U u, TPHandle &) = 0;
274  	  };
275  	
276  	  /** @brief Template by-pointer work queue.
277  	   * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
278  	   * This is useful when the work item are large or include dynamically allocated memory. The queue
279  	   * will automatically add itself to the thread pool on construction and remove itself on
280  	   * destruction. */
281  	  template<class T>
282  	  class WorkQueue : public WorkQueue_ {
283  	    ThreadPool *pool;
284  	    
285  	    /// Add a work item to the queue.
286  	    virtual bool _enqueue(T *) = 0;
287  	    /// Dequeue a previously submitted work item.
288  	    virtual void _dequeue(T *) = 0;
289  	    /// Dequeue a work item and return the original submitted pointer.
290  	    virtual T *_dequeue() = 0;
291  	    virtual void _process_finish(T *) {}
292  	
293  	    // implementation of virtual methods from WorkQueue_
294  	    void *_void_dequeue() override {
295  	      return (void *)_dequeue();
296  	    }
297  	    void _void_process(void *p, TPHandle &handle) override {
298  	      _process(static_cast<T *>(p), handle);
299  	    }
300  	    void _void_process_finish(void *p) override {
301  	      _process_finish(static_cast<T *>(p));
302  	    }
303  	
304  	  protected:
305  	    /// Process a work item. Called from the worker threads.
306  	    virtual void _process(T *t, TPHandle &) = 0;
307  	
308  	  public:
309  	    WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
310  	      : WorkQueue_(std::move(n), ti, sti), pool(p) {
311  	      pool->add_work_queue(this);
312  	    }
313  	    ~WorkQueue() override {
314  	      pool->remove_work_queue(this);
315  	    }
316  	    
317  	    bool queue(T *item) {
318  	      pool->_lock.lock();
319  	      bool r = _enqueue(item);
320  	      pool->_cond.notify_one();
321  	      pool->_lock.unlock();
322  	      return r;
323  	    }
324  	    void dequeue(T *item) {
325  	      pool->_lock.lock();
326  	      _dequeue(item);
327  	      pool->_lock.unlock();
328  	    }
329  	    void clear() {
330  	      pool->_lock.lock();
331  	      _clear();
332  	      pool->_lock.unlock();
333  	    }
334  	
335  	    void lock() {
336  	      pool->lock();
337  	    }
338  	    void unlock() {
339  	      pool->unlock();
340  	    }
341  	    /// wake up the thread pool (without lock held)
342  	    void wake() {
343  	      pool->wake();
344  	    }
345  	    /// wake up the thread pool (with lock already held)
346  	    void _wake() {
347  	      pool->_wake();
348  	    }
349  	    void _wait() {
350  	      pool->_wait();
351  	    }
352  	    void drain() {
353  	      pool->drain(this);
354  	    }
355  	
356  	  };
357  	
358  	  template<typename T>
359  	  class PointerWQ : public WorkQueue_ {
360  	  public:
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
361  	    ~PointerWQ() override {
362  	      m_pool->remove_work_queue(this);
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
363  	      ceph_assert(m_processing == 0);
364  	    }
365  	    void drain() {
366  	      {
367  	        // if this queue is empty and not processing, don't wait for other
368  	        // queues to finish processing
369  	        std::lock_guard l(m_pool->_lock);
370  	        if (m_processing == 0 && m_items.empty()) {
371  	          return;
372  	        }
373  	      }
374  	      m_pool->drain(this);
375  	    }
376  	    void queue(T *item) {
377  	      std::lock_guard l(m_pool->_lock);
378  	      m_items.push_back(item);
379  	      m_pool->_cond.notify_one();
380  	    }
381  	    bool empty() {
382  	      std::lock_guard l(m_pool->_lock);
383  	      return _empty();
384  	    }
385  	  protected:
386  	    PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
387  	      : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
388  	    }
389  	    void register_work_queue() {
390  	      m_pool->add_work_queue(this);
391  	    }
392  	    void _clear() override {
393  	      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
394  	      m_items.clear();
395  	    }
396  	    bool _empty() override {
397  	      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
398  	      return m_items.empty();
399  	    }
400  	    void *_void_dequeue() override {
401  	      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
402  	      if (m_items.empty()) {
403  	        return NULL;
404  	      }
405  	
406  	      ++m_processing;
407  	      T *item = m_items.front();
408  	      m_items.pop_front();
409  	      return item;
410  	    }
411  	    void _void_process(void *item, ThreadPool::TPHandle &handle) override {
412  	      process(reinterpret_cast<T *>(item));
413  	    }
414  	    void _void_process_finish(void *item) override {
415  	      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
416  	      ceph_assert(m_processing > 0);
417  	      --m_processing;
418  	    }
419  	
420  	    virtual void process(T *item) = 0;
421  	    void process_finish() {
422  	      std::lock_guard locker(m_pool->_lock);
423  	      _void_process_finish(nullptr);
424  	    }
425  	
426  	    T *front() {
427  	      ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
428  	      if (m_items.empty()) {
429  	        return NULL;
430  	      }
431  	      return m_items.front();
432  	    }
433  	    void requeue_front(T *item) {
434  	      std::lock_guard pool_locker(m_pool->_lock);
435  	      _void_process_finish(nullptr);
436  	      m_items.push_front(item);
437  	    }
438  	    void requeue_back(T *item) {
439  	      std::lock_guard pool_locker(m_pool->_lock);
440  	      _void_process_finish(nullptr);
441  	      m_items.push_back(item);
442  	    }
443  	    void signal() {
444  	      std::lock_guard pool_locker(m_pool->_lock);
445  	      m_pool->_cond.notify_one();
446  	    }
447  	    ceph::mutex &get_pool_lock() {
448  	      return m_pool->_lock;
449  	    }
450  	  private:
451  	    ThreadPool *m_pool;
452  	    std::list<T *> m_items;
453  	    uint32_t m_processing;
454  	  };
455  	protected:
456  	  std::vector<WorkQueue_*> work_queues;
457  	  int next_work_queue = 0;
458  	 
459  	
460  	  // threads
461  	  struct WorkThread : public Thread {
462  	    ThreadPool *pool;
463  	    // cppcheck-suppress noExplicitConstructor
464  	    WorkThread(ThreadPool *p) : pool(p) {}
465  	    void *entry() override {
466  	      pool->worker(this);
467  	      return 0;
468  	    }
469  	  };
470  	  
471  	  std::set<WorkThread*> _threads;
472  	  std::list<WorkThread*> _old_threads;  ///< need to be joined
473  	  int processing;
474  	
475  	  void start_threads();
476  	  void join_old_threads();
477  	  virtual void worker(WorkThread *wt);
478  	
479  	public:
480  	  ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
481  	  ~ThreadPool() override;
482  	
483  	  /// return number of threads currently running
484  	  int get_num_threads() {
485  	    std::lock_guard l(_lock);
486  	    return _num_threads;
487  	  }
488  	  
489  	  /// assign a work queue to this thread pool
490  	  void add_work_queue(WorkQueue_* wq) {
491  	    std::lock_guard l(_lock);
492  	    work_queues.push_back(wq);
493  	  }
494  	  /// remove a work queue from this thread pool
495  	  void remove_work_queue(WorkQueue_* wq) {
496  	    std::lock_guard l(_lock);
497  	    unsigned i = 0;
498  	    while (work_queues[i] != wq)
499  	      i++;
500  	    for (i++; i < work_queues.size(); i++) 
501  	      work_queues[i-1] = work_queues[i];
502  	    ceph_assert(i == work_queues.size());
503  	    work_queues.resize(i-1);
504  	  }
505  	
506  	  /// take thread pool lock
507  	  void lock() {
508  	    _lock.lock();
509  	  }
510  	  /// release thread pool lock
511  	  void unlock() {
512  	    _lock.unlock();
513  	  }
514  	
515  	  /// wait for a kick on this thread pool
516  	  void wait(ceph::condition_variable &c) {
517  	    std::unique_lock l(_lock, std::adopt_lock);
518  	    c.wait(l);
519  	  }
520  	
521  	  /// wake up a waiter (with lock already held)
522  	  void _wake() {
523  	    _cond.notify_all();
524  	  }
525  	  /// wake up a waiter (without lock held)
526  	  void wake() {
527  	    std::lock_guard l(_lock);
528  	    _cond.notify_all();
529  	  }
530  	  void _wait() {
531  	    std::unique_lock l(_lock, std::adopt_lock);
532  	    _cond.wait(l);
533  	  }
534  	
535  	  /// start thread pool thread
536  	  void start();
537  	  /// stop thread pool thread
538  	  void stop(bool clear_after=true);
539  	  /// pause thread pool (if it not already paused)
540  	  void pause();
541  	  /// pause initiation of new work
542  	  void pause_new();
543  	  /// resume work in thread pool.  must match each pause() call 1:1 to resume.
544  	  void unpause();
545  	  /** @brief Wait until work completes.
546  	   * If the parameter is NULL, blocks until all threads are idle.
547  	   * If it is not NULL, blocks until the given work queue does not have
548  	   * any items left to process. */
549  	  void drain(WorkQueue_* wq = 0);
550  	};
551  	
552  	class GenContextWQ :
553  	  public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
554  	  std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
555  	public:
556  	  GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
557  	    : ThreadPool::WorkQueueVal<
558  	      GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
559  	  
560  	  void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
561  	    _queue.push_back(c);
562  	  }
563  	  void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
564  	    _queue.push_front(c);
565  	  }
566  	  bool _empty() override {
567  	    return _queue.empty();
568  	  }
569  	  GenContext<ThreadPool::TPHandle&> *_dequeue() override {
570  	    ceph_assert(!_queue.empty());
571  	    GenContext<ThreadPool::TPHandle&> *c = _queue.front();
572  	    _queue.pop_front();
573  	    return c;
574  	  }
575  	  void _process(GenContext<ThreadPool::TPHandle&> *c,
576  			ThreadPool::TPHandle &tp) override {
577  	    c->complete(tp);
578  	  }
579  	};
580  	
581  	class C_QueueInWQ : public Context {
582  	  GenContextWQ *wq;
583  	  GenContext<ThreadPool::TPHandle&> *c;
584  	public:
585  	  C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
586  	    : wq(wq), c(c) {}
587  	  void finish(int) override {
588  	    wq->queue(c);
589  	  }
590  	};
591  	
592  	/// Work queue that asynchronously completes contexts (executes callbacks).
593  	/// @see Finisher
594  	class ContextWQ : public ThreadPool::PointerWQ<Context> {
595  	public:
596  	  ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
597  	    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
598  	    this->register_work_queue();
599  	  }
600  	
601  	  void queue(Context *ctx, int result = 0) {
602  	    if (result != 0) {
603  	      std::lock_guard locker(m_lock);
604  	      m_context_results[ctx] = result;
605  	    }
606  	    ThreadPool::PointerWQ<Context>::queue(ctx);
607  	  }
608  	protected:
609  	  void _clear() override {
610  	    ThreadPool::PointerWQ<Context>::_clear();
611  	
612  	    std::lock_guard locker(m_lock);
613  	    m_context_results.clear();
614  	  }
615  	
616  	  void process(Context *ctx) override {
617  	    int result = 0;
618  	    {
619  	      std::lock_guard locker(m_lock);
620  	      ceph::unordered_map<Context *, int>::iterator it =
621  	        m_context_results.find(ctx);
622  	      if (it != m_context_results.end()) {
623  	        result = it->second;
624  	        m_context_results.erase(it);
625  	      }
626  	    }
627  	    ctx->complete(result);
628  	  }
629  	private:
630  	  ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
631  	  ceph::unordered_map<Context*, int> m_context_results;
632  	};
633  	
634  	class ShardedThreadPool {
635  	
636  	  CephContext *cct;
637  	  std::string name;
638  	  std::string thread_name;
639  	  std::string lockname;
640  	  ceph::mutex shardedpool_lock;
641  	  ceph::condition_variable shardedpool_cond;
642  	  ceph::condition_variable wait_cond;
643  	  uint32_t num_threads;
644  	
645  	  std::atomic<bool> stop_threads = { false };
646  	  std::atomic<bool> pause_threads = { false };
647  	  std::atomic<bool> drain_threads = { false };
648  	
649  	  uint32_t num_paused;
650  	  uint32_t num_drained;
651  	
652  	public:
653  	
654  	  class BaseShardedWQ {
655  	  
656  	  public:
657  	    time_t timeout_interval, suicide_interval;
658  	    BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
659  	    virtual ~BaseShardedWQ() {}
660  	
661  	    virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
662  	    virtual void return_waiting_threads() = 0;
663  	    virtual void stop_return_waiting_threads() = 0;
664  	    virtual bool is_shard_empty(uint32_t thread_index) = 0;
665  	  };
666  	
667  	  template <typename T>
668  	  class ShardedWQ: public BaseShardedWQ {
669  	  
670  	    ShardedThreadPool* sharded_pool;
671  	
672  	  protected:
673  	    virtual void _enqueue(T&&) = 0;
674  	    virtual void _enqueue_front(T&&) = 0;
675  	
676  	
677  	  public:
678  	    ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), 
679  	                                                                 sharded_pool(tp) {
680  	      tp->set_wq(this);
681  	    }
682  	    ~ShardedWQ() override {}
683  	
684  	    void queue(T&& item) {
685  	      _enqueue(std::move(item));
686  	    }
687  	    void queue_front(T&& item) {
688  	      _enqueue_front(std::move(item));
689  	    }
690  	    void drain() {
691  	      sharded_pool->drain();
692  	    }
693  	    
694  	  };
695  	
696  	private:
697  	
698  	  BaseShardedWQ* wq;
699  	  // threads
700  	  struct WorkThreadSharded : public Thread {
701  	    ShardedThreadPool *pool;
702  	    uint32_t thread_index;
703  	    WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
704  	      thread_index(pthread_index) {}
705  	    void *entry() override {
706  	      pool->shardedthreadpool_worker(thread_index);
707  	      return 0;
708  	    }
709  	  };
710  	
711  	  std::vector<WorkThreadSharded*> threads_shardedpool;
712  	  void start_threads();
713  	  void shardedthreadpool_worker(uint32_t thread_index);
714  	  void set_wq(BaseShardedWQ* swq) {
715  	    wq = swq;
716  	  }
717  	
718  	
719  	
720  	public:
721  	
722  	  ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
723  	
724  	  ~ShardedThreadPool(){};
725  	
726  	  /// start thread pool thread
727  	  void start();
728  	  /// stop thread pool thread
729  	  void stop();
730  	  /// pause thread pool (if it not already paused)
731  	  void pause();
732  	  /// pause initiation of new work
733  	  void pause_new();
734  	  /// resume work in thread pool.  must match each pause() call 1:1 to resume.
735  	  void unpause();
736  	  /// wait for all work to complete
737  	  void drain();
738  	
739  	};
740  	
741  	#endif
742  	
743  	#endif
744