1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "LeaderWatcher.h"
5 #include "common/Cond.h"
6 #include "common/Timer.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "include/stringify.h"
11 #include "librbd/Utils.h"
12 #include "librbd/watcher/Types.h"
13 #include "Threads.h"
14
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rbd_mirror
17 #undef dout_prefix
18 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
19 << this << " " << __func__ << ": "
20 namespace rbd {
21 namespace mirror {
22
23 using namespace leader_watcher;
24
25 using librbd::util::create_async_context_callback;
26 using librbd::util::create_context_callback;
27 using librbd::util::create_rados_callback;
28
29 template <typename I>
30 LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
31 leader_watcher::Listener *listener)
32 : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
33 m_threads(threads), m_listener(listener), m_instances_listener(this),
34 m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " +
35 io_ctx.get_pool_name())),
36 m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
37 m_instance_id(stringify(m_notifier_id)),
38 m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
39 m_cct->_conf.get_val<uint64_t>(
40 "rbd_blacklist_expire_seconds"))) {
41 }
42
43 template <typename I>
44 LeaderWatcher<I>::~LeaderWatcher() {
45 ceph_assert(m_instances == nullptr);
46 ceph_assert(m_timer_task == nullptr);
47
48 delete m_leader_lock;
49 }
50
51 template <typename I>
52 std::string LeaderWatcher<I>::get_instance_id() {
53 return m_instance_id;
54 }
55
56 template <typename I>
57 int LeaderWatcher<I>::init() {
58 C_SaferCond init_ctx;
59 init(&init_ctx);
60 return init_ctx.wait();
61 }
62
63 template <typename I>
64 void LeaderWatcher<I>::init(Context *on_finish) {
65 dout(10) << "notifier_id=" << m_notifier_id << dendl;
66
67 std::lock_guard locker{m_lock};
68
69 ceph_assert(m_on_finish == nullptr);
70 m_on_finish = on_finish;
71
72 create_leader_object();
73 }
74
75 template <typename I>
76 void LeaderWatcher<I>::create_leader_object() {
77 dout(10) << dendl;
78
79 ceph_assert(ceph_mutex_is_locked(m_lock));
80
81 librados::ObjectWriteOperation op;
82 op.create(false);
83
84 librados::AioCompletion *aio_comp = create_rados_callback<
85 LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
86 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
87 ceph_assert(r == 0);
88 aio_comp->release();
89 }
90
91 template <typename I>
92 void LeaderWatcher<I>::handle_create_leader_object(int r) {
93 dout(10) << "r=" << r << dendl;
94
95 Context *on_finish = nullptr;
96 {
97 std::lock_guard locker{m_lock};
98
99 if (r == 0) {
100 register_watch();
101 return;
102 }
103
104 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
105 << dendl;
106
107 std::swap(on_finish, m_on_finish);
108 }
109 on_finish->complete(r);
110 }
111
112 template <typename I>
113 void LeaderWatcher<I>::register_watch() {
114 dout(10) << dendl;
115
116 ceph_assert(ceph_mutex_is_locked(m_lock));
117
118 Context *ctx = create_async_context_callback(
119 m_work_queue, create_context_callback<
120 LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
121
122 librbd::Watcher::register_watch(ctx);
123 }
124
125 template <typename I>
126 void LeaderWatcher<I>::handle_register_watch(int r) {
127 dout(10) << "r=" << r << dendl;
128
129 Context *on_finish = nullptr;
130 {
131 std::lock_guard timer_locker(m_threads->timer_lock);
132 std::lock_guard locker{m_lock};
133
134 if (r < 0) {
135 derr << "error registering leader watcher for " << m_oid << " object: "
136 << cpp_strerror(r) << dendl;
137 } else {
138 schedule_acquire_leader_lock(0);
139 }
140
141 ceph_assert(m_on_finish != nullptr);
142 std::swap(on_finish, m_on_finish);
143 }
144
145 on_finish->complete(r);
146 }
147
148 template <typename I>
149 void LeaderWatcher<I>::shut_down() {
150 C_SaferCond shut_down_ctx;
151 shut_down(&shut_down_ctx);
152 int r = shut_down_ctx.wait();
153 ceph_assert(r == 0);
154 }
155
156 template <typename I>
157 void LeaderWatcher<I>::shut_down(Context *on_finish) {
158 dout(10) << dendl;
159
160 std::scoped_lock locker{m_threads->timer_lock, m_lock};
161
162 ceph_assert(m_on_shut_down_finish == nullptr);
163 m_on_shut_down_finish = on_finish;
164 cancel_timer_task();
165 shut_down_leader_lock();
166 }
167
168 template <typename I>
169 void LeaderWatcher<I>::shut_down_leader_lock() {
170 dout(10) << dendl;
171
172 ceph_assert(ceph_mutex_is_locked(m_lock));
173
174 Context *ctx = create_async_context_callback(
175 m_work_queue, create_context_callback<
176 LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
177
178 m_leader_lock->shut_down(ctx);
179 }
180
181 template <typename I>
182 void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
183 dout(10) << "r=" << r << dendl;
184
185 std::lock_guard locker{m_lock};
186
187 if (r < 0) {
188 derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
189 }
190
191 unregister_watch();
192 }
193
194 template <typename I>
195 void LeaderWatcher<I>::unregister_watch() {
196 dout(10) << dendl;
197
198 ceph_assert(ceph_mutex_is_locked(m_lock));
199
200 Context *ctx = create_async_context_callback(
201 m_work_queue, create_context_callback<
202 LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
203
204 librbd::Watcher::unregister_watch(ctx);
205 }
206
207 template <typename I>
208 void LeaderWatcher<I>::handle_unregister_watch(int r) {
209 dout(10) << "r=" << r << dendl;
210
211 if (r < 0) {
212 derr << "error unregistering leader watcher for " << m_oid << " object: "
213 << cpp_strerror(r) << dendl;
214 }
215 wait_for_tasks();
216 }
217
218 template <typename I>
219 void LeaderWatcher<I>::wait_for_tasks() {
220 dout(10) << dendl;
221
222 std::scoped_lock locker{m_threads->timer_lock, m_lock};
223 schedule_timer_task("wait for tasks", 0, false,
224 &LeaderWatcher<I>::handle_wait_for_tasks, true);
225 }
226
227 template <typename I>
228 void LeaderWatcher<I>::handle_wait_for_tasks() {
229 dout(10) << dendl;
230
231 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
232 ceph_assert(ceph_mutex_is_locked(m_lock));
233 ceph_assert(m_on_shut_down_finish != nullptr);
234
235 ceph_assert(!m_timer_op_tracker.empty());
236 m_timer_op_tracker.finish_op();
237
238 auto ctx = new LambdaContext([this](int r) {
239 Context *on_finish;
240 {
241 // ensure lock isn't held when completing shut down
242 std::lock_guard locker{m_lock};
243 ceph_assert(m_on_shut_down_finish != nullptr);
244 on_finish = m_on_shut_down_finish;
245 }
246 on_finish->complete(0);
247 });
248 m_work_queue->queue(ctx, 0);
249 }
250
251 template <typename I>
252 bool LeaderWatcher<I>::is_leader() const {
253 std::lock_guard locker{m_lock};
254 return is_leader(m_lock);
255 }
256
257 template <typename I>
258 bool LeaderWatcher<I>::is_leader(ceph::mutex &lock) const {
259 ceph_assert(ceph_mutex_is_locked(m_lock));
260
261 bool leader = m_leader_lock->is_leader();
262 dout(10) << leader << dendl;
263 return leader;
264 }
265
266 template <typename I>
267 bool LeaderWatcher<I>::is_releasing_leader() const {
268 std::lock_guard locker{m_lock};
269 return is_releasing_leader(m_lock);
270 }
271
272 template <typename I>
273 bool LeaderWatcher<I>::is_releasing_leader(ceph::mutex &lock) const {
274 ceph_assert(ceph_mutex_is_locked(m_lock));
275
276 bool releasing = m_leader_lock->is_releasing_leader();
277 dout(10) << releasing << dendl;
278 return releasing;
279 }
280
281 template <typename I>
282 bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
283 dout(10) << dendl;
284
285 std::lock_guard locker{m_lock};
286
287 if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
288 *instance_id = m_instance_id;
289 return true;
290 }
291
292 if (!m_locker.cookie.empty()) {
293 *instance_id = stringify(m_locker.entity.num());
294 return true;
295 }
296
297 return false;
298 }
299
300 template <typename I>
301 void LeaderWatcher<I>::release_leader() {
302 dout(10) << dendl;
303
304 std::lock_guard locker{m_lock};
305 if (!is_leader(m_lock)) {
306 return;
307 }
308
309 release_leader_lock();
310 }
311
312 template <typename I>
313 void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
314 dout(10) << dendl;
315
316 std::lock_guard locker{m_lock};
317
318 instance_ids->clear();
319 if (m_instances != nullptr) {
320 m_instances->list(instance_ids);
321 }
322 }
323
324 template <typename I>
325 void LeaderWatcher<I>::cancel_timer_task() {
326 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
327 ceph_assert(ceph_mutex_is_locked(m_lock));
328
329 if (m_timer_task == nullptr) {
330 return;
331 }
332
333 dout(10) << m_timer_task << dendl;
334 bool canceled = m_threads->timer->cancel_event(m_timer_task);
335 ceph_assert(canceled);
336 m_timer_task = nullptr;
337 }
338
339 template <typename I>
340 void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
341 int delay_factor, bool leader,
342 TimerCallback timer_callback,
343 bool shutting_down) {
344 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
345 ceph_assert(ceph_mutex_is_locked(m_lock));
346
347 if (!shutting_down && m_on_shut_down_finish != nullptr) {
348 return;
349 }
350
351 cancel_timer_task();
352
353 m_timer_task = new LambdaContext(
354 [this, leader, timer_callback](int r) {
355 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
356 m_timer_task = nullptr;
357
358 if (m_timer_op_tracker.empty()) {
359 std::lock_guard locker{m_lock};
360 execute_timer_task(leader, timer_callback);
361 return;
362 }
363
364 // old timer task is still running -- do not start next
365 // task until the previous task completes
366 if (m_timer_gate == nullptr) {
367 m_timer_gate = new C_TimerGate(this);
368 m_timer_op_tracker.wait_for_ops(m_timer_gate);
369 }
370 m_timer_gate->leader = leader;
371 m_timer_gate->timer_callback = timer_callback;
372 });
373
374 int after = delay_factor * m_cct->_conf.get_val<uint64_t>(
375 "rbd_mirror_leader_heartbeat_interval");
376
377 dout(10) << "scheduling " << name << " after " << after << " sec (task "
378 << m_timer_task << ")" << dendl;
379 m_threads->timer->add_event_after(after, m_timer_task);
380 }
381
382 template <typename I>
383 void LeaderWatcher<I>::execute_timer_task(bool leader,
384 TimerCallback timer_callback) {
385 dout(10) << dendl;
386
387 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
388 ceph_assert(ceph_mutex_is_locked(m_lock));
389 ceph_assert(m_timer_op_tracker.empty());
390
391 if (is_leader(m_lock) != leader) {
392 return;
393 }
394
395 m_timer_op_tracker.start_op();
396 (this->*timer_callback)();
397 }
398
399 template <typename I>
400 void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
401 Context *on_finish) {
402 dout(10) << "r=" << r << dendl;
403
404 if (r < 0) {
405 if (r == -EAGAIN) {
406 dout(10) << "already locked" << dendl;
407 } else {
408 derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
409 }
410 on_finish->complete(r);
411 return;
412 }
413
414 std::lock_guard locker{m_lock};
415 ceph_assert(m_on_finish == nullptr);
416 m_on_finish = on_finish;
417 m_ret_val = 0;
418
419 init_instances();
420 }
421
422 template <typename I>
423 void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
424 dout(10) << dendl;
425
426 std::lock_guard locker{m_lock};
427 ceph_assert(m_on_finish == nullptr);
428 m_on_finish = on_finish;
429 m_ret_val = 0;
430
431 notify_listener();
432 }
433
434 template <typename I>
435 void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
436 Context *on_finish) {
437 dout(10) << "r=" << r << dendl;
438
439 if (r < 0) {
440 on_finish->complete(r);
441 return;
442 }
443
444 std::lock_guard locker{m_lock};
445 ceph_assert(m_on_finish == nullptr);
446 m_on_finish = on_finish;
447
448 notify_lock_released();
449 }
450
451 template <typename I>
452 void LeaderWatcher<I>::break_leader_lock() {
453 dout(10) << dendl;
454
455 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
456 ceph_assert(ceph_mutex_is_locked(m_lock));
457 ceph_assert(!m_timer_op_tracker.empty());
458
459 if (m_locker.cookie.empty()) {
460 get_locker();
461 return;
462 }
463
464 Context *ctx = create_async_context_callback(
465 m_work_queue, create_context_callback<
466 LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
467
468 m_leader_lock->break_lock(m_locker, true, ctx);
469 }
470
471 template <typename I>
472 void LeaderWatcher<I>::handle_break_leader_lock(int r) {
473 dout(10) << "r=" << r << dendl;
474
475 std::scoped_lock locker{m_threads->timer_lock, m_lock};
476 ceph_assert(!m_timer_op_tracker.empty());
477
478 if (m_leader_lock->is_shutdown()) {
479 dout(10) << "canceling due to shutdown" << dendl;
480 m_timer_op_tracker.finish_op();
481 return;
482 }
483
484 if (r < 0 && r != -ENOENT) {
485 derr << "error breaking leader lock: " << cpp_strerror(r) << dendl;
486 schedule_acquire_leader_lock(1);
487 m_timer_op_tracker.finish_op();
488 return;
489 }
490
491 m_locker = {};
492 m_acquire_attempts = 0;
493 acquire_leader_lock();
494 }
495
496 template <typename I>
497 void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
498 uint32_t delay_factor) {
499 dout(10) << dendl;
500
501 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
502 ceph_assert(ceph_mutex_is_locked(m_lock));
503
504 if (reset_leader) {
505 m_locker = {};
506 m_acquire_attempts = 0;
507 }
508
509 schedule_timer_task("get locker", delay_factor, false,
510 &LeaderWatcher<I>::get_locker, false);
511 }
512
513 template <typename I>
514 void LeaderWatcher<I>::get_locker() {
515 dout(10) << dendl;
516
517 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
518 ceph_assert(ceph_mutex_is_locked(m_lock));
519 ceph_assert(!m_timer_op_tracker.empty());
520
521 C_GetLocker *get_locker_ctx = new C_GetLocker(this);
522 Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
523
524 m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
525 }
526
527 template <typename I>
528 void LeaderWatcher<I>::handle_get_locker(int r,
529 librbd::managed_lock::Locker& locker) {
530 dout(10) << "r=" << r << dendl;
531
532 std::scoped_lock l{m_threads->timer_lock, m_lock};
533 ceph_assert(!m_timer_op_tracker.empty());
534
535 if (m_leader_lock->is_shutdown()) {
536 dout(10) << "canceling due to shutdown" << dendl;
537 m_timer_op_tracker.finish_op();
538 return;
539 }
540
541 if (is_leader(m_lock)) {
542 m_locker = {};
543 m_timer_op_tracker.finish_op();
544 return;
545 }
546
547 if (r == -ENOENT) {
548 m_locker = {};
549 m_acquire_attempts = 0;
550 acquire_leader_lock();
551 return;
552 } else if (r < 0) {
553 derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
554 schedule_get_locker(true, 1);
555 m_timer_op_tracker.finish_op();
556 return;
557 }
558
559 bool notify_listener = false;
560 if (m_locker != locker) {
561 m_locker = locker;
562 notify_listener = true;
563 if (m_acquire_attempts > 1) {
564 dout(10) << "new lock owner detected -- resetting heartbeat counter"
565 << dendl;
566 m_acquire_attempts = 0;
567 }
568 }
569
570 if (m_acquire_attempts >= m_cct->_conf.get_val<uint64_t>(
571 "rbd_mirror_leader_max_acquire_attempts_before_break")) {
572 dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
573 << "failed attempts to acquire" << dendl;
574 break_leader_lock();
575 return;
576 }
577
578 schedule_acquire_leader_lock(1);
579
580 if (!notify_listener) {
581 m_timer_op_tracker.finish_op();
582 return;
583 }
584
585 auto ctx = new LambdaContext(
586 [this](int r) {
587 std::string instance_id;
588 if (get_leader_instance_id(&instance_id)) {
589 m_listener->update_leader_handler(instance_id);
590 }
(1) Event parameter_hidden: |
declaration hides parameter "locker" (declared at line 529) |
(2) Event caretline: |
^ |
591 std::scoped_lock locker{m_threads->timer_lock, m_lock};
592 m_timer_op_tracker.finish_op();
593 });
594 m_work_queue->queue(ctx, 0);
595 }
596
597 template <typename I>
598 void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
599 dout(10) << dendl;
600
601 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
602 ceph_assert(ceph_mutex_is_locked(m_lock));
603
604 schedule_timer_task("acquire leader lock",
605 delay_factor *
606 m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_missed_heartbeats"),
607 false, &LeaderWatcher<I>::acquire_leader_lock, false);
608 }
609
610 template <typename I>
611 void LeaderWatcher<I>::acquire_leader_lock() {
612 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
613 ceph_assert(ceph_mutex_is_locked(m_lock));
614 ceph_assert(!m_timer_op_tracker.empty());
615
616 ++m_acquire_attempts;
617 dout(10) << "acquire_attempts=" << m_acquire_attempts << dendl;
618
619 Context *ctx = create_async_context_callback(
620 m_work_queue, create_context_callback<
621 LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
622 m_leader_lock->try_acquire_lock(ctx);
623 }
624
625 template <typename I>
626 void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
627 dout(10) << "r=" << r << dendl;
628
629 std::scoped_lock locker{m_threads->timer_lock, m_lock};
630 ceph_assert(!m_timer_op_tracker.empty());
631
632 if (m_leader_lock->is_shutdown()) {
633 dout(10) << "canceling due to shutdown" << dendl;
634 m_timer_op_tracker.finish_op();
635 return;
636 }
637
638 if (r < 0) {
639 if (r == -EAGAIN) {
640 dout(10) << "already locked" << dendl;
641 } else {
642 derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
643 }
644
645 get_locker();
646 return;
647 }
648
649 m_locker = {};
650 m_acquire_attempts = 0;
651
652 if (m_ret_val) {
653 dout(5) << "releasing due to error on notify" << dendl;
654 release_leader_lock();
655 m_timer_op_tracker.finish_op();
656 return;
657 }
658
659 notify_heartbeat();
660 }
661
662 template <typename I>
663 void LeaderWatcher<I>::release_leader_lock() {
664 dout(10) << dendl;
665
666 ceph_assert(ceph_mutex_is_locked(m_lock));
667
668 Context *ctx = create_async_context_callback(
669 m_work_queue, create_context_callback<
670 LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
671
672 m_leader_lock->release_lock(ctx);
673 }
674
675 template <typename I>
676 void LeaderWatcher<I>::handle_release_leader_lock(int r) {
677 dout(10) << "r=" << r << dendl;
678
679 std::scoped_lock locker{m_threads->timer_lock, m_lock};
680
681 if (r < 0) {
682 derr << "error releasing lock: " << cpp_strerror(r) << dendl;
683 return;
684 }
685
686 schedule_acquire_leader_lock(1);
687 }
688
689 template <typename I>
690 void LeaderWatcher<I>::init_instances() {
691 dout(10) << dendl;
692
693 ceph_assert(ceph_mutex_is_locked(m_lock));
694 ceph_assert(m_instances == nullptr);
695
696 m_instances = Instances<I>::create(m_threads, m_ioctx, m_instance_id,
697 m_instances_listener);
698
699 Context *ctx = create_context_callback<
700 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
701
702 m_instances->init(ctx);
703 }
704
705 template <typename I>
706 void LeaderWatcher<I>::handle_init_instances(int r) {
707 dout(10) << "r=" << r << dendl;
708
709 Context *on_finish = nullptr;
710 if (r < 0) {
711 std::lock_guard locker{m_lock};
712 derr << "error initializing instances: " << cpp_strerror(r) << dendl;
713 m_instances->destroy();
714 m_instances = nullptr;
715
716 ceph_assert(m_on_finish != nullptr);
717 std::swap(m_on_finish, on_finish);
718 } else {
719 std::lock_guard locker{m_lock};
720 notify_listener();
721 return;
722 }
723
724 on_finish->complete(r);
725 }
726
727 template <typename I>
728 void LeaderWatcher<I>::shut_down_instances() {
729 dout(10) << dendl;
730
731 ceph_assert(ceph_mutex_is_locked(m_lock));
732 ceph_assert(m_instances != nullptr);
733
734 Context *ctx = create_async_context_callback(
735 m_work_queue, create_context_callback<LeaderWatcher<I>,
736 &LeaderWatcher<I>::handle_shut_down_instances>(this));
737
738 m_instances->shut_down(ctx);
739 }
740
741 template <typename I>
742 void LeaderWatcher<I>::handle_shut_down_instances(int r) {
743 dout(10) << "r=" << r << dendl;
744 ceph_assert(r == 0);
745
746 Context *on_finish = nullptr;
747 {
748 std::lock_guard locker{m_lock};
749
750 m_instances->destroy();
751 m_instances = nullptr;
752
753 ceph_assert(m_on_finish != nullptr);
754 std::swap(m_on_finish, on_finish);
755 }
756 on_finish->complete(r);
757 }
758
759 template <typename I>
760 void LeaderWatcher<I>::notify_listener() {
761 dout(10) << dendl;
762
763 ceph_assert(ceph_mutex_is_locked(m_lock));
764
765 Context *ctx = create_async_context_callback(
766 m_work_queue, create_context_callback<
767 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
768
769 if (is_leader(m_lock)) {
770 ctx = new LambdaContext(
771 [this, ctx](int r) {
772 m_listener->post_acquire_handler(ctx);
773 });
774 } else {
775 ctx = new LambdaContext(
776 [this, ctx](int r) {
777 m_listener->pre_release_handler(ctx);
778 });
779 }
780 m_work_queue->queue(ctx, 0);
781 }
782
783 template <typename I>
784 void LeaderWatcher<I>::handle_notify_listener(int r) {
785 dout(10) << "r=" << r << dendl;
786
787 std::lock_guard locker{m_lock};
788
789 if (r < 0) {
790 derr << "error notifying listener: " << cpp_strerror(r) << dendl;
791 m_ret_val = r;
792 }
793
794 if (is_leader(m_lock)) {
795 notify_lock_acquired();
796 } else {
797 shut_down_instances();
798 }
799 }
800
801 template <typename I>
802 void LeaderWatcher<I>::notify_lock_acquired() {
803 dout(10) << dendl;
804
805 ceph_assert(ceph_mutex_is_locked(m_lock));
806
807 Context *ctx = create_context_callback<
808 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
809
810 bufferlist bl;
811 encode(NotifyMessage{LockAcquiredPayload{}}, bl);
812
813 send_notify(bl, nullptr, ctx);
814 }
815
816 template <typename I>
817 void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
818 dout(10) << "r=" << r << dendl;
819
820 Context *on_finish = nullptr;
821 {
822 std::lock_guard locker{m_lock};
823 if (r < 0 && r != -ETIMEDOUT) {
824 derr << "error notifying leader lock acquired: " << cpp_strerror(r)
825 << dendl;
826 m_ret_val = r;
827 }
828
829 ceph_assert(m_on_finish != nullptr);
830 std::swap(m_on_finish, on_finish);
831
832 if (m_ret_val == 0) {
833 // listener should be ready for instance add/remove events now
834 m_instances->unblock_listener();
835 }
836 }
837 on_finish->complete(0);
838 }
839
840 template <typename I>
841 void LeaderWatcher<I>::notify_lock_released() {
842 dout(10) << dendl;
843
844 ceph_assert(ceph_mutex_is_locked(m_lock));
845
846 Context *ctx = create_context_callback<
847 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
848
849 bufferlist bl;
850 encode(NotifyMessage{LockReleasedPayload{}}, bl);
851
852 send_notify(bl, nullptr, ctx);
853 }
854
855 template <typename I>
856 void LeaderWatcher<I>::handle_notify_lock_released(int r) {
857 dout(10) << "r=" << r << dendl;
858
859 Context *on_finish = nullptr;
860 {
861 std::lock_guard locker{m_lock};
862 if (r < 0 && r != -ETIMEDOUT) {
863 derr << "error notifying leader lock released: " << cpp_strerror(r)
864 << dendl;
865 }
866
867 ceph_assert(m_on_finish != nullptr);
868 std::swap(m_on_finish, on_finish);
869 }
870 on_finish->complete(r);
871 }
872
873 template <typename I>
874 void LeaderWatcher<I>::notify_heartbeat() {
875 dout(10) << dendl;
876
877 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
878 ceph_assert(ceph_mutex_is_locked(m_lock));
879 ceph_assert(!m_timer_op_tracker.empty());
880
881 if (!is_leader(m_lock)) {
882 dout(5) << "not leader, canceling" << dendl;
883 m_timer_op_tracker.finish_op();
884 return;
885 }
886
887 Context *ctx = create_context_callback<
888 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
889
890 bufferlist bl;
891 encode(NotifyMessage{HeartbeatPayload{}}, bl);
892
893 m_heartbeat_response.acks.clear();
894 send_notify(bl, &m_heartbeat_response, ctx);
895 }
896
897 template <typename I>
898 void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
899 dout(10) << "r=" << r << dendl;
900
901 std::scoped_lock locker{m_threads->timer_lock, m_lock};
902 ceph_assert(!m_timer_op_tracker.empty());
903
904 m_timer_op_tracker.finish_op();
905 if (m_leader_lock->is_shutdown()) {
906 dout(10) << "canceling due to shutdown" << dendl;
907 return;
908 } else if (!is_leader(m_lock)) {
909 return;
910 }
911
912 if (r < 0 && r != -ETIMEDOUT) {
913 derr << "error notifying heartbeat: " << cpp_strerror(r)
914 << ", releasing leader" << dendl;
915 release_leader_lock();
916 return;
917 }
918
919 dout(10) << m_heartbeat_response.acks.size() << " acks received, "
920 << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
921
922 std::vector<std::string> instance_ids;
923 for (auto &it: m_heartbeat_response.acks) {
924 uint64_t notifier_id = it.first.gid;
925 instance_ids.push_back(stringify(notifier_id));
926 }
927 if (!instance_ids.empty()) {
928 m_instances->acked(instance_ids);
929 }
930
931 schedule_timer_task("heartbeat", 1, true,
932 &LeaderWatcher<I>::notify_heartbeat, false);
933 }
934
935 template <typename I>
936 void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
937 dout(10) << dendl;
938
939 {
940 std::scoped_lock locker{m_threads->timer_lock, m_lock};
941 if (is_leader(m_lock)) {
942 dout(5) << "got another leader heartbeat, ignoring" << dendl;
943 } else {
944 cancel_timer_task();
945 m_acquire_attempts = 0;
946 schedule_acquire_leader_lock(1);
947 }
948 }
949
950 on_notify_ack->complete(0);
951 }
952
953 template <typename I>
954 void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
955 dout(10) << dendl;
956
957 {
958 std::scoped_lock locker{m_threads->timer_lock, m_lock};
959 if (is_leader(m_lock)) {
960 dout(5) << "got another leader lock_acquired, ignoring" << dendl;
961 } else {
962 cancel_timer_task();
963 schedule_get_locker(true, 0);
964 }
965 }
966
967 on_notify_ack->complete(0);
968 }
969
970 template <typename I>
971 void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
972 dout(10) << dendl;
973
974 {
975 std::scoped_lock locker{m_threads->timer_lock, m_lock};
976 if (is_leader(m_lock)) {
977 dout(5) << "got another leader lock_released, ignoring" << dendl;
978 } else {
979 cancel_timer_task();
980 schedule_get_locker(true, 0);
981 }
982 }
983
984 on_notify_ack->complete(0);
985 }
986
987 template <typename I>
988 void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
989 uint64_t notifier_id, bufferlist &bl) {
990 dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", "
991 << "notifier_id=" << notifier_id << dendl;
992
993 Context *ctx = new C_NotifyAck(this, notify_id, handle);
994
995 if (notifier_id == m_notifier_id) {
996 dout(10) << "our own notification, ignoring" << dendl;
997 ctx->complete(0);
998 return;
999 }
1000
1001 NotifyMessage notify_message;
1002 try {
1003 auto iter = bl.cbegin();
1004 decode(notify_message, iter);
1005 } catch (const buffer::error &err) {
1006 derr << ": error decoding image notification: " << err.what() << dendl;
1007 ctx->complete(0);
1008 return;
1009 }
1010
1011 apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1012 }
1013
1014 template <typename I>
1015 void LeaderWatcher<I>::handle_rewatch_complete(int r) {
1016 dout(5) << "r=" << r << dendl;
1017
1018 if (r != -EBLACKLISTED) {
1019 m_leader_lock->reacquire_lock(nullptr);
1020 }
1021 }
1022
1023 template <typename I>
1024 void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1025 Context *on_notify_ack) {
1026 dout(10) << "heartbeat" << dendl;
1027
1028 handle_heartbeat(on_notify_ack);
1029 }
1030
1031 template <typename I>
1032 void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1033 Context *on_notify_ack) {
1034 dout(10) << "lock_acquired" << dendl;
1035
1036 handle_lock_acquired(on_notify_ack);
1037 }
1038
1039 template <typename I>
1040 void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1041 Context *on_notify_ack) {
1042 dout(10) << "lock_released" << dendl;
1043
1044 handle_lock_released(on_notify_ack);
1045 }
1046
1047 template <typename I>
1048 void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1049 Context *on_notify_ack) {
1050 dout(10) << "unknown" << dendl;
1051
1052 on_notify_ack->complete(0);
1053 }
1054
1055 } // namespace mirror
1056 } // namespace rbd
1057
1058 template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;
1059