1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 */
13   	
14   	#ifndef TRACKEDREQUEST_H_
15   	#define TRACKEDREQUEST_H_
16   	
17   	#include <atomic>
18   	#include "common/ceph_mutex.h"
19   	#include "common/histogram.h"
20   	#include "common/Thread.h"
21   	#include "common/Clock.h"
22   	#include "include/spinlock.h"
23   	#include "msg/Message.h"
24   	
25   	#define OPTRACKER_PREALLOC_EVENTS 20
26   	
27   	class TrackedOp;
28   	class OpHistory;
29   	
30   	typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
31   	
32   	class OpHistoryServiceThread : public Thread
33   	{
34   	private:
35   	  std::list<std::pair<utime_t, TrackedOpRef>> _external_queue;
36   	  OpHistory* _ophistory;
37   	  mutable ceph::spinlock queue_spinlock;
38   	  bool _break_thread;
39   	public:
40   	  explicit OpHistoryServiceThread(OpHistory* parent)
41   	    : _ophistory(parent),
42   	      _break_thread(false) { }
43   	
44   	  void break_thread();
45   	  void insert_op(const utime_t& now, TrackedOpRef op) {
46   	    queue_spinlock.lock();
47   	    _external_queue.emplace_back(now, op);
48   	    queue_spinlock.unlock();
49   	  }
50   	
51   	  void *entry() override;
52   	};
53   	
54   	
55   	class OpHistory {
56   	  std::set<std::pair<utime_t, TrackedOpRef> > arrived;
57   	  std::set<std::pair<double, TrackedOpRef> > duration;
58   	  std::set<std::pair<utime_t, TrackedOpRef> > slow_op;
59   	  ceph::mutex ops_history_lock = ceph::make_mutex("OpHistory::ops_history_lock");
60   	  void cleanup(utime_t now);
61   	  std::atomic_size_t history_size{0};
62   	  std::atomic_uint32_t history_duration{0};
63   	  std::atomic_size_t history_slow_op_size{0};
64   	  std::atomic_uint32_t history_slow_op_threshold{0};
65   	  std::atomic_bool shutdown{false};
66   	  OpHistoryServiceThread opsvc;
67   	  friend class OpHistoryServiceThread;
68   	
69   	public:
70   	  OpHistory() : opsvc(this) {
71   	    opsvc.create("OpHistorySvc");
72   	  }
73   	  ~OpHistory() {
74   	    ceph_assert(arrived.empty());
75   	    ceph_assert(duration.empty());
76   	    ceph_assert(slow_op.empty());
77   	  }
78   	  void insert(const utime_t& now, TrackedOpRef op)
79   	  {
80   	    if (shutdown)
81   	      return;
82   	
83   	    opsvc.insert_op(now, op);
84   	  }
85   	
86   	  void _insert_delayed(const utime_t& now, TrackedOpRef op);
87   	  void dump_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""}, bool by_duration=false);
88   	  void dump_slow_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""});
89   	  void on_shutdown();
90   	  void set_size_and_duration(size_t new_size, uint32_t new_duration) {
91   	    history_size = new_size;
92   	    history_duration = new_duration;
93   	  }
94   	  void set_slow_op_size_and_threshold(size_t new_size, uint32_t new_threshold) {
95   	    history_slow_op_size = new_size;
96   	    history_slow_op_threshold = new_threshold;
97   	  }
98   	};
99   	
100  	struct ShardedTrackingData;
101  	class OpTracker {
102  	  friend class OpHistory;
103  	  std::atomic<int64_t> seq = { 0 };
104  	  std::vector<ShardedTrackingData*> sharded_in_flight_list;
105  	  OpHistory history;
106  	  uint32_t num_optracker_shards;
107  	  float complaint_time;
108  	  int log_threshold;
109  	  std::atomic<bool> tracking_enabled;
110  	  ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock");
111  	
112  	public:
113  	  CephContext *cct;
114  	  OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
115  	      
116  	  void set_complaint_and_threshold(float time, int threshold) {
117  	    complaint_time = time;
118  	    log_threshold = threshold;
119  	  }
120  	  void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
121  	    history.set_size_and_duration(new_size, new_duration);
122  	  }
123  	  void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
124  	    history.set_slow_op_size_and_threshold(new_size, new_threshold);
125  	  }
126  	  bool is_tracking() const {
127  	    return tracking_enabled;
128  	  }
129  	  void set_tracking(bool enable) {
130  	    tracking_enabled = enable;
131  	  }
132  	  bool dump_ops_in_flight(ceph::Formatter *f, bool print_only_blocked = false, std::set<std::string> filters = {""});
133  	  bool dump_historic_ops(ceph::Formatter *f, bool by_duration = false, std::set<std::string> filters = {""});
134  	  bool dump_historic_slow_ops(ceph::Formatter *f, std::set<std::string> filters = {""});
135  	  bool register_inflight_op(TrackedOp *i);
136  	  void unregister_inflight_op(TrackedOp *i);
137  	  void record_history_op(TrackedOpRef&& i);
138  	
139  	  void get_age_ms_histogram(pow2_hist_t *h);
140  	
141  	  /**
142  	   * walk through ops in flight
143  	   *
144  	   * @param oldest_sec the amount of time since the oldest op was initiated
145  	   * @param check a function consuming tracked ops, the function returns
146  	   *              false if it don't want to be fed with more ops
147  	   * @return True if there are any Ops to warn on, false otherwise
148  	   */
149  	  bool visit_ops_in_flight(utime_t* oldest_secs,
150  				   std::function<bool(TrackedOp&)>&& visit);
151  	  /**
152  	   * walk through slow ops in flight
153  	   *
154  	   * @param[out] oldest_sec the amount of time since the oldest op was initiated
155  	   * @param[out] num_slow_ops total number of slow ops
156  	   * @param[out] num_warned_ops total number of warned ops
157  	   * @param on_warn a function consuming tracked ops, the function returns
158  	   *                false if it don't want to be fed with more ops
159  	   * @return True if there are any Ops to warn on, false otherwise
160  	   */
161  	  bool with_slow_ops_in_flight(utime_t* oldest_secs,
162  				       int* num_slow_ops,
163  				       int* num_warned_ops,
164  				       std::function<void(TrackedOp&)>&& on_warn);
165  	  /**
166  	   * Look for Ops which are too old, and insert warning
167  	   * strings for each Op that is too old.
168  	   *
169  	   * @param summary[out] a std::string summarizing slow Ops.
170  	   * @param warning_strings[out] A std::vector<std::string> reference which is filled
171  	   * with a warning std::string for each old Op.
172  	   * @param slow[out] total number of slow ops
173  	   * @return True if there are any Ops to warn on, false otherwise.
174  	   */
175  	  bool check_ops_in_flight(std::string* summary,
176  				   std::vector<std::string> &warning_strings,
177  				   int* slow = nullptr);
178  	
179  	  void on_shutdown() {
180  	    history.on_shutdown();
181  	  }
182  	  ~OpTracker();
183  	
184  	  template <typename T, typename U>
185  	  typename T::Ref create_request(U params)
186  	  {
187  	    typename T::Ref retval(new T(params, this));
188  	    retval->tracking_start();
189  	    if (is_tracking()) {
190  	      retval->mark_event("throttled", params->get_throttle_stamp());
191  	      retval->mark_event("header_read", params->get_recv_stamp());
192  	      retval->mark_event("all_read", params->get_recv_complete_stamp());
193  	      retval->mark_event("dispatched", params->get_dispatch_stamp());
194  	    }
195  	
196  	    return retval;
197  	  }
198  	};
199  	
200  	class TrackedOp : public boost::intrusive::list_base_hook<> {
201  	private:
202  	  friend class OpHistory;
203  	  friend class OpTracker;
204  	
205  	  boost::intrusive::list_member_hook<> tracker_item;
206  	
207  	public:
208  	  typedef boost::intrusive::list<
209  	  TrackedOp,
210  	  boost::intrusive::member_hook<
211  	    TrackedOp,
212  	    boost::intrusive::list_member_hook<>,
213  	    &TrackedOp::tracker_item> > tracked_op_list_t;
214  	
215  	  // for use when clearing lists.  e.g.,
216  	  //   ls.clear_and_dispose(TrackedOp::Putter());
217  	  struct Putter {
218  	    void operator()(TrackedOp *op) {
219  	      op->put();
220  	    }
221  	  };
222  	
223  	protected:
224  	  OpTracker *tracker;          ///< the tracker we are associated with
225  	  std::atomic_int nref = {0};  ///< ref count
226  	
227  	  utime_t initiated_at;
228  	
229  	  struct Event {
230  	    utime_t stamp;
231  	    std::string str;
232  	
233  	    Event(utime_t t, std::string_view s) : stamp(t), str(s) {}
234  	
235  	    int compare(const char *s) const {
236  	      return str.compare(s);
237  	    }
238  	
239  	    const char *c_str() const {
240  	      return str.c_str();
241  	    }
242  	
243  	    void dump(ceph::Formatter *f) const {
244  	      f->dump_stream("time") << stamp;
245  	      f->dump_string("event", str);
246  	    }
247  	  };
248  	
249  	  std::vector<Event> events;    ///< std::list of events and their times
250  	  mutable ceph::mutex lock = ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
251  	  uint64_t seq = 0;        ///< a unique value std::set by the OpTracker
252  	
253  	  uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
254  	
255  	  enum {
256  	    STATE_UNTRACKED = 0,
257  	    STATE_LIVE,
258  	    STATE_HISTORY
259  	  };
260  	  std::atomic<int> state = {STATE_UNTRACKED};
261  	
262  	  mutable std::string desc_str;   ///< protected by lock
263  	  mutable const char *desc = nullptr;  ///< readable without lock
264  	  mutable std::atomic<bool> want_new_desc = {false};
265  	
266  	  TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
267  	    tracker(_tracker),
268  	    initiated_at(initiated)
269  	  {
270  	    events.reserve(OPTRACKER_PREALLOC_EVENTS);
271  	  }
272  	
273  	  /// output any type-specific data you want to get when dump() is called
274  	  virtual void _dump(ceph::Formatter *f) const {}
275  	  /// if you want something else to happen when events are marked, implement
276  	  virtual void _event_marked() {}
277  	  /// return a unique descriptor of the Op; eg the message it's attached to
278  	  virtual void _dump_op_descriptor_unlocked(std::ostream& stream) const = 0;
279  	  /// called when the last non-OpTracker reference is dropped
280  	  virtual void _unregistered() {}
281  	
282  	  virtual bool filter_out(const std::set<std::string>& filters) { return true; }
283  	
284  	public:
285  	  ZTracer::Trace osd_trace;
286  	  ZTracer::Trace pg_trace;
287  	  ZTracer::Trace store_trace;
288  	  ZTracer::Trace journal_trace;
289  	
290  	  virtual ~TrackedOp() {}
291  	
292  	  void get() {
293  	    ++nref;
294  	  }
295  	  void put() {
296  	  again:
297  	    auto nref_snap = nref.load();
298  	    if (nref_snap == 1) {
299  	      switch (state.load()) {
300  	      case STATE_UNTRACKED:
301  		_unregistered();
302  		delete this;
303  		break;
304  	
305  	      case STATE_LIVE:
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
306  		mark_event("done");
307  		tracker->unregister_inflight_op(this);
308  		_unregistered();
309  		if (!tracker->is_tracking()) {
310  		  delete this;
311  		} else {
312  		  state = TrackedOp::STATE_HISTORY;
313  		  tracker->record_history_op(
314  		    TrackedOpRef(this, /* add_ref = */ false));
315  		}
316  		break;
317  	
318  	      case STATE_HISTORY:
319  		delete this;
320  		break;
321  	
322  	      default:
323  		ceph_abort();
324  	      }
325  	    } else if (!nref.compare_exchange_weak(nref_snap, nref_snap - 1)) {
326  	      goto again;
327  	    }
328  	  }
329  	
330  	  const char *get_desc() const {
331  	    if (!desc || want_new_desc.load()) {
332  	      std::lock_guard l(lock);
333  	      _gen_desc();
334  	    }
335  	    return desc;
336  	  }
337  	private:
338  	  void _gen_desc() const {
339  	    std::ostringstream ss;
340  	    _dump_op_descriptor_unlocked(ss);
341  	    desc_str = ss.str();
342  	    desc = desc_str.c_str();
343  	    want_new_desc = false;
344  	  }
345  	public:
346  	  void reset_desc() {
347  	    want_new_desc = true;
348  	  }
349  	
350  	  const utime_t& get_initiated() const {
351  	    return initiated_at;
352  	  }
353  	
354  	  double get_duration() const {
355  	    std::lock_guard l(lock);
356  	    if (!events.empty() && events.rbegin()->compare("done") == 0)
357  	      return events.rbegin()->stamp - get_initiated();
358  	    else
359  	      return ceph_clock_now() - get_initiated();
360  	  }
361  	
362  	  void mark_event(std::string_view event, utime_t stamp=ceph_clock_now());
363  	
364  	  void mark_nowarn() {
365  	    warn_interval_multiplier = 0;
366  	  }
367  	
368  	  virtual std::string_view state_string() const {
369  	    std::lock_guard l(lock);
370  	    return events.empty() ? std::string_view() : std::string_view(events.rbegin()->str);
371  	  }
372  	
373  	  void dump(utime_t now, ceph::Formatter *f) const;
374  	
375  	  void tracking_start() {
376  	    if (tracker->register_inflight_op(this)) {
377  	      events.emplace_back(initiated_at, "initiated");
378  	      state = STATE_LIVE;
379  	    }
380  	  }
381  	
382  	  // ref counting via intrusive_ptr, with special behavior on final
383  	  // put for historical op tracking
384  	  friend void intrusive_ptr_add_ref(TrackedOp *o) {
385  	    o->get();
386  	  }
387  	  friend void intrusive_ptr_release(TrackedOp *o) {
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
388  	    o->put();
389  	  }
390  	};
391  	
392  	
393  	#endif
394