1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#include "msg/Message.h"
16   	#include "DispatchQueue.h"
17   	#include "Messenger.h"
18   	#include "common/ceph_context.h"
19   	
20   	#define dout_subsys ceph_subsys_ms
21   	#include "common/debug.h"
22   	
23   	
24   	/*******************
25   	 * DispatchQueue
26   	 */
27   	
28   	#undef dout_prefix
29   	#define dout_prefix *_dout << "-- " << msgr->get_myaddrs() << " "
30   	
31   	double DispatchQueue::get_max_age(utime_t now) const {
32   	  std::lock_guard l{lock};
33   	  if (marrival.empty())
34   	    return 0;
35   	  else
36   	    return (now - marrival.begin()->first);
37   	}
38   	
39   	uint64_t DispatchQueue::pre_dispatch(const ref_t<Message>& m)
40   	{
41   	  ldout(cct,1) << "<== " << m->get_source_inst()
42   		       << " " << m->get_seq()
43   		       << " ==== " << *m
44   		       << " ==== " << m->get_payload().length()
45   		       << "+" << m->get_middle().length()
46   		       << "+" << m->get_data().length()
47   		       << " (" << ceph_con_mode_name(m->get_connection()->get_con_mode())
48   		       << " " << m->get_footer().front_crc << " "
49   		       << m->get_footer().middle_crc
50   		       << " " << m->get_footer().data_crc << ")"
51   		       << " " << m << " con " << m->get_connection()
52   		       << dendl;
53   	  uint64_t msize = m->get_dispatch_throttle_size();
54   	  m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
55   	  return msize;
56   	}
57   	
58   	void DispatchQueue::post_dispatch(const ref_t<Message>& m, uint64_t msize)
59   	{
60   	  dispatch_throttle_release(msize);
61   	  ldout(cct,20) << "done calling dispatch on " << m << dendl;
62   	}
63   	
64   	bool DispatchQueue::can_fast_dispatch(const cref_t<Message> &m) const
65   	{
66   	  return msgr->ms_can_fast_dispatch(m);
67   	}
68   	
69   	void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
70   	{
71   	  uint64_t msize = pre_dispatch(m);
72   	  msgr->ms_fast_dispatch(m);
73   	  post_dispatch(m, msize);
74   	}
75   	
76   	void DispatchQueue::fast_preprocess(const ref_t<Message>& m)
77   	{
78   	  msgr->ms_fast_preprocess(m);
79   	}
80   	
81   	void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
82   	{
83   	  std::lock_guard l{lock};
84   	  if (stop) {
85   	    return;
86   	  }
87   	  ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
88   	  add_arrival(m);
89   	  if (priority >= CEPH_MSG_PRIO_LOW) {
90   	    mqueue.enqueue_strict(id, priority, QueueItem(m));
91   	  } else {
92   	    mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
93   	  }
94   	  cond.notify_all();
95   	}
96   	
97   	void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
98   	{
99   	  m->set_recv_stamp(ceph_clock_now());
100  	  std::lock_guard l{local_delivery_lock};
101  	  if (local_messages.empty())
102  	    local_delivery_cond.notify_all();
103  	  local_messages.emplace(m, priority);
104  	  return;
105  	}
106  	
107  	void DispatchQueue::run_local_delivery()
108  	{
109  	  std::unique_lock l{local_delivery_lock};
110  	  while (true) {
111  	    if (stop_local_delivery)
112  	      break;
113  	    if (local_messages.empty()) {
114  	      local_delivery_cond.wait(l);
115  	      continue;
116  	    }
117  	    auto p = std::move(local_messages.front());
118  	    local_messages.pop();
119  	    l.unlock();
120  	    const ref_t<Message>& m = p.first;
121  	    int priority = p.second;
122  	    fast_preprocess(m);
123  	    if (can_fast_dispatch(m)) {
124  	      fast_dispatch(m);
125  	    } else {
126  	      enqueue(m, priority, 0);
127  	    }
128  	    l.lock();
129  	  }
130  	}
131  	
132  	void DispatchQueue::dispatch_throttle_release(uint64_t msize)
133  	{
134  	  if (msize) {
135  	    ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
136  		    << dispatch_throttler.get_current() << "/"
137  		    << dispatch_throttler.get_max() << dendl;
138  	    dispatch_throttler.put(msize);
139  	  }
140  	}
141  	
142  	/*
143  	 * This function delivers incoming messages to the Messenger.
144  	 * Connections with messages are kept in queues; when beginning a message
145  	 * delivery the highest-priority queue is selected, the connection from the
146  	 * front of the queue is removed, and its message read. If the connection
147  	 * has remaining messages at that priority level, it is re-placed on to the
148  	 * end of the queue. If the queue is empty; it's removed.
149  	 * The message is then delivered and the process starts again.
150  	 */
151  	void DispatchQueue::entry()
152  	{
153  	  std::unique_lock l{lock};
154  	  while (true) {
155  	    while (!mqueue.empty()) {
156  	      QueueItem qitem = mqueue.dequeue();
157  	      if (!qitem.is_code())
158  		remove_arrival(qitem.get_message());
159  	      l.unlock();
160  	
161  	      if (qitem.is_code()) {
162  		if (cct->_conf->ms_inject_internal_delays &&
163  		    cct->_conf->ms_inject_delay_probability &&
(1) Event dont_call: "rand" should not be used for security-related applications, because linear congruential algorithms are too easy to break.
(2) Event remediation: Use a compliant random number generator, such as "/dev/random" or "/dev/urandom" on Unix-like systems, and CNG (Cryptography API: Next Generation) on Windows.
164  		    (rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {
165  		  utime_t t;
166  		  t.set_from_double(cct->_conf->ms_inject_internal_delays);
167  		  ldout(cct, 1) << "DispatchQueue::entry  inject delay of " << t
168  				<< dendl;
169  		  t.sleep();
170  		}
171  		switch (qitem.get_code()) {
172  		case D_BAD_REMOTE_RESET:
173  		  msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
174  		  break;
175  		case D_CONNECT:
176  		  msgr->ms_deliver_handle_connect(qitem.get_connection());
177  		  break;
178  		case D_ACCEPT:
179  		  msgr->ms_deliver_handle_accept(qitem.get_connection());
180  		  break;
181  		case D_BAD_RESET:
182  		  msgr->ms_deliver_handle_reset(qitem.get_connection());
183  		  break;
184  		case D_CONN_REFUSED:
185  		  msgr->ms_deliver_handle_refused(qitem.get_connection());
186  		  break;
187  		default:
188  		  ceph_abort();
189  		}
190  	      } else {
191  		const ref_t<Message>& m = qitem.get_message();
192  		if (stop) {
193  		  ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
194  		} else {
195  		  uint64_t msize = pre_dispatch(m);
196  		  msgr->ms_deliver_dispatch(m);
197  		  post_dispatch(m, msize);
198  		}
199  	      }
200  	
201  	      l.lock();
202  	    }
203  	    if (stop)
204  	      break;
205  	
206  	    // wait for something to be put on queue
207  	    cond.wait(l);
208  	  }
209  	}
210  	
211  	void DispatchQueue::discard_queue(uint64_t id) {
212  	  std::lock_guard l{lock};
213  	  list<QueueItem> removed;
214  	  mqueue.remove_by_class(id, &removed);
215  	  for (list<QueueItem>::iterator i = removed.begin();
216  	       i != removed.end();
217  	       ++i) {
218  	    ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
219  	    const ref_t<Message>& m = i->get_message();
220  	    remove_arrival(m);
221  	    dispatch_throttle_release(m->get_dispatch_throttle_size());
222  	  }
223  	}
224  	
225  	void DispatchQueue::start()
226  	{
227  	  ceph_assert(!stop);
228  	  ceph_assert(!dispatch_thread.is_started());
229  	  dispatch_thread.create("ms_dispatch");
230  	  local_delivery_thread.create("ms_local");
231  	}
232  	
233  	void DispatchQueue::wait()
234  	{
235  	  local_delivery_thread.join();
236  	  dispatch_thread.join();
237  	}
238  	
239  	void DispatchQueue::discard_local()
240  	{
241  	  decltype(local_messages)().swap(local_messages);
242  	}
243  	
244  	void DispatchQueue::shutdown()
245  	{
246  	  // stop my local delivery thread
247  	  {
248  	    std::scoped_lock l{local_delivery_lock};
249  	    stop_local_delivery = true;
250  	    local_delivery_cond.notify_all();
251  	  }
252  	  // stop my dispatch thread
253  	  {
254  	    std::scoped_lock l{lock};
255  	    stop = true;
256  	    cond.notify_all();
257  	  }
258  	}
259