1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "PoolReplayer.h"
5    	#include <boost/bind.hpp>
6    	#include "common/Cond.h"
7    	#include "common/Formatter.h"
8    	#include "common/admin_socket.h"
9    	#include "common/ceph_argparse.h"
10   	#include "common/code_environment.h"
11   	#include "common/common_init.h"
12   	#include "common/debug.h"
13   	#include "common/errno.h"
14   	#include "cls/rbd/cls_rbd_client.h"
15   	#include "global/global_context.h"
16   	#include "librbd/api/Config.h"
17   	#include "librbd/api/Namespace.h"
18   	#include "ServiceDaemon.h"
19   	#include "Threads.h"
20   	
21   	#define dout_context g_ceph_context
22   	#define dout_subsys ceph_subsys_rbd_mirror
23   	#undef dout_prefix
24   	#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
25   	                           << this << " " << __func__ << ": "
26   	
27   	namespace rbd {
28   	namespace mirror {
29   	
30   	using ::operator<<;
31   	
32   	namespace {
33   	
34   	const std::string SERVICE_DAEMON_LEADER_KEY("leader");
35   	
36   	const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
37   	  {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
38   	
39   	template <typename I>
40   	class PoolReplayerAdminSocketCommand {
41   	public:
42   	  PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
43   	    : pool_replayer(pool_replayer) {
44   	  }
45   	  virtual ~PoolReplayerAdminSocketCommand() {}
46   	  virtual int call(Formatter *f) = 0;
47   	protected:
48   	  PoolReplayer<I> *pool_replayer;
49   	};
50   	
51   	template <typename I>
52   	class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
53   	public:
54   	  explicit StatusCommand(PoolReplayer<I> *pool_replayer)
55   	    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
56   	  }
57   	
58   	  int call(Formatter *f) override {
59   	    this->pool_replayer->print_status(f);
60   	    return 0;
61   	  }
62   	};
63   	
64   	template <typename I>
65   	class StartCommand : public PoolReplayerAdminSocketCommand<I> {
66   	public:
67   	  explicit StartCommand(PoolReplayer<I> *pool_replayer)
68   	    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
69   	  }
70   	
71   	  int call(Formatter *f) override {
72   	    this->pool_replayer->start();
73   	    return 0;
74   	  }
75   	};
76   	
77   	template <typename I>
78   	class StopCommand : public PoolReplayerAdminSocketCommand<I> {
79   	public:
80   	  explicit StopCommand(PoolReplayer<I> *pool_replayer)
81   	    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
82   	  }
83   	
84   	  int call(Formatter *f) override {
85   	    this->pool_replayer->stop(true);
86   	    return 0;
87   	  }
88   	};
89   	
90   	template <typename I>
91   	class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
92   	public:
93   	  explicit RestartCommand(PoolReplayer<I> *pool_replayer)
94   	    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
95   	  }
96   	
97   	  int call(Formatter *f) override {
98   	    this->pool_replayer->restart();
99   	    return 0;
100  	  }
101  	};
102  	
103  	template <typename I>
104  	class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
105  	public:
106  	  explicit FlushCommand(PoolReplayer<I> *pool_replayer)
107  	    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
108  	  }
109  	
110  	  int call(Formatter *f) override {
111  	    this->pool_replayer->flush();
112  	    return 0;
113  	  }
114  	};
115  	
116  	template <typename I>
117  	class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
118  	public:
119  	  explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
120  	    : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
121  	  }
122  	
123  	  int call(Formatter *f) override {
124  	    this->pool_replayer->release_leader();
125  	    return 0;
126  	  }
127  	};
128  	
129  	template <typename I>
130  	class PoolReplayerAdminSocketHook : public AdminSocketHook {
131  	public:
132  	  PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
133  	                              PoolReplayer<I> *pool_replayer)
134  	    : admin_socket(cct->get_admin_socket()) {
135  	    std::string command;
136  	    int r;
137  	
138  	    command = "rbd mirror status " + name;
139  	    r = admin_socket->register_command(command, this,
140  					       "get status for rbd mirror " + name);
141  	    if (r == 0) {
142  	      commands[command] = new StatusCommand<I>(pool_replayer);
143  	    }
144  	
145  	    command = "rbd mirror start " + name;
146  	    r = admin_socket->register_command(command, this,
147  					       "start rbd mirror " + name);
148  	    if (r == 0) {
149  	      commands[command] = new StartCommand<I>(pool_replayer);
150  	    }
151  	
152  	    command = "rbd mirror stop " + name;
153  	    r = admin_socket->register_command(command, this,
154  					       "stop rbd mirror " + name);
155  	    if (r == 0) {
156  	      commands[command] = new StopCommand<I>(pool_replayer);
157  	    }
158  	
159  	    command = "rbd mirror restart " + name;
160  	    r = admin_socket->register_command(command, this,
161  					       "restart rbd mirror " + name);
162  	    if (r == 0) {
163  	      commands[command] = new RestartCommand<I>(pool_replayer);
164  	    }
165  	
166  	    command = "rbd mirror flush " + name;
167  	    r = admin_socket->register_command(command, this,
168  					       "flush rbd mirror " + name);
169  	    if (r == 0) {
170  	      commands[command] = new FlushCommand<I>(pool_replayer);
171  	    }
172  	
173  	    command = "rbd mirror leader release " + name;
174  	    r = admin_socket->register_command(command, this,
175  	                                       "release rbd mirror leader " + name);
176  	    if (r == 0) {
177  	      commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
178  	    }
179  	  }
180  	
181  	  ~PoolReplayerAdminSocketHook() override {
182  	    (void)admin_socket->unregister_commands(this);
183  	    for (auto i = commands.begin(); i != commands.end(); ++i) {
184  	      delete i->second;
185  	    }
186  	  }
187  	
188  	  int call(std::string_view command, const cmdmap_t& cmdmap,
189  		   Formatter *f,
190  		   std::ostream& ss,
191  		   bufferlist& out) override {
192  	    auto i = commands.find(command);
193  	    ceph_assert(i != commands.end());
194  	    return i->second->call(f);
195  	  }
196  	
197  	private:
198  	  typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
199  			   std::less<>> Commands;
200  	
201  	  AdminSocket *admin_socket;
202  	  Commands commands;
203  	};
204  	
205  	} // anonymous namespace
206  	
207  	template <typename I>
208  	PoolReplayer<I>::PoolReplayer(
209  	    Threads<I> *threads, ServiceDaemon<I> *service_daemon,
210  	    journal::CacheManagerHandler *cache_manager_handler, int64_t local_pool_id,
211  	    const PeerSpec &peer, const std::vector<const char*> &args) :
212  	  m_threads(threads),
213  	  m_service_daemon(service_daemon),
214  	  m_cache_manager_handler(cache_manager_handler),
215  	  m_local_pool_id(local_pool_id),
216  	  m_peer(peer),
217  	  m_args(args),
218  	  m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))),
219  	  m_pool_replayer_thread(this),
220  	  m_leader_listener(this) {
221  	}
222  	
223  	template <typename I>
224  	PoolReplayer<I>::~PoolReplayer()
225  	{
226  	  shut_down();
227  	
228  	  ceph_assert(m_asok_hook == nullptr);
229  	}
230  	
231  	template <typename I>
232  	bool PoolReplayer<I>::is_blacklisted() const {
233  	  std::lock_guard locker{m_lock};
234  	  return m_blacklisted;
235  	}
236  	
237  	template <typename I>
238  	bool PoolReplayer<I>::is_leader() const {
239  	  std::lock_guard locker{m_lock};
240  	  return m_leader_watcher && m_leader_watcher->is_leader();
241  	}
242  	
243  	template <typename I>
244  	bool PoolReplayer<I>::is_running() const {
245  	  return m_pool_replayer_thread.is_started();
246  	}
247  	
248  	template <typename I>
249  	void PoolReplayer<I>::init() {
250  	  ceph_assert(!m_pool_replayer_thread.is_started());
251  	
252  	  // reset state
253  	  m_stopping = false;
254  	  m_blacklisted = false;
255  	
256  	  dout(10) << "replaying for " << m_peer << dendl;
257  	  int r = init_rados(g_ceph_context->_conf->cluster,
258  	                     g_ceph_context->_conf->name.to_str(),
259  	                     "", "", "local cluster", &m_local_rados, false);
260  	  if (r < 0) {
261  	    m_callout_id = m_service_daemon->add_or_update_callout(
262  	      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
263  	      "unable to connect to local cluster");
264  	    return;
265  	  }
266  	
267  	  r = init_rados(m_peer.cluster_name, m_peer.client_name,
268  	                 m_peer.mon_host, m_peer.key,
269  	                 std::string("remote peer ") + stringify(m_peer),
270  	                 &m_remote_rados, true);
271  	  if (r < 0) {
272  	    m_callout_id = m_service_daemon->add_or_update_callout(
273  	      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
274  	      "unable to connect to remote cluster");
275  	    return;
276  	  }
277  	
278  	  r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
279  	  if (r < 0) {
280  	    derr << "error accessing local pool " << m_local_pool_id << ": "
281  	         << cpp_strerror(r) << dendl;
282  	    return;
283  	  }
284  	
285  	  auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
286  	  librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
287  	
288  	  r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
289  	                                          &m_local_mirror_uuid);
290  	  if (r < 0) {
291  	    derr << "failed to retrieve local mirror uuid from pool "
292  	         << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
293  	    m_callout_id = m_service_daemon->add_or_update_callout(
294  	      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
295  	      "unable to query local mirror uuid");
296  	    return;
297  	  }
298  	
299  	  r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
300  	                                   m_remote_io_ctx);
301  	  if (r < 0) {
302  	    derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
303  	         << ": " << cpp_strerror(r) << dendl;
304  	    m_callout_id = m_service_daemon->add_or_update_callout(
305  	      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
306  	      "unable to access remote pool");
307  	    return;
308  	  }
309  	
310  	  dout(10) << "connected to " << m_peer << dendl;
311  	
312  	  m_image_sync_throttler.reset(
313  	      Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
314  	
315  	  m_image_deletion_throttler.reset(
316  	      Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions"));
317  	
318  	  m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
319  	      "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
320  	      m_threads, m_image_sync_throttler.get(), m_image_deletion_throttler.get(),
321  	      m_service_daemon, m_cache_manager_handler));
322  	
323  	  C_SaferCond on_init;
324  	  m_default_namespace_replayer->init(&on_init);
325  	  r = on_init.wait();
326  	  if (r < 0) {
327  	    derr << "error initializing default namespace replayer: " << cpp_strerror(r)
328  	         << dendl;
329  	    m_callout_id = m_service_daemon->add_or_update_callout(
330  	      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
331  	      "unable to initialize default namespace replayer");
332  	    m_default_namespace_replayer.reset();
333  	    return;
334  	  }
335  	
336  	  m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
337  	                                                  &m_leader_listener));
338  	  r = m_leader_watcher->init();
339  	  if (r < 0) {
340  	    derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
341  	    m_callout_id = m_service_daemon->add_or_update_callout(
342  	      m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
343  	      "unable to initialize leader messenger object");
344  	    m_leader_watcher.reset();
345  	    return;
346  	  }
347  	
348  	  if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
349  	    m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
350  	    m_callout_id = service_daemon::CALLOUT_ID_NONE;
351  	  }
352  	
353  	  m_pool_replayer_thread.create("pool replayer");
354  	}
355  	
356  	template <typename I>
357  	void PoolReplayer<I>::shut_down() {
358  	  {
359  	    std::lock_guard l{m_lock};
360  	    m_stopping = true;
361  	    m_cond.notify_all();
362  	  }
363  	  if (m_pool_replayer_thread.is_started()) {
364  	    m_pool_replayer_thread.join();
365  	  }
366  	
367  	  if (m_leader_watcher) {
368  	    m_leader_watcher->shut_down();
369  	  }
370  	  m_leader_watcher.reset();
371  	
372  	  if (m_default_namespace_replayer) {
373  	    C_SaferCond on_shut_down;
374  	    m_default_namespace_replayer->shut_down(&on_shut_down);
375  	    on_shut_down.wait();
376  	  }
377  	  m_default_namespace_replayer.reset();
378  	
379  	  m_image_sync_throttler.reset();
380  	  m_image_deletion_throttler.reset();
381  	
382  	  m_local_rados.reset();
383  	  m_remote_rados.reset();
384  	}
385  	
386  	template <typename I>
387  	int PoolReplayer<I>::init_rados(const std::string &cluster_name,
388  				        const std::string &client_name,
389  	                                const std::string &mon_host,
390  	                                const std::string &key,
391  				        const std::string &description,
392  				        RadosRef *rados_ref,
393  	                                bool strip_cluster_overrides) {
394  	  rados_ref->reset(new librados::Rados());
395  	
396  	  // NOTE: manually bootstrap a CephContext here instead of via
397  	  // the librados API to avoid mixing global singletons between
398  	  // the librados shared library and the daemon
399  	  // TODO: eliminate intermingling of global singletons within Ceph APIs
400  	  CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
401  	  if (client_name.empty() || !iparams.name.from_str(client_name)) {
402  	    derr << "error initializing cluster handle for " << description << dendl;
403  	    return -EINVAL;
404  	  }
405  	
406  	  CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
407  	                                    CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
408  	  cct->_conf->cluster = cluster_name;
409  	
410  	  // librados::Rados::conf_read_file
411  	  int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
412  	  if (r < 0 && r != -ENOENT) {
413  	    // do not treat this as fatal, it might still be able to connect
414  	    derr << "could not read ceph conf for " << description << ": "
415  		 << cpp_strerror(r) << dendl;
416  	  }
417  	
418  	  // preserve cluster-specific config settings before applying environment/cli
419  	  // overrides
420  	  std::map<std::string, std::string> config_values;
421  	  if (strip_cluster_overrides) {
422  	    // remote peer connections shouldn't apply cluster-specific
423  	    // configuration settings
424  	    for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
425  	      config_values[key] = cct->_conf.get_val<std::string>(key);
426  	    }
427  	  }
428  	
429  	  cct->_conf.parse_env(cct->get_module_type());
430  	
431  	  // librados::Rados::conf_parse_env
432  	  std::vector<const char*> args;
433  	  r = cct->_conf.parse_argv(args);
434  	  if (r < 0) {
435  	    derr << "could not parse environment for " << description << ":"
436  	         << cpp_strerror(r) << dendl;
437  	    cct->put();
438  	    return r;
439  	  }
440  	  cct->_conf.parse_env(cct->get_module_type());
441  	
442  	  if (!m_args.empty()) {
443  	    // librados::Rados::conf_parse_argv
444  	    args = m_args;
445  	    r = cct->_conf.parse_argv(args);
446  	    if (r < 0) {
447  	      derr << "could not parse command line args for " << description << ": "
448  		   << cpp_strerror(r) << dendl;
449  	      cct->put();
450  	      return r;
451  	    }
452  	  }
453  	
454  	  if (strip_cluster_overrides) {
455  	    // remote peer connections shouldn't apply cluster-specific
456  	    // configuration settings
457  	    for (auto& pair : config_values) {
458  	      auto value = cct->_conf.get_val<std::string>(pair.first);
459  	      if (pair.second != value) {
460  	        dout(0) << "reverting global config option override: "
461  	                << pair.first << ": " << value << " -> " << pair.second
462  	                << dendl;
463  	        cct->_conf.set_val_or_die(pair.first, pair.second);
464  	      }
465  	    }
466  	  }
467  	
468  	  if (!g_ceph_context->_conf->admin_socket.empty()) {
469  	    cct->_conf.set_val_or_die("admin_socket",
470  	                               "$run_dir/$name.$pid.$cluster.$cctid.asok");
471  	  }
472  	
473  	  if (!mon_host.empty()) {
474  	    r = cct->_conf.set_val("mon_host", mon_host);
475  	    if (r < 0) {
476  	      derr << "failed to set mon_host config for " << description << ": "
477  	           << cpp_strerror(r) << dendl;
478  	      cct->put();
479  	      return r;
480  	    }
481  	  }
482  	
483  	  if (!key.empty()) {
484  	    r = cct->_conf.set_val("key", key);
485  	    if (r < 0) {
486  	      derr << "failed to set key config for " << description << ": "
487  	           << cpp_strerror(r) << dendl;
488  	      cct->put();
489  	      return r;
490  	    }
491  	  }
492  	
493  	  // disable unnecessary librbd cache
494  	  cct->_conf.set_val_or_die("rbd_cache", "false");
495  	  cct->_conf.apply_changes(nullptr);
496  	  cct->_conf.complain_about_parse_error(cct);
497  	
498  	  r = (*rados_ref)->init_with_context(cct);
499  	  ceph_assert(r == 0);
500  	  cct->put();
501  	
502  	  r = (*rados_ref)->connect();
503  	  if (r < 0) {
504  	    derr << "error connecting to " << description << ": "
505  		 << cpp_strerror(r) << dendl;
506  	    return r;
507  	  }
508  	
509  	  return 0;
510  	}
511  	
512  	template <typename I>
513  	void PoolReplayer<I>::run() {
514  	  dout(20) << dendl;
515  	
516  	  while (true) {
517  	    std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
518  	                                 m_peer.cluster_name;
519  	    if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
520  	      m_asok_hook_name = asok_hook_name;
521  	      delete m_asok_hook;
522  	
523  	      m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
524  							       m_asok_hook_name, this);
525  	    }
526  	
527  	    with_namespace_replayers([this]() { update_namespace_replayers(); });
528  	
529  	    std::unique_lock locker{m_lock};
530  	
531  	    if (m_default_namespace_replayer->is_blacklisted()) {
532  	      m_blacklisted = true;
533  	      m_stopping = true;
534  	    }
535  	
536  	    for (auto &it : m_namespace_replayers) {
537  	      if (it.second->is_blacklisted()) {
538  	        m_blacklisted = true;
539  	        m_stopping = true;
540  	        break;
541  	      }
542  	    }
543  	
544  	    if (m_stopping) {
545  	      break;
546  	    }
547  	
548  	    auto seconds = g_ceph_context->_conf.get_val<uint64_t>(
549  	        "rbd_mirror_pool_replayers_refresh_interval");
550  	    m_cond.wait_for(locker, ceph::make_timespan(seconds));
551  	  }
552  	
553  	  // shut down namespace replayers
554  	  with_namespace_replayers([this]() { update_namespace_replayers(); });
555  	
556  	  delete m_asok_hook;
557  	  m_asok_hook = nullptr;
558  	}
559  	
560  	template <typename I>
561  	void PoolReplayer<I>::update_namespace_replayers() {
562  	  dout(20) << dendl;
563  	
564  	  ceph_assert(ceph_mutex_is_locked(m_lock));
565  	
566  	  std::set<std::string> mirroring_namespaces;
567  	  if (!m_stopping) {
568  	    int r = list_mirroring_namespaces(&mirroring_namespaces);
569  	    if (r < 0) {
570  	      return;
571  	    }
572  	  }
573  	
574  	  auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
575  	  C_SaferCond cond;
576  	  auto gather_ctx = new C_Gather(cct, &cond);
577  	  for (auto it = m_namespace_replayers.begin();
578  	       it != m_namespace_replayers.end(); ) {
579  	    auto iter = mirroring_namespaces.find(it->first);
580  	    if (iter == mirroring_namespaces.end()) {
581  	      auto namespace_replayer = it->second;
582  	      auto on_shut_down = new LambdaContext(
583  	        [this, namespace_replayer, ctx=gather_ctx->new_sub()](int r) {
584  	          delete namespace_replayer;
585  	          ctx->complete(r);
586  	        });
587  	      namespace_replayer->shut_down(on_shut_down);
588  	      it = m_namespace_replayers.erase(it);
589  	    } else {
590  	      mirroring_namespaces.erase(iter);
591  	      it++;
592  	    }
593  	  }
594  	
595  	  for (auto &name : mirroring_namespaces) {
596  	    auto namespace_replayer = NamespaceReplayer<I>::create(
597  	        name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
598  	        m_threads, m_image_sync_throttler.get(),
599  	        m_image_deletion_throttler.get(), m_service_daemon,
600  	        m_cache_manager_handler);
601  	    auto on_init = new LambdaContext(
602  	        [this, namespace_replayer, name, &mirroring_namespaces,
603  	         ctx=gather_ctx->new_sub()](int r) {
604  	          if (r < 0) {
605  	            derr << "failed to initialize namespace replayer for namespace "
606  	                 << name << ": " << cpp_strerror(r) << dendl;
607  	            delete namespace_replayer;
608  	            mirroring_namespaces.erase(name);
609  	          } else {
610  	            std::lock_guard locker{m_lock};
611  	            m_namespace_replayers[name] = namespace_replayer;
612  	          }
613  	          ctx->complete(r);
614  	        });
615  	    namespace_replayer->init(on_init);
616  	  }
617  	
618  	  gather_ctx->activate();
619  	
620  	  m_lock.unlock();
621  	  cond.wait();
622  	  m_lock.lock();
623  	
624  	  if (m_leader) {
625  	    C_SaferCond acquire_cond;
626  	    auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond);
627  	
628  	    for (auto &name : mirroring_namespaces) {
629  	      namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub());
630  	    }
631  	    acquire_gather_ctx->activate();
632  	
633  	    m_lock.unlock();
634  	    acquire_cond.wait();
635  	    m_lock.lock();
636  	
637  	    std::vector<std::string> instance_ids;
638  	    m_leader_watcher->list_instances(&instance_ids);
639  	
640  	    for (auto &name : mirroring_namespaces) {
641  	      auto it = m_namespace_replayers.find(name);
642  	      if (it == m_namespace_replayers.end()) {
643  	        // acuire leader for this namespace replayer failed
644  	        continue;
645  	      }
646  	      it->second->handle_instances_added(instance_ids);
647  	    }
648  	  } else {
649  	    std::string leader_instance_id;
650  	    if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) {
651  	      for (auto &name : mirroring_namespaces) {
652  	        m_namespace_replayers[name]->handle_update_leader(leader_instance_id);
653  	      }
654  	    }
655  	  }
656  	}
657  	
658  	template <typename I>
659  	int PoolReplayer<I>::list_mirroring_namespaces(
660  	    std::set<std::string> *namespaces) {
661  	  ceph_assert(ceph_mutex_is_locked(m_lock));
662  	
663  	  std::vector<std::string> names;
664  	
665  	  int r = librbd::api::Namespace<I>::list(m_local_io_ctx, &names);
666  	  if (r < 0) {
667  	    derr << "failed to list namespaces: " << cpp_strerror(r) << dendl;
668  	    return r;
669  	  }
670  	
671  	  for (auto &name : names) {
672  	    cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
673  	    int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode);
674  	    if (r < 0 && r != -ENOENT) {
675  	      derr << "failed to get namespace mirror mode: " << cpp_strerror(r)
676  	           << dendl;
677  	      if (m_namespace_replayers.count(name) == 0) {
678  	        continue;
679  	      }
680  	    } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
681  	      dout(10) << "mirroring is disabled for namespace " << name << dendl;
682  	      continue;
683  	    }
684  	
685  	    namespaces->insert(name);
686  	  }
687  	
688  	  return 0;
689  	}
690  	
691  	template <typename I>
692  	void PoolReplayer<I>::namespace_replayer_acquire_leader(const std::string &name,
693  	                                                        Context *on_finish) {
694  	  ceph_assert(ceph_mutex_is_locked(m_lock));
695  	
696  	  auto it = m_namespace_replayers.find(name);
697  	  ceph_assert(it != m_namespace_replayers.end());
698  	
699  	  on_finish = new LambdaContext(
700  	      [this, name, on_finish](int r) {
701  	        if (r < 0) {
702  	          derr << "failed to handle acquire leader for namespace: "
703  	               << name << ": " << cpp_strerror(r) << dendl;
704  	
705  	          // remove the namespace replayer -- update_namespace_replayers will
706  	          // retry to create it and acquire leader.
707  	
708  	          std::lock_guard locker{m_lock};
709  	
710  	          auto namespace_replayer = m_namespace_replayers[name];
711  	          m_namespace_replayers.erase(name);
712  	          auto on_shut_down = new LambdaContext(
713  	              [this, namespace_replayer, on_finish](int r) {
714  	                delete namespace_replayer;
715  	                on_finish->complete(r);
716  	              });
717  	          namespace_replayer->shut_down(on_shut_down);
718  	          return;
719  	        }
720  	        on_finish->complete(0);
721  	      });
722  	
723  	  it->second->handle_acquire_leader(on_finish);
724  	}
725  	
726  	template <typename I>
727  	void PoolReplayer<I>::print_status(Formatter *f) {
728  	  dout(20) << dendl;
729  	
730  	  assert(f);
731  	
732  	  std::lock_guard l{m_lock};
733  	
734  	  f->open_object_section("pool_replayer_status");
735  	  f->dump_string("pool", m_local_io_ctx.get_pool_name());
736  	  f->dump_stream("peer") << m_peer;
737  	  f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id();
738  	
739  	  std::string state("running");
740  	  if (m_manual_stop) {
741  	    state = "stopped (manual)";
742  	  } else if (m_stopping) {
743  	    state = "stopped";
744  	  }
745  	  f->dump_string("state", state);
746  	
747  	  std::string leader_instance_id;
748  	  m_leader_watcher->get_leader_instance_id(&leader_instance_id);
749  	  f->dump_string("leader_instance_id", leader_instance_id);
750  	
751  	  bool leader = m_leader_watcher->is_leader();
752  	  f->dump_bool("leader", leader);
753  	  if (leader) {
754  	    std::vector<std::string> instance_ids;
755  	    m_leader_watcher->list_instances(&instance_ids);
756  	    f->open_array_section("instances");
757  	    for (auto instance_id : instance_ids) {
758  	      f->dump_string("instance_id", instance_id);
759  	    }
760  	    f->close_section(); // instances
761  	  }
762  	
763  	  f->dump_string("local_cluster_admin_socket",
764  	                 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
765  	                     get_val<std::string>("admin_socket"));
766  	  f->dump_string("remote_cluster_admin_socket",
767  	                 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
768  	                     get_val<std::string>("admin_socket"));
769  	
770  	  if (m_image_sync_throttler) {
771  	    f->open_object_section("sync_throttler");
772  	    m_image_sync_throttler->print_status(f);
773  	    f->close_section(); // sync_throttler
774  	  }
775  	
776  	  if (m_image_deletion_throttler) {
777  	    f->open_object_section("deletion_throttler");
778  	    m_image_deletion_throttler->print_status(f);
779  	    f->close_section(); // deletion_throttler
780  	  }
781  	
782  	  m_default_namespace_replayer->print_status(f);
783  	
784  	  f->open_array_section("namespaces");
785  	  for (auto &it : m_namespace_replayers) {
786  	    f->open_object_section("namespace");
787  	    f->dump_string("name", it.first);
788  	    it.second->print_status(f);
789  	    f->close_section(); // namespace
790  	  }
791  	  f->close_section(); // namespaces
792  	
793  	  f->close_section(); // pool_replayer_status
794  	}
795  	
796  	template <typename I>
797  	void PoolReplayer<I>::start() {
798  	  dout(20) << dendl;
799  	
800  	  std::lock_guard l{m_lock};
801  	
802  	  if (m_stopping) {
803  	    return;
804  	  }
805  	
806  	  m_manual_stop = false;
807  	
808  	  m_default_namespace_replayer->start();
809  	  for (auto &it : m_namespace_replayers) {
810  	    it.second->start();
811  	  }
812  	}
813  	
814  	template <typename I>
815  	void PoolReplayer<I>::stop(bool manual) {
816  	  dout(20) << "enter: manual=" << manual << dendl;
817  	
818  	  std::lock_guard l{m_lock};
819  	  if (!manual) {
820  	    m_stopping = true;
821  	    m_cond.notify_all();
822  	    return;
823  	  } else if (m_stopping) {
824  	    return;
825  	  }
826  	
827  	  m_manual_stop = true;
828  	
829  	  m_default_namespace_replayer->stop();
830  	  for (auto &it : m_namespace_replayers) {
831  	    it.second->stop();
832  	  }
833  	}
834  	
835  	template <typename I>
836  	void PoolReplayer<I>::restart() {
837  	  dout(20) << dendl;
838  	
839  	  std::lock_guard l{m_lock};
840  	
841  	  if (m_stopping) {
842  	    return;
843  	  }
844  	
845  	  m_default_namespace_replayer->restart();
846  	  for (auto &it : m_namespace_replayers) {
847  	    it.second->restart();
848  	  }
849  	}
850  	
851  	template <typename I>
852  	void PoolReplayer<I>::flush() {
853  	  dout(20) << dendl;
854  	
855  	  std::lock_guard l{m_lock};
856  	
857  	  if (m_stopping || m_manual_stop) {
858  	    return;
859  	  }
860  	
861  	  m_default_namespace_replayer->flush();
862  	  for (auto &it : m_namespace_replayers) {
863  	    it.second->flush();
864  	  }
865  	}
866  	
867  	template <typename I>
868  	void PoolReplayer<I>::release_leader() {
869  	  dout(20) << dendl;
870  	
871  	  std::lock_guard l{m_lock};
872  	
873  	  if (m_stopping || !m_leader_watcher) {
874  	    return;
875  	  }
876  	
877  	  m_leader_watcher->release_leader();
878  	}
879  	
880  	template <typename I>
881  	void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
882  	  dout(20) << dendl;
883  	
884  	  with_namespace_replayers(
(1) Event parameter_hidden: declaration hides parameter "on_finish" (declared at line 881)
(2) Event caretline: ^
885  	      [this](Context *on_finish) {
886  	        dout(10) << "handle_post_acquire_leader" << dendl;
887  	
888  	        ceph_assert(ceph_mutex_is_locked(m_lock));
889  	
890  	        m_service_daemon->add_or_update_attribute(m_local_pool_id,
891  	                                                  SERVICE_DAEMON_LEADER_KEY,
892  	                                                  true);
893  	        auto ctx = new LambdaContext(
894  	            [this, on_finish](int r) {
895  	              if (r == 0) {
896  	                std::lock_guard locker{m_lock};
897  	                m_leader = true;
898  	              }
899  	              on_finish->complete(r);
900  	            });
901  	
902  	        auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
903  	        auto gather_ctx = new C_Gather(cct, ctx);
904  	
905  	        m_default_namespace_replayer->handle_acquire_leader(
906  	            gather_ctx->new_sub());
907  	
908  	        for (auto &it : m_namespace_replayers) {
909  	          namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub());
910  	        }
911  	
912  	        gather_ctx->activate();
913  	      }, on_finish);
914  	}
915  	
916  	template <typename I>
917  	void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
918  	  dout(20) << dendl;
919  	
920  	  with_namespace_replayers(
921  	      [this](Context *on_finish) {
922  	        dout(10) << "handle_pre_release_leader" << dendl;
923  	
924  	        ceph_assert(ceph_mutex_is_locked(m_lock));
925  	
926  	        m_leader = false;
927  	        m_service_daemon->remove_attribute(m_local_pool_id,
928  	                                           SERVICE_DAEMON_LEADER_KEY);
929  	
930  	        auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
931  	        auto gather_ctx = new C_Gather(cct, on_finish);
932  	
933  	        m_default_namespace_replayer->handle_release_leader(
934  	            gather_ctx->new_sub());
935  	
936  	        for (auto &it : m_namespace_replayers) {
937  	          it.second->handle_release_leader(gather_ctx->new_sub());
938  	        }
939  	
940  	        gather_ctx->activate();
941  	      }, on_finish);
942  	}
943  	
944  	template <typename I>
945  	void PoolReplayer<I>::handle_update_leader(
946  	    const std::string &leader_instance_id) {
947  	  dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
948  	
949  	  std::lock_guard locker{m_lock};
950  	
951  	  m_default_namespace_replayer->handle_update_leader(leader_instance_id);
952  	
953  	  for (auto &it : m_namespace_replayers) {
954  	    it.second->handle_update_leader(leader_instance_id);
955  	  }
956  	}
957  	
958  	template <typename I>
959  	void PoolReplayer<I>::handle_instances_added(
960  	    const std::vector<std::string> &instance_ids) {
961  	  dout(5) << "instance_ids=" << instance_ids << dendl;
962  	
963  	  std::lock_guard locker{m_lock};
964  	  if (!m_leader_watcher->is_leader()) {
965  	    return;
966  	  }
967  	
968  	  m_default_namespace_replayer->handle_instances_added(instance_ids);
969  	
970  	  for (auto &it : m_namespace_replayers) {
971  	    it.second->handle_instances_added(instance_ids);
972  	  }
973  	}
974  	
975  	template <typename I>
976  	void PoolReplayer<I>::handle_instances_removed(
977  	    const std::vector<std::string> &instance_ids) {
978  	  dout(5) << "instance_ids=" << instance_ids << dendl;
979  	
980  	  std::lock_guard locker{m_lock};
981  	  if (!m_leader_watcher->is_leader()) {
982  	    return;
983  	  }
984  	
985  	  m_default_namespace_replayer->handle_instances_removed(instance_ids);
986  	
987  	  for (auto &it : m_namespace_replayers) {
988  	    it.second->handle_instances_removed(instance_ids);
989  	  }
990  	}
991  	
992  	} // namespace mirror
993  	} // namespace rbd
994  	
995  	template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;
996