1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "Cond.h"
16 #include "Timer.h"
17
18
19 #define dout_subsys ceph_subsys_timer
20 #undef dout_prefix
21 #define dout_prefix *_dout << "timer(" << this << ")."
22
23
24
25 class SafeTimerThread : public Thread {
26 SafeTimer *parent;
27 public:
28 explicit SafeTimerThread(SafeTimer *s) : parent(s) {}
29 void *entry() override {
30 parent->timer_thread();
31 return NULL;
32 }
33 };
34
35
36 SafeTimer::SafeTimer(CephContext *cct_, ceph::mutex &l, bool safe_callbacks)
37 : cct(cct_), lock(l),
38 safe_callbacks(safe_callbacks),
39 thread(NULL),
40 stopping(false)
41 {
42 }
43
44 SafeTimer::~SafeTimer()
45 {
46 ceph_assert(thread == NULL);
47 }
48
49 void SafeTimer::init()
50 {
51 ldout(cct,10) << "init" << dendl;
52 thread = new SafeTimerThread(this);
53 thread->create("safe_timer");
54 }
55
56 void SafeTimer::shutdown()
57 {
58 ldout(cct,10) << "shutdown" << dendl;
59 if (thread) {
60 ceph_assert(ceph_mutex_is_locked(lock));
61 cancel_all_events();
62 stopping = true;
63 cond.notify_all();
64 lock.unlock();
65 thread->join();
66 lock.lock();
67 delete thread;
68 thread = NULL;
69 }
70 }
71
72 void SafeTimer::timer_thread()
73 {
74 std::unique_lock l{lock};
75 ldout(cct,10) << "timer_thread starting" << dendl;
76 while (!stopping) {
77 auto now = clock_t::now();
78
79 while (!schedule.empty()) {
80 auto p = schedule.begin();
81
82 // is the future now?
83 if (p->first > now)
84 break;
85
86 Context *callback = p->second;
87 events.erase(callback);
88 schedule.erase(p);
89 ldout(cct,10) << "timer_thread executing " << callback << dendl;
90
91 if (!safe_callbacks) {
92 l.unlock();
93 callback->complete(0);
94 l.lock();
95 } else {
96 callback->complete(0);
97 }
98 }
99
100 // recheck stopping if we dropped the lock
101 if (!safe_callbacks && stopping)
102 break;
103
104 ldout(cct,20) << "timer_thread going to sleep" << dendl;
105 if (schedule.empty()) {
106 cond.wait(l);
107 } else {
108 cond.wait_until(l, schedule.begin()->first);
109 }
110 ldout(cct,20) << "timer_thread awake" << dendl;
111 }
112 ldout(cct,10) << "timer_thread exiting" << dendl;
113 }
114
115 Context* SafeTimer::add_event_after(double seconds, Context *callback)
116 {
117 ceph_assert(ceph_mutex_is_locked(lock));
118
119 auto when = clock_t::now() + ceph::make_timespan(seconds);
120 return add_event_at(when, callback);
121 }
122
123 Context* SafeTimer::add_event_at(SafeTimer::clock_t::time_point when, Context *callback)
124 {
125 ceph_assert(ceph_mutex_is_locked(lock));
126 ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
127 if (stopping) {
128 ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
129 delete callback;
130 return nullptr;
131 }
132 scheduled_map_t::value_type s_val(when, callback);
133 scheduled_map_t::iterator i = schedule.insert(s_val);
134
135 event_lookup_map_t::value_type e_val(callback, i);
136 pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val));
137
138 /* If you hit this, you tried to insert the same Context* twice. */
139 ceph_assert(rval.second);
140
141 /* If the event we have just inserted comes before everything else, we need to
142 * adjust our timeout. */
143 if (i == schedule.begin())
144 cond.notify_all();
145 return callback;
146 }
147
148 bool SafeTimer::cancel_event(Context *callback)
149 {
(1) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
150 ceph_assert(ceph_mutex_is_locked(lock));
151
152 auto p = events.find(callback);
153 if (p == events.end()) {
154 ldout(cct,10) << "cancel_event " << callback << " not found" << dendl;
155 return false;
156 }
157
158 ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl;
159 delete p->first;
160
161 schedule.erase(p->second);
162 events.erase(p);
163 return true;
164 }
165
166 void SafeTimer::cancel_all_events()
167 {
168 ldout(cct,10) << "cancel_all_events" << dendl;
169 ceph_assert(ceph_mutex_is_locked(lock));
170
171 while (!events.empty()) {
172 auto p = events.begin();
173 ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl;
174 delete p->first;
175 schedule.erase(p->second);
176 events.erase(p);
177 }
178 }
179
180 void SafeTimer::dump(const char *caller) const
181 {
182 if (!caller)
183 caller = "";
184 ldout(cct,10) << "dump " << caller << dendl;
185
186 for (scheduled_map_t::const_iterator s = schedule.begin();
187 s != schedule.end();
188 ++s)
189 ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
190 }
191