1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2014 CohortFS, LLC
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	#include <string>
15   	#include "QueueStrategy.h"
16   	#define dout_subsys ceph_subsys_ms
17   	#include "common/debug.h"
18   	
19   	QueueStrategy::QueueStrategy(int _n_threads)
20   	  : n_threads(_n_threads),
21   	    stop(false),
22   	    mqueue(),
23   	    disp_threads()
24   	{
25   	}
26   	
27   	void QueueStrategy::ds_dispatch(Message *m) {
28   	  msgr->ms_fast_preprocess(m);
29   	  if (msgr->ms_can_fast_dispatch(m)) {
30   	    msgr->ms_fast_dispatch(m);
31   	    return;
32   	  }
33   	  std::lock_guard l{lock};
34   	  mqueue.push_back(*m);
35   	  if (disp_threads.size()) {
36   	    if (! disp_threads.empty()) {
37   	      QSThread *thrd = &disp_threads.front();
38   	      disp_threads.pop_front();
39   	      thrd->cond.notify_all();
40   	    }
41   	  }
42   	}
43   	
44   	void QueueStrategy::entry(QSThread *thrd)
45   	{
46   	  for (;;) {
47   	    ref_t<Message> m;
48   	    std::unique_lock l{lock};
49   	    for (;;) {
50   	      if (! mqueue.empty()) {
51   		m = ref_t<Message>(&mqueue.front(), false);
52   		mqueue.pop_front();
53   		break;
54   	      }
55   	      if (stop)
56   		break;
57   	      disp_threads.push_front(*thrd);
58   	      thrd->cond.wait(l);
59   	    }
60   	    l.unlock();
61   	    if (stop) {
62   		if (!m) break;
63   		continue;
64   	    }
65   	    get_messenger()->ms_deliver_dispatch(m);
66   	  }
67   	}
68   	
69   	void QueueStrategy::shutdown()
70   	{
71   	  QSThread *thrd;
72   	  std::lock_guard l{lock};
73   	  stop = true;
(1) Event cond_true: Condition "this->disp_threads.size()", taking true branch.
74   	  while (disp_threads.size()) {
(2) Event extract: Calling "front" which extracts wrapped state from "this->disp_threads".
(3) Event assign: Assigning: "thrd" = "this->disp_threads.front()".
Also see events: [invalidate][use_after_free]
75   	    thrd = &(disp_threads.front());
(4) Event invalidate: Calling "pop_front" invalidates the internal representation of "this->disp_threads".
Also see events: [extract][assign][use_after_free]
76   	    disp_threads.pop_front();
(5) Event use_after_free: Using invalidated internal representation of "this->disp_threads".
Also see events: [extract][assign][invalidate]
77   	    thrd->cond.notify_all();
78   	  }
79   	}
80   	
81   	void QueueStrategy::wait()
82   	{
83   	  std::unique_lock l{lock};
84   	  ceph_assert(stop);
85   	  for (auto& thread : threads) {
86   	    l.unlock();
87   	
88   	    // join outside of lock
89   	    thread->join();
90   	
91   	    l.lock();
92   	  }
93   	}
94   	
95   	void QueueStrategy::start()
96   	{
97   	  ceph_assert(!stop);
98   	  std::lock_guard l{lock};
99   	  threads.reserve(n_threads);
100  	  for (int ix = 0; ix < n_threads; ++ix) {
101  	    string thread_name = "ms_qs_";
102  	    thread_name.append(std::to_string(ix));
103  	    auto thrd = std::make_unique<QSThread>(this);
104  	    thrd->create(thread_name.c_str());
105  	    threads.emplace_back(std::move(thrd));
106  	  }
107  	}
108