1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7    	 *
8    	 * Author: Haomai Wang <haomaiwang@gmail.com>
9    	 *
10   	 * This is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License version 2.1, as published by the Free Software
13   	 * Foundation.  See file COPYING.
14   	 *
15   	 */
16   	
17   	#ifndef CEPH_MSG_EVENT_H
18   	#define CEPH_MSG_EVENT_H
19   	
20   	#ifdef __APPLE__
21   	#include <AvailabilityMacros.h>
22   	#endif
23   	
24   	// We use epoll, kqueue, evport, select in descending order by performance.
25   	#if defined(__linux__)
26   	#define HAVE_EPOLL 1
27   	#endif
28   	
29   	#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
30   	#define HAVE_KQUEUE 1
31   	#endif
32   	
33   	#ifdef __sun
34   	#include <sys/feature_tests.h>
35   	#ifdef _DTRACE_VERSION
36   	#define HAVE_EVPORT 1
37   	#endif
38   	#endif
39   	
40   	#include <atomic>
41   	#include <mutex>
42   	#include <condition_variable>
43   	
44   	#include "common/ceph_time.h"
45   	#include "common/dout.h"
46   	#include "net_handler.h"
47   	
48   	#define EVENT_NONE 0
49   	#define EVENT_READABLE 1
50   	#define EVENT_WRITABLE 2
51   	
52   	class EventCenter;
53   	
54   	class EventCallback {
55   	
56   	 public:
57   	  virtual void do_request(uint64_t fd_or_id) = 0;
58   	  virtual ~EventCallback() {}       // we want a virtual destructor!!!
59   	};
60   	
61   	typedef EventCallback* EventCallbackRef;
62   	
63   	struct FiredFileEvent {
64   	  int fd;
65   	  int mask;
66   	};
67   	
68   	/*
69   	 * EventDriver is a wrap of event mechanisms depends on different OS.
70   	 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71   	 * be used for worst condition.
72   	 */
73   	class EventDriver {
74   	 public:
75   	  virtual ~EventDriver() {}       // we want a virtual destructor!!!
76   	  virtual int init(EventCenter *center, int nevent) = 0;
77   	  virtual int add_event(int fd, int cur_mask, int mask) = 0;
78   	  virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
79   	  virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
80   	  virtual int resize_events(int newsize) = 0;
81   	  virtual bool need_wakeup() { return true; }
82   	};
83   	
84   	/*
85   	 * EventCenter maintain a set of file descriptor and handle registered events.
86   	 */
87   	class EventCenter {
88   	 public:
89   	  // should be enough;
90   	  static const int MAX_EVENTCENTER = 24;
91   	
92   	 private:
93   	  using clock_type = ceph::coarse_mono_clock;
94   	
95   	  struct AssociatedCenters {
96   	    EventCenter *centers[MAX_EVENTCENTER];
97   	    AssociatedCenters() {
98   	      memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
99   	    }
100  	  };
101  	
102  	  struct FileEvent {
103  	    int mask;
104  	    EventCallbackRef read_cb;
105  	    EventCallbackRef write_cb;
106  	    FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
107  	  };
108  	
109  	  struct TimeEvent {
110  	    uint64_t id;
111  	    EventCallbackRef time_cb;
112  	
113  	    TimeEvent(): id(0), time_cb(NULL) {}
114  	  };
115  	
116  	 public:
117  	  /**
118  	     * A Poller object is invoked once each time through the dispatcher's
119  	     * inner polling loop.
120  	     */
121  	  class Poller {
122  	   public:
123  	    explicit Poller(EventCenter* center, const string& pollerName);
124  	    virtual ~Poller();
125  	
126  	    /**
127  	     * This method is defined by a subclass and invoked once by the
128  	     * center during each pass through its inner polling loop.
129  	     *
130  	     * \return
131  	     *      1 means that this poller did useful work during this call.
132  	     *      0 means that the poller found no work to do.
133  	     */
134  	    virtual int poll() = 0;
135  	
136  	   private:
137  	    /// The EventCenter object that owns this Poller.  NULL means the
138  	    /// EventCenter has been deleted.
139  	    EventCenter* owner;
140  	
141  	    /// Human-readable string name given to the poller to make it
142  	    /// easy to identify for debugging. For most pollers just passing
143  	    /// in the subclass name probably makes sense.
144  	    string poller_name;
145  	
146  	    /// Index of this Poller in EventCenter::pollers.  Allows deletion
147  	    /// without having to scan all the entries in pollers. -1 means
148  	    /// this poller isn't currently in EventCenter::pollers (happens
149  	    /// after EventCenter::reset).
150  	    int slot;
151  	  };
152  	
153  	 private:
154  	  CephContext *cct;
155  	  std::string type;
156  	  int nevent;
157  	  // Used only to external event
158  	  pthread_t owner = 0;
159  	  std::mutex external_lock;
160  	  std::atomic_ulong external_num_events;
161  	  deque<EventCallbackRef> external_events;
162  	  vector<FileEvent> file_events;
163  	  EventDriver *driver;
164  	  std::multimap<clock_type::time_point, TimeEvent> time_events;
165  	  // Keeps track of all of the pollers currently defined.  We don't
166  	  // use an intrusive list here because it isn't reentrant: we need
167  	  // to add/remove elements while the center is traversing the list.
168  	  std::vector<Poller*> pollers;
169  	  std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
170  	  uint64_t time_event_next_id;
171  	  int notify_receive_fd;
172  	  int notify_send_fd;
173  	  NetHandler net;
174  	  EventCallbackRef notify_handler;
175  	  unsigned center_id;
176  	  AssociatedCenters *global_centers = nullptr;
177  	
178  	  int process_time_events();
179  	  FileEvent *_get_file_event(int fd) {
180  	    ceph_assert(fd < nevent);
181  	    return &file_events[fd];
182  	  }
183  	
184  	 public:
185  	  explicit EventCenter(CephContext *c):
186  	    cct(c), nevent(0),
187  	    external_num_events(0),
188  	    driver(NULL), time_event_next_id(1),
189  	    notify_receive_fd(-1), notify_send_fd(-1), net(c),
190  	    notify_handler(NULL), center_id(0) { }
191  	  ~EventCenter();
192  	  ostream& _event_prefix(std::ostream *_dout);
193  	
194  	  int init(int nevent, unsigned center_id, const std::string &type);
195  	  void set_owner();
196  	  pthread_t get_owner() const { return owner; }
197  	  unsigned get_id() const { return center_id; }
198  	
199  	  EventDriver *get_driver() { return driver; }
200  	
201  	  // Used by internal thread
202  	  int create_file_event(int fd, int mask, EventCallbackRef ctxt);
203  	  uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
204  	  void delete_file_event(int fd, int mask);
205  	  void delete_time_event(uint64_t id);
206  	  int process_events(unsigned timeout_microseconds, ceph::timespan *working_dur = nullptr);
207  	  void wakeup();
208  	
209  	  // Used by external thread
210  	  void dispatch_event_external(EventCallbackRef e);
211  	  inline bool in_thread() const {
212  	    return pthread_equal(pthread_self(), owner);
213  	  }
214  	
215  	 private:
216  	  template <typename func>
217  	  class C_submit_event : public EventCallback {
218  	    std::mutex lock;
219  	    std::condition_variable cond;
220  	    bool done = false;
221  	    func f;
222  	    bool nonwait;
223  	   public:
224  	    C_submit_event(func &&_f, bool nowait)
225  	      : f(std::move(_f)), nonwait(nowait) {}
226  	    void do_request(uint64_t id) override {
227  	      f();
228  	      lock.lock();
229  	      cond.notify_all();
230  	      done = true;
231  	      bool del = nonwait;
232  	      lock.unlock();
233  	      if (del)
234  	        delete this;
235  	    }
236  	    void wait() {
237  	      ceph_assert(!nonwait);
238  	      std::unique_lock<std::mutex> l(lock);
239  	      while (!done)
240  	        cond.wait(l);
241  	    }
242  	  };
243  	
244  	 public:
245  	  template <typename func>
246  	  void submit_to(int i, func &&f, bool nowait = false) {
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
247  	    ceph_assert(i < MAX_EVENTCENTER && global_centers);
248  	    EventCenter *c = global_centers->centers[i];
249  	    ceph_assert(c);
250  	    if (nowait) {
251  	      C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
252  	      c->dispatch_event_external(event);
253  	    } else if (c->in_thread()) {
254  	      f();
255  	      return;
256  	    } else {
257  	      C_submit_event<func> event(std::move(f), false);
258  	      c->dispatch_event_external(&event);
259  	      event.wait();
260  	    }
261  	  };
262  	};
263  	
264  	#endif
265