1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab ft=cpp
3    	
4    	#include "common/async/completion.h"
5    	#include "rgw_dmclock_async_scheduler.h"
6    	#include "rgw_dmclock_scheduler.h"
7    	
8    	namespace rgw::dmclock {
9    	
(1) Event exn_spec_violation: An exception of type "boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
10   	AsyncScheduler::~AsyncScheduler()
11   	{
(2) Event fun_call_w_exception: Called function throws an exception of type "boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >". [details]
Also see events: [exn_spec_violation]
12   	  cancel();
13   	  if (observer) {
14   	    cct->_conf.remove_observer(this);
15   	  }
16   	}
17   	
18   	const char** AsyncScheduler::get_tracked_conf_keys() const
19   	{
20   	  if (observer) {
21   	    return observer->get_tracked_conf_keys();
22   	  }
23   	  static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
24   	  return keys;
25   	}
26   	
27   	void AsyncScheduler::handle_conf_change(const ConfigProxy& conf,
28   	                                        const std::set<std::string>& changed)
29   	{
30   	  if (observer) {
31   	    observer->handle_conf_change(conf, changed);
32   	  }
33   	  if (changed.count("rgw_max_concurrent_requests")) {
34   	    auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
35   	    max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
36   	  }
37   	  queue.update_client_infos();
38   	  schedule(crimson::dmclock::TimeZero);
39   	}
40   	
41   	int AsyncScheduler::schedule_request_impl(const client_id& client,
42   	                                          const ReqParams& params,
43   	                                          const Time& time, const Cost& cost,
44   	                                          optional_yield yield_ctx)
45   	{
46   	    ceph_assert(yield_ctx);
47   	
48   	    auto &yield = yield_ctx.get_yield_context();
49   	    boost::system::error_code ec;
50   	    async_request(client, params, time, cost, yield[ec]);
51   	
52   	    if (ec){
53   	      if (ec == boost::system::errc::resource_unavailable_try_again)
54   	        return -EAGAIN;
55   	      else
56   	        return -ec.value();
57   	    }
58   	
59   	    return 0;
60   	}
61   	
62   	void AsyncScheduler::request_complete()
63   	{
64   	  --outstanding_requests;
65   	  schedule(crimson::dmclock::TimeZero);
66   	}
67   	
68   	void AsyncScheduler::cancel()
69   	{
70   	  ClientSums sums;
71   	
72   	  queue.remove_by_req_filter([&] (RequestRef&& request) {
73   	      inc(sums, request->client, request->cost);
74   	      auto c = static_cast<Completion*>(request.release());
75   	      Completion::dispatch(std::unique_ptr<Completion>{c},
76   	                           boost::asio::error::operation_aborted,
77   	                           PhaseType::priority);
78   	      return true;
79   	    });
(1) Event fun_call_w_exception: Called function throws an exception of type "boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >". [details]
80   	  timer.cancel();
81   	
82   	  for (size_t i = 0; i < client_count; i++) {
83   	    if (auto c = counters(static_cast<client_id>(i))) {
84   	      on_cancel(c, sums[i]);
85   	    }
86   	  }
87   	}
88   	
89   	void AsyncScheduler::cancel(const client_id& client)
90   	{
91   	  ClientSum sum;
92   	
93   	  queue.remove_by_client(client, false, [&] (RequestRef&& request) {
94   	      sum.count++;
95   	      sum.cost += request->cost;
96   	      auto c = static_cast<Completion*>(request.release());
97   	      Completion::dispatch(std::unique_ptr<Completion>{c},
98   	                           boost::asio::error::operation_aborted,
99   	                           PhaseType::priority);
100  	    });
101  	  if (auto c = counters(client)) {
102  	    on_cancel(c, sum);
103  	  }
104  	  schedule(crimson::dmclock::TimeZero);
105  	}
106  	
107  	void AsyncScheduler::schedule(const Time& time)
108  	{
109  	  timer.expires_at(Clock::from_double(time));
110  	  timer.async_wait([this] (boost::system::error_code ec) {
111  	      // process requests unless the wait was canceled. note that a canceled
112  	      // wait may execute after this AsyncScheduler destructs
113  	      if (ec != boost::asio::error::operation_aborted) {
114  	        process(get_time());
115  	      }
116  	    });
117  	}
118  	
119  	void AsyncScheduler::process(const Time& now)
120  	{
121  	  // must run in the executor. we should only invoke completion handlers if the
122  	  // executor is running
123  	  assert(get_executor().running_in_this_thread());
124  	
125  	  ClientSums rsums, psums;
126  	
127  	  while (outstanding_requests < max_requests) {
128  	    auto pull = queue.pull_request(now);
129  	
130  	    if (pull.is_none()) {
131  	      // no pending requests, cancel the timer
132  	      timer.cancel();
133  	      break;
134  	    }
135  	    if (pull.is_future()) {
136  	      // update the timer based on the future time
137  	      schedule(pull.getTime());
138  	      break;
139  	    }
140  	    ++outstanding_requests;
141  	
142  	    // complete the request
143  	    auto& r = pull.get_retn();
144  	    auto client = r.client;
145  	    auto phase = r.phase;
146  	    auto started = r.request->started;
147  	    auto cost = r.request->cost;
148  	    auto c = static_cast<Completion*>(r.request.release());
149  	    Completion::post(std::unique_ptr<Completion>{c},
150  	                     boost::system::error_code{}, phase);
151  	
152  	    if (auto c = counters(client)) {
153  	      auto lat = Clock::from_double(now) - Clock::from_double(started);
154  	      if (phase == PhaseType::reservation) {
155  	        inc(rsums, client, cost);
156  	        c->tinc(queue_counters::l_res_latency, lat);
157  	      } else {
158  	        inc(psums, client, cost);
159  	        c->tinc(queue_counters::l_prio_latency, lat);
160  	      }
161  	    }
162  	  }
163  	
164  	  if (outstanding_requests >= max_requests) {
165  	    if(auto c = counters(client_id::count)){
166  	      c->inc(throttle_counters::l_throttle);
167  	    }
168  	  }
169  	
170  	  for (size_t i = 0; i < client_count; i++) {
171  	    if (auto c = counters(static_cast<client_id>(i))) {
172  	      on_process(c, rsums[i], psums[i]);
173  	    }
174  	  }
175  	}
176  	
177  	} // namespace rgw::dmclock
178