1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * This is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License version 2.1, as published by the Free Software
9 * Foundation. See file COPYING.
10 * Copyright 2013 Inktank
11 */
12
13 #include "TrackedOp.h"
14
15 #define dout_context cct
16 #define dout_subsys ceph_subsys_optracker
17 #undef dout_prefix
18 #define dout_prefix _prefix(_dout)
19
20 static ostream& _prefix(std::ostream* _dout)
21 {
22 return *_dout << "-- op tracker -- ";
23 }
24
25 void OpHistoryServiceThread::break_thread() {
26 queue_spinlock.lock();
27 _external_queue.clear();
28 _break_thread = true;
29 queue_spinlock.unlock();
30 }
31
32 void* OpHistoryServiceThread::entry() {
33 int sleep_time = 1000;
34 list<pair<utime_t, TrackedOpRef>> internal_queue;
35 while (1) {
36 queue_spinlock.lock();
37 if (_break_thread) {
38 queue_spinlock.unlock();
39 break;
40 }
41 internal_queue.swap(_external_queue);
42 queue_spinlock.unlock();
43 if (internal_queue.empty()) {
44 usleep(sleep_time);
45 if (sleep_time < 128000) {
46 sleep_time <<= 2;
47 }
48 } else {
49 sleep_time = 1000;
50 }
51
52 while (!internal_queue.empty()) {
53 pair<utime_t, TrackedOpRef> op = internal_queue.front();
54 _ophistory->_insert_delayed(op.first, op.second);
55 internal_queue.pop_front();
56 }
57 }
58 return nullptr;
59 }
60
61
62 void OpHistory::on_shutdown()
63 {
64 opsvc.break_thread();
65 opsvc.join();
66 std::lock_guard history_lock(ops_history_lock);
67 arrived.clear();
68 duration.clear();
69 slow_op.clear();
70 shutdown = true;
71 }
72
73 void OpHistory::_insert_delayed(const utime_t& now, TrackedOpRef op)
74 {
75 std::lock_guard history_lock(ops_history_lock);
76 if (shutdown)
77 return;
78 double opduration = op->get_duration();
79 duration.insert(make_pair(opduration, op));
80 arrived.insert(make_pair(op->get_initiated(), op));
81 if (opduration >= history_slow_op_threshold.load())
82 slow_op.insert(make_pair(op->get_initiated(), op));
83 cleanup(now);
84 }
85
86 void OpHistory::cleanup(utime_t now)
87 {
88 while (arrived.size() &&
89 (now - arrived.begin()->first >
90 (double)(history_duration.load()))) {
91 duration.erase(make_pair(
92 arrived.begin()->second->get_duration(),
93 arrived.begin()->second));
94 arrived.erase(arrived.begin());
95 }
96
97 while (duration.size() > history_size.load()) {
98 arrived.erase(make_pair(
99 duration.begin()->second->get_initiated(),
100 duration.begin()->second));
101 duration.erase(duration.begin());
102 }
103
104 while (slow_op.size() > history_slow_op_size.load()) {
105 slow_op.erase(make_pair(
106 slow_op.begin()->second->get_initiated(),
107 slow_op.begin()->second));
108 }
109 }
110
111 void OpHistory::dump_ops(utime_t now, Formatter *f, set<string> filters, bool by_duration)
112 {
(1) Event getlock: |
Acquiring lock named "_ZN4ceph18mutex_debug_detail16mutex_debug_implILb0EEE.m". [details] |
113 std::lock_guard history_lock(ops_history_lock);
114 cleanup(now);
115 f->open_object_section("op_history");
116 f->dump_int("size", history_size.load());
117 f->dump_int("duration", history_duration.load());
118 {
119 f->open_array_section("ops");
120 auto dump_fn = [&f, &now, &filters](auto begin_iter, auto end_iter) {
121 for (auto i=begin_iter; i!=end_iter; ++i) {
122 if (!i->second->filter_out(filters))
123 continue;
124 f->open_object_section("op");
125 i->second->dump(now, f);
126 f->close_section();
127 }
128 };
129
(2) Event cond_true: |
Condition "by_duration", taking true branch. |
130 if (by_duration) {
131 dump_fn(duration.rbegin(), duration.rend());
(3) Event if_fallthrough: |
Falling through to end of if statement. |
132 } else {
133 dump_fn(arrived.begin(), arrived.end());
(4) Event if_end: |
End of if statement. |
134 }
135 f->close_section();
136 }
137 f->close_section();
138 }
139
140 struct ShardedTrackingData {
141 ceph::mutex ops_in_flight_lock_sharded;
142 TrackedOp::tracked_op_list_t ops_in_flight_sharded;
143 explicit ShardedTrackingData(string lock_name)
144 : ops_in_flight_lock_sharded(ceph::make_mutex(lock_name)) {}
145 };
146
147 OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
148 seq(0),
149 num_optracker_shards(num_shards),
150 complaint_time(0), log_threshold(0),
151 tracking_enabled(tracking),
152 cct(cct_) {
153 for (uint32_t i = 0; i < num_optracker_shards; i++) {
154 char lock_name[32] = {0};
155 snprintf(lock_name, sizeof(lock_name), "%s:%" PRIu32, "OpTracker::ShardedLock", i);
156 ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
157 sharded_in_flight_list.push_back(one_shard);
158 }
159 }
160
161 OpTracker::~OpTracker() {
162 while (!sharded_in_flight_list.empty()) {
163 ceph_assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
164 delete sharded_in_flight_list.back();
165 sharded_in_flight_list.pop_back();
166 }
167 }
168
169 bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> filters)
170 {
171 if (!tracking_enabled)
172 return false;
173
174 std::shared_lock l{lock};
175 utime_t now = ceph_clock_now();
176 history.dump_ops(now, f, filters, by_duration);
177 return true;
178 }
179
180 void OpHistory::dump_slow_ops(utime_t now, Formatter *f, set<string> filters)
181 {
(1) Event getlock: |
Acquiring lock named "_ZN4ceph18mutex_debug_detail16mutex_debug_implILb0EEE.m". [details] |
182 std::lock_guard history_lock(ops_history_lock);
183 cleanup(now);
184 f->open_object_section("OpHistory slow ops");
185 f->dump_int("num to keep", history_slow_op_size.load());
186 f->dump_int("threshold to keep", history_slow_op_threshold.load());
187 {
188 f->open_array_section("Ops");
(2) Event cond_false: |
Condition "i != this->slow_op.end()", taking false branch. |
189 for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
190 slow_op.begin();
191 i != slow_op.end();
192 ++i) {
193 if (!i->second->filter_out(filters))
194 continue;
195 f->open_object_section("Op");
196 i->second->dump(now, f);
197 f->close_section();
(3) Event loop_end: |
Reached end of loop. |
198 }
199 f->close_section();
200 }
201 f->close_section();
202 }
203
204 bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
205 {
206 if (!tracking_enabled)
207 return false;
208
209 std::shared_lock l{lock};
210 utime_t now = ceph_clock_now();
211 history.dump_slow_ops(now, f, filters);
212 return true;
213 }
214
215 bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<string> filters)
216 {
217 if (!tracking_enabled)
218 return false;
219
220 std::shared_lock l{lock};
221 f->open_object_section("ops_in_flight"); // overall dump
222 uint64_t total_ops_in_flight = 0;
223 f->open_array_section("ops"); // list of TrackedOps
224 utime_t now = ceph_clock_now();
225 for (uint32_t i = 0; i < num_optracker_shards; i++) {
226 ShardedTrackingData* sdata = sharded_in_flight_list[i];
227 ceph_assert(NULL != sdata);
228 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
229 for (auto& op : sdata->ops_in_flight_sharded) {
230 if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
231 break;
232 if (!op.filter_out(filters))
233 continue;
234 f->open_object_section("op");
235 op.dump(now, f);
236 f->close_section(); // this TrackedOp
237 total_ops_in_flight++;
238 }
239 }
240 f->close_section(); // list of TrackedOps
241 if (print_only_blocked) {
242 f->dump_float("complaint_time", complaint_time);
243 f->dump_int("num_blocked_ops", total_ops_in_flight);
244 } else
245 f->dump_int("num_ops", total_ops_in_flight);
246 f->close_section(); // overall dump
247 return true;
248 }
249
250 bool OpTracker::register_inflight_op(TrackedOp *i)
251 {
252 if (!tracking_enabled)
253 return false;
254
255 std::shared_lock l{lock};
256 uint64_t current_seq = ++seq;
257 uint32_t shard_index = current_seq % num_optracker_shards;
258 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
259 ceph_assert(NULL != sdata);
260 {
261 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
262 sdata->ops_in_flight_sharded.push_back(*i);
263 i->seq = current_seq;
264 }
265 return true;
266 }
267
268 void OpTracker::unregister_inflight_op(TrackedOp* const i)
269 {
270 // caller checks;
271 ceph_assert(i->state);
272
273 uint32_t shard_index = i->seq % num_optracker_shards;
274 ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
275 ceph_assert(NULL != sdata);
276 {
277 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
278 auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
279 sdata->ops_in_flight_sharded.erase(p);
280 }
281 }
282
283 void OpTracker::record_history_op(TrackedOpRef&& i)
284 {
285 std::shared_lock l{lock};
286 history.insert(ceph_clock_now(), std::move(i));
287 }
288
289 bool OpTracker::visit_ops_in_flight(utime_t* oldest_secs,
290 std::function<bool(TrackedOp&)>&& visit)
291 {
292 if (!tracking_enabled)
293 return false;
294
295 const utime_t now = ceph_clock_now();
296 utime_t oldest_op = now;
297 uint64_t total_ops_in_flight = 0;
298
299 std::shared_lock l{lock};
300 for (const auto sdata : sharded_in_flight_list) {
301 ceph_assert(sdata);
302 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
303 if (!sdata->ops_in_flight_sharded.empty()) {
304 utime_t oldest_op_tmp =
305 sdata->ops_in_flight_sharded.front().get_initiated();
306 if (oldest_op_tmp < oldest_op) {
307 oldest_op = oldest_op_tmp;
308 }
309 }
310 total_ops_in_flight += sdata->ops_in_flight_sharded.size();
311 }
312 if (!total_ops_in_flight)
313 return false;
314 *oldest_secs = now - oldest_op;
315 dout(10) << "ops_in_flight.size: " << total_ops_in_flight
316 << "; oldest is " << *oldest_secs
317 << " seconds old" << dendl;
318
319 if (*oldest_secs < complaint_time)
320 return false;
321
322 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
323 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
324 ceph_assert(NULL != sdata);
325 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
326 for (auto& op : sdata->ops_in_flight_sharded) {
327 if (!visit(op))
328 break;
329 }
330 }
331 return true;
332 }
333
334 bool OpTracker::with_slow_ops_in_flight(utime_t* oldest_secs,
335 int* num_slow_ops,
336 int* num_warned_ops,
337 std::function<void(TrackedOp&)>&& on_warn)
338 {
339 const utime_t now = ceph_clock_now();
340 auto too_old = now;
341 too_old -= complaint_time;
342 int slow = 0;
343 int warned = 0;
344 auto check = [&](TrackedOp& op) {
345 if (op.get_initiated() >= too_old) {
346 // no more slow ops in flight
347 return false;
348 }
349 if (!op.warn_interval_multiplier)
350 return true;
351 slow++;
352 if (warned >= log_threshold) {
353 // enough samples of slow ops
354 return true;
355 }
356 auto time_to_complain = (op.get_initiated() +
357 complaint_time * op.warn_interval_multiplier);
358 if (time_to_complain >= now) {
359 // complain later if the op is still in flight
360 return true;
361 }
362 // will warn, increase counter
363 warned++;
364 on_warn(op);
365 return true;
366 };
367 if (visit_ops_in_flight(oldest_secs, check)) {
368 if (num_slow_ops) {
369 *num_slow_ops = slow;
370 *num_warned_ops = warned;
371 }
372 return true;
373 } else {
374 return false;
375 }
376 }
377
378 bool OpTracker::check_ops_in_flight(std::string* summary,
379 std::vector<string> &warnings,
380 int *num_slow_ops)
381 {
382 const utime_t now = ceph_clock_now();
383 auto too_old = now;
384 too_old -= complaint_time;
385 int warned = 0;
386 utime_t oldest_secs;
387 auto warn_on_slow_op = [&](TrackedOp& op) {
388 stringstream ss;
389 utime_t age = now - op.get_initiated();
390 ss << "slow request " << age << " seconds old, received at "
391 << op.get_initiated() << ": " << op.get_desc()
392 << " currently "
393 << op.state_string();
394 warnings.push_back(ss.str());
395 // only those that have been shown will backoff
396 op.warn_interval_multiplier *= 2;
397 };
398 int slow = 0;
399 if (with_slow_ops_in_flight(&oldest_secs, &slow, &warned, warn_on_slow_op) &&
400 slow > 0) {
401 stringstream ss;
402 ss << slow << " slow requests, "
403 << warned << " included below; oldest blocked for > "
404 << oldest_secs << " secs";
405 *summary = ss.str();
406 if (num_slow_ops) {
407 *num_slow_ops = slow;
408 }
409 return true;
410 } else {
411 return false;
412 }
413 }
414
415 void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
416 {
417 h->clear();
418 utime_t now = ceph_clock_now();
419
420 for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
421 ShardedTrackingData* sdata = sharded_in_flight_list[iter];
422 ceph_assert(NULL != sdata);
423 std::lock_guard locker(sdata->ops_in_flight_lock_sharded);
424
425 for (auto& i : sdata->ops_in_flight_sharded) {
426 utime_t age = now - i.get_initiated();
427 uint32_t ms = (long)(age * 1000.0);
428 h->add(ms);
429 }
430 }
431 }
432
433
434 #undef dout_context
435 #define dout_context tracker->cct
436
437 void TrackedOp::mark_event(std::string_view event, utime_t stamp)
438 {
439 if (!state)
440 return;
441
442 {
443 std::lock_guard l(lock);
444 events.emplace_back(stamp, event);
445 }
446 dout(6) << " seq: " << seq
447 << ", time: " << stamp
448 << ", event: " << event
449 << ", op: " << get_desc()
450 << dendl;
451 _event_marked();
452 }
453
454 void TrackedOp::dump(utime_t now, Formatter *f) const
455 {
456 // Ignore if still in the constructor
(1) Event cond_false: |
Condition "!this->state.operator std::__atomic_base<int>::__int_type()", taking false branch. |
457 if (!state)
(2) Event if_end: |
End of if statement. |
458 return;
(3) Event getlock: |
Acquiring lock named "_ZN4ceph18mutex_debug_detail16mutex_debug_implILb0EEE.m". [details] |
459 f->dump_string("description", get_desc());
460 f->dump_stream("initiated_at") << get_initiated();
461 f->dump_float("age", now - get_initiated());
462 f->dump_float("duration", get_duration());
463 {
464 f->open_object_section("type_data");
465 _dump(f);
466 f->close_section();
467 }
468 }
469