1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_DISPATCHQUEUE_H
16 #define CEPH_DISPATCHQUEUE_H
17
18 #include <atomic>
19 #include <map>
20 #include <queue>
21 #include <boost/intrusive_ptr.hpp>
22 #include "include/ceph_assert.h"
23 #include "common/Throttle.h"
24 #include "common/ceph_mutex.h"
25 #include "common/Thread.h"
26 #include "common/PrioritizedQueue.h"
27
28 #include "Message.h"
29
30 class CephContext;
31 class Messenger;
32 struct Connection;
33
34 /**
35 * The DispatchQueue contains all the connections which have Messages
36 * they want to be dispatched, carefully organized by Message priority
37 * and permitted to deliver in a round-robin fashion.
38 * See Messenger::dispatch_entry for details.
39 */
40 class DispatchQueue {
41 class QueueItem {
42 int type;
43 ConnectionRef con;
44 ref_t<Message> m;
45 public:
46 explicit QueueItem(const ref_t<Message>& m) : type(-1), con(0), m(m) {}
47 QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
48 bool is_code() const {
49 return type != -1;
50 }
51 int get_code () const {
52 ceph_assert(is_code());
53 return type;
54 }
55 const ref_t<Message>& get_message() {
56 ceph_assert(!is_code());
57 return m;
58 }
59 Connection *get_connection() {
60 ceph_assert(is_code());
61 return con.get();
62 }
63 };
64
65 CephContext *cct;
66 Messenger *msgr;
67 mutable ceph::mutex lock;
68 ceph::condition_variable cond;
69
70 PrioritizedQueue<QueueItem, uint64_t> mqueue;
71
72 std::set<pair<double, ref_t<Message>>> marrival;
73 map<ref_t<Message>, decltype(marrival)::iterator> marrival_map;
74 void add_arrival(const ref_t<Message>& m) {
75 marrival_map.insert(
76 make_pair(
77 m,
78 marrival.insert(make_pair(m->get_recv_stamp(), m)).first
79 )
80 );
81 }
82 void remove_arrival(const ref_t<Message>& m) {
83 auto it = marrival_map.find(m);
84 ceph_assert(it != marrival_map.end());
85 marrival.erase(it->second);
86 marrival_map.erase(it);
87 }
88
89 std::atomic<uint64_t> next_id;
90
91 enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
92
93 /**
94 * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
95 */
96 class DispatchThread : public Thread {
97 DispatchQueue *dq;
98 public:
99 explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
100 void *entry() override {
101 dq->entry();
102 return 0;
103 }
104 } dispatch_thread;
105
106 ceph::mutex local_delivery_lock;
107 ceph::condition_variable local_delivery_cond;
108 bool stop_local_delivery;
109 std::queue<pair<ref_t<Message>, int>> local_messages;
110 class LocalDeliveryThread : public Thread {
111 DispatchQueue *dq;
112 public:
113 explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
114 void *entry() override {
115 dq->run_local_delivery();
116 return 0;
117 }
118 } local_delivery_thread;
119
120 uint64_t pre_dispatch(const ref_t<Message>& m);
121 void post_dispatch(const ref_t<Message>& m, uint64_t msize);
122
123 public:
124
125 /// Throttle preventing us from building up a big backlog waiting for dispatch
126 Throttle dispatch_throttler;
127
128 bool stop;
129 void local_delivery(const ref_t<Message>& m, int priority);
130 void local_delivery(Message* m, int priority) {
131 return local_delivery(ref_t<Message>(m, false), priority); /* consume ref */
132 }
133 void run_local_delivery();
134
135 double get_max_age(utime_t now) const;
136
137 int get_queue_len() const {
138 std::lock_guard l{lock};
139 return mqueue.length();
140 }
141
142 /**
143 * Release memory accounting back to the dispatch throttler.
144 *
145 * @param msize The amount of memory to release.
146 */
147 void dispatch_throttle_release(uint64_t msize);
148
149 void queue_connect(Connection *con) {
150 std::lock_guard l{lock};
151 if (stop)
152 return;
153 mqueue.enqueue_strict(
154 0,
155 CEPH_MSG_PRIO_HIGHEST,
156 QueueItem(D_CONNECT, con));
157 cond.notify_all();
158 }
159 void queue_accept(Connection *con) {
160 std::lock_guard l{lock};
161 if (stop)
162 return;
163 mqueue.enqueue_strict(
164 0,
165 CEPH_MSG_PRIO_HIGHEST,
166 QueueItem(D_ACCEPT, con));
167 cond.notify_all();
168 }
169 void queue_remote_reset(Connection *con) {
170 std::lock_guard l{lock};
171 if (stop)
172 return;
173 mqueue.enqueue_strict(
174 0,
175 CEPH_MSG_PRIO_HIGHEST,
176 QueueItem(D_BAD_REMOTE_RESET, con));
177 cond.notify_all();
178 }
179 void queue_reset(Connection *con) {
180 std::lock_guard l{lock};
181 if (stop)
182 return;
183 mqueue.enqueue_strict(
184 0,
185 CEPH_MSG_PRIO_HIGHEST,
186 QueueItem(D_BAD_RESET, con));
187 cond.notify_all();
188 }
189 void queue_refused(Connection *con) {
190 std::lock_guard l{lock};
191 if (stop)
192 return;
193 mqueue.enqueue_strict(
194 0,
195 CEPH_MSG_PRIO_HIGHEST,
196 QueueItem(D_CONN_REFUSED, con));
197 cond.notify_all();
198 }
199
200 bool can_fast_dispatch(const cref_t<Message> &m) const;
201 void fast_dispatch(const ref_t<Message>& m);
202 void fast_dispatch(Message* m) {
203 return fast_dispatch(ref_t<Message>(m, false)); /* consume ref */
204 }
205 void fast_preprocess(const ref_t<Message>& m);
206 void enqueue(const ref_t<Message>& m, int priority, uint64_t id);
207 void enqueue(Message* m, int priority, uint64_t id) {
208 return enqueue(ref_t<Message>(m, false), priority, id); /* consume ref */
209 }
210 void discard_queue(uint64_t id);
211 void discard_local();
212 uint64_t get_id() {
213 return next_id++;
214 }
215 void start();
216 void entry();
217 void wait();
218 void shutdown();
219 bool is_started() const {return dispatch_thread.is_started();}
220
221 DispatchQueue(CephContext *cct, Messenger *msgr, string &name)
222 : cct(cct), msgr(msgr),
223 lock(ceph::make_mutex("Messenger::DispatchQueue::lock" + name)),
224 mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
225 cct->_conf->ms_pq_min_cost),
226 next_id(1),
227 dispatch_thread(this),
228 local_delivery_lock(ceph::make_mutex("Messenger::DispatchQueue::local_delivery_lock" + name)),
229 stop_local_delivery(false),
230 local_delivery_thread(this),
231 dispatch_throttler(cct, string("msgr_dispatch_throttler-") + name,
232 cct->_conf->ms_dispatch_throttle_bytes),
233 stop(false)
234 {}
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
235 ~DispatchQueue() {
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
236 ceph_assert(mqueue.empty());
237 ceph_assert(marrival.empty());
238 ceph_assert(local_messages.empty());
239 }
240 };
241
242 #endif
243