1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "msg/Message.h"
16 #include "DispatchQueue.h"
17 #include "Messenger.h"
18 #include "common/ceph_context.h"
19
20 #define dout_subsys ceph_subsys_ms
21 #include "common/debug.h"
22
23
24 /*******************
25 * DispatchQueue
26 */
27
28 #undef dout_prefix
29 #define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " "
30
31 double DispatchQueue::get_max_age(utime_t now) const {
32 std::lock_guard l{lock};
33 if (marrival.empty())
34 return 0;
35 else
36 return (now - marrival.begin()->first);
37 }
38
39 uint64_t DispatchQueue::pre_dispatch(const ref_t<Message>& m)
40 {
41 ldout(cct,1) << "<== " << m->get_source_inst()
42 << " " << m->get_seq()
43 << " ==== " << *m
44 << " ==== " << m->get_payload().length()
45 << "+" << m->get_middle().length()
46 << "+" << m->get_data().length()
47 << " (" << ceph_con_mode_name(m->get_connection()->get_con_mode())
48 << " " << m->get_footer().front_crc << " "
49 << m->get_footer().middle_crc
50 << " " << m->get_footer().data_crc << ")"
51 << " " << m << " con " << m->get_connection()
52 << dendl;
53 uint64_t msize = m->get_dispatch_throttle_size();
54 m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
55 return msize;
56 }
57
58 void DispatchQueue::post_dispatch(const ref_t<Message>& m, uint64_t msize)
59 {
60 dispatch_throttle_release(msize);
61 ldout(cct,20) << "done calling dispatch on " << m << dendl;
62 }
63
64 bool DispatchQueue::can_fast_dispatch(const cref_t<Message> &m) const
65 {
66 return msgr->ms_can_fast_dispatch(m);
67 }
68
69 void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
70 {
71 uint64_t msize = pre_dispatch(m);
72 msgr->ms_fast_dispatch(m);
73 post_dispatch(m, msize);
74 }
75
76 void DispatchQueue::fast_preprocess(const ref_t<Message>& m)
77 {
78 msgr->ms_fast_preprocess(m);
79 }
80
81 void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
82 {
83 std::lock_guard l{lock};
84 if (stop) {
85 return;
86 }
87 ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
88 add_arrival(m);
89 if (priority >= CEPH_MSG_PRIO_LOW) {
90 mqueue.enqueue_strict(id, priority, QueueItem(m));
91 } else {
92 mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
93 }
94 cond.notify_all();
95 }
96
97 void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
98 {
99 m->set_recv_stamp(ceph_clock_now());
100 std::lock_guard l{local_delivery_lock};
101 if (local_messages.empty())
102 local_delivery_cond.notify_all();
103 local_messages.emplace(m, priority);
104 return;
105 }
106
107 void DispatchQueue::run_local_delivery()
108 {
109 std::unique_lock l{local_delivery_lock};
110 while (true) {
111 if (stop_local_delivery)
112 break;
113 if (local_messages.empty()) {
114 local_delivery_cond.wait(l);
115 continue;
116 }
117 auto p = std::move(local_messages.front());
118 local_messages.pop();
119 l.unlock();
120 const ref_t<Message>& m = p.first;
121 int priority = p.second;
122 fast_preprocess(m);
123 if (can_fast_dispatch(m)) {
124 fast_dispatch(m);
125 } else {
126 enqueue(m, priority, 0);
127 }
128 l.lock();
129 }
130 }
131
132 void DispatchQueue::dispatch_throttle_release(uint64_t msize)
133 {
134 if (msize) {
135 ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
136 << dispatch_throttler.get_current() << "/"
137 << dispatch_throttler.get_max() << dendl;
138 dispatch_throttler.put(msize);
139 }
140 }
141
142 /*
143 * This function delivers incoming messages to the Messenger.
144 * Connections with messages are kept in queues; when beginning a message
145 * delivery the highest-priority queue is selected, the connection from the
146 * front of the queue is removed, and its message read. If the connection
147 * has remaining messages at that priority level, it is re-placed on to the
148 * end of the queue. If the queue is empty; it's removed.
149 * The message is then delivered and the process starts again.
150 */
151 void DispatchQueue::entry()
152 {
153 std::unique_lock l{lock};
154 while (true) {
155 while (!mqueue.empty()) {
156 QueueItem qitem = mqueue.dequeue();
157 if (!qitem.is_code())
158 remove_arrival(qitem.get_message());
159 l.unlock();
160
161 if (qitem.is_code()) {
162 if (cct->_conf->ms_inject_internal_delays &&
163 cct->_conf->ms_inject_delay_probability &&
(1) Event dont_call: |
"rand" should not be used for security-related applications, because linear congruential algorithms are too easy to break. |
(2) Event remediation: |
Use a compliant random number generator, such as "/dev/random" or "/dev/urandom" on Unix-like systems, and CNG (Cryptography API: Next Generation) on Windows. |
164 (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {
165 utime_t t;
166 t.set_from_double(cct->_conf->ms_inject_internal_delays);
167 ldout(cct, 1) << "DispatchQueue::entry inject delay of " << t
168 << dendl;
169 t.sleep();
170 }
171 switch (qitem.get_code()) {
172 case D_BAD_REMOTE_RESET:
173 msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
174 break;
175 case D_CONNECT:
176 msgr->ms_deliver_handle_connect(qitem.get_connection());
177 break;
178 case D_ACCEPT:
179 msgr->ms_deliver_handle_accept(qitem.get_connection());
180 break;
181 case D_BAD_RESET:
182 msgr->ms_deliver_handle_reset(qitem.get_connection());
183 break;
184 case D_CONN_REFUSED:
185 msgr->ms_deliver_handle_refused(qitem.get_connection());
186 break;
187 default:
188 ceph_abort();
189 }
190 } else {
191 const ref_t<Message>& m = qitem.get_message();
192 if (stop) {
193 ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
194 } else {
195 uint64_t msize = pre_dispatch(m);
196 msgr->ms_deliver_dispatch(m);
197 post_dispatch(m, msize);
198 }
199 }
200
201 l.lock();
202 }
203 if (stop)
204 break;
205
206 // wait for something to be put on queue
207 cond.wait(l);
208 }
209 }
210
211 void DispatchQueue::discard_queue(uint64_t id) {
212 std::lock_guard l{lock};
213 list<QueueItem> removed;
214 mqueue.remove_by_class(id, &removed);
215 for (list<QueueItem>::iterator i = removed.begin();
216 i != removed.end();
217 ++i) {
218 ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
219 const ref_t<Message>& m = i->get_message();
220 remove_arrival(m);
221 dispatch_throttle_release(m->get_dispatch_throttle_size());
222 }
223 }
224
225 void DispatchQueue::start()
226 {
227 ceph_assert(!stop);
228 ceph_assert(!dispatch_thread.is_started());
229 dispatch_thread.create("ms_dispatch");
230 local_delivery_thread.create("ms_local");
231 }
232
233 void DispatchQueue::wait()
234 {
235 local_delivery_thread.join();
236 dispatch_thread.join();
237 }
238
239 void DispatchQueue::discard_local()
240 {
241 decltype(local_messages)().swap(local_messages);
242 }
243
244 void DispatchQueue::shutdown()
245 {
246 // stop my local delivery thread
247 {
248 std::scoped_lock l{local_delivery_lock};
249 stop_local_delivery = true;
250 local_delivery_cond.notify_all();
251 }
252 // stop my dispatch thread
253 {
254 std::scoped_lock l{lock};
255 stop = true;
256 cond.notify_all();
257 }
258 }
259