1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#ifndef CEPH_DISPATCHQUEUE_H
16   	#define CEPH_DISPATCHQUEUE_H
17   	
18   	#include <atomic>
19   	#include <map>
20   	#include <queue>
21   	#include <boost/intrusive_ptr.hpp>
22   	#include "include/ceph_assert.h"
23   	#include "common/Throttle.h"
24   	#include "common/ceph_mutex.h"
25   	#include "common/Thread.h"
26   	#include "common/PrioritizedQueue.h"
27   	
28   	#include "Message.h"
29   	
30   	class CephContext;
31   	class Messenger;
32   	struct Connection;
33   	
34   	/**
35   	 * The DispatchQueue contains all the connections which have Messages
36   	 * they want to be dispatched, carefully organized by Message priority
37   	 * and permitted to deliver in a round-robin fashion.
38   	 * See Messenger::dispatch_entry for details.
39   	 */
40   	class DispatchQueue {
41   	  class QueueItem {
42   	    int type;
43   	    ConnectionRef con;
44   	    ref_t<Message> m;
45   	  public:
46   	    explicit QueueItem(const ref_t<Message>& m) : type(-1), con(0), m(m) {}
47   	    QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
48   	    bool is_code() const {
49   	      return type != -1;
50   	    }
51   	    int get_code () const {
52   	      ceph_assert(is_code());
53   	      return type;
54   	    }
55   	    const ref_t<Message>& get_message() {
56   	      ceph_assert(!is_code());
57   	      return m;
58   	    }
59   	    Connection *get_connection() {
60   	      ceph_assert(is_code());
61   	      return con.get();
62   	    }
63   	  };
64   	    
65   	  CephContext *cct;
66   	  Messenger *msgr;
67   	  mutable ceph::mutex lock;
68   	  ceph::condition_variable cond;
69   	
70   	  PrioritizedQueue<QueueItem, uint64_t> mqueue;
71   	
72   	  std::set<pair<double, ref_t<Message>>> marrival;
73   	  map<ref_t<Message>, decltype(marrival)::iterator> marrival_map;
74   	  void add_arrival(const ref_t<Message>& m) {
75   	    marrival_map.insert(
76   	      make_pair(
77   		m,
78   		marrival.insert(make_pair(m->get_recv_stamp(), m)).first
79   		)
80   	      );
81   	  }
82   	  void remove_arrival(const ref_t<Message>& m) {
83   	    auto it = marrival_map.find(m);
84   	    ceph_assert(it != marrival_map.end());
85   	    marrival.erase(it->second);
86   	    marrival_map.erase(it);
87   	  }
88   	
89   	  std::atomic<uint64_t> next_id;
90   	    
91   	  enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
92   	
93   	  /**
94   	   * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
95   	   */
96   	  class DispatchThread : public Thread {
97   	    DispatchQueue *dq;
98   	  public:
99   	    explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
100  	    void *entry() override {
101  	      dq->entry();
102  	      return 0;
103  	    }
104  	  } dispatch_thread;
105  	
106  	  ceph::mutex local_delivery_lock;
107  	  ceph::condition_variable local_delivery_cond;
108  	  bool stop_local_delivery;
109  	  std::queue<pair<ref_t<Message>, int>> local_messages;
110  	  class LocalDeliveryThread : public Thread {
111  	    DispatchQueue *dq;
112  	  public:
113  	    explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
114  	    void *entry() override {
115  	      dq->run_local_delivery();
116  	      return 0;
117  	    }
118  	  } local_delivery_thread;
119  	
120  	  uint64_t pre_dispatch(const ref_t<Message>& m);
121  	  void post_dispatch(const ref_t<Message>& m, uint64_t msize);
122  	
123  	 public:
124  	
125  	  /// Throttle preventing us from building up a big backlog waiting for dispatch
126  	  Throttle dispatch_throttler;
127  	
128  	  bool stop;
129  	  void local_delivery(const ref_t<Message>& m, int priority);
130  	  void local_delivery(Message* m, int priority) {
131  	    return local_delivery(ref_t<Message>(m, false), priority); /* consume ref */
132  	  }
133  	  void run_local_delivery();
134  	
135  	  double get_max_age(utime_t now) const;
136  	
137  	  int get_queue_len() const {
138  	    std::lock_guard l{lock};
139  	    return mqueue.length();
140  	  }
141  	
142  	  /**
143  	   * Release memory accounting back to the dispatch throttler.
144  	   *
145  	   * @param msize The amount of memory to release.
146  	   */
147  	  void dispatch_throttle_release(uint64_t msize);
148  	
149  	  void queue_connect(Connection *con) {
150  	    std::lock_guard l{lock};
151  	    if (stop)
152  	      return;
153  	    mqueue.enqueue_strict(
154  	      0,
155  	      CEPH_MSG_PRIO_HIGHEST,
156  	      QueueItem(D_CONNECT, con));
157  	    cond.notify_all();
158  	  }
159  	  void queue_accept(Connection *con) {
160  	    std::lock_guard l{lock};
161  	    if (stop)
162  	      return;
163  	    mqueue.enqueue_strict(
164  	      0,
165  	      CEPH_MSG_PRIO_HIGHEST,
166  	      QueueItem(D_ACCEPT, con));
167  	    cond.notify_all();
168  	  }
169  	  void queue_remote_reset(Connection *con) {
170  	    std::lock_guard l{lock};
171  	    if (stop)
172  	      return;
173  	    mqueue.enqueue_strict(
174  	      0,
175  	      CEPH_MSG_PRIO_HIGHEST,
176  	      QueueItem(D_BAD_REMOTE_RESET, con));
177  	    cond.notify_all();
178  	  }
179  	  void queue_reset(Connection *con) {
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
180  	    std::lock_guard l{lock};
181  	    if (stop)
182  	      return;
183  	    mqueue.enqueue_strict(
184  	      0,
185  	      CEPH_MSG_PRIO_HIGHEST,
186  	      QueueItem(D_BAD_RESET, con));
187  	    cond.notify_all();
188  	  }
189  	  void queue_refused(Connection *con) {
190  	    std::lock_guard l{lock};
191  	    if (stop)
192  	      return;
193  	    mqueue.enqueue_strict(
194  	      0,
195  	      CEPH_MSG_PRIO_HIGHEST,
196  	      QueueItem(D_CONN_REFUSED, con));
197  	    cond.notify_all();
198  	  }
199  	
200  	  bool can_fast_dispatch(const cref_t<Message> &m) const;
201  	  void fast_dispatch(const ref_t<Message>& m);
202  	  void fast_dispatch(Message* m) {
203  	    return fast_dispatch(ref_t<Message>(m, false)); /* consume ref */
204  	  }
205  	  void fast_preprocess(const ref_t<Message>& m);
206  	  void enqueue(const ref_t<Message>& m, int priority, uint64_t id);
207  	  void enqueue(Message* m, int priority, uint64_t id) {
208  	    return enqueue(ref_t<Message>(m, false), priority, id); /* consume ref */
209  	  }
210  	  void discard_queue(uint64_t id);
211  	  void discard_local();
212  	  uint64_t get_id() {
213  	    return next_id++;
214  	  }
215  	  void start();
216  	  void entry();
217  	  void wait();
218  	  void shutdown();
219  	  bool is_started() const {return dispatch_thread.is_started();}
220  	
221  	  DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
222  	    : cct(cct), msgr(msgr),
223  	      lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)),
224  	      mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
225  		     cct->_conf->ms_pq_min_cost),
226  	      next_id(1),
227  	      dispatch_thread(this),
228  	      local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)),
229  	      stop_local_delivery(false),
230  	      local_delivery_thread(this),
231  	      dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
232  	                         cct->_conf->ms_dispatch_throttle_bytes),
233  	      stop(false)
234  	    {}
235  	  ~DispatchQueue() {
236  	    ceph_assert(mqueue.empty());
237  	    ceph_assert(marrival.empty());
238  	    ceph_assert(local_messages.empty());
239  	  }
240  	};
241  	
242  	#endif
243