1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
16
17 #include <atomic>
18 #include "common/ceph_mutex.h"
19 #include "common/histogram.h"
20 #include "common/Thread.h"
21 #include "common/Clock.h"
22 #include "include/spinlock.h"
23 #include "msg/Message.h"
24
25 #define OPTRACKER_PREALLOC_EVENTS 20
26
27 class TrackedOp;
28 class OpHistory;
29
30 typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
31
32 class OpHistoryServiceThread : public Thread
33 {
34 private:
35 std::list<std::pair<utime_t, TrackedOpRef>> _external_queue;
36 OpHistory* _ophistory;
37 mutable ceph::spinlock queue_spinlock;
38 bool _break_thread;
39 public:
40 explicit OpHistoryServiceThread(OpHistory* parent)
41 : _ophistory(parent),
42 _break_thread(false) { }
43
44 void break_thread();
45 void insert_op(const utime_t& now, TrackedOpRef op) {
46 queue_spinlock.lock();
47 _external_queue.emplace_back(now, op);
48 queue_spinlock.unlock();
49 }
50
51 void *entry() override;
52 };
53
54
55 class OpHistory {
56 std::set<std::pair<utime_t, TrackedOpRef> > arrived;
57 std::set<std::pair<double, TrackedOpRef> > duration;
58 std::set<std::pair<utime_t, TrackedOpRef> > slow_op;
59 ceph::mutex ops_history_lock = ceph::make_mutex("OpHistory::ops_history_lock");
60 void cleanup(utime_t now);
61 std::atomic_size_t history_size{0};
62 std::atomic_uint32_t history_duration{0};
63 std::atomic_size_t history_slow_op_size{0};
64 std::atomic_uint32_t history_slow_op_threshold{0};
65 std::atomic_bool shutdown{false};
66 OpHistoryServiceThread opsvc;
67 friend class OpHistoryServiceThread;
68
69 public:
70 OpHistory() : opsvc(this) {
71 opsvc.create("OpHistorySvc");
72 }
73 ~OpHistory() {
74 ceph_assert(arrived.empty());
75 ceph_assert(duration.empty());
76 ceph_assert(slow_op.empty());
77 }
78 void insert(const utime_t& now, TrackedOpRef op)
79 {
80 if (shutdown)
81 return;
82
83 opsvc.insert_op(now, op);
84 }
85
86 void _insert_delayed(const utime_t& now, TrackedOpRef op);
87 void dump_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""}, bool by_duration=false);
88 void dump_slow_ops(utime_t now, ceph::Formatter *f, std::set<std::string> filters = {""});
89 void on_shutdown();
90 void set_size_and_duration(size_t new_size, uint32_t new_duration) {
91 history_size = new_size;
92 history_duration = new_duration;
93 }
94 void set_slow_op_size_and_threshold(size_t new_size, uint32_t new_threshold) {
95 history_slow_op_size = new_size;
96 history_slow_op_threshold = new_threshold;
97 }
98 };
99
100 struct ShardedTrackingData;
101 class OpTracker {
102 friend class OpHistory;
103 std::atomic<int64_t> seq = { 0 };
104 std::vector<ShardedTrackingData*> sharded_in_flight_list;
105 OpHistory history;
106 uint32_t num_optracker_shards;
107 float complaint_time;
108 int log_threshold;
109 std::atomic<bool> tracking_enabled;
110 ceph::shared_mutex lock = ceph::make_shared_mutex("OpTracker::lock");
111
112 public:
113 CephContext *cct;
114 OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
115
116 void set_complaint_and_threshold(float time, int threshold) {
117 complaint_time = time;
118 log_threshold = threshold;
119 }
120 void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
121 history.set_size_and_duration(new_size, new_duration);
122 }
123 void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
124 history.set_slow_op_size_and_threshold(new_size, new_threshold);
125 }
126 bool is_tracking() const {
127 return tracking_enabled;
128 }
129 void set_tracking(bool enable) {
130 tracking_enabled = enable;
131 }
132 bool dump_ops_in_flight(ceph::Formatter *f, bool print_only_blocked = false, std::set<std::string> filters = {""});
133 bool dump_historic_ops(ceph::Formatter *f, bool by_duration = false, std::set<std::string> filters = {""});
134 bool dump_historic_slow_ops(ceph::Formatter *f, std::set<std::string> filters = {""});
135 bool register_inflight_op(TrackedOp *i);
136 void unregister_inflight_op(TrackedOp *i);
137 void record_history_op(TrackedOpRef&& i);
138
139 void get_age_ms_histogram(pow2_hist_t *h);
140
141 /**
142 * walk through ops in flight
143 *
144 * @param oldest_sec the amount of time since the oldest op was initiated
145 * @param check a function consuming tracked ops, the function returns
146 * false if it don't want to be fed with more ops
147 * @return True if there are any Ops to warn on, false otherwise
148 */
149 bool visit_ops_in_flight(utime_t* oldest_secs,
150 std::function<bool(TrackedOp&)>&& visit);
151 /**
152 * walk through slow ops in flight
153 *
154 * @param[out] oldest_sec the amount of time since the oldest op was initiated
155 * @param[out] num_slow_ops total number of slow ops
156 * @param[out] num_warned_ops total number of warned ops
157 * @param on_warn a function consuming tracked ops, the function returns
158 * false if it don't want to be fed with more ops
159 * @return True if there are any Ops to warn on, false otherwise
160 */
161 bool with_slow_ops_in_flight(utime_t* oldest_secs,
162 int* num_slow_ops,
163 int* num_warned_ops,
164 std::function<void(TrackedOp&)>&& on_warn);
165 /**
166 * Look for Ops which are too old, and insert warning
167 * strings for each Op that is too old.
168 *
169 * @param summary[out] a std::string summarizing slow Ops.
170 * @param warning_strings[out] A std::vector<std::string> reference which is filled
171 * with a warning std::string for each old Op.
172 * @param slow[out] total number of slow ops
173 * @return True if there are any Ops to warn on, false otherwise.
174 */
175 bool check_ops_in_flight(std::string* summary,
176 std::vector<std::string> &warning_strings,
177 int* slow = nullptr);
178
179 void on_shutdown() {
180 history.on_shutdown();
181 }
182 ~OpTracker();
183
184 template <typename T, typename U>
185 typename T::Ref create_request(U params)
186 {
187 typename T::Ref retval(new T(params, this));
188 retval->tracking_start();
189 if (is_tracking()) {
190 retval->mark_event("throttled", params->get_throttle_stamp());
191 retval->mark_event("header_read", params->get_recv_stamp());
192 retval->mark_event("all_read", params->get_recv_complete_stamp());
193 retval->mark_event("dispatched", params->get_dispatch_stamp());
194 }
195
196 return retval;
197 }
198 };
199
200 class TrackedOp : public boost::intrusive::list_base_hook<> {
201 private:
202 friend class OpHistory;
203 friend class OpTracker;
204
205 boost::intrusive::list_member_hook<> tracker_item;
206
207 public:
208 typedef boost::intrusive::list<
209 TrackedOp,
210 boost::intrusive::member_hook<
211 TrackedOp,
212 boost::intrusive::list_member_hook<>,
213 &TrackedOp::tracker_item> > tracked_op_list_t;
214
215 // for use when clearing lists. e.g.,
216 // ls.clear_and_dispose(TrackedOp::Putter());
217 struct Putter {
218 void operator()(TrackedOp *op) {
219 op->put();
220 }
221 };
222
223 protected:
224 OpTracker *tracker; ///< the tracker we are associated with
225 std::atomic_int nref = {0}; ///< ref count
226
227 utime_t initiated_at;
228
229 struct Event {
230 utime_t stamp;
231 std::string str;
232
233 Event(utime_t t, std::string_view s) : stamp(t), str(s) {}
234
235 int compare(const char *s) const {
236 return str.compare(s);
237 }
238
239 const char *c_str() const {
240 return str.c_str();
241 }
242
243 void dump(ceph::Formatter *f) const {
244 f->dump_stream("time") << stamp;
245 f->dump_string("event", str);
246 }
247 };
248
249 std::vector<Event> events; ///< std::list of events and their times
250 mutable ceph::mutex lock = ceph::make_mutex("TrackedOp::lock"); ///< to protect the events list
251 uint64_t seq = 0; ///< a unique value std::set by the OpTracker
252
253 uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
254
255 enum {
256 STATE_UNTRACKED = 0,
257 STATE_LIVE,
258 STATE_HISTORY
259 };
260 std::atomic<int> state = {STATE_UNTRACKED};
261
262 mutable std::string desc_str; ///< protected by lock
263 mutable const char *desc = nullptr; ///< readable without lock
264 mutable std::atomic<bool> want_new_desc = {false};
265
266 TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
267 tracker(_tracker),
268 initiated_at(initiated)
269 {
270 events.reserve(OPTRACKER_PREALLOC_EVENTS);
271 }
272
273 /// output any type-specific data you want to get when dump() is called
274 virtual void _dump(ceph::Formatter *f) const {}
275 /// if you want something else to happen when events are marked, implement
276 virtual void _event_marked() {}
277 /// return a unique descriptor of the Op; eg the message it's attached to
278 virtual void _dump_op_descriptor_unlocked(std::ostream& stream) const = 0;
279 /// called when the last non-OpTracker reference is dropped
280 virtual void _unregistered() {}
281
282 virtual bool filter_out(const std::set<std::string>& filters) { return true; }
283
284 public:
285 ZTracer::Trace osd_trace;
286 ZTracer::Trace pg_trace;
287 ZTracer::Trace store_trace;
288 ZTracer::Trace journal_trace;
289
290 virtual ~TrackedOp() {}
291
292 void get() {
293 ++nref;
294 }
295 void put() {
296 again:
297 auto nref_snap = nref.load();
298 if (nref_snap == 1) {
299 switch (state.load()) {
300 case STATE_UNTRACKED:
301 _unregistered();
302 delete this;
303 break;
304
305 case STATE_LIVE:
306 mark_event("done");
307 tracker->unregister_inflight_op(this);
308 _unregistered();
309 if (!tracker->is_tracking()) {
310 delete this;
311 } else {
312 state = TrackedOp::STATE_HISTORY;
313 tracker->record_history_op(
314 TrackedOpRef(this, /* add_ref = */ false));
315 }
316 break;
317
318 case STATE_HISTORY:
319 delete this;
320 break;
321
322 default:
323 ceph_abort();
324 }
325 } else if (!nref.compare_exchange_weak(nref_snap, nref_snap - 1)) {
326 goto again;
327 }
328 }
329
330 const char *get_desc() const {
(1) Event cond_true: |
Condition "!this->desc", taking true branch. |
331 if (!desc || want_new_desc.load()) {
(2) Event getlock: |
Acquiring lock named "_ZN4ceph18mutex_debug_detail16mutex_debug_implILb0EEE.m". [details] |
332 std::lock_guard l(lock);
333 _gen_desc();
334 }
335 return desc;
336 }
337 private:
338 void _gen_desc() const {
339 std::ostringstream ss;
340 _dump_op_descriptor_unlocked(ss);
341 desc_str = ss.str();
342 desc = desc_str.c_str();
343 want_new_desc = false;
344 }
345 public:
346 void reset_desc() {
347 want_new_desc = true;
348 }
349
350 const utime_t& get_initiated() const {
351 return initiated_at;
352 }
353
354 double get_duration() const {
355 std::lock_guard l(lock);
356 if (!events.empty() && events.rbegin()->compare("done") == 0)
357 return events.rbegin()->stamp - get_initiated();
358 else
359 return ceph_clock_now() - get_initiated();
360 }
361
362 void mark_event(std::string_view event, utime_t stamp=ceph_clock_now());
363
364 void mark_nowarn() {
365 warn_interval_multiplier = 0;
366 }
367
368 virtual std::string_view state_string() const {
369 std::lock_guard l(lock);
370 return events.empty() ? std::string_view() : std::string_view(events.rbegin()->str);
371 }
372
373 void dump(utime_t now, ceph::Formatter *f) const;
374
375 void tracking_start() {
376 if (tracker->register_inflight_op(this)) {
377 events.emplace_back(initiated_at, "initiated");
378 state = STATE_LIVE;
379 }
380 }
381
382 // ref counting via intrusive_ptr, with special behavior on final
383 // put for historical op tracking
384 friend void intrusive_ptr_add_ref(TrackedOp *o) {
385 o->get();
386 }
387 friend void intrusive_ptr_release(TrackedOp *o) {
388 o->put();
389 }
390 };
391
392
393 #endif
394