1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * This is free software; you can redistribute it and/or
7    	 * modify it under the terms of the GNU Lesser General Public
8    	 * License version 2.1, as published by the Free Software
9    	 * Foundation.  See file COPYING.
10   	 * Copyright 2013 Inktank
11   	 */
13   	#include "TrackedOp.h"
15   	#define dout_context cct
16   	#define dout_subsys ceph_subsys_optracker
17   	#undef dout_prefix
18   	#define dout_prefix _prefix(_dout)
20   	static ostream& _prefix(std::ostream* _dout)
21   	{
22   	  return *_dout << "-- op tracker -- ";
23   	}
25   	void OpHistoryServiceThread::break_thread() {
26   	  queue_spinlock.lock();
27   	  _external_queue.clear();
28   	  _break_thread = true;
29   	  queue_spinlock.unlock();
30   	}
32   	void* OpHistoryServiceThread::entry() {
33   	  int sleep_time = 1000;
34   	  list<pair<utime_t, TrackedOpRef>> internal_queue;
35   	  while (1) {
36   	    queue_spinlock.lock();
37   	    if (_break_thread) {
38   	      queue_spinlock.unlock();
39   	      break;
40   	    }
41   	    internal_queue.swap(_external_queue);
42   	    queue_spinlock.unlock();
43   	    if (internal_queue.empty()) {
44   	      usleep(sleep_time);
45   	      if (sleep_time < 128000) {
46   	        sleep_time <<= 2;
47   	      }
48   	    } else {
49   	      sleep_time = 1000;
50   	    }
52   	    while (!internal_queue.empty()) {
53   	      pair<utime_t, TrackedOpRef> op = internal_queue.front();
54   	      _ophistory->_insert_delayed(op.first, op.second);
55   	      internal_queue.pop_front();
56   	    }
57   	  }
58   	  return nullptr;
59   	}
62   	void OpHistory::on_shutdown()
63   	{
64   	  opsvc.break_thread();
65   	  opsvc.join();
66   	  std::lock_guard history_lock(ops_history_lock);
67   	  arrived.clear();
68   	  duration.clear();
69   	  slow_op.clear();
70   	  shutdown = true;
71   	}
73   	void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
74   	{
75   	  std::lock_guard history_lock(ops_history_lock);
76   	  if (shutdown)
77   	    return;
78   	  double opduration = op->get_duration();
79   	  duration.insert(make_pair(opduration, op));
80   	  arrived.insert(make_pair(op->get_initiated(), op));
81   	  if (opduration >= history_slow_op_threshold.load())
82   	    slow_op.insert(make_pair(op->get_initiated(), op));
83   	  cleanup(now);
84   	}
86   	void OpHistory::cleanup(utime_t now)
87   	{
88   	  while (arrived.size() &&
89   		 (now - arrived.begin()->first >
90   		  (double)(history_duration.load()))) {
91   	    duration.erase(make_pair(
92   		arrived.begin()->second->get_duration(),
93   		arrived.begin()->second));
94   	    arrived.erase(arrived.begin());
95   	  }
97   	  while (duration.size() > history_size.load()) {
98   	    arrived.erase(make_pair(
99   		duration.begin()->second->get_initiated(),
100  		duration.begin()->second));
101  	    duration.erase(duration.begin());
102  	  }
104  	  while (slow_op.size() > history_slow_op_size.load()) {
105  	    slow_op.erase(make_pair(
106  		slow_op.begin()->second->get_initiated(),
107  		slow_op.begin()->second));
108  	  }
109  	}
111  	void OpHistory::dump_ops(utime_t now, Formatter *f, set<string> filters, bool by_duration)
112  	{
113  	  std::lock_guard history_lock(ops_history_lock);
114  	  cleanup(now);
115  	  f->open_object_section("op_history");
116  	  f->dump_int("size", history_size.load());
117  	  f->dump_int("duration", history_duration.load());
118  	  {
119  	    f->open_array_section("ops");
120  	    auto dump_fn = [&f, &now, &filters](auto begin_iter, auto end_iter) {
121  	      for (auto i=begin_iter; i!=end_iter; ++i) {
122  		if (!i->second->filter_out(filters))
123  		  continue;
124  		f->open_object_section("op");
125  		i->second->dump(now, f);
126  		f->close_section();
127  	      }
128  	    };
130  	    if (by_duration) {
131  	      dump_fn(duration.rbegin(), duration.rend());
132  	    } else {
133  	      dump_fn(arrived.begin(), arrived.end());
134  	    }
135  	    f->close_section();
136  	  }
137  	  f->close_section();
138  	}
140  	struct ShardedTrackingData {
141  	  ceph::mutex ops_in_flight_lock_sharded;
142  	  TrackedOp::tracked_op_list_t ops_in_flight_sharded;
143  	  explicit ShardedTrackingData(string lock_name)
144  	    : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {}
145  	};
147  	OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
148  	  seq(0),
149  	  num_optracker_shards(num_shards),
150  	  complaint_time(0), log_threshold(0),
151  	  tracking_enabled(tracking),
152  	  cct(cct_) {
153  	    for (uint32_t i = 0; i < num_optracker_shards; i++) {
154  	      char lock_name[32] = {0};
155  	      snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
156  	      ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
157  	      sharded_in_flight_list.push_back(one_shard);
158  	    }
159  	}
161  	OpTracker::~OpTracker() {
162  	  while (!sharded_in_flight_list.empty()) {
163  	    ceph_assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
164  	    delete sharded_in_flight_list.back();
165  	    sharded_in_flight_list.pop_back();
166  	  }
167  	}
169  	bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> filters)
170  	{
171  	  if (!tracking_enabled)
172  	    return false;
174  	  std::shared_lock l{lock};
175  	  utime_t now = ceph_clock_now();
176  	  history.dump_ops(now, f, filters, by_duration);
177  	  return true;
178  	}
180  	void OpHistory::dump_slow_ops(utime_t now, Formatter *f, set<string> filters)
181  	{
182  	  std::lock_guard history_lock(ops_history_lock);
183  	  cleanup(now);
184  	  f->open_object_section("OpHistory slow ops");
185  	  f->dump_int("num to keep", history_slow_op_size.load());
186  	  f->dump_int("threshold to keep", history_slow_op_threshold.load());
187  	  {
188  	    f->open_array_section("Ops");
189  	    for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
190  		   slow_op.begin();
191  		 i != slow_op.end();
192  		 ++i) {
193  	      if (!i->second->filter_out(filters))
194  	        continue;
195  	      f->open_object_section("Op");
196  	      i->second->dump(now, f);
197  	      f->close_section();
198  	    }
199  	    f->close_section();
200  	  }
201  	  f->close_section();
202  	}
204  	bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
205  	{
206  	  if (!tracking_enabled)
207  	    return false;
209  	  std::shared_lock l{lock};
210  	  utime_t now = ceph_clock_now();
211  	  history.dump_slow_ops(now, f, filters);
212  	  return true;
213  	}
215  	bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<string> filters)
216  	{
217  	  if (!tracking_enabled)
218  	    return false;
220  	  std::shared_lock l{lock};
221  	  f->open_object_section("ops_in_flight"); // overall dump
222  	  uint64_t total_ops_in_flight = 0;
223  	  f->open_array_section("ops"); // list of TrackedOps
224  	  utime_t now = ceph_clock_now();
225  	  for (uint32_t i = 0; i < num_optracker_shards; i++) {
226  	    ShardedTrackingData* sdata = sharded_in_flight_list[i];
227  	    ceph_assert(NULL != sdata); 
228  	    std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
229  	    for (auto& op : sdata->ops_in_flight_sharded) {
230  	      if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
231  	        break;
232  	      if (!op.filter_out(filters))
233  	        continue;
234  	      f->open_object_section("op");
235  	      op.dump(now, f);
236  	      f->close_section(); // this TrackedOp
237  	      total_ops_in_flight++;
238  	    }
239  	  }
240  	  f->close_section(); // list of TrackedOps
241  	  if (print_only_blocked) {
242  	    f->dump_float("complaint_time", complaint_time);
243  	    f->dump_int("num_blocked_ops", total_ops_in_flight);
244  	  } else
245  	    f->dump_int("num_ops", total_ops_in_flight);
246  	  f->close_section(); // overall dump
247  	  return true;
248  	}
250  	bool OpTracker::register_inflight_op(TrackedOp *i)
251  	{
252  	  if (!tracking_enabled)
253  	    return false;
255  	  std::shared_lock l{lock};
256  	  uint64_t current_seq = ++seq;
257  	  uint32_t shard_index = current_seq % num_optracker_shards;
258  	  ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
259  	  ceph_assert(NULL != sdata);
260  	  {
261  	    std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
262  	    sdata->ops_in_flight_sharded.push_back(*i);
263  	    i->seq = current_seq;
264  	  }
265  	  return true;
266  	}
268  	void OpTracker::unregister_inflight_op(TrackedOp* const i)
269  	{
270  	  // caller checks;
271  	  ceph_assert(i->state);
273  	  uint32_t shard_index = i->seq % num_optracker_shards;
274  	  ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
275  	  ceph_assert(NULL != sdata);
276  	  {
277  	    std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
278  	    auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
279  	    sdata->ops_in_flight_sharded.erase(p);
280  	  }
281  	}
283  	void OpTracker::record_history_op(TrackedOpRef&& i)
284  	{
285  	  std::shared_lock l{lock};
286  	  history.insert(ceph_clock_now(), std::move(i));
287  	}
289  	bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs,
290  					    std::function<bool(TrackedOp&)>&& visit)
291  	{
292  	  if (!tracking_enabled)
293  	    return false;
295  	  const utime_t now = ceph_clock_now();
296  	  utime_t oldest_op = now;
297  	  uint64_t total_ops_in_flight = 0;
299  	  std::shared_lock l{lock};
300  	  for (const auto sdata : sharded_in_flight_list) {
301  	    ceph_assert(sdata);
302  	    std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
303  	    if (!sdata->ops_in_flight_sharded.empty()) {
304  	      utime_t oldest_op_tmp =
305  		sdata->ops_in_flight_sharded.front().get_initiated();
306  	      if (oldest_op_tmp < oldest_op) {
307  	        oldest_op = oldest_op_tmp;
308  	      }
309  	    }
310  	    total_ops_in_flight += sdata->ops_in_flight_sharded.size();
311  	  }
312  	  if (!total_ops_in_flight)
313  	    return false;
314  	  *oldest_secs = now - oldest_op;
315  	  dout(10) << "ops_in_flight.size: " << total_ops_in_flight
316  	           << "; oldest is " << *oldest_secs
317  	           << " seconds old" << dendl;
319  	  if (*oldest_secs < complaint_time)
320  	    return false;
322  	  for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
323  	    ShardedTrackingData* sdata = sharded_in_flight_list[iter];
324  	    ceph_assert(NULL != sdata);
325  	    std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
326  	    for (auto& op : sdata->ops_in_flight_sharded) {
327  	      if (!visit(op))
328  		break;
329  	    }
330  	  }
331  	  return true;
332  	}
334  	bool OpTracker::with_slow_ops_in_flight(utime_t* oldest_secs,
335  						int* num_slow_ops,
336  						int* num_warned_ops,
337  						std::function<void(TrackedOp&)>&& on_warn)
338  	{
339  	  const utime_t now = ceph_clock_now();
340  	  auto too_old = now;
341  	  too_old -= complaint_time;
342  	  int slow = 0;
343  	  int warned = 0;
344  	  auto check = [&](TrackedOp& op) {
345  	    if (op.get_initiated() >= too_old) {
346  	      // no more slow ops in flight
347  	      return false;
348  	    }
349  	    if (!op.warn_interval_multiplier)
350  	      return true;
351  	    slow++;
352  	    if (warned >= log_threshold) {
353  	      // enough samples of slow ops
354  	      return true;
355  	    }
356  	    auto time_to_complain = (op.get_initiated() +
357  				     complaint_time * op.warn_interval_multiplier);
358  	    if (time_to_complain >= now) {
359  	      // complain later if the op is still in flight
360  	      return true;
361  	    }
362  	    // will warn, increase counter
363  	    warned++;
364  	    on_warn(op);
365  	    return true;
366  	  };
367  	  if (visit_ops_in_flight(oldest_secs, check)) {
368  	    if (num_slow_ops) {
369  	      *num_slow_ops = slow;
370  	      *num_warned_ops = warned;
371  	    }
372  	    return true;
373  	  } else {
374  	    return false;
375  	  }
376  	}
378  	bool OpTracker::check_ops_in_flight(std::string* summary,
379  					    std::vector<string> &warnings,
380  					    int *num_slow_ops)
381  	{
382  	  const utime_t now = ceph_clock_now();
383  	  auto too_old = now;
384  	  too_old -= complaint_time;
385  	  int warned = 0;
386  	  utime_t oldest_secs;
387  	  auto warn_on_slow_op = [&](TrackedOp& op) {
388  	    stringstream ss;
389  	    utime_t age = now - op.get_initiated();
390  	    ss << "slow request " << age << " seconds old, received at "
391  	       << op.get_initiated() << ": " << op.get_desc()
392  	       << " currently "
393  	       << op.state_string();
394  	    warnings.push_back(ss.str());
395  	    // only those that have been shown will backoff
396  	    op.warn_interval_multiplier *= 2;
397  	  };
398  	  int slow = 0;
399  	  if (with_slow_ops_in_flight(&oldest_secs, &slow, &warned, warn_on_slow_op) &&
400  	      slow > 0) {
401  	    stringstream ss;
402  	    ss << slow << " slow requests, "
403  	       << warned << " included below; oldest blocked for > "
404  	       << oldest_secs << " secs";
405  	    *summary = ss.str();
406  	    if (num_slow_ops) {
407  	      *num_slow_ops = slow;
408  	    }
409  	    return true;
410  	  } else {
411  	    return false;
412  	  }
413  	}
415  	void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
416  	{
417  	  h->clear();
418  	  utime_t now = ceph_clock_now();
420  	  for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
421  	    ShardedTrackingData* sdata = sharded_in_flight_list[iter];
422  	    ceph_assert(NULL != sdata);
423  	    std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
425  	    for (auto& i : sdata->ops_in_flight_sharded) {
426  	      utime_t age = now - i.get_initiated();
427  	      uint32_t ms = (long)(age * 1000.0);
428  	      h->add(ms);
429  	    }
430  	  }
431  	}
434  	#undef dout_context
435  	#define dout_context tracker->cct
437  	void TrackedOp::mark_event(std::string_view event, utime_t stamp)
438  	{
439  	  if (!state)
440  	    return;
442  	  {
443  	    std::lock_guard l(lock);
444  	    events.emplace_back(stamp, event);
445  	  }
446  	  dout(6) << " seq: " << seq
447  		  << ", time: " << stamp
448  		  << ", event: " << event
449  		  << ", op: " << get_desc()
450  		  << dendl;
451  	  _event_marked();
452  	}
454  	void TrackedOp::dump(utime_t now, Formatter *f) const
455  	{
456  	  // Ignore if still in the constructor
457  	  if (!state)
458  	    return;
459  	  f->dump_string("description", get_desc());
460  	  f->dump_stream("initiated_at") << get_initiated();
461  	  f->dump_float("age", now - get_initiated());
462  	  f->dump_float("duration", get_duration());
463  	  {
464  	    f->open_object_section("type_data");
465  	    _dump(f);
466  	    f->close_section();
467  	  }
468  	}