1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "librbd/Utils.h"
10 #include "ImageReplayer.h"
11 #include "InstanceReplayer.h"
12 #include "ServiceDaemon.h"
13 #include "Threads.h"
14
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rbd_mirror
17 #undef dout_prefix
18 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
19 << this << " " << __func__ << ": "
20
21 namespace rbd {
22 namespace mirror {
23
24 namespace {
25
26 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
27 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
28 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
29
30 } // anonymous namespace
31
32 using librbd::util::create_async_context_callback;
33 using librbd::util::create_context_callback;
34
35 template <typename I>
36 InstanceReplayer<I>::InstanceReplayer(
37 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
38 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
39 journal::CacheManagerHandler *cache_manager_handler)
40 : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
41 m_threads(threads), m_service_daemon(service_daemon),
42 m_cache_manager_handler(cache_manager_handler),
43 m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
44 stringify(local_io_ctx.get_id()))) {
45 }
46
47 template <typename I>
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
48 InstanceReplayer<I>::~InstanceReplayer() {
49 ceph_assert(m_image_state_check_task == nullptr);
50 ceph_assert(m_async_op_tracker.empty());
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
51 ceph_assert(m_image_replayers.empty());
52 }
53
54 template <typename I>
55 int InstanceReplayer<I>::init() {
56 C_SaferCond init_ctx;
57 init(&init_ctx);
58 return init_ctx.wait();
59 }
60
61 template <typename I>
62 void InstanceReplayer<I>::init(Context *on_finish) {
63 dout(10) << dendl;
64
65 Context *ctx = new LambdaContext(
66 [this, on_finish] (int r) {
67 {
68 std::lock_guard timer_locker{m_threads->timer_lock};
69 schedule_image_state_check_task();
70 }
71 on_finish->complete(0);
72 });
73
74 m_threads->work_queue->queue(ctx, 0);
75 }
76
77 template <typename I>
78 void InstanceReplayer<I>::shut_down() {
79 C_SaferCond shut_down_ctx;
80 shut_down(&shut_down_ctx);
81 int r = shut_down_ctx.wait();
82 ceph_assert(r == 0);
83 }
84
85 template <typename I>
86 void InstanceReplayer<I>::shut_down(Context *on_finish) {
87 dout(10) << dendl;
88
89 std::lock_guard locker{m_lock};
90
91 ceph_assert(m_on_shut_down == nullptr);
92 m_on_shut_down = on_finish;
93
94 Context *ctx = new LambdaContext(
95 [this] (int r) {
96 cancel_image_state_check_task();
97 wait_for_ops();
98 });
99
100 m_threads->work_queue->queue(ctx, 0);
101 }
102
103 template <typename I>
104 void InstanceReplayer<I>::add_peer(std::string peer_uuid,
105 librados::IoCtx io_ctx) {
106 dout(10) << peer_uuid << dendl;
107
108 std::lock_guard locker{m_lock};
109 auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
110 ceph_assert(result);
111 }
112
113 template <typename I>
114 void InstanceReplayer<I>::release_all(Context *on_finish) {
115 dout(10) << dendl;
116
117 std::lock_guard locker{m_lock};
118
119 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
120 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
121 it = m_image_replayers.erase(it)) {
122 auto image_replayer = it->second;
123 auto ctx = gather_ctx->new_sub();
124 ctx = new LambdaContext(
125 [image_replayer, ctx] (int r) {
126 image_replayer->destroy();
127 ctx->complete(0);
128 });
129 stop_image_replayer(image_replayer, ctx);
130 }
131 gather_ctx->activate();
132 }
133
134 template <typename I>
135 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
136 const std::string &global_image_id,
137 Context *on_finish) {
138 dout(10) << "global_image_id=" << global_image_id << dendl;
139
140 std::lock_guard locker{m_lock};
141
142 ceph_assert(m_on_shut_down == nullptr);
143
144 auto it = m_image_replayers.find(global_image_id);
145 if (it == m_image_replayers.end()) {
146 auto image_replayer = ImageReplayer<I>::create(
147 m_local_io_ctx, m_local_mirror_uuid, global_image_id,
148 m_threads, instance_watcher, m_cache_manager_handler);
149
150 dout(10) << global_image_id << ": creating replayer " << image_replayer
151 << dendl;
152
153 it = m_image_replayers.insert(std::make_pair(global_image_id,
154 image_replayer)).first;
155
156 // TODO only a single peer is currently supported
157 ceph_assert(m_peers.size() == 1);
158 auto peer = *m_peers.begin();
159 image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
160 start_image_replayer(image_replayer);
161 } else {
162 // A duplicate acquire notification implies (1) connection hiccup or
163 // (2) new leader election. For the second case, restart the replayer to
164 // detect if the image has been deleted while the leader was offline
165 auto& image_replayer = it->second;
166 image_replayer->set_finished(false);
167 image_replayer->restart();
168 }
169
170 m_threads->work_queue->queue(on_finish, 0);
171 }
172
173 template <typename I>
174 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
175 Context *on_finish) {
176 dout(10) << "global_image_id=" << global_image_id << dendl;
177
178 std::lock_guard locker{m_lock};
179 ceph_assert(m_on_shut_down == nullptr);
180
181 auto it = m_image_replayers.find(global_image_id);
182 if (it == m_image_replayers.end()) {
183 dout(5) << global_image_id << ": not found" << dendl;
184 m_threads->work_queue->queue(on_finish, 0);
185 return;
186 }
187
188 auto image_replayer = it->second;
189 m_image_replayers.erase(it);
190
191 on_finish = new LambdaContext(
192 [image_replayer, on_finish] (int r) {
193 image_replayer->destroy();
194 on_finish->complete(0);
195 });
196 stop_image_replayer(image_replayer, on_finish);
197 }
198
199 template <typename I>
200 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
201 const std::string &peer_mirror_uuid,
202 Context *on_finish) {
203 dout(10) << "global_image_id=" << global_image_id << ", "
204 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
205
206 std::lock_guard locker{m_lock};
207 ceph_assert(m_on_shut_down == nullptr);
208
209 auto it = m_image_replayers.find(global_image_id);
210 if (it != m_image_replayers.end()) {
211 // TODO only a single peer is currently supported, therefore
212 // we can just interrupt the current image replayer and
213 // it will eventually detect that the peer image is missing and
214 // determine if a delete propagation is required.
215 auto image_replayer = it->second;
216 image_replayer->restart();
217 }
218 m_threads->work_queue->queue(on_finish, 0);
219 }
220
221 template <typename I>
222 void InstanceReplayer<I>::print_status(Formatter *f) {
223 dout(10) << dendl;
224
225 std::lock_guard locker{m_lock};
226
227 f->open_array_section("image_replayers");
228 for (auto &kv : m_image_replayers) {
229 auto &image_replayer = kv.second;
230 image_replayer->print_status(f);
231 }
232 f->close_section();
233 }
234
235 template <typename I>
236 void InstanceReplayer<I>::start()
237 {
238 dout(10) << dendl;
239
240 std::lock_guard locker{m_lock};
241
242 m_manual_stop = false;
243
244 for (auto &kv : m_image_replayers) {
245 auto &image_replayer = kv.second;
246 image_replayer->start(nullptr, true);
247 }
248 }
249
250 template <typename I>
251 void InstanceReplayer<I>::stop()
252 {
253 dout(10) << dendl;
254
255 std::lock_guard locker{m_lock};
256
257 m_manual_stop = true;
258
259 for (auto &kv : m_image_replayers) {
260 auto &image_replayer = kv.second;
261 image_replayer->stop(nullptr, true);
262 }
263 }
264
265 template <typename I>
266 void InstanceReplayer<I>::stop(Context *on_finish)
267 {
268 dout(10) << dendl;
269
270 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
271 auto gather_ctx = new C_Gather(cct, on_finish);
272 {
273 std::lock_guard locker{m_lock};
274
275 m_manual_stop = true;
276
277 for (auto &kv : m_image_replayers) {
278 auto &image_replayer = kv.second;
279 image_replayer->stop(gather_ctx->new_sub(), true);
280 }
281 }
282
283 gather_ctx->activate();
284 }
285
286 template <typename I>
287 void InstanceReplayer<I>::restart()
288 {
289 dout(10) << dendl;
290
291 std::lock_guard locker{m_lock};
292
293 m_manual_stop = false;
294
295 for (auto &kv : m_image_replayers) {
296 auto &image_replayer = kv.second;
297 image_replayer->restart();
298 }
299 }
300
301 template <typename I>
302 void InstanceReplayer<I>::flush()
303 {
304 dout(10) << dendl;
305
306 std::lock_guard locker{m_lock};
307
308 for (auto &kv : m_image_replayers) {
309 auto &image_replayer = kv.second;
310 image_replayer->flush();
311 }
312 }
313
314 template <typename I>
315 void InstanceReplayer<I>::start_image_replayer(
316 ImageReplayer<I> *image_replayer) {
317 ceph_assert(ceph_mutex_is_locked(m_lock));
318
319 std::string global_image_id = image_replayer->get_global_image_id();
320 if (!image_replayer->is_stopped()) {
321 return;
322 } else if (image_replayer->is_blacklisted()) {
323 derr << "global_image_id=" << global_image_id << ": blacklisted detected "
324 << "during image replay" << dendl;
325 return;
326 } else if (image_replayer->is_finished()) {
327 // TODO temporary until policy integrated
328 dout(5) << "removing image replayer for global_image_id="
329 << global_image_id << dendl;
330 m_image_replayers.erase(image_replayer->get_global_image_id());
331 image_replayer->destroy();
332 return;
333 } else if (m_manual_stop) {
334 return;
335 }
336
337 dout(10) << "global_image_id=" << global_image_id << dendl;
338 image_replayer->start(nullptr, false);
339 }
340
341 template <typename I>
342 void InstanceReplayer<I>::queue_start_image_replayers() {
343 dout(10) << dendl;
344
345 Context *ctx = create_context_callback<
346 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
347 m_async_op_tracker.start_op();
348 m_threads->work_queue->queue(ctx, 0);
349 }
350
351 template <typename I>
352 void InstanceReplayer<I>::start_image_replayers(int r) {
353 dout(10) << dendl;
354
355 std::lock_guard locker{m_lock};
356 if (m_on_shut_down != nullptr) {
357 return;
358 }
359
360 uint64_t image_count = 0;
361 uint64_t warning_count = 0;
362 uint64_t error_count = 0;
363 for (auto it = m_image_replayers.begin();
364 it != m_image_replayers.end();) {
365 auto current_it(it);
366 ++it;
367
368 ++image_count;
369 auto health_state = current_it->second->get_health_state();
370 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
371 ++warning_count;
372 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
373 ++error_count;
374 }
375
376 start_image_replayer(current_it->second);
377 }
378
379 // TODO: add namespace support to service daemon
380 if (m_local_io_ctx.get_namespace().empty()) {
381 m_service_daemon->add_or_update_attribute(
382 m_local_io_ctx.get_id(), SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
383 m_service_daemon->add_or_update_attribute(
384 m_local_io_ctx.get_id(), SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
385 m_service_daemon->add_or_update_attribute(
386 m_local_io_ctx.get_id(), SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
387 }
388
389 m_async_op_tracker.finish_op();
390 }
391
392 template <typename I>
393 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
394 Context *on_finish) {
395 dout(10) << image_replayer << " global_image_id="
396 << image_replayer->get_global_image_id() << ", on_finish="
397 << on_finish << dendl;
398
399 if (image_replayer->is_stopped()) {
400 m_threads->work_queue->queue(on_finish, 0);
401 return;
402 }
403
404 m_async_op_tracker.start_op();
405 Context *ctx = create_async_context_callback(
406 m_threads->work_queue, new LambdaContext(
407 [this, image_replayer, on_finish] (int r) {
408 stop_image_replayer(image_replayer, on_finish);
409 m_async_op_tracker.finish_op();
410 }));
411
412 if (image_replayer->is_running()) {
413 image_replayer->stop(ctx, false);
414 } else {
415 int after = 1;
416 dout(10) << "scheduling image replayer " << image_replayer << " stop after "
417 << after << " sec (task " << ctx << ")" << dendl;
418 ctx = new LambdaContext(
419 [this, after, ctx] (int r) {
420 std::lock_guard timer_locker{m_threads->timer_lock};
421 m_threads->timer->add_event_after(after, ctx);
422 });
423 m_threads->work_queue->queue(ctx, 0);
424 }
425 }
426
427 template <typename I>
428 void InstanceReplayer<I>::wait_for_ops() {
429 dout(10) << dendl;
430
431 Context *ctx = create_context_callback<
432 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
433
434 m_async_op_tracker.wait_for_ops(ctx);
435 }
436
437 template <typename I>
438 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
439 dout(10) << "r=" << r << dendl;
440
441 ceph_assert(r == 0);
442
443 std::lock_guard locker{m_lock};
444 stop_image_replayers();
445 }
446
447 template <typename I>
448 void InstanceReplayer<I>::stop_image_replayers() {
449 dout(10) << dendl;
450
451 ceph_assert(ceph_mutex_is_locked(m_lock));
452
453 Context *ctx = create_async_context_callback(
454 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
455 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
456
457 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
458 for (auto &it : m_image_replayers) {
459 stop_image_replayer(it.second, gather_ctx->new_sub());
460 }
461 gather_ctx->activate();
462 }
463
464 template <typename I>
465 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
466 dout(10) << "r=" << r << dendl;
467
468 ceph_assert(r == 0);
469
470 Context *on_finish = nullptr;
471 {
472 std::lock_guard locker{m_lock};
473
474 for (auto &it : m_image_replayers) {
475 ceph_assert(it.second->is_stopped());
476 it.second->destroy();
477 }
478 m_image_replayers.clear();
479
480 ceph_assert(m_on_shut_down != nullptr);
481 std::swap(on_finish, m_on_shut_down);
482 }
483 on_finish->complete(r);
484 }
485
486 template <typename I>
487 void InstanceReplayer<I>::cancel_image_state_check_task() {
488 std::lock_guard timer_locker{m_threads->timer_lock};
489
490 if (m_image_state_check_task == nullptr) {
491 return;
492 }
493
494 dout(10) << m_image_state_check_task << dendl;
495 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
496 ceph_assert(canceled);
497 m_image_state_check_task = nullptr;
498 }
499
500 template <typename I>
501 void InstanceReplayer<I>::schedule_image_state_check_task() {
502 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
503 ceph_assert(m_image_state_check_task == nullptr);
504
505 m_image_state_check_task = new LambdaContext(
506 [this](int r) {
507 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
508 m_image_state_check_task = nullptr;
509 schedule_image_state_check_task();
510 queue_start_image_replayers();
511 });
512
513 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
514 int after = cct->_conf.get_val<uint64_t>(
515 "rbd_mirror_image_state_check_interval");
516
517 dout(10) << "scheduling image state check after " << after << " sec (task "
518 << m_image_state_check_task << ")" << dendl;
519 m_threads->timer->add_event_after(after, m_image_state_check_task);
520 }
521
522 } // namespace mirror
523 } // namespace rbd
524
525 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
526