1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#ifndef COMMON_CEPH_TIMER_H
16   	#define COMMON_CEPH_TIMER_H
17   	
18   	#include <condition_variable>
19   	#include <thread>
20   	#include <boost/intrusive/set.hpp>
21   	
22   	namespace ceph {
23   	
24   	  /// Newly constructed timer should be suspended at point of
25   	  /// construction.
26   	
27   	  struct construct_suspended_t { };
28   	  constexpr construct_suspended_t construct_suspended { };
29   	
30   	  namespace timer_detail {
31   	    using boost::intrusive::member_hook;
32   	    using boost::intrusive::set_member_hook;
33   	    using boost::intrusive::link_mode;
34   	    using boost::intrusive::normal_link;
35   	    using boost::intrusive::set;
36   	    using boost::intrusive::constant_time_size;
37   	    using boost::intrusive::compare;
38   	
39   	    // Compared to the SafeTimer this does fewer allocations (you
40   	    // don't have to allocate a new Context every time you
41   	    // want to cue the next tick.)
42   	    //
43   	    // It also does not share a lock with the caller. If you call
44   	    // cancel event, it either cancels the event (and returns true) or
45   	    // you missed it. If this does not work for you, you can set up a
46   	    // flag and mutex of your own.
47   	    //
48   	    // You get to pick your clock. I like mono_clock, since I usually
49   	    // want to wait FOR a given duration. real_clock is worthwhile if
50   	    // you want to wait UNTIL a specific moment of wallclock time.  If
51   	    // you want you can set up a timer that executes a function after
52   	    // you use up ten seconds of CPU time.
53   	
54   	    template <class TC>
55   	    class timer {
56   	      using sh = set_member_hook<link_mode<normal_link> >;
57   	
58   	      struct event {
59   		typename TC::time_point t;
60   		uint64_t id;
61   		std::function<void()> f;
62   	
63   		sh schedule_link;
64   		sh event_link;
65   	
66   		event() : t(TC::time_point::min()), id(0) {}
67   		event(uint64_t _id) : t(TC::time_point::min()), id(_id) {}
68   		event(typename TC::time_point _t, uint64_t _id,
69   		      std::function<void()>&& _f) : t(_t), id(_id), f(_f) {}
70   		event(typename TC::time_point _t, uint64_t _id,
71   		      const std::function<void()>& _f) : t(_t), id(_id), f(_f) {}
72   		bool operator <(const event& e) {
73   		  return t == e.t ? id < e.id : t < e.t;
74   		}
75   	      };
76   	      struct SchedCompare {
77   		bool operator()(const event& e1, const event& e2) const {
78   		  return e1.t == e2.t ? e1.id < e2.id : e1.t < e2.t;
79   		}
80   	      };
81   	      struct EventCompare {
82   		bool operator()(const event& e1, const event& e2) const {
83   		  return e1.id < e2.id;
84   		}
85   	      };
86   	
87   	      using schedule_type = set<event,
88   					member_hook<event, sh, &event::schedule_link>,
89   					constant_time_size<false>,
90   					compare<SchedCompare> >;
91   	
92   	      schedule_type schedule;
93   	
94   	      using event_set_type = set<event,
95   					 member_hook<event, sh, &event::event_link>,
96   					 constant_time_size<false>,
97   					 compare<EventCompare> >;
98   	
99   	      event_set_type events;
100  	
101  	      std::mutex lock;
102  	      using lock_guard = std::lock_guard<std::mutex>;
103  	      using unique_lock = std::unique_lock<std::mutex>;
104  	      std::condition_variable cond;
105  	
106  	      event* running{ nullptr };
107  	      uint64_t next_id{ 0 };
108  	
109  	      bool suspended;
110  	      std::thread thread;
111  	
112  	      void timer_thread() {
113  		unique_lock l(lock);
114  		while (!suspended) {
115  		  typename TC::time_point now = TC::now();
116  	
117  		  while (!schedule.empty()) {
118  		    auto p = schedule.begin();
119  		    // Should we wait for the future?
120  		    if (p->t > now)
121  		      break;
122  	
123  		    event& e = *p;
124  		    schedule.erase(e);
125  		    events.erase(e);
126  	
127  		    // Since we have only one thread it is impossible to have more
128  		    // than one running event
129  		    running = &e;
130  	
131  		    l.unlock();
132  		    e.f();
133  		    l.lock();
134  	
135  		    if (running) {
136  		      running = nullptr;
137  		      delete &e;
138  		    } // Otherwise the event requeued itself
139  		  }
140  	
141  	          if (suspended)
142  	            break;
143  		  if (schedule.empty())
144  		    cond.wait(l);
145  		  else
146  		    cond.wait_until(l, schedule.begin()->t);
147  		}
148  	      }
149  	
150  	  public:
151  	      timer() {
152  		lock_guard l(lock);
153  		suspended = false;
154  		thread = std::thread(&timer::timer_thread, this);
155  	      }
156  	
157  	      // Create a suspended timer, jobs will be executed in order when
158  	      // it is resumed.
159  	      timer(construct_suspended_t) {
160  		lock_guard l(lock);
161  		suspended = true;
162  	      }
163  	
164  	      timer(const timer &) = delete;
165  	      timer& operator=(const timer &) = delete;
166  	
167  	      ~timer() {
168  		suspend();
169  		cancel_all_events();
170  	      }
171  	
172  	      // Suspend operation of the timer (and let its thread die).
173  	      void suspend() {
174  		unique_lock l(lock);
175  		if (suspended)
176  		  return;
177  	
178  		suspended = true;
179  		cond.notify_one();
180  		l.unlock();
181  		thread.join();
182  	      }
183  	
184  	
185  	      // Resume operation of the timer. (Must have been previously
186  	      // suspended.)
187  	      void resume() {
188  		unique_lock l(lock);
189  		  if (!suspended)
190  		  return;
191  	
192  		suspended = false;
193  		ceph_assert(!thread.joinable());
194  		thread = std::thread(&timer::timer_thread, this);
195  	      }
196  	
197  	      // Schedule an event in the relative future
198  	      template<typename Callable, typename... Args>
199  	      uint64_t add_event(typename TC::duration duration,
200  				 Callable&& f, Args&&... args) {
201  		typename TC::time_point when = TC::now();
202  		when += duration;
203  		return add_event(when,
204  				 std::forward<Callable>(f),
205  				 std::forward<Args>(args)...);
206  	      }
207  	
208  	      // Schedule an event in the absolute future
209  	      template<typename Callable, typename... Args>
210  	      uint64_t add_event(typename TC::time_point when,
211  				 Callable&& f, Args&&... args) {
212  		std::lock_guard l(lock);
(1) Event alloc_fn: Storage is returned from allocation function "operator new".
(2) Event var_assign: Assigning: "e" = storage returned from "new ceph::timer_detail::timer<ceph::time_detail::coarse_mono_clock>::event(when, ++this->next_id, std::forward(std::remove_reference<std::function<void ()> >::type(std::_Bind_helper<false, Objecter::get_fs_stats(ceph_statfs &, boost::optional<long>, Context *)::[lambda() (instance 1)]>::type(std::bind(std::forward(f))))))".
Also see events: [noescape][noescape][leaked_storage]
213  		event& e = *(new event(
214  			       when, ++next_id,
215  			       std::forward<std::function<void()> >(
216  				 std::bind(std::forward<Callable>(f),
217  					   std::forward<Args>(args)...))));
(3) Event noescape: Resource "e" is not freed or pointed-to in "insert". [details]
Also see events: [alloc_fn][var_assign][noescape][leaked_storage]
218  		auto i = schedule.insert(e);
(4) Event noescape: Resource "e" is not freed or pointed-to in "insert". [details]
Also see events: [alloc_fn][var_assign][noescape][leaked_storage]
219  		events.insert(e);
220  	
221  		/* If the event we have just inserted comes before everything
222  		 * else, we need to adjust our timeout. */
(5) Event cond_true: Condition "i.first == boost::intrusive::bstbase3<boost::intrusive::mhtraits<ceph::timer_detail::timer<ceph::time_detail::coarse_mono_clock>::event, boost::intrusive::set_member_hook<boost::intrusive::link_mode<(boost::intrusive::link_mode_type)0>, void, void, void>, &ceph::timer_detail::timer<ceph::time_detail::coarse_mono_clock>::event::schedule_link>, (boost::intrusive::algo_types)5, void>::iterator(this->schedule.begin())", taking true branch.
223  		if (i.first == schedule.begin())
224  		  cond.notify_one();
225  	
226  		// Previously each event was a context, identified by a
227  		// pointer, and each context to be called only once. Since you
228  		// can queue the same function pointer, member function,
229  		// lambda, or functor up multiple times, identifying things by
230  		// function for the purposes of cancellation is no longer
231  		// suitable. Thus:
(6) Event leaked_storage: Variable "e" going out of scope leaks the storage it points to.
Also see events: [alloc_fn][var_assign][noescape][noescape]
232  		return e.id;
233  	      }
234  	
235  	      // Adjust the timeout of a currently-scheduled event (relative)
236  	      bool adjust_event(uint64_t id, typename TC::duration duration) {
237  		return adjust_event(id, TC::now() + duration);
238  	      }
239  	
240  	      // Adjust the timeout of a currently-scheduled event (absolute)
241  	      bool adjust_event(uint64_t id, typename TC::time_point when) {
242  		std::lock_guard l(lock);
243  	
244  		event key(id);
245  		typename event_set_type::iterator it = events.find(key);
246  	
247  		if (it == events.end())
248  		  return false;
249  	
250  		event& e = *it;
251  	
252  		schedule.erase(e);
253  		e.t = when;
254  		schedule.insert(e);
255  	
256  		return true;
257  	      }
258  	
259  	      // Cancel an event. If the event has already come and gone (or you
260  	      // never submitted it) you will receive false. Otherwise you will
261  	      // receive true and it is guaranteed the event will not execute.
262  	      bool cancel_event(const uint64_t id) {
263  		std::lock_guard l(lock);
264  		event dummy(id);
265  		auto p = events.find(dummy);
266  		if (p == events.end()) {
267  		  return false;
268  		}
269  	
270  		event& e = *p;
271  		events.erase(e);
272  		schedule.erase(e);
273  		delete &e;
274  	
275  		return true;
276  	      }
277  	
278  	      // Reschedules a currently running event in the relative
279  	      // future. Must be called only from an event executed by this
280  	      // timer. If you have a function that can be called either from
281  	      // this timer or some other way, it is your responsibility to make
282  	      // sure it can tell the difference only does not call
283  	      // reschedule_me in the non-timer case.
284  	      //
285  	      // Returns an event id. If you had an event_id from the first
286  	      // scheduling, replace it with this return value.
287  	      uint64_t reschedule_me(typename TC::duration duration) {
288  		return reschedule_me(TC::now() + duration);
289  	      }
290  	
291  	      // Reschedules a currently running event in the absolute
292  	      // future. Must be called only from an event executed by this
293  	      // timer. if you have a function that can be called either from
294  	      // this timer or some other way, it is your responsibility to make
295  	      // sure it can tell the difference only does not call
296  	      // reschedule_me in the non-timer case.
297  	      //
298  	      // Returns an event id. If you had an event_id from the first
299  	      // scheduling, replace it with this return value.
300  	      uint64_t reschedule_me(typename TC::time_point when) {
301  		if (std::this_thread::get_id() != thread.get_id())
302  		  throw std::make_error_condition(std::errc::operation_not_permitted);
303  		std::lock_guard l(lock);
304  		running->t = when;
305  		uint64_t id = ++next_id;
306  		running->id = id;
307  		schedule.insert(*running);
308  		events.insert(*running);
309  	
310  		// Hacky, but keeps us from being deleted
311  		running = nullptr;
312  	
313  		// Same function, but you get a new ID.
314  		return id;
315  	      }
316  	
317  	      // Remove all events from the queue.
318  	      void cancel_all_events() {
319  		std::lock_guard l(lock);
320  		while (!events.empty()) {
321  		  auto p = events.begin();
322  		  event& e = *p;
323  		  schedule.erase(e);
324  		  events.erase(e);
325  		  delete &e;
326  		}
327  	      }
328  	    }; // timer
329  	  }; // timer_detail
330  	
331  	  using timer_detail::timer;
332  	}; // ceph
333  	
334  	#endif
335