1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Finisher.h"
5
6 #define dout_subsys ceph_subsys_finisher
7 #undef dout_prefix
8 #define dout_prefix *_dout << "finisher(" << this << ") "
9
10 void Finisher::start()
11 {
12 ldout(cct, 10) << __func__ << dendl;
13 finisher_thread.create(thread_name.c_str());
14 }
15
16 void Finisher::stop()
17 {
18 ldout(cct, 10) << __func__ << dendl;
(6) Event example_lock: |
Example 1: Locking "ceph::mutex_debug_detail::mutex_debug_impl<false>.m". |
Also see events: |
[missing_lock][example_access] |
19 finisher_lock.lock();
(7) Event example_access: |
Example 1 (cont.): "Finisher.finisher_stop" is accessed with lock "ceph::mutex_debug_detail::mutex_debug_impl<false>.m" held. |
Also see events: |
[missing_lock][example_lock] |
20 finisher_stop = true;
21 // we don't have any new work to do, but we want the worker to wake up anyway
22 // to process the stop condition.
23 finisher_cond.notify_all();
24 finisher_lock.unlock();
25 finisher_thread.join(); // wait until the worker exits completely
26 ldout(cct, 10) << __func__ << " finish" << dendl;
27 }
28
29 void Finisher::wait_for_empty()
30 {
31 std::unique_lock ul(finisher_lock);
32 while (!finisher_queue.empty() || finisher_running) {
33 ldout(cct, 10) << "wait_for_empty waiting" << dendl;
34 finisher_empty_wait = true;
35 finisher_empty_cond.wait(ul);
36 }
37 ldout(cct, 10) << "wait_for_empty empty" << dendl;
38 finisher_empty_wait = false;
39 }
40
41 void *Finisher::finisher_thread_entry()
42 {
43 std::unique_lock ul(finisher_lock);
(1) Event cond_true: |
Condition "should_gather", taking true branch. |
44 ldout(cct, 10) << "finisher_thread start" << dendl;
45
46 utime_t start;
47 uint64_t count = 0;
(2) Event cond_false: |
Condition "!this->finisher_stop", taking false branch. |
48 while (!finisher_stop) {
49 /// Every time we are woken up, we process the queue until it is empty.
50 while (!finisher_queue.empty()) {
51 // To reduce lock contention, we swap out the queue to process.
52 // This way other threads can submit new contexts to complete
53 // while we are working.
54 in_progress_queue.swap(finisher_queue);
55 finisher_running = true;
56 ul.unlock();
57 ldout(cct, 10) << "finisher_thread doing " << in_progress_queue << dendl;
58
59 if (logger) {
60 start = ceph_clock_now();
61 count = in_progress_queue.size();
62 }
63
64 // Now actually process the contexts.
65 for (auto p : in_progress_queue) {
66 p.first->complete(p.second);
67 }
68 ldout(cct, 10) << "finisher_thread done with " << in_progress_queue
69 << dendl;
70 in_progress_queue.clear();
71 if (logger) {
72 logger->dec(l_finisher_queue_len, count);
73 logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
74 }
75
76 ul.lock();
77 finisher_running = false;
78 }
79 ldout(cct, 10) << "finisher_thread empty" << dendl;
80 if (unlikely(finisher_empty_wait))
81 finisher_empty_cond.notify_all();
82 if (finisher_stop)
83 break;
84
85 ldout(cct, 10) << "finisher_thread sleeping" << dendl;
86 finisher_cond.wait(ul);
(3) Event loop_end: |
Reached end of loop. |
87 }
88 // If we are exiting, we signal the thread waiting in stop(),
89 // otherwise it would never unblock
90 finisher_empty_cond.notify_all();
91
(4) Event cond_true: |
Condition "should_gather", taking true branch. |
92 ldout(cct, 10) << "finisher_thread stop" << dendl;
(5) Event missing_lock: |
Accessing "this->finisher_stop" without holding lock "ceph::mutex_debug_detail::mutex_debug_impl<false>.m". Elsewhere, "Finisher.finisher_stop" is accessed with "ceph::mutex_debug_detail::mutex_debug_impl<false>.m" held 1 out of 2 times (1 of these accesses strongly imply that it is necessary). |
Also see events: |
[example_lock][example_access] |
93 finisher_stop = false;
94 return 0;
95 }
96
97