1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "Finisher.h"
5    	
6    	#define dout_subsys ceph_subsys_finisher
7    	#undef dout_prefix
8    	#define dout_prefix *_dout << "finisher(" << this << ") "
9    	
10   	void Finisher::start()
11   	{
12   	  ldout(cct, 10) << __func__ << dendl;
13   	  finisher_thread.create(thread_name.c_str());
14   	}
15   	
16   	void Finisher::stop()
17   	{
18   	  ldout(cct, 10) << __func__ << dendl;
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
19   	  finisher_lock.lock();
20   	  finisher_stop = true;
21   	  // we don't have any new work to do, but we want the worker to wake up anyway
22   	  // to process the stop condition.
23   	  finisher_cond.notify_all();
24   	  finisher_lock.unlock();
25   	  finisher_thread.join(); // wait until the worker exits completely
26   	  ldout(cct, 10) << __func__ << " finish" << dendl;
27   	}
28   	
29   	void Finisher::wait_for_empty()
30   	{
31   	  std::unique_lock ul(finisher_lock);
32   	  while (!finisher_queue.empty() || finisher_running) {
33   	    ldout(cct, 10) << "wait_for_empty waiting" << dendl;
34   	    finisher_empty_wait = true;
35   	    finisher_empty_cond.wait(ul);
36   	  }
37   	  ldout(cct, 10) << "wait_for_empty empty" << dendl;
38   	  finisher_empty_wait = false;
39   	}
40   	
41   	void *Finisher::finisher_thread_entry()
42   	{
43   	  std::unique_lock ul(finisher_lock);
44   	  ldout(cct, 10) << "finisher_thread start" << dendl;
45   	
46   	  utime_t start;
47   	  uint64_t count = 0;
48   	  while (!finisher_stop) {
49   	    /// Every time we are woken up, we process the queue until it is empty.
50   	    while (!finisher_queue.empty()) {
51   	      // To reduce lock contention, we swap out the queue to process.
52   	      // This way other threads can submit new contexts to complete
53   	      // while we are working.
54   	      in_progress_queue.swap(finisher_queue);
55   	      finisher_running = true;
56   	      ul.unlock();
57   	      ldout(cct, 10) << "finisher_thread doing " << in_progress_queue << dendl;
58   	
59   	      if (logger) {
60   		start = ceph_clock_now();
61   		count = in_progress_queue.size();
62   	      }
63   	
64   	      // Now actually process the contexts.
65   	      for (auto p : in_progress_queue) {
66   		p.first->complete(p.second);
67   	      }
68   	      ldout(cct, 10) << "finisher_thread done with " << in_progress_queue
69   	                     << dendl;
70   	      in_progress_queue.clear();
71   	      if (logger) {
72   		logger->dec(l_finisher_queue_len, count);
73   		logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
74   	      }
75   	
76   	      ul.lock();
77   	      finisher_running = false;
78   	    }
79   	    ldout(cct, 10) << "finisher_thread empty" << dendl;
80   	    if (unlikely(finisher_empty_wait))
81   	      finisher_empty_cond.notify_all();
82   	    if (finisher_stop)
83   	      break;
84   	    
85   	    ldout(cct, 10) << "finisher_thread sleeping" << dendl;
86   	    finisher_cond.wait(ul);
87   	  }
88   	  // If we are exiting, we signal the thread waiting in stop(),
89   	  // otherwise it would never unblock
90   	  finisher_empty_cond.notify_all();
91   	
92   	  ldout(cct, 10) << "finisher_thread stop" << dendl;
93   	  finisher_stop = false;
94   	  return 0;
95   	}
96   	
97