1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "LeaderWatcher.h"
5    	#include "common/Cond.h"
6    	#include "common/Timer.h"
7    	#include "common/debug.h"
8    	#include "common/errno.h"
9    	#include "cls/rbd/cls_rbd_client.h"
10   	#include "include/stringify.h"
11   	#include "librbd/Utils.h"
12   	#include "librbd/watcher/Types.h"
13   	#include "Threads.h"
14   	
15   	#define dout_context g_ceph_context
16   	#define dout_subsys ceph_subsys_rbd_mirror
17   	#undef dout_prefix
18   	#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
19   	                           << this << " " << __func__ << ": "
20   	namespace rbd {
21   	namespace mirror {
22   	
23   	using namespace leader_watcher;
24   	
25   	using librbd::util::create_async_context_callback;
26   	using librbd::util::create_context_callback;
27   	using librbd::util::create_rados_callback;
28   	
29   	template <typename I>
30   	LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
31   	                                leader_watcher::Listener *listener)
32   	  : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
33   	    m_threads(threads), m_listener(listener), m_instances_listener(this),
34   	    m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " +
35   				    io_ctx.get_pool_name())),
36   	    m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
37   	    m_instance_id(stringify(m_notifier_id)),
38   	    m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
39   	                                 m_cct->_conf.get_val<uint64_t>(
40   	                                   "rbd_blacklist_expire_seconds"))) {
41   	}
42   	
43   	template <typename I>
44   	LeaderWatcher<I>::~LeaderWatcher() {
45   	  ceph_assert(m_instances == nullptr);
46   	  ceph_assert(m_timer_task == nullptr);
47   	
48   	  delete m_leader_lock;
49   	}
50   	
51   	template <typename I>
52   	std::string LeaderWatcher<I>::get_instance_id() {
53   	  return m_instance_id;
54   	}
55   	
56   	template <typename I>
57   	int LeaderWatcher<I>::init() {
58   	  C_SaferCond init_ctx;
59   	  init(&init_ctx);
60   	  return init_ctx.wait();
61   	}
62   	
63   	template <typename I>
64   	void LeaderWatcher<I>::init(Context *on_finish) {
65   	  dout(10) << "notifier_id=" << m_notifier_id << dendl;
66   	
67   	  std::lock_guard locker{m_lock};
68   	
69   	  ceph_assert(m_on_finish == nullptr);
70   	  m_on_finish = on_finish;
71   	
72   	  create_leader_object();
73   	}
74   	
75   	template <typename I>
76   	void LeaderWatcher<I>::create_leader_object() {
77   	  dout(10) << dendl;
78   	
79   	  ceph_assert(ceph_mutex_is_locked(m_lock));
80   	
81   	  librados::ObjectWriteOperation op;
82   	  op.create(false);
83   	
84   	  librados::AioCompletion *aio_comp = create_rados_callback<
85   	    LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
86   	  int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
87   	  ceph_assert(r == 0);
88   	  aio_comp->release();
89   	}
90   	
91   	template <typename I>
92   	void LeaderWatcher<I>::handle_create_leader_object(int r) {
93   	  dout(10) << "r=" << r << dendl;
94   	
95   	  Context *on_finish = nullptr;
96   	  {
97   	    std::lock_guard locker{m_lock};
98   	
99   	    if (r == 0) {
100  	      register_watch();
101  	      return;
102  	    }
103  	
104  	    derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
105  	         << dendl;
106  	
107  	    std::swap(on_finish, m_on_finish);
108  	  }
109  	  on_finish->complete(r);
110  	}
111  	
112  	template <typename I>
113  	void LeaderWatcher<I>::register_watch() {
114  	  dout(10) << dendl;
115  	
116  	  ceph_assert(ceph_mutex_is_locked(m_lock));
117  	
118  	  Context *ctx = create_async_context_callback(
119  	    m_work_queue, create_context_callback<
120  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
121  	
122  	  librbd::Watcher::register_watch(ctx);
123  	}
124  	
125  	template <typename I>
126  	void LeaderWatcher<I>::handle_register_watch(int r) {
127  	  dout(10) << "r=" << r << dendl;
128  	
129  	  Context *on_finish = nullptr;
130  	  {
131  	    std::lock_guard timer_locker(m_threads->timer_lock);
132  	    std::lock_guard locker{m_lock};
133  	
134  	    if (r < 0) {
135  	      derr << "error registering leader watcher for " << m_oid << " object: "
136  	           << cpp_strerror(r) << dendl;
137  	    } else {
138  	      schedule_acquire_leader_lock(0);
139  	    }
140  	
141  	    ceph_assert(m_on_finish != nullptr);
142  	    std::swap(on_finish, m_on_finish);
143  	  }
144  	
145  	  on_finish->complete(r);
146  	}
147  	
148  	template <typename I>
149  	void LeaderWatcher<I>::shut_down() {
150  	  C_SaferCond shut_down_ctx;
151  	  shut_down(&shut_down_ctx);
152  	  int r = shut_down_ctx.wait();
153  	  ceph_assert(r == 0);
154  	}
155  	
156  	template <typename I>
157  	void LeaderWatcher<I>::shut_down(Context *on_finish) {
158  	  dout(10) << dendl;
159  	
160  	  std::scoped_lock locker{m_threads->timer_lock, m_lock};
161  	
162  	  ceph_assert(m_on_shut_down_finish == nullptr);
163  	  m_on_shut_down_finish = on_finish;
164  	  cancel_timer_task();
165  	  shut_down_leader_lock();
166  	}
167  	
168  	template <typename I>
169  	void LeaderWatcher<I>::shut_down_leader_lock() {
170  	  dout(10) << dendl;
171  	
172  	  ceph_assert(ceph_mutex_is_locked(m_lock));
173  	
174  	  Context *ctx = create_async_context_callback(
175  	    m_work_queue, create_context_callback<
176  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
177  	
178  	  m_leader_lock->shut_down(ctx);
179  	}
180  	
181  	template <typename I>
182  	void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
183  	  dout(10) << "r=" << r << dendl;
184  	
185  	  std::lock_guard locker{m_lock};
186  	
187  	  if (r < 0) {
188  	    derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
189  	  }
190  	
191  	  unregister_watch();
192  	}
193  	
194  	template <typename I>
195  	void LeaderWatcher<I>::unregister_watch() {
196  	  dout(10) << dendl;
197  	
198  	  ceph_assert(ceph_mutex_is_locked(m_lock));
199  	
200  	  Context *ctx = create_async_context_callback(
201  	    m_work_queue, create_context_callback<
202  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
203  	
204  	  librbd::Watcher::unregister_watch(ctx);
205  	}
206  	
207  	template <typename I>
208  	void LeaderWatcher<I>::handle_unregister_watch(int r) {
209  	  dout(10) << "r=" << r << dendl;
210  	
211  	  if (r < 0) {
212  	    derr << "error unregistering leader watcher for " << m_oid << " object: "
213  	         << cpp_strerror(r) << dendl;
214  	  }
215  	  wait_for_tasks();
216  	}
217  	
218  	template <typename I>
219  	void LeaderWatcher<I>::wait_for_tasks() {
220  	  dout(10) << dendl;
221  	
222  	  std::scoped_lock locker{m_threads->timer_lock, m_lock};
223  	  schedule_timer_task("wait for tasks", 0, false,
224  	                      &LeaderWatcher<I>::handle_wait_for_tasks, true);
225  	}
226  	
227  	template <typename I>
228  	void LeaderWatcher<I>::handle_wait_for_tasks() {
229  	  dout(10) << dendl;
230  	
231  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
232  	  ceph_assert(ceph_mutex_is_locked(m_lock));
233  	  ceph_assert(m_on_shut_down_finish != nullptr);
234  	
235  	  ceph_assert(!m_timer_op_tracker.empty());
236  	  m_timer_op_tracker.finish_op();
237  	
238  	  auto ctx = new LambdaContext([this](int r) {
239  	      Context *on_finish;
240  	      {
241  	        // ensure lock isn't held when completing shut down
242  		std::lock_guard locker{m_lock};
243  	        ceph_assert(m_on_shut_down_finish != nullptr);
244  	        on_finish = m_on_shut_down_finish;
245  	      }
246  	      on_finish->complete(0);
247  	    });
248  	  m_work_queue->queue(ctx, 0);
249  	}
250  	
251  	template <typename I>
252  	bool LeaderWatcher<I>::is_leader() const {
253  	  std::lock_guard locker{m_lock};
254  	  return is_leader(m_lock);
255  	}
256  	
257  	template <typename I>
258  	bool LeaderWatcher<I>::is_leader(ceph::mutex &lock) const {
259  	  ceph_assert(ceph_mutex_is_locked(m_lock));
260  	
261  	  bool leader = m_leader_lock->is_leader();
262  	  dout(10) << leader << dendl;
263  	  return leader;
264  	}
265  	
266  	template <typename I>
267  	bool LeaderWatcher<I>::is_releasing_leader() const {
268  	  std::lock_guard locker{m_lock};
269  	  return is_releasing_leader(m_lock);
270  	}
271  	
272  	template <typename I>
273  	bool LeaderWatcher<I>::is_releasing_leader(ceph::mutex &lock) const {
274  	  ceph_assert(ceph_mutex_is_locked(m_lock));
275  	
276  	  bool releasing = m_leader_lock->is_releasing_leader();
277  	  dout(10) << releasing << dendl;
278  	  return releasing;
279  	}
280  	
281  	template <typename I>
282  	bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
283  	  dout(10) << dendl;
284  	
285  	  std::lock_guard locker{m_lock};
286  	
287  	  if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
288  	    *instance_id = m_instance_id;
289  	    return true;
290  	  }
291  	
292  	  if (!m_locker.cookie.empty()) {
293  	    *instance_id = stringify(m_locker.entity.num());
294  	    return true;
295  	  }
296  	
297  	  return false;
298  	}
299  	
300  	template <typename I>
301  	void LeaderWatcher<I>::release_leader() {
302  	  dout(10) << dendl;
303  	
304  	  std::lock_guard locker{m_lock};
305  	  if (!is_leader(m_lock)) {
306  	    return;
307  	  }
308  	
309  	  release_leader_lock();
310  	}
311  	
312  	template <typename I>
313  	void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
314  	  dout(10) << dendl;
315  	
316  	  std::lock_guard locker{m_lock};
317  	
318  	  instance_ids->clear();
319  	  if (m_instances != nullptr) {
320  	    m_instances->list(instance_ids);
321  	  }
322  	}
323  	
324  	template <typename I>
325  	void LeaderWatcher<I>::cancel_timer_task() {
326  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
327  	  ceph_assert(ceph_mutex_is_locked(m_lock));
328  	
329  	  if (m_timer_task == nullptr) {
330  	    return;
331  	  }
332  	
333  	  dout(10) << m_timer_task << dendl;
334  	  bool canceled = m_threads->timer->cancel_event(m_timer_task);
335  	  ceph_assert(canceled);
336  	  m_timer_task = nullptr;
337  	}
338  	
339  	template <typename I>
340  	void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
341  	                                           int delay_factor, bool leader,
342  	                                           TimerCallback timer_callback,
343  	                                           bool shutting_down) {
344  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
345  	  ceph_assert(ceph_mutex_is_locked(m_lock));
346  	
347  	  if (!shutting_down && m_on_shut_down_finish != nullptr) {
348  	    return;
349  	  }
350  	
351  	  cancel_timer_task();
352  	
353  	  m_timer_task = new LambdaContext(
354  	    [this, leader, timer_callback](int r) {
355  	      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
356  	      m_timer_task = nullptr;
357  	
358  	      if (m_timer_op_tracker.empty()) {
359  		std::lock_guard locker{m_lock};
360  	        execute_timer_task(leader, timer_callback);
361  	        return;
362  	      }
363  	
364  	      // old timer task is still running -- do not start next
365  	      // task until the previous task completes
366  	      if (m_timer_gate == nullptr) {
367  	        m_timer_gate = new C_TimerGate(this);
368  	        m_timer_op_tracker.wait_for_ops(m_timer_gate);
369  	      }
370  	      m_timer_gate->leader = leader;
371  	      m_timer_gate->timer_callback = timer_callback;
372  	    });
373  	
374  	  int after = delay_factor * m_cct->_conf.get_val<uint64_t>(
375  	    "rbd_mirror_leader_heartbeat_interval");
376  	
377  	  dout(10) << "scheduling " << name << " after " << after << " sec (task "
378  	           << m_timer_task << ")" << dendl;
379  	  m_threads->timer->add_event_after(after, m_timer_task);
380  	}
381  	
382  	template <typename I>
383  	void LeaderWatcher<I>::execute_timer_task(bool leader,
384  	                                          TimerCallback timer_callback) {
385  	  dout(10) << dendl;
386  	
387  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
388  	  ceph_assert(ceph_mutex_is_locked(m_lock));
389  	  ceph_assert(m_timer_op_tracker.empty());
390  	
391  	  if (is_leader(m_lock) != leader) {
392  	    return;
393  	  }
394  	
395  	  m_timer_op_tracker.start_op();
396  	  (this->*timer_callback)();
397  	}
398  	
399  	template <typename I>
400  	void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
401  	                                                       Context *on_finish) {
402  	  dout(10) << "r=" << r << dendl;
403  	
404  	  if (r < 0) {
405  	    if (r == -EAGAIN) {
406  	      dout(10) << "already locked" << dendl;
407  	    } else {
408  	      derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
409  	    }
410  	    on_finish->complete(r);
411  	    return;
412  	  }
413  	
414  	  std::lock_guard locker{m_lock};
415  	  ceph_assert(m_on_finish == nullptr);
416  	  m_on_finish = on_finish;
417  	  m_ret_val = 0;
418  	
419  	  init_instances();
420  	}
421  	
422  	template <typename I>
423  	void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
424  	  dout(10) << dendl;
425  	
426  	  std::lock_guard locker{m_lock};
427  	  ceph_assert(m_on_finish == nullptr);
428  	  m_on_finish = on_finish;
429  	  m_ret_val = 0;
430  	
431  	  notify_listener();
432  	}
433  	
434  	template <typename I>
435  	void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
436  	                                                       Context *on_finish) {
437  	  dout(10) << "r=" << r << dendl;
438  	
439  	  if (r < 0) {
440  	    on_finish->complete(r);
441  	    return;
442  	  }
443  	
444  	  std::lock_guard locker{m_lock};
445  	  ceph_assert(m_on_finish == nullptr);
446  	  m_on_finish = on_finish;
447  	
448  	  notify_lock_released();
449  	}
450  	
451  	template <typename I>
452  	void LeaderWatcher<I>::break_leader_lock() {
453  	  dout(10) << dendl;
454  	
455  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
456  	  ceph_assert(ceph_mutex_is_locked(m_lock));
457  	  ceph_assert(!m_timer_op_tracker.empty());
458  	
459  	  if (m_locker.cookie.empty()) {
460  	    get_locker();
461  	    return;
462  	  }
463  	
464  	  Context *ctx = create_async_context_callback(
465  	    m_work_queue, create_context_callback<
466  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
467  	
468  	  m_leader_lock->break_lock(m_locker, true, ctx);
469  	}
470  	
471  	template <typename I>
472  	void LeaderWatcher<I>::handle_break_leader_lock(int r) {
473  	  dout(10) << "r=" << r << dendl;
474  	
475  	  std::scoped_lock locker{m_threads->timer_lock, m_lock};
476  	  ceph_assert(!m_timer_op_tracker.empty());
477  	
478  	  if (m_leader_lock->is_shutdown()) {
479  	    dout(10) << "canceling due to shutdown" << dendl;
480  	    m_timer_op_tracker.finish_op();
481  	    return;
482  	  }
483  	
484  	  if (r < 0 && r != -ENOENT) {
485  	    derr << "error breaking leader lock: " << cpp_strerror(r)  << dendl;
486  	    schedule_acquire_leader_lock(1);
487  	    m_timer_op_tracker.finish_op();
488  	    return;
489  	  }
490  	
491  	  m_locker = {};
492  	  m_acquire_attempts = 0;
493  	  acquire_leader_lock();
494  	}
495  	
496  	template <typename I>
497  	void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
498  	                                           uint32_t delay_factor) {
499  	  dout(10) << dendl;
500  	
501  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
502  	  ceph_assert(ceph_mutex_is_locked(m_lock));
503  	
504  	  if (reset_leader) {
505  	    m_locker = {};
506  	    m_acquire_attempts = 0;
507  	  }
508  	
509  	  schedule_timer_task("get locker", delay_factor, false,
510  	                      &LeaderWatcher<I>::get_locker, false);
511  	}
512  	
513  	template <typename I>
514  	void LeaderWatcher<I>::get_locker() {
515  	  dout(10) << dendl;
516  	
517  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
518  	  ceph_assert(ceph_mutex_is_locked(m_lock));
519  	  ceph_assert(!m_timer_op_tracker.empty());
520  	
521  	  C_GetLocker *get_locker_ctx = new C_GetLocker(this);
522  	  Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
523  	
524  	  m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
525  	}
526  	
527  	template <typename I>
528  	void LeaderWatcher<I>::handle_get_locker(int r,
529  	                                         librbd::managed_lock::Locker& locker) {
530  	  dout(10) << "r=" << r << dendl;
531  	
532  	  std::scoped_lock l{m_threads->timer_lock, m_lock};
533  	  ceph_assert(!m_timer_op_tracker.empty());
534  	
535  	  if (m_leader_lock->is_shutdown()) {
536  	    dout(10) << "canceling due to shutdown" << dendl;
537  	    m_timer_op_tracker.finish_op();
538  	    return;
539  	  }
540  	
541  	  if (is_leader(m_lock)) {
542  	    m_locker = {};
543  	    m_timer_op_tracker.finish_op();
544  	    return;
545  	  }
546  	
547  	  if (r == -ENOENT) {
548  	    m_locker = {};
549  	    m_acquire_attempts = 0;
550  	    acquire_leader_lock();
551  	    return;
552  	  } else if (r < 0) {
553  	    derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
554  	    schedule_get_locker(true, 1);
555  	    m_timer_op_tracker.finish_op();
556  	    return;
557  	  }
558  	
559  	  bool notify_listener = false;
560  	  if (m_locker != locker) {
561  	    m_locker = locker;
562  	    notify_listener = true;
563  	    if (m_acquire_attempts > 1) {
564  	      dout(10) << "new lock owner detected -- resetting heartbeat counter"
565  	               << dendl;
566  	      m_acquire_attempts = 0;
567  	    }
568  	  }
569  	
570  	  if (m_acquire_attempts >= m_cct->_conf.get_val<uint64_t>(
571  	        "rbd_mirror_leader_max_acquire_attempts_before_break")) {
572  	    dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
573  	            << "failed attempts to acquire" << dendl;
574  	    break_leader_lock();
575  	    return;
576  	  }
577  	
578  	  schedule_acquire_leader_lock(1);
579  	
580  	  if (!notify_listener) {
581  	    m_timer_op_tracker.finish_op();
582  	    return;
583  	  }
584  	
585  	  auto ctx = new LambdaContext(
(1) Event parameter_hidden: declaration hides parameter "r" (declared at line 528)
(2) Event caretline: ^
586  	    [this](int r) {
587  	      std::string instance_id;
588  	      if (get_leader_instance_id(&instance_id)) {
589  	        m_listener->update_leader_handler(instance_id);
590  	      }
591  	      std::scoped_lock locker{m_threads->timer_lock, m_lock};
592  	      m_timer_op_tracker.finish_op();
593  	    });
594  	  m_work_queue->queue(ctx, 0);
595  	}
596  	
597  	template <typename I>
598  	void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
599  	  dout(10) << dendl;
600  	
601  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
602  	  ceph_assert(ceph_mutex_is_locked(m_lock));
603  	
604  	  schedule_timer_task("acquire leader lock",
605  	                      delay_factor *
606  	                        m_cct->_conf.get_val<uint64_t>("rbd_mirror_leader_max_missed_heartbeats"),
607  	                      false, &LeaderWatcher<I>::acquire_leader_lock, false);
608  	}
609  	
610  	template <typename I>
611  	void LeaderWatcher<I>::acquire_leader_lock() {
612  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
613  	  ceph_assert(ceph_mutex_is_locked(m_lock));
614  	  ceph_assert(!m_timer_op_tracker.empty());
615  	
616  	  ++m_acquire_attempts;
617  	  dout(10) << "acquire_attempts=" << m_acquire_attempts << dendl;
618  	
619  	  Context *ctx = create_async_context_callback(
620  	    m_work_queue, create_context_callback<
621  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
622  	  m_leader_lock->try_acquire_lock(ctx);
623  	}
624  	
625  	template <typename I>
626  	void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
627  	  dout(10) << "r=" << r << dendl;
628  	
629  	  std::scoped_lock locker{m_threads->timer_lock, m_lock};
630  	  ceph_assert(!m_timer_op_tracker.empty());
631  	
632  	  if (m_leader_lock->is_shutdown()) {
633  	    dout(10) << "canceling due to shutdown" << dendl;
634  	    m_timer_op_tracker.finish_op();
635  	    return;
636  	  }
637  	
638  	  if (r < 0) {
639  	    if (r == -EAGAIN) {
640  	      dout(10) << "already locked" << dendl;
641  	    } else {
642  	      derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
643  	    }
644  	
645  	    get_locker();
646  	    return;
647  	  }
648  	
649  	  m_locker = {};
650  	  m_acquire_attempts = 0;
651  	
652  	  if (m_ret_val) {
653  	    dout(5) << "releasing due to error on notify" << dendl;
654  	    release_leader_lock();
655  	    m_timer_op_tracker.finish_op();
656  	    return;
657  	  }
658  	
659  	  notify_heartbeat();
660  	}
661  	
662  	template <typename I>
663  	void LeaderWatcher<I>::release_leader_lock() {
664  	  dout(10) << dendl;
665  	
666  	  ceph_assert(ceph_mutex_is_locked(m_lock));
667  	
668  	  Context *ctx = create_async_context_callback(
669  	    m_work_queue, create_context_callback<
670  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
671  	
672  	  m_leader_lock->release_lock(ctx);
673  	}
674  	
675  	template <typename I>
676  	void LeaderWatcher<I>::handle_release_leader_lock(int r) {
677  	  dout(10) << "r=" << r << dendl;
678  	
679  	  std::scoped_lock locker{m_threads->timer_lock, m_lock};
680  	
681  	  if (r < 0) {
682  	    derr << "error releasing lock: " << cpp_strerror(r) << dendl;
683  	    return;
684  	  }
685  	
686  	  schedule_acquire_leader_lock(1);
687  	}
688  	
689  	template <typename I>
690  	void LeaderWatcher<I>::init_instances() {
691  	  dout(10) << dendl;
692  	
693  	  ceph_assert(ceph_mutex_is_locked(m_lock));
694  	  ceph_assert(m_instances == nullptr);
695  	
696  	  m_instances = Instances<I>::create(m_threads, m_ioctx, m_instance_id,
697  	                                     m_instances_listener);
698  	
699  	  Context *ctx = create_context_callback<
700  	    LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
701  	
702  	  m_instances->init(ctx);
703  	}
704  	
705  	template <typename I>
706  	void LeaderWatcher<I>::handle_init_instances(int r) {
707  	  dout(10) << "r=" << r << dendl;
708  	
709  	  Context *on_finish = nullptr;
710  	  if (r < 0) {
711  	    std::lock_guard locker{m_lock};
712  	    derr << "error initializing instances: " << cpp_strerror(r) << dendl;
713  	    m_instances->destroy();
714  	    m_instances = nullptr;
715  	
716  	    ceph_assert(m_on_finish != nullptr);
717  	    std::swap(m_on_finish, on_finish);
718  	  } else {
719  	    std::lock_guard locker{m_lock};
720  	    notify_listener();
721  	    return;
722  	  }
723  	
724  	  on_finish->complete(r);
725  	}
726  	
727  	template <typename I>
728  	void LeaderWatcher<I>::shut_down_instances() {
729  	  dout(10) << dendl;
730  	
731  	  ceph_assert(ceph_mutex_is_locked(m_lock));
732  	  ceph_assert(m_instances != nullptr);
733  	
734  	  Context *ctx = create_async_context_callback(
735  	    m_work_queue, create_context_callback<LeaderWatcher<I>,
736  	      &LeaderWatcher<I>::handle_shut_down_instances>(this));
737  	
738  	  m_instances->shut_down(ctx);
739  	}
740  	
741  	template <typename I>
742  	void LeaderWatcher<I>::handle_shut_down_instances(int r) {
743  	  dout(10) << "r=" << r << dendl;
744  	  ceph_assert(r == 0);
745  	
746  	  Context *on_finish = nullptr;
747  	  {
748  	    std::lock_guard locker{m_lock};
749  	
750  	    m_instances->destroy();
751  	    m_instances = nullptr;
752  	
753  	    ceph_assert(m_on_finish != nullptr);
754  	    std::swap(m_on_finish, on_finish);
755  	  }
756  	  on_finish->complete(r);
757  	}
758  	
759  	template <typename I>
760  	void LeaderWatcher<I>::notify_listener() {
761  	  dout(10) << dendl;
762  	
763  	  ceph_assert(ceph_mutex_is_locked(m_lock));
764  	
765  	  Context *ctx = create_async_context_callback(
766  	    m_work_queue, create_context_callback<
767  	      LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
768  	
769  	  if (is_leader(m_lock)) {
770  	    ctx = new LambdaContext(
771  	      [this, ctx](int r) {
772  	        m_listener->post_acquire_handler(ctx);
773  	      });
774  	  } else {
775  	    ctx = new LambdaContext(
776  	      [this, ctx](int r) {
777  	        m_listener->pre_release_handler(ctx);
778  	      });
779  	  }
780  	  m_work_queue->queue(ctx, 0);
781  	}
782  	
783  	template <typename I>
784  	void LeaderWatcher<I>::handle_notify_listener(int r) {
785  	  dout(10) << "r=" << r << dendl;
786  	
787  	  std::lock_guard locker{m_lock};
788  	
789  	  if (r < 0) {
790  	    derr << "error notifying listener: " << cpp_strerror(r) << dendl;
791  	    m_ret_val = r;
792  	  }
793  	
794  	  if (is_leader(m_lock)) {
795  	    notify_lock_acquired();
796  	  } else {
797  	    shut_down_instances();
798  	  }
799  	}
800  	
801  	template <typename I>
802  	void LeaderWatcher<I>::notify_lock_acquired() {
803  	  dout(10) << dendl;
804  	
805  	  ceph_assert(ceph_mutex_is_locked(m_lock));
806  	
807  	  Context *ctx = create_context_callback<
808  	    LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
809  	
810  	  bufferlist bl;
811  	  encode(NotifyMessage{LockAcquiredPayload{}}, bl);
812  	
813  	  send_notify(bl, nullptr, ctx);
814  	}
815  	
816  	template <typename I>
817  	void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
818  	  dout(10) << "r=" << r << dendl;
819  	
820  	  Context *on_finish = nullptr;
821  	  {
822  	    std::lock_guard locker{m_lock};
823  	    if (r < 0 && r != -ETIMEDOUT) {
824  	      derr << "error notifying leader lock acquired: " << cpp_strerror(r)
825  	           << dendl;
826  	      m_ret_val = r;
827  	    }
828  	
829  	    ceph_assert(m_on_finish != nullptr);
830  	    std::swap(m_on_finish, on_finish);
831  	
832  	    if (m_ret_val == 0) {
833  	      // listener should be ready for instance add/remove events now
834  	      m_instances->unblock_listener();
835  	    }
836  	  }
837  	  on_finish->complete(0);
838  	}
839  	
840  	template <typename I>
841  	void LeaderWatcher<I>::notify_lock_released() {
842  	  dout(10) << dendl;
843  	
844  	  ceph_assert(ceph_mutex_is_locked(m_lock));
845  	
846  	  Context *ctx = create_context_callback<
847  	    LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
848  	
849  	  bufferlist bl;
850  	  encode(NotifyMessage{LockReleasedPayload{}}, bl);
851  	
852  	  send_notify(bl, nullptr, ctx);
853  	}
854  	
855  	template <typename I>
856  	void LeaderWatcher<I>::handle_notify_lock_released(int r) {
857  	  dout(10) << "r=" << r << dendl;
858  	
859  	  Context *on_finish = nullptr;
860  	  {
861  	    std::lock_guard locker{m_lock};
862  	    if (r < 0 && r != -ETIMEDOUT) {
863  	      derr << "error notifying leader lock released: " << cpp_strerror(r)
864  	           << dendl;
865  	    }
866  	
867  	    ceph_assert(m_on_finish != nullptr);
868  	    std::swap(m_on_finish, on_finish);
869  	  }
870  	  on_finish->complete(r);
871  	}
872  	
873  	template <typename I>
874  	void LeaderWatcher<I>::notify_heartbeat() {
875  	  dout(10) << dendl;
876  	
877  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
878  	  ceph_assert(ceph_mutex_is_locked(m_lock));
879  	  ceph_assert(!m_timer_op_tracker.empty());
880  	
881  	  if (!is_leader(m_lock)) {
882  	    dout(5) << "not leader, canceling" << dendl;
883  	    m_timer_op_tracker.finish_op();
884  	    return;
885  	  }
886  	
887  	  Context *ctx = create_context_callback<
888  	    LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
889  	
890  	  bufferlist bl;
891  	  encode(NotifyMessage{HeartbeatPayload{}}, bl);
892  	
893  	  m_heartbeat_response.acks.clear();
894  	  send_notify(bl, &m_heartbeat_response, ctx);
895  	}
896  	
897  	template <typename I>
898  	void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
899  	  dout(10) << "r=" << r << dendl;
900  	
901  	  std::scoped_lock locker{m_threads->timer_lock, m_lock};
902  	  ceph_assert(!m_timer_op_tracker.empty());
903  	
904  	  m_timer_op_tracker.finish_op();
905  	  if (m_leader_lock->is_shutdown()) {
906  	    dout(10) << "canceling due to shutdown" << dendl;
907  	    return;
908  	  } else if (!is_leader(m_lock)) {
909  	    return;
910  	  }
911  	
912  	  if (r < 0 && r != -ETIMEDOUT) {
913  	    derr << "error notifying heartbeat: " << cpp_strerror(r)
914  	         <<  ", releasing leader" << dendl;
915  	    release_leader_lock();
916  	    return;
917  	  }
918  	
919  	  dout(10) << m_heartbeat_response.acks.size() << " acks received, "
920  	           << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
921  	
922  	  std::vector<std::string> instance_ids;
923  	  for (auto &it: m_heartbeat_response.acks) {
924  	    uint64_t notifier_id = it.first.gid;
925  	    instance_ids.push_back(stringify(notifier_id));
926  	  }
927  	  if (!instance_ids.empty()) {
928  	    m_instances->acked(instance_ids);
929  	  }
930  	
931  	  schedule_timer_task("heartbeat", 1, true,
932  	                      &LeaderWatcher<I>::notify_heartbeat, false);
933  	}
934  	
935  	template <typename I>
936  	void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
937  	  dout(10) << dendl;
938  	
939  	  {
940  	    std::scoped_lock locker{m_threads->timer_lock, m_lock};
941  	    if (is_leader(m_lock)) {
942  	      dout(5) << "got another leader heartbeat, ignoring" << dendl;
943  	    } else {
944  	      cancel_timer_task();
945  	      m_acquire_attempts = 0;
946  	      schedule_acquire_leader_lock(1);
947  	    }
948  	  }
949  	
950  	  on_notify_ack->complete(0);
951  	}
952  	
953  	template <typename I>
954  	void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
955  	  dout(10) << dendl;
956  	
957  	  {
958  	    std::scoped_lock locker{m_threads->timer_lock, m_lock};
959  	    if (is_leader(m_lock)) {
960  	      dout(5) << "got another leader lock_acquired, ignoring" << dendl;
961  	    } else {
962  	      cancel_timer_task();
963  	      schedule_get_locker(true, 0);
964  	    }
965  	  }
966  	
967  	  on_notify_ack->complete(0);
968  	}
969  	
970  	template <typename I>
971  	void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
972  	  dout(10) << dendl;
973  	
974  	  {
975  	    std::scoped_lock locker{m_threads->timer_lock, m_lock};
976  	    if (is_leader(m_lock)) {
977  	      dout(5) << "got another leader lock_released, ignoring" << dendl;
978  	    } else {
979  	      cancel_timer_task();
980  	      schedule_get_locker(true, 0);
981  	    }
982  	  }
983  	
984  	  on_notify_ack->complete(0);
985  	}
986  	
987  	template <typename I>
988  	void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
989  	                                     uint64_t notifier_id, bufferlist &bl) {
990  	  dout(10) << "notify_id=" << notify_id << ", handle=" << handle << ", "
991  	           << "notifier_id=" << notifier_id << dendl;
992  	
993  	  Context *ctx = new C_NotifyAck(this, notify_id, handle);
994  	
995  	  if (notifier_id == m_notifier_id) {
996  	    dout(10) << "our own notification, ignoring" << dendl;
997  	    ctx->complete(0);
998  	    return;
999  	  }
1000 	
1001 	  NotifyMessage notify_message;
1002 	  try {
1003 	    auto iter = bl.cbegin();
1004 	    decode(notify_message, iter);
1005 	  } catch (const buffer::error &err) {
1006 	    derr << ": error decoding image notification: " << err.what() << dendl;
1007 	    ctx->complete(0);
1008 	    return;
1009 	  }
1010 	
1011 	  apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1012 	}
1013 	
1014 	template <typename I>
1015 	void LeaderWatcher<I>::handle_rewatch_complete(int r) {
1016 	  dout(5) << "r=" << r << dendl;
1017 	
1018 	  if (r != -EBLACKLISTED) {
1019 	    m_leader_lock->reacquire_lock(nullptr);
1020 	  }
1021 	}
1022 	
1023 	template <typename I>
1024 	void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1025 	                                      Context *on_notify_ack) {
1026 	  dout(10) << "heartbeat" << dendl;
1027 	
1028 	  handle_heartbeat(on_notify_ack);
1029 	}
1030 	
1031 	template <typename I>
1032 	void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1033 	                                      Context *on_notify_ack) {
1034 	  dout(10) << "lock_acquired" << dendl;
1035 	
1036 	  handle_lock_acquired(on_notify_ack);
1037 	}
1038 	
1039 	template <typename I>
1040 	void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1041 	                                      Context *on_notify_ack) {
1042 	  dout(10) << "lock_released" << dendl;
1043 	
1044 	  handle_lock_released(on_notify_ack);
1045 	}
1046 	
1047 	template <typename I>
1048 	void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1049 	                                      Context *on_notify_ack) {
1050 	  dout(10) << "unknown" << dendl;
1051 	
1052 	  on_notify_ack->complete(0);
1053 	}
1054 	
1055 	} // namespace mirror
1056 	} // namespace rbd
1057 	
1058 	template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;
1059