1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#include "WorkQueue.h"
16   	#include "include/compat.h"
17   	#include "common/errno.h"
18   	
19   	#define dout_subsys ceph_subsys_tp
20   	#undef dout_prefix
21   	#define dout_prefix *_dout << name << " "
22   	
23   	
24   	ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25   	  : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26   	    lockname(name + "::lock"),
27   	    _lock(ceph::make_mutex(lockname)),  // this should be safe due to declaration order
28   	    _stop(false),
29   	    _pause(0),
30   	    _draining(0),
31   	    _num_threads(n),
32   	    processing(0)
33   	{
34   	  if (option) {
35   	    _thread_num_option = option;
36   	    // set up conf_keys
37   	    _conf_keys = new const char*[2];
38   	    _conf_keys[0] = _thread_num_option.c_str();
39   	    _conf_keys[1] = NULL;
40   	  } else {
41   	    _conf_keys = new const char*[1];
42   	    _conf_keys[0] = NULL;
43   	  }
44   	}
45   	
46   	void ThreadPool::TPHandle::suspend_tp_timeout()
47   	{
48   	  cct->get_heartbeat_map()->clear_timeout(hb);
49   	}
50   	
51   	void ThreadPool::TPHandle::reset_tp_timeout()
52   	{
53   	  cct->get_heartbeat_map()->reset_timeout(
54   	    hb, grace, suicide_grace);
55   	}
56   	
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
57   	ThreadPool::~ThreadPool()
58   	{
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
59   	  ceph_assert(_threads.empty());
60   	  delete[] _conf_keys;
61   	}
62   	
63   	void ThreadPool::handle_conf_change(const ConfigProxy& conf,
64   					    const std::set <std::string> &changed)
65   	{
66   	  if (changed.count(_thread_num_option)) {
67   	    char *buf;
68   	    int r = conf.get_val(_thread_num_option.c_str(), &buf, -1);
69   	    ceph_assert(r >= 0);
70   	    int v = atoi(buf);
71   	    free(buf);
72   	    if (v >= 0) {
73   	      _lock.lock();
74   	      _num_threads = v;
75   	      start_threads();
76   	      _cond.notify_all();
77   	      _lock.unlock();
78   	    }
79   	  }
80   	}
81   	
82   	void ThreadPool::worker(WorkThread *wt)
83   	{
84   	  std::unique_lock ul(_lock);
85   	  ldout(cct,10) << "worker start" << dendl;
86   	  
87   	  std::stringstream ss;
88   	  ss << name << " thread " << (void *)pthread_self();
89   	  heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
90   	
91   	  while (!_stop) {
92   	
93   	    // manage dynamic thread pool
94   	    join_old_threads();
95   	    if (_threads.size() > _num_threads) {
96   	      ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
97   	      _threads.erase(wt);
98   	      _old_threads.push_back(wt);
99   	      break;
100  	    }
101  	
102  	    if (!_pause && !work_queues.empty()) {
103  	      WorkQueue_* wq;
104  	      int tries = 2 * work_queues.size();
105  	      bool did = false;
106  	      while (tries--) {
107  		next_work_queue %= work_queues.size();
108  		wq = work_queues[next_work_queue++];
109  		
110  		void *item = wq->_void_dequeue();
111  		if (item) {
112  		  processing++;
113  		  ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
114  				<< " (" << processing << " active)" << dendl;
115  		  ul.unlock();
116  		  TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
117  		  tp_handle.reset_tp_timeout();
118  		  wq->_void_process(item, tp_handle);
119  		  ul.lock();
120  		  wq->_void_process_finish(item);
121  		  processing--;
122  		  ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
123  				<< " (" << processing << " active)" << dendl;
124  		  if (_pause || _draining)
125  		    _wait_cond.notify_all();
126  		  did = true;
127  		  break;
128  		}
129  	      }
130  	      if (did)
131  		continue;
132  	    }
133  	
134  	    ldout(cct,20) << "worker waiting" << dendl;
135  	    cct->get_heartbeat_map()->reset_timeout(
136  	      hb,
137  	      cct->_conf->threadpool_default_timeout,
138  	      0);
139  	    auto wait = std::chrono::seconds(
140  	      cct->_conf->threadpool_empty_queue_max_wait);
141  	    _cond.wait_for(ul, wait);
142  	  }
143  	  ldout(cct,1) << "worker finish" << dendl;
144  	
145  	  cct->get_heartbeat_map()->remove_worker(hb);
146  	}
147  	
148  	void ThreadPool::start_threads()
149  	{
150  	  ceph_assert(ceph_mutex_is_locked(_lock));
151  	  while (_threads.size() < _num_threads) {
152  	    WorkThread *wt = new WorkThread(this);
153  	    ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
154  	    _threads.insert(wt);
155  	
156  	    wt->create(thread_name.c_str());
157  	  }
158  	}
159  	
160  	void ThreadPool::join_old_threads()
161  	{
162  	  ceph_assert(ceph_mutex_is_locked(_lock));
163  	  while (!_old_threads.empty()) {
164  	    ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
165  	    _old_threads.front()->join();
166  	    delete _old_threads.front();
167  	    _old_threads.pop_front();
168  	  }
169  	}
170  	
171  	void ThreadPool::start()
172  	{
173  	  ldout(cct,10) << "start" << dendl;
174  	
175  	  if (_thread_num_option.length()) {
176  	    ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
177  	    cct->_conf.add_observer(this);
178  	  }
179  	
180  	  _lock.lock();
181  	  start_threads();
182  	  _lock.unlock();
183  	  ldout(cct,15) << "started" << dendl;
184  	}
185  	
186  	void ThreadPool::stop(bool clear_after)
187  	{
188  	  ldout(cct,10) << "stop" << dendl;
189  	
190  	  if (_thread_num_option.length()) {
191  	    ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
192  	    cct->_conf.remove_observer(this);
193  	  }
194  	
195  	  _lock.lock();
196  	  _stop = true;
197  	  _cond.notify_all();
198  	  join_old_threads();
199  	  _lock.unlock();
200  	  for (set<WorkThread*>::iterator p = _threads.begin();
201  	       p != _threads.end();
202  	       ++p) {
203  	    (*p)->join();
204  	    delete *p;
205  	  }
206  	  _threads.clear();
207  	  _lock.lock();
208  	  for (unsigned i=0; i<work_queues.size(); i++)
209  	    work_queues[i]->_clear();
210  	  _stop = false;
211  	  _lock.unlock();
212  	  ldout(cct,15) << "stopped" << dendl;
213  	}
214  	
215  	void ThreadPool::pause()
216  	{
217  	  std::unique_lock ul(_lock);
218  	  ldout(cct,10) << "pause" << dendl;
219  	  _pause++;
220  	  while (processing) {
221  	    _wait_cond.wait(ul);
222  	  }
223  	  ldout(cct,15) << "paused" << dendl;
224  	}
225  	
226  	void ThreadPool::pause_new()
227  	{
228  	  ldout(cct,10) << "pause_new" << dendl;
229  	  _lock.lock();
230  	  _pause++;
231  	  _lock.unlock();
232  	}
233  	
234  	void ThreadPool::unpause()
235  	{
236  	  ldout(cct,10) << "unpause" << dendl;
237  	  _lock.lock();
238  	  ceph_assert(_pause > 0);
239  	  _pause--;
240  	  _cond.notify_all();
241  	  _lock.unlock();
242  	}
243  	
244  	void ThreadPool::drain(WorkQueue_* wq)
245  	{
246  	  std::unique_lock ul(_lock);
247  	  ldout(cct,10) << "drain" << dendl;
248  	  _draining++;
249  	  while (processing || (wq != NULL && !wq->_empty())) {
250  	    _wait_cond.wait(ul);
251  	  }
252  	  _draining--;
253  	}
254  	
255  	ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
256  	  uint32_t pnum_threads):
257  	  cct(pcct_),
258  	  name(std::move(nm)),
259  	  thread_name(std::move(tn)),
260  	  lockname(name + "::lock"),
261  	  shardedpool_lock(ceph::make_mutex(lockname)),
262  	  num_threads(pnum_threads),
263  	  num_paused(0),
264  	  num_drained(0),
265  	  wq(NULL) {}
266  	
267  	void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
268  	{
269  	  ceph_assert(wq != NULL);
270  	  ldout(cct,10) << "worker start" << dendl;
271  	
272  	  std::stringstream ss;
273  	  ss << name << " thread " << (void *)pthread_self();
274  	  heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
275  	
276  	  while (!stop_threads) {
277  	    if (pause_threads) {
278  	      std::unique_lock ul(shardedpool_lock);
279  	      ++num_paused;
280  	      wait_cond.notify_all();
281  	      while (pause_threads) {
282  	       cct->get_heartbeat_map()->reset_timeout(
283  		        hb,
284  		        wq->timeout_interval, wq->suicide_interval);
285  	       shardedpool_cond.wait_for(
286  		 ul,
287  		 std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
288  	      }
289  	      --num_paused;
290  	    }
291  	    if (drain_threads) {
292  	      std::unique_lock ul(shardedpool_lock);
293  	      if (wq->is_shard_empty(thread_index)) {
294  	        ++num_drained;
295  	        wait_cond.notify_all();
296  	        while (drain_threads) {
297  		  cct->get_heartbeat_map()->reset_timeout(
298  		    hb,
299  		    wq->timeout_interval, wq->suicide_interval);
300  	          shardedpool_cond.wait_for(
301  		    ul,
302  		    std::chrono::seconds(cct->_conf->threadpool_empty_queue_max_wait));
303  	        }
304  	        --num_drained;
305  	      }
306  	    }
307  	
308  	    cct->get_heartbeat_map()->reset_timeout(
309  	      hb,
310  	      wq->timeout_interval, wq->suicide_interval);
311  	    wq->_process(thread_index, hb);
312  	
313  	  }
314  	
315  	  ldout(cct,10) << "sharded worker finish" << dendl;
316  	
317  	  cct->get_heartbeat_map()->remove_worker(hb);
318  	
319  	}
320  	
321  	void ShardedThreadPool::start_threads()
322  	{
323  	  ceph_assert(ceph_mutex_is_locked(shardedpool_lock));
324  	  int32_t thread_index = 0;
325  	  while (threads_shardedpool.size() < num_threads) {
326  	
327  	    WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
328  	    ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
329  	    threads_shardedpool.push_back(wt);
330  	    wt->create(thread_name.c_str());
331  	    thread_index++;
332  	  }
333  	}
334  	
335  	void ShardedThreadPool::start()
336  	{
337  	  ldout(cct,10) << "start" << dendl;
338  	
339  	  shardedpool_lock.lock();
340  	  start_threads();
341  	  shardedpool_lock.unlock();
342  	  ldout(cct,15) << "started" << dendl;
343  	}
344  	
345  	void ShardedThreadPool::stop()
346  	{
347  	  ldout(cct,10) << "stop" << dendl;
348  	  stop_threads = true;
349  	  ceph_assert(wq != NULL);
350  	  wq->return_waiting_threads();
351  	  for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
352  	       p != threads_shardedpool.end();
353  	       ++p) {
354  	    (*p)->join();
355  	    delete *p;
356  	  }
357  	  threads_shardedpool.clear();
358  	  ldout(cct,15) << "stopped" << dendl;
359  	}
360  	
361  	void ShardedThreadPool::pause()
362  	{
363  	  std::unique_lock ul(shardedpool_lock);
364  	  ldout(cct,10) << "pause" << dendl;
365  	  pause_threads = true;
366  	  ceph_assert(wq != NULL);
367  	  wq->return_waiting_threads();
368  	  while (num_threads != num_paused){
369  	    wait_cond.wait(ul);
370  	  }
371  	  ldout(cct,10) << "paused" << dendl; 
372  	}
373  	
374  	void ShardedThreadPool::pause_new()
375  	{
376  	  ldout(cct,10) << "pause_new" << dendl;
377  	  shardedpool_lock.lock();
378  	  pause_threads = true;
379  	  ceph_assert(wq != NULL);
380  	  wq->return_waiting_threads();
381  	  shardedpool_lock.unlock();
382  	  ldout(cct,10) << "paused_new" << dendl;
383  	}
384  	
385  	void ShardedThreadPool::unpause()
386  	{
387  	  ldout(cct,10) << "unpause" << dendl;
388  	  shardedpool_lock.lock();
389  	  pause_threads = false;
390  	  wq->stop_return_waiting_threads();
391  	  shardedpool_cond.notify_all();
392  	  shardedpool_lock.unlock();
393  	  ldout(cct,10) << "unpaused" << dendl;
394  	}
395  	
396  	void ShardedThreadPool::drain()
397  	{
398  	  std::unique_lock ul(shardedpool_lock);
399  	  ldout(cct,10) << "drain" << dendl;
400  	  drain_threads = true;
401  	  ceph_assert(wq != NULL);
402  	  wq->return_waiting_threads();
403  	  while (num_threads != num_drained) {
404  	    wait_cond.wait(ul);
405  	  }
406  	  drain_threads = false;
407  	  wq->stop_return_waiting_threads();
408  	  shardedpool_cond.notify_all();
409  	  ldout(cct,10) << "drained" << dendl;
410  	}
411  	
412