1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "include/compat.h"
5    	#include "common/Formatter.h"
6    	#include "common/admin_socket.h"
7    	#include "common/debug.h"
8    	#include "common/errno.h"
9    	#include "include/stringify.h"
10   	#include "cls/rbd/cls_rbd_client.h"
11   	#include "common/Timer.h"
12   	#include "common/WorkQueue.h"
13   	#include "global/global_context.h"
14   	#include "journal/Journaler.h"
15   	#include "journal/ReplayHandler.h"
16   	#include "journal/Settings.h"
17   	#include "librbd/ExclusiveLock.h"
18   	#include "librbd/ImageCtx.h"
19   	#include "librbd/ImageState.h"
20   	#include "librbd/Journal.h"
21   	#include "librbd/Operations.h"
22   	#include "librbd/Utils.h"
23   	#include "librbd/journal/Replay.h"
24   	#include "ImageDeleter.h"
25   	#include "ImageReplayer.h"
26   	#include "Threads.h"
27   	#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
28   	#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29   	#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
30   	#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
31   	#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
32   	#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33   	
34   	#define dout_context g_ceph_context
35   	#define dout_subsys ceph_subsys_rbd_mirror
36   	#undef dout_prefix
37   	#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
38   	                           << __func__ << ": "
39   	
40   	using std::map;
41   	using std::string;
42   	using std::unique_ptr;
43   	using std::shared_ptr;
44   	using std::vector;
45   	
46   	extern PerfCounters *g_perf_counters;
47   	
48   	namespace rbd {
49   	namespace mirror {
50   	
51   	using librbd::util::create_context_callback;
52   	using librbd::util::create_rados_callback;
53   	using namespace rbd::mirror::image_replayer;
54   	
55   	template <typename I>
56   	std::ostream &operator<<(std::ostream &os,
57   	                         const typename ImageReplayer<I>::State &state);
58   	
59   	namespace {
60   	
61   	template <typename I>
62   	struct ReplayHandler : public ::journal::ReplayHandler {
63   	  ImageReplayer<I> *replayer;
64   	  ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
65   	
66   	  void handle_entries_available() override {
67   	    replayer->handle_replay_ready();
68   	  }
69   	  void handle_complete(int r) override {
70   	    std::stringstream ss;
71   	    if (r == -ENOMEM) {
72   	      ss << "not enough memory in autotune cache";
73   	    } else if (r < 0) {
74   	      ss << "replay completed with error: " << cpp_strerror(r);
75   	    }
76   	    replayer->handle_replay_complete(r, ss.str());
77   	  }
78   	};
79   	
80   	template <typename I>
81   	class ImageReplayerAdminSocketCommand {
82   	public:
83   	  ImageReplayerAdminSocketCommand(const std::string &desc,
84   	                                  ImageReplayer<I> *replayer)
85   	    : desc(desc), replayer(replayer) {
86   	  }
87   	  virtual ~ImageReplayerAdminSocketCommand() {}
88   	  virtual int call(Formatter *f) = 0;
89   	
90   	  std::string desc;
91   	  ImageReplayer<I> *replayer;
92   	  bool registered = false;
93   	};
94   	
95   	template <typename I>
96   	class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
97   	public:
98   	  explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
99   	    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
100  	  }
101  	
102  	  int call(Formatter *f) override {
103  	    this->replayer->print_status(f);
104  	    return 0;
105  	  }
106  	};
107  	
108  	template <typename I>
109  	class StartCommand : public ImageReplayerAdminSocketCommand<I> {
110  	public:
111  	  explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
112  	    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
113  	  }
114  	
115  	  int call(Formatter *f) override {
116  	    this->replayer->start(nullptr, true);
117  	    return 0;
118  	  }
119  	};
120  	
121  	template <typename I>
122  	class StopCommand : public ImageReplayerAdminSocketCommand<I> {
123  	public:
124  	  explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
125  	    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
126  	  }
127  	
128  	  int call(Formatter *f) override {
129  	    this->replayer->stop(nullptr, true);
130  	    return 0;
131  	  }
132  	};
133  	
134  	template <typename I>
135  	class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
136  	public:
137  	  explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
138  	    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
139  	  }
140  	
141  	  int call(Formatter *f) override {
142  	    this->replayer->restart();
143  	    return 0;
144  	  }
145  	};
146  	
147  	template <typename I>
148  	class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
149  	public:
150  	  explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
151  	    : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
152  	  }
153  	
154  	  int call(Formatter *f) override {
155  	    this->replayer->flush();
156  	    return 0;
157  	  }
158  	};
159  	
160  	template <typename I>
161  	class ImageReplayerAdminSocketHook : public AdminSocketHook {
162  	public:
163  	  ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
164  				       ImageReplayer<I> *replayer)
165  	    : admin_socket(cct->get_admin_socket()),
166  	      commands{{"rbd mirror flush " + name,
167  	                new FlushCommand<I>("flush rbd mirror " + name, replayer)},
168  	               {"rbd mirror restart " + name,
169  	                new RestartCommand<I>("restart rbd mirror " + name, replayer)},
170  	               {"rbd mirror start " + name,
171  	                new StartCommand<I>("start rbd mirror " + name, replayer)},
172  	               {"rbd mirror status " + name,
173  	                new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
174  	               {"rbd mirror stop " + name,
175  	                new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
176  	  }
177  	
178  	  int register_commands() {
179  	    for (auto &it : commands) {
180  	      int r = admin_socket->register_command(it.first, this,
181  	                                             it.second->desc);
182  	      if (r < 0) {
183  	        return r;
184  	      }
185  	      it.second->registered = true;
186  	    }
187  	    return 0;
188  	  }
189  	
190  	  ~ImageReplayerAdminSocketHook() override {
191  	    admin_socket->unregister_commands(this);
192  	    for (auto &it : commands) {
193  	      delete it.second;
194  	    }
195  	    commands.clear();
196  	  }
197  	
198  	  int call(std::string_view command, const cmdmap_t& cmdmap,
199  		   Formatter *f,
200  		   std::ostream& errss,
201  		   bufferlist& out) override {
202  	    auto i = commands.find(command);
203  	    ceph_assert(i != commands.end());
204  	    return i->second->call(f);
205  	  }
206  	
207  	private:
208  	  typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
209  			   std::less<>> Commands;
210  	
211  	  AdminSocket *admin_socket;
212  	  Commands commands;
213  	};
214  	
215  	uint32_t calculate_replay_delay(const utime_t &event_time,
216  	                                int mirroring_replay_delay) {
217  	    if (mirroring_replay_delay <= 0) {
218  	      return 0;
219  	    }
220  	
221  	    utime_t now = ceph_clock_now();
222  	    if (event_time + mirroring_replay_delay <= now) {
223  	      return 0;
224  	    }
225  	
226  	    // ensure it is rounded up when converting to integer
227  	    return (event_time + mirroring_replay_delay - now) + 1;
228  	}
229  	
230  	} // anonymous namespace
231  	
232  	template <typename I>
233  	void ImageReplayer<I>::BootstrapProgressContext::update_progress(
234  	  const std::string &description, bool flush)
235  	{
236  	  const std::string desc = "bootstrapping, " + description;
237  	  replayer->set_state_description(0, desc);
238  	  if (flush) {
239  	    replayer->update_mirror_image_status(false, boost::none);
240  	  }
241  	}
242  	
243  	template <typename I>
244  	void ImageReplayer<I>::RemoteJournalerListener::handle_update(
245  	  ::journal::JournalMetadata *) {
246  	  auto ctx = new LambdaContext([this](int r) {
247  	      replayer->handle_remote_journal_metadata_updated();
248  	    });
249  	  replayer->m_threads->work_queue->queue(ctx, 0);
250  	}
251  	
252  	template <typename I>
253  	ImageReplayer<I>::ImageReplayer(
254  	    librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
255  	    const std::string &global_image_id, Threads<I> *threads,
256  	    InstanceWatcher<I> *instance_watcher,
257  	    journal::CacheManagerHandler *cache_manager_handler) :
258  	  m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
259  	  m_global_image_id(global_image_id), m_threads(threads),
260  	  m_instance_watcher(instance_watcher),
261  	  m_cache_manager_handler(cache_manager_handler),
262  	  m_local_image_name(global_image_id),
263  	  m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
264  	      stringify(local_io_ctx.get_id()) + " " + global_image_id)),
265  	  m_progress_cxt(this),
266  	  m_journal_listener(new JournalListener(this)),
267  	  m_remote_listener(this)
268  	{
269  	  // Register asok commands using a temporary "remote_pool_name/global_image_id"
270  	  // name.  When the image name becomes known on start the asok commands will be
271  	  // re-registered using "remote_pool_name/remote_image_name" name.
272  	
273  	  m_name = admin_socket_hook_name(global_image_id);
274  	  register_admin_socket_hook();
275  	}
276  	
277  	template <typename I>
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
278  	ImageReplayer<I>::~ImageReplayer()
279  	{
280  	  unregister_admin_socket_hook();
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
281  	  ceph_assert(m_event_preprocessor == nullptr);
282  	  ceph_assert(m_replay_status_formatter == nullptr);
283  	  ceph_assert(m_local_image_ctx == nullptr);
284  	  ceph_assert(m_local_replay == nullptr);
285  	  ceph_assert(m_remote_journaler == nullptr);
286  	  ceph_assert(m_replay_handler == nullptr);
287  	  ceph_assert(m_on_start_finish == nullptr);
288  	  ceph_assert(m_on_stop_finish == nullptr);
289  	  ceph_assert(m_bootstrap_request == nullptr);
290  	  ceph_assert(m_in_flight_status_updates == 0);
291  	
292  	  delete m_journal_listener;
293  	}
294  	
295  	template <typename I>
296  	image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
297  	  std::lock_guard locker{m_lock};
298  	
299  	  if (!m_mirror_image_status_state) {
300  	    return image_replayer::HEALTH_STATE_OK;
301  	  } else if (*m_mirror_image_status_state ==
302  	               cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
303  	             *m_mirror_image_status_state ==
304  	               cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
305  	    return image_replayer::HEALTH_STATE_WARNING;
306  	  }
307  	  return image_replayer::HEALTH_STATE_ERROR;
308  	}
309  	
310  	template <typename I>
311  	void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
312  	                                librados::IoCtx &io_ctx) {
313  	  std::lock_guard locker{m_lock};
314  	  auto it = m_peers.find({peer_uuid});
315  	  if (it == m_peers.end()) {
316  	    m_peers.insert({peer_uuid, io_ctx});
317  	  }
318  	}
319  	
320  	template <typename I>
321  	void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
322  	  dout(10) << r << " " << desc << dendl;
323  	
324  	  std::lock_guard l{m_lock};
325  	  m_last_r = r;
326  	  m_state_desc = desc;
327  	}
328  	
329  	template <typename I>
330  	void ImageReplayer<I>::start(Context *on_finish, bool manual)
331  	{
332  	  dout(10) << "on_finish=" << on_finish << dendl;
333  	
334  	  int r = 0;
335  	  {
336  	    std::lock_guard locker{m_lock};
337  	    if (!is_stopped_()) {
338  	      derr << "already running" << dendl;
339  	      r = -EINVAL;
340  	    } else if (m_manual_stop && !manual) {
341  	      dout(5) << "stopped manually, ignoring start without manual flag"
342  		      << dendl;
343  	      r = -EPERM;
344  	    } else {
345  	      m_state = STATE_STARTING;
346  	      m_last_r = 0;
347  	      m_state_desc.clear();
348  	      m_manual_stop = false;
349  	      m_delete_requested = false;
350  	
351  	      if (on_finish != nullptr) {
352  	        ceph_assert(m_on_start_finish == nullptr);
353  	        m_on_start_finish = on_finish;
354  	      }
355  	      ceph_assert(m_on_stop_finish == nullptr);
356  	    }
357  	  }
358  	
359  	  if (r < 0) {
360  	    if (on_finish) {
361  	      on_finish->complete(r);
362  	    }
363  	    return;
364  	  }
365  	
366  	  prepare_local_image();
367  	}
368  	
369  	template <typename I>
370  	void ImageReplayer<I>::prepare_local_image() {
371  	  dout(10) << dendl;
372  	
373  	  m_local_image_id = "";
374  	  Context *ctx = create_context_callback<
375  	    ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
376  	  auto req = PrepareLocalImageRequest<I>::create(
377  	    m_local_io_ctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
378  	    &m_local_image_tag_owner, m_threads->work_queue, ctx);
379  	  req->send();
380  	}
381  	
382  	template <typename I>
383  	void ImageReplayer<I>::handle_prepare_local_image(int r) {
384  	  dout(10) << "r=" << r << dendl;
385  	
386  	  if (r == -ENOENT) {
387  	    dout(10) << "local image does not exist" << dendl;
388  	  } else if (r < 0) {
389  	    on_start_fail(r, "error preparing local image for replay");
390  	    return;
391  	  } else {
392  	    reregister_admin_socket_hook();
393  	  }
394  	
395  	  // local image doesn't exist or is non-primary
396  	  prepare_remote_image();
397  	}
398  	
399  	template <typename I>
400  	void ImageReplayer<I>::prepare_remote_image() {
401  	  dout(10) << dendl;
402  	  if (m_peers.empty()) {
403  	    // technically nothing to bootstrap, but it handles the status update
404  	    bootstrap();
405  	    return;
406  	  }
407  	
408  	  // TODO need to support multiple remote images
409  	  ceph_assert(!m_peers.empty());
410  	  m_remote_image = {*m_peers.begin()};
411  	
412  	  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
413  	  journal::Settings journal_settings;
414  	  journal_settings.commit_interval = cct->_conf.get_val<double>(
415  	    "rbd_mirror_journal_commit_age");
416  	
417  	  Context *ctx = create_context_callback<
418  	    ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
419  	  auto req = PrepareRemoteImageRequest<I>::create(
420  	    m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
421  	    m_local_image_id, journal_settings, m_cache_manager_handler,
422  	    &m_remote_image.mirror_uuid, &m_remote_image.image_id, &m_remote_journaler,
423  	    &m_client_state, &m_client_meta, ctx);
424  	  req->send();
425  	}
426  	
427  	template <typename I>
428  	void ImageReplayer<I>::handle_prepare_remote_image(int r) {
429  	  dout(10) << "r=" << r << dendl;
430  	
431  	  ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
432  	  if (r < 0 && !m_local_image_id.empty() &&
433  	      m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
434  	    // local image is primary -- fall-through
435  	  } else if (r == -ENOENT) {
436  	    dout(10) << "remote image does not exist" << dendl;
437  	
438  	    // TODO need to support multiple remote images
439  	    if (m_remote_image.image_id.empty() && !m_local_image_id.empty() &&
440  	        m_local_image_tag_owner == m_remote_image.mirror_uuid) {
441  	      // local image exists and is non-primary and linked to the missing
442  	      // remote image
443  	
444  	      m_delete_requested = true;
445  	      on_start_fail(0, "remote image no longer exists");
446  	    } else {
447  	      on_start_fail(-ENOENT, "remote image does not exist");
448  	    }
449  	    return;
450  	  } else if (r < 0) {
451  	    on_start_fail(r, "error retrieving remote image id");
452  	    return;
453  	  }
454  	
455  	  bootstrap();
456  	}
457  	
458  	template <typename I>
459  	void ImageReplayer<I>::bootstrap() {
460  	  dout(10) << dendl;
461  	
462  	  if (!m_local_image_id.empty() &&
463  	      m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
464  	    dout(5) << "local image is primary" << dendl;
465  	    on_start_fail(0, "local image is primary");
466  	    return;
467  	  } else if (m_peers.empty()) {
468  	    dout(5) << "no peer clusters" << dendl;
469  	    on_start_fail(-ENOENT, "no peer clusters");
470  	    return;
471  	  }
472  	
473  	  BootstrapRequest<I> *request = nullptr;
474  	  {
475  	    std::lock_guard locker{m_lock};
476  	    if (on_start_interrupted(m_lock)) {
477  	      return;
478  	    }
479  	
480  	    auto ctx = create_context_callback<
481  	      ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
482  	    request = BootstrapRequest<I>::create(
483  	      m_threads, m_local_io_ctx, m_remote_image.io_ctx, m_instance_watcher,
484  	      &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
485  	      m_global_image_id, m_local_mirror_uuid, m_remote_image.mirror_uuid,
486  	      m_remote_journaler, &m_client_state, &m_client_meta, ctx,
487  	      &m_resync_requested, &m_progress_cxt);
488  	    request->get();
489  	    m_bootstrap_request = request;
490  	  }
491  	
492  	  update_mirror_image_status(false, boost::none);
493  	  reschedule_update_status_task(10);
494  	
495  	  request->send();
496  	}
497  	
498  	template <typename I>
499  	void ImageReplayer<I>::handle_bootstrap(int r) {
500  	  dout(10) << "r=" << r << dendl;
501  	  {
502  	    std::lock_guard locker{m_lock};
503  	    m_bootstrap_request->put();
504  	    m_bootstrap_request = nullptr;
505  	    if (m_local_image_ctx) {
506  	      m_local_image_id = m_local_image_ctx->id;
507  	    }
508  	  }
509  	
510  	  if (on_start_interrupted()) {
511  	    return;
512  	  } else if (r == -EREMOTEIO) {
513  	    m_local_image_tag_owner = "";
514  	    dout(5) << "remote image is non-primary" << dendl;
515  	    on_start_fail(-EREMOTEIO, "remote image is non-primary");
516  	    return;
517  	  } else if (r == -EEXIST) {
518  	    m_local_image_tag_owner = "";
519  	    on_start_fail(r, "split-brain detected");
520  	    return;
521  	  } else if (r < 0) {
522  	    on_start_fail(r, "error bootstrapping replay");
523  	    return;
524  	  } else if (m_resync_requested) {
525  	    on_start_fail(0, "resync requested");
526  	    return;
527  	  }
528  	
529  	  ceph_assert(m_local_journal == nullptr);
530  	  {
531  	    std::shared_lock image_locker{m_local_image_ctx->image_lock};
532  	    if (m_local_image_ctx->journal != nullptr) {
533  	      m_local_journal = m_local_image_ctx->journal;
534  	      m_local_journal->add_listener(m_journal_listener);
535  	    }
536  	  }
537  	
538  	  if (m_local_journal == nullptr) {
539  	    on_start_fail(-EINVAL, "error accessing local journal");
540  	    return;
541  	  }
542  	
543  	  update_mirror_image_status(false, boost::none);
544  	  init_remote_journaler();
545  	}
546  	
547  	template <typename I>
548  	void ImageReplayer<I>::init_remote_journaler() {
549  	  dout(10) << dendl;
550  	
551  	  Context *ctx = create_context_callback<
552  	    ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
553  	  m_remote_journaler->init(ctx);
554  	}
555  	
556  	template <typename I>
557  	void ImageReplayer<I>::handle_init_remote_journaler(int r) {
558  	  dout(10) << "r=" << r << dendl;
559  	
560  	  if (on_start_interrupted()) {
561  	    return;
562  	  } else if (r < 0) {
563  	    derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
564  	    on_start_fail(r, "error initializing remote journal");
565  	    return;
566  	  }
567  	
568  	  m_remote_journaler->add_listener(&m_remote_listener);
569  	
570  	  cls::journal::Client client;
571  	  r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
572  	  if (r < 0) {
573  	    derr << "error retrieving remote journal client: " << cpp_strerror(r)
574  		 << dendl;
575  	    on_start_fail(r, "error retrieving remote journal client");
576  	    return;
577  	  }
578  	
579  	  dout(5) << "image_id=" << m_local_image_id << ", "
580  	          << "client_meta.image_id=" << m_client_meta.image_id << ", "
581  	          << "client.state=" << client.state << dendl;
582  	  if (m_client_meta.image_id == m_local_image_id &&
583  	      client.state != cls::journal::CLIENT_STATE_CONNECTED) {
584  	    dout(5) << "client flagged disconnected, stopping image replay" << dendl;
585  	    if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
586  	      m_resync_requested = true;
587  	      on_start_fail(-ENOTCONN, "disconnected: automatic resync");
588  	    } else {
589  	      on_start_fail(-ENOTCONN, "disconnected");
590  	    }
591  	    return;
592  	  }
593  	
594  	  start_replay();
595  	}
596  	
597  	template <typename I>
598  	void ImageReplayer<I>::start_replay() {
599  	  dout(10) << dendl;
600  	
601  	  Context *start_ctx = create_context_callback<
602  	    ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
603  	  m_local_journal->start_external_replay(&m_local_replay, start_ctx);
604  	}
605  	
606  	template <typename I>
607  	void ImageReplayer<I>::handle_start_replay(int r) {
608  	  dout(10) << "r=" << r << dendl;
609  	
610  	  if (r < 0) {
611  	    ceph_assert(m_local_replay == nullptr);
612  	    derr << "error starting external replay on local image "
613  		 <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
614  	    on_start_fail(r, "error starting replay on local image");
615  	    return;
616  	  }
617  	
618  	  m_replay_status_formatter =
619  	    ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
620  	
621  	  Context *on_finish(nullptr);
622  	  {
623  	    std::lock_guard locker{m_lock};
624  	    ceph_assert(m_state == STATE_STARTING);
625  	    m_state = STATE_REPLAYING;
626  	    std::swap(m_on_start_finish, on_finish);
627  	  }
628  	
629  	  m_event_preprocessor = EventPreprocessor<I>::create(
630  	    *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
631  	    &m_client_meta, m_threads->work_queue);
632  	
633  	  update_mirror_image_status(true, boost::none);
634  	  reschedule_update_status_task(30);
635  	
636  	  if (on_replay_interrupted()) {
637  	    if (on_finish != nullptr) {
638  	      on_finish->complete(r);
639  	    }
640  	    return;
641  	  }
642  	
643  	  {
644  	    CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
645  	    double poll_seconds = cct->_conf.get_val<double>(
646  	      "rbd_mirror_journal_poll_age");
647  	
648  	    std::lock_guard locker{m_lock};
649  	    m_replay_handler = new ReplayHandler<I>(this);
650  	    m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
651  	
652  	    dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
653  	  }
654  	
655  	  dout(10) << "start succeeded" << dendl;
656  	  if (on_finish != nullptr) {
657  	    dout(10) << "on finish complete, r=" << r << dendl;
658  	    on_finish->complete(r);
659  	  }
660  	}
661  	
662  	template <typename I>
663  	void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
664  	{
665  	  dout(10) << "r=" << r << dendl;
666  	  Context *ctx = new LambdaContext([this, r, desc](int _r) {
667  	      {
668  		std::lock_guard locker{m_lock};
669  	        ceph_assert(m_state == STATE_STARTING);
670  	        m_state = STATE_STOPPING;
671  	        if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
672  	          derr << "start failed: " << cpp_strerror(r) << dendl;
673  	        } else {
674  	          dout(10) << "start canceled" << dendl;
675  	        }
676  	      }
677  	
678  	      set_state_description(r, desc);
679  	      update_mirror_image_status(false, boost::none);
680  	      reschedule_update_status_task(-1);
681  	      shut_down(r);
682  	    });
683  	  m_threads->work_queue->queue(ctx, 0);
684  	}
685  	
686  	template <typename I>
687  	bool ImageReplayer<I>::on_start_interrupted() {
688  	  std::lock_guard locker{m_lock};
689  	  return on_start_interrupted(m_lock);
690  	}
691  	
692  	template <typename I>
693  	bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
694  	  ceph_assert(ceph_mutex_is_locked(m_lock));
695  	  ceph_assert(m_state == STATE_STARTING);
696  	  if (!m_stop_requested) {
697  	    return false;
698  	  }
699  	
700  	  on_start_fail(-ECANCELED, "");
701  	  return true;
702  	}
703  	
704  	template <typename I>
705  	void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
706  				    const std::string& desc)
707  	{
708  	  dout(10) << "on_finish=" << on_finish << ", manual=" << manual
709  		   << ", desc=" << desc << dendl;
710  	
711  	  image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
712  	  bool shut_down_replay = false;
713  	  bool running = true;
714  	  {
715  	    std::lock_guard locker{m_lock};
716  	
717  	    if (!is_running_()) {
718  	      running = false;
719  	    } else {
720  	      if (!is_stopped_()) {
721  		if (m_state == STATE_STARTING) {
722  		  dout(10) << "canceling start" << dendl;
723  		  if (m_bootstrap_request != nullptr) {
724  	            bootstrap_request = m_bootstrap_request;
725  	            bootstrap_request->get();
726  		  }
727  		} else {
728  		  dout(10) << "interrupting replay" << dendl;
729  		  shut_down_replay = true;
730  		}
731  	
732  	        ceph_assert(m_on_stop_finish == nullptr);
733  	        std::swap(m_on_stop_finish, on_finish);
734  	        m_stop_requested = true;
735  	        m_manual_stop = manual;
736  	      }
737  	    }
738  	  }
739  	
740  	  // avoid holding lock since bootstrap request will update status
741  	  if (bootstrap_request != nullptr) {
742  	    dout(10) << "canceling bootstrap" << dendl;
743  	    bootstrap_request->cancel();
744  	    bootstrap_request->put();
745  	  }
746  	
747  	  if (!running) {
748  	    dout(20) << "not running" << dendl;
749  	    if (on_finish) {
750  	      on_finish->complete(-EINVAL);
751  	    }
752  	    return;
753  	  }
754  	
755  	  if (shut_down_replay) {
756  	    on_stop_journal_replay(r, desc);
757  	  } else if (on_finish != nullptr) {
758  	    on_finish->complete(0);
759  	  }
760  	}
761  	
762  	template <typename I>
763  	void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
764  	{
765  	  dout(10) << dendl;
766  	
767  	  {
768  	    std::lock_guard locker{m_lock};
769  	    if (m_state != STATE_REPLAYING) {
770  	      // might be invoked multiple times while stopping
771  	      return;
772  	    }
773  	    m_stop_requested = true;
774  	    m_state = STATE_STOPPING;
775  	  }
776  	
777  	  set_state_description(r, desc);
778  	  update_mirror_image_status(true, boost::none);
779  	  reschedule_update_status_task(-1);
780  	  shut_down(0);
781  	}
782  	
783  	template <typename I>
784  	void ImageReplayer<I>::handle_replay_ready()
785  	{
786  	  dout(20) << dendl;
787  	  if (on_replay_interrupted()) {
788  	    return;
789  	  }
790  	
791  	  if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
792  	    return;
793  	  }
794  	
795  	  m_event_replay_tracker.start_op();
796  	
797  	  m_lock.lock();
798  	  bool stopping = (m_state == STATE_STOPPING);
799  	  m_lock.unlock();
800  	
801  	  if (stopping) {
802  	    dout(10) << "stopping event replay" << dendl;
803  	    m_event_replay_tracker.finish_op();
804  	    return;
805  	  }
806  	
807  	  if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
808  	    preprocess_entry();
809  	    return;
810  	  }
811  	
812  	  replay_flush();
813  	}
814  	
815  	template <typename I>
816  	void ImageReplayer<I>::restart(Context *on_finish)
817  	{
818  	  auto ctx = new LambdaContext(
819  	    [this, on_finish](int r) {
820  	      if (r < 0) {
821  		// Try start anyway.
822  	      }
823  	      start(on_finish, true);
824  	    });
825  	  stop(ctx);
826  	}
827  	
828  	template <typename I>
829  	void ImageReplayer<I>::flush()
830  	{
831  	  dout(10) << dendl;
832  	  C_SaferCond ctx;
833  	  flush_local_replay(&ctx);
834  	  ctx.wait();
835  	
836  	  update_mirror_image_status(false, boost::none);
837  	}
838  	
839  	template <typename I>
840  	void ImageReplayer<I>::flush_local_replay(Context* on_flush)
841  	{
842  	  m_lock.lock();
843  	  if (m_state != STATE_REPLAYING) {
844  	    m_lock.unlock();
845  	    on_flush->complete(0);
846  	    return;
847  	  }
848  	
849  	  dout(15) << dendl;
850  	  auto ctx = new LambdaContext(
851  	    [this, on_flush](int r) {
852  	      handle_flush_local_replay(on_flush, r);
853  	    });
854  	  m_local_replay->flush(ctx);
855  	  m_lock.unlock();
856  	}
857  	
858  	template <typename I>
859  	void ImageReplayer<I>::handle_flush_local_replay(Context* on_flush, int r)
860  	{
861  	  dout(15) << "r=" << r << dendl;
862  	  if (r < 0) {
863  	    derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
864  	    on_flush->complete(r);
865  	    return;
866  	  }
867  	
868  	  flush_commit_position(on_flush);
869  	}
870  	
871  	template <typename I>
872  	void ImageReplayer<I>::flush_commit_position(Context* on_flush)
873  	{
874  	  m_lock.lock();
875  	  if (m_state != STATE_REPLAYING) {
876  	    m_lock.unlock();
877  	    on_flush->complete(0);
878  	    return;
879  	  }
880  	
881  	  dout(15) << dendl;
882  	  auto ctx = new LambdaContext(
883  	    [this, on_flush](int r) {
884  	      handle_flush_commit_position(on_flush, r);
885  	    });
886  	  m_remote_journaler->flush_commit_position(ctx);
887  	  m_lock.unlock();
888  	}
889  	
890  	template <typename I>
891  	void ImageReplayer<I>::handle_flush_commit_position(Context* on_flush, int r)
892  	{
893  	  dout(15) << "r=" << r << dendl;
894  	  if (r < 0) {
895  	    derr << "error flushing remote journal commit position: "
896  		 << cpp_strerror(r) << dendl;
897  	  }
898  	
899  	  on_flush->complete(r);
900  	}
901  	
902  	template <typename I>
903  	bool ImageReplayer<I>::on_replay_interrupted()
904  	{
905  	  bool shut_down;
906  	  {
907  	    std::lock_guard locker{m_lock};
908  	    shut_down = m_stop_requested;
909  	  }
910  	
911  	  if (shut_down) {
912  	    on_stop_journal_replay();
913  	  }
914  	  return shut_down;
915  	}
916  	
917  	template <typename I>
918  	void ImageReplayer<I>::print_status(Formatter *f)
919  	{
920  	  dout(10) << dendl;
921  	
922  	  std::lock_guard l{m_lock};
923  	
924  	  f->open_object_section("image_replayer");
925  	  f->dump_string("name", m_name);
926  	  f->dump_string("state", to_string(m_state));
927  	  f->close_section();
928  	}
929  	
930  	template <typename I>
931  	void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
932  	{
933  	  dout(10) << "r=" << r << dendl;
934  	  if (r < 0) {
935  	    derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
936  	  }
937  	
938  	  {
939  	    std::lock_guard locker{m_lock};
940  	    m_stop_requested = true;
941  	  }
942  	  on_stop_journal_replay(r, error_desc);
943  	}
944  	
945  	template <typename I>
946  	void ImageReplayer<I>::replay_flush() {
947  	  dout(10) << dendl;
948  	
949  	  bool interrupted = false;
950  	  {
951  	    std::lock_guard locker{m_lock};
952  	    if (m_state != STATE_REPLAYING) {
953  	      dout(10) << "replay interrupted" << dendl;
954  	      interrupted = true;
955  	    } else {
956  	      m_state = STATE_REPLAY_FLUSHING;
957  	    }
958  	  }
959  	
960  	  if (interrupted) {
961  	    m_event_replay_tracker.finish_op();
962  	    return;
963  	  }
964  	
965  	  // shut down the replay to flush all IO and ops and create a new
966  	  // replayer to handle the new tag epoch
967  	  Context *ctx = create_context_callback<
968  	    ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
969  	  ctx = new LambdaContext([this, ctx](int r) {
970  	      m_local_image_ctx->journal->stop_external_replay();
971  	      m_local_replay = nullptr;
972  	
973  	      if (r < 0) {
974  	        ctx->complete(r);
975  	        return;
976  	      }
977  	
978  	      m_local_journal->start_external_replay(&m_local_replay, ctx);
979  	    });
980  	  m_local_replay->shut_down(false, ctx);
981  	}
982  	
983  	template <typename I>
984  	void ImageReplayer<I>::handle_replay_flush(int r) {
985  	  dout(10) << "r=" << r << dendl;
986  	
987  	  {
988  	    std::lock_guard locker{m_lock};
989  	    ceph_assert(m_state == STATE_REPLAY_FLUSHING);
990  	    m_state = STATE_REPLAYING;
991  	  }
992  	
993  	  if (r < 0) {
994  	    derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
995  	    m_event_replay_tracker.finish_op();
996  	    handle_replay_complete(r, "replay flush encountered an error");
997  	    return;
998  	  } else if (on_replay_interrupted()) {
999  	    m_event_replay_tracker.finish_op();
1000 	    return;
1001 	  }
1002 	
1003 	  get_remote_tag();
1004 	}
1005 	
1006 	template <typename I>
1007 	void ImageReplayer<I>::get_remote_tag() {
1008 	  dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
1009 	
1010 	  Context *ctx = create_context_callback<
1011 	    ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1012 	  m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1013 	}
1014 	
1015 	template <typename I>
1016 	void ImageReplayer<I>::handle_get_remote_tag(int r) {
1017 	  dout(15) << "r=" << r << dendl;
1018 	
1019 	  if (r == 0) {
1020 	    try {
1021 	      auto it = m_replay_tag.data.cbegin();
1022 	      decode(m_replay_tag_data, it);
1023 	    } catch (const buffer::error &err) {
1024 	      r = -EBADMSG;
1025 	    }
1026 	  }
1027 	
1028 	  if (r < 0) {
1029 	    derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1030 	         << cpp_strerror(r) << dendl;
1031 	    m_event_replay_tracker.finish_op();
1032 	    handle_replay_complete(r, "failed to retrieve remote tag");
1033 	    return;
1034 	  }
1035 	
1036 	  m_replay_tag_valid = true;
1037 	  dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
1038 	           << m_replay_tag_data << dendl;
1039 	
1040 	  allocate_local_tag();
1041 	}
1042 	
1043 	template <typename I>
1044 	void ImageReplayer<I>::allocate_local_tag() {
1045 	  dout(15) << dendl;
1046 	
1047 	  std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1048 	  if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1049 	    mirror_uuid = m_remote_image.mirror_uuid;
1050 	  } else if (mirror_uuid == m_local_mirror_uuid) {
1051 	    mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1052 	  } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1053 	    // handle possible edge condition where daemon can failover and
1054 	    // the local image has already been promoted/demoted
1055 	    auto local_tag_data = m_local_journal->get_tag_data();
1056 	    if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1057 	        (local_tag_data.predecessor.commit_valid &&
1058 	         local_tag_data.predecessor.mirror_uuid ==
1059 	           librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1060 	      dout(15) << "skipping stale demotion event" << dendl;
1061 	      handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
1062 	      handle_replay_ready();
1063 	      return;
1064 	    } else {
1065 	      dout(5) << "encountered image demotion: stopping" << dendl;
1066 	      std::lock_guard locker{m_lock};
1067 	      m_stop_requested = true;
1068 	    }
1069 	  }
1070 	
1071 	  librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1072 	  if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1073 	    predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1074 	  } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1075 	    predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1076 	  }
1077 	
1078 	  dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1079 	           << "predecessor=" << predecessor << ", "
1080 	           << "replay_tag_tid=" << m_replay_tag_tid << dendl;
1081 	  Context *ctx = create_context_callback<
1082 	    ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1083 	  m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1084 	}
1085 	
1086 	template <typename I>
1087 	void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1088 	  dout(15) << "r=" << r << ", "
1089 	           << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
1090 	
1091 	  if (r < 0) {
1092 	    derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1093 	    m_event_replay_tracker.finish_op();
1094 	    handle_replay_complete(r, "failed to allocate journal tag");
1095 	    return;
1096 	  }
1097 	
1098 	  preprocess_entry();
1099 	}
1100 	
1101 	template <typename I>
1102 	void ImageReplayer<I>::preprocess_entry() {
1103 	  dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1104 	           << dendl;
1105 	
1106 	  bufferlist data = m_replay_entry.get_data();
1107 	  auto it = data.cbegin();
1108 	  int r = m_local_replay->decode(&it, &m_event_entry);
1109 	  if (r < 0) {
1110 	    derr << "failed to decode journal event" << dendl;
1111 	    m_event_replay_tracker.finish_op();
1112 	    handle_replay_complete(r, "failed to decode journal event");
1113 	    return;
1114 	  }
1115 	
1116 	  uint32_t delay = calculate_replay_delay(
1117 	    m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1118 	  if (delay == 0) {
1119 	    handle_preprocess_entry_ready(0);
1120 	    return;
1121 	  }
1122 	
1123 	  dout(20) << "delaying replay by " << delay << " sec" << dendl;
1124 	
1125 	  std::lock_guard timer_locker{m_threads->timer_lock};
1126 	  ceph_assert(m_delayed_preprocess_task == nullptr);
1127 	  m_delayed_preprocess_task = new LambdaContext(
1128 	    [this](int r) {
1129 	      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
1130 	      m_delayed_preprocess_task = nullptr;
1131 	      m_threads->work_queue->queue(
1132 	        create_context_callback<ImageReplayer,
1133 	        &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1134 	    });
1135 	  m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1136 	}
1137 	
1138 	template <typename I>
1139 	void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1140 	  dout(20) << "r=" << r << dendl;
1141 	  ceph_assert(r == 0);
1142 	
1143 	  m_replay_start_time = ceph_clock_now();
1144 	  if (!m_event_preprocessor->is_required(m_event_entry)) {
1145 	    process_entry();
1146 	    return;
1147 	  }
1148 	
1149 	  Context *ctx = create_context_callback<
1150 	    ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1151 	  m_event_preprocessor->preprocess(&m_event_entry, ctx);
1152 	}
1153 	
1154 	template <typename I>
1155 	void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1156 	  dout(20) << "r=" << r << dendl;
1157 	
1158 	  if (r < 0) {
1159 	    m_event_replay_tracker.finish_op();
1160 	
1161 	    if (r == -ECANCELED) {
1162 	      handle_replay_complete(0, "lost exclusive lock");
1163 	    } else {
1164 	      derr << "failed to preprocess journal event" << dendl;
1165 	      handle_replay_complete(r, "failed to preprocess journal event");
1166 	    }
1167 	    return;
1168 	  }
1169 	
1170 	  process_entry();
1171 	}
1172 	
1173 	template <typename I>
1174 	void ImageReplayer<I>::process_entry() {
1175 	  dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1176 	           << dendl;
1177 	
1178 	  // stop replaying events if stop has been requested
1179 	  if (on_replay_interrupted()) {
1180 	    m_event_replay_tracker.finish_op();
1181 	    return;
1182 	  }
1183 	
1184 	  Context *on_ready = create_context_callback<
1185 	    ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1186 	  Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1187 	                                             m_replay_start_time);
1188 	
1189 	  m_local_replay->process(m_event_entry, on_ready, on_commit);
1190 	}
1191 	
1192 	template <typename I>
1193 	void ImageReplayer<I>::handle_process_entry_ready(int r) {
1194 	  dout(20) << dendl;
1195 	  ceph_assert(r == 0);
1196 	
1197 	  bool update_status = false;
1198 	  {
1199 	    std::shared_lock image_locker{m_local_image_ctx->image_lock};
1200 	    if (m_local_image_name != m_local_image_ctx->name) {
1201 	      m_local_image_name = m_local_image_ctx->name;
1202 	      update_status = true;
1203 	    }
1204 	  }
1205 	
1206 	  if (update_status) {
1207 	    reschedule_update_status_task(0);
1208 	  }
1209 	
1210 	  // attempt to process the next event
1211 	  handle_replay_ready();
1212 	}
1213 	
1214 	template <typename I>
1215 	void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
1216 	                                                 const utime_t &replay_start_time,
1217 	                                                 int r) {
1218 	  dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1219 		   << dendl;
1220 	
1221 	  if (r < 0) {
1222 	    derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1223 	    handle_replay_complete(r, "failed to commit journal event");
1224 	  } else {
1225 	    ceph_assert(m_remote_journaler != nullptr);
1226 	    m_remote_journaler->committed(replay_entry);
1227 	  }
1228 	
1229 	  auto bytes = replay_entry.get_data().length();
1230 	  auto latency = ceph_clock_now() - replay_start_time;
1231 	
1232 	  if (g_perf_counters) {
1233 	    g_perf_counters->inc(l_rbd_mirror_replay);
1234 	    g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1235 	    g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1236 	  }
1237 	
1238 	  auto ctx = new LambdaContext(
1239 	    [this, bytes, latency](int r) {
1240 	      std::lock_guard locker{m_lock};
1241 	      if (m_perf_counters) {
1242 	        m_perf_counters->inc(l_rbd_mirror_replay);
1243 	        m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1244 	        m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1245 	      }
1246 	      m_event_replay_tracker.finish_op();
1247 	    });
1248 	  m_threads->work_queue->queue(ctx, 0);
1249 	}
1250 	
1251 	template <typename I>
1252 	bool ImageReplayer<I>::update_mirror_image_status(bool force,
1253 	                                                  const OptionalState &state) {
1254 	  dout(15) << dendl;
1255 	  {
1256 	    std::lock_guard locker{m_lock};
1257 	    if (!start_mirror_image_status_update(force, false)) {
1258 	      return false;
1259 	    }
1260 	  }
1261 	
1262 	  queue_mirror_image_status_update(state);
1263 	  return true;
1264 	}
1265 	
1266 	template <typename I>
1267 	bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1268 	                                                        bool restarting) {
1269 	  ceph_assert(ceph_mutex_is_locked(m_lock));
1270 	
1271 	  if (!force && !is_stopped_()) {
1272 	    if (!is_running_()) {
1273 	      dout(15) << "shut down in-progress: ignoring update" << dendl;
1274 	      return false;
1275 	    } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1276 	      dout(15) << "already sending update" << dendl;
1277 	      m_update_status_requested = true;
1278 	      return false;
1279 	    }
1280 	  }
1281 	
1282 	  ++m_in_flight_status_updates;
1283 	  dout(15) << "in-flight updates=" << m_in_flight_status_updates << dendl;
1284 	  return true;
1285 	}
1286 	
1287 	template <typename I>
1288 	void ImageReplayer<I>::finish_mirror_image_status_update() {
1289 	  reregister_admin_socket_hook();
1290 	
1291 	  Context *on_finish = nullptr;
1292 	  {
1293 	    std::lock_guard locker{m_lock};
1294 	    ceph_assert(m_in_flight_status_updates > 0);
1295 	    if (--m_in_flight_status_updates > 0) {
1296 	      dout(15) << "waiting on " << m_in_flight_status_updates << " in-flight "
1297 	               << "updates" << dendl;
1298 	      return;
1299 	    }
1300 	
1301 	    std::swap(on_finish, m_on_update_status_finish);
1302 	  }
1303 	
1304 	  dout(15) << dendl;
1305 	  if (on_finish != nullptr) {
1306 	    on_finish->complete(0);
1307 	  }
1308 	}
1309 	
1310 	template <typename I>
1311 	void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1312 	  dout(15) << dendl;
1313 	
1314 	  auto ctx = new LambdaContext(
1315 	    [this, state](int r) {
1316 	      send_mirror_status_update(state);
1317 	    });
1318 	
1319 	  // ensure pending IO is flushed and the commit position is updated
1320 	  // prior to updating the mirror status
1321 	  auto ctx2 = new LambdaContext(
1322 	    [this, ctx=std::move(ctx)](int r) {
1323 	      flush_local_replay(ctx);
1324 	    });
1325 	  m_threads->work_queue->queue(ctx2, 0);
1326 	}
1327 	
1328 	template <typename I>
1329 	void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1330 	  State state;
1331 	  std::string state_desc;
1332 	  int last_r;
1333 	  bool stopping_replay;
1334 	
1335 	  OptionalMirrorImageStatusState mirror_image_status_state =
1336 	    boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1337 	  image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1338 	  {
1339 	    std::lock_guard locker{m_lock};
1340 	    state = m_state;
1341 	    state_desc = m_state_desc;
1342 	    mirror_image_status_state = m_mirror_image_status_state;
1343 	    last_r = m_last_r;
1344 	    stopping_replay = (m_local_image_ctx != nullptr);
1345 	
1346 	    if (m_bootstrap_request != nullptr) {
1347 	      bootstrap_request = m_bootstrap_request;
1348 	      bootstrap_request->get();
1349 	    }
1350 	  }
1351 	
1352 	  bool syncing = false;
1353 	  if (bootstrap_request != nullptr) {
1354 	    syncing = bootstrap_request->is_syncing();
1355 	    bootstrap_request->put();
1356 	    bootstrap_request = nullptr;
1357 	  }
1358 	
1359 	  if (opt_state) {
1360 	    state = *opt_state;
1361 	  }
1362 	
1363 	  cls::rbd::MirrorImageStatus status;
1364 	  status.up = true;
1365 	  switch (state) {
1366 	  case STATE_STARTING:
1367 	    if (syncing) {
1368 	      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1369 	      status.description = state_desc.empty() ? "syncing" : state_desc;
1370 	      mirror_image_status_state = status.state;
1371 	    } else {
1372 	      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1373 	      status.description = "starting replay";
1374 	    }
1375 	    break;
1376 	  case STATE_REPLAYING:
1377 	  case STATE_REPLAY_FLUSHING:
1378 	    status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1379 	    {
1380 	      Context *on_req_finish = new LambdaContext(
1381 	        [this](int r) {
1382 	          dout(15) << "replay status ready: r=" << r << dendl;
1383 	          if (r >= 0) {
1384 	            send_mirror_status_update(boost::none);
1385 	          } else if (r == -EAGAIN) {
1386 	            // decrement in-flight status update counter
1387 	            handle_mirror_status_update(r);
1388 	          }
1389 	        });
1390 	
1391 	      std::string desc;
1392 	      ceph_assert(m_replay_status_formatter != nullptr);
1393 	      if (!m_replay_status_formatter->get_or_send_update(&desc,
1394 	                                                         on_req_finish)) {
1395 	        dout(15) << "waiting for replay status" << dendl;
1396 	        return;
1397 	      }
1398 	      status.description = "replaying, " + desc;
1399 	      mirror_image_status_state = boost::make_optional(
1400 	        false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1401 	    }
1402 	    break;
1403 	  case STATE_STOPPING:
1404 	    if (stopping_replay) {
1405 	      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1406 	      status.description = state_desc.empty() ? "stopping replay" : state_desc;
1407 	      break;
1408 	    }
1409 	    // FALLTHROUGH
1410 	  case STATE_STOPPED:
1411 	    if (last_r == -EREMOTEIO) {
1412 	      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1413 	      status.description = state_desc;
1414 	      mirror_image_status_state = status.state;
1415 	    } else if (last_r < 0) {
1416 	      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1417 	      status.description = state_desc;
1418 	      mirror_image_status_state = status.state;
1419 	    } else {
1420 	      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1421 	      status.description = state_desc.empty() ? "stopped" : state_desc;
1422 	      mirror_image_status_state = boost::none;
1423 	    }
1424 	    break;
1425 	  default:
1426 	    ceph_assert(!"invalid state");
1427 	  }
1428 	
1429 	  {
1430 	    std::lock_guard locker{m_lock};
1431 	    m_mirror_image_status_state = mirror_image_status_state;
1432 	  }
1433 	
1434 	  // prevent the status from ping-ponging when failed replays are restarted
1435 	  if (mirror_image_status_state &&
1436 	      *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1437 	    status.state = *mirror_image_status_state;
1438 	  }
1439 	
1440 	  dout(15) << "status=" << status << dendl;
1441 	  librados::ObjectWriteOperation op;
1442 	  librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1443 	
1444 	  librados::AioCompletion *aio_comp = create_rados_callback<
1445 	    ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1446 	  int r = m_local_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1447 	  ceph_assert(r == 0);
1448 	  aio_comp->release();
1449 	}
1450 	
1451 	template <typename I>
1452 	void ImageReplayer<I>::handle_mirror_status_update(int r) {
1453 	  dout(15) << "r=" << r << dendl;
1454 	
1455 	  bool running = false;
1456 	  bool started = false;
1457 	  {
1458 	    std::lock_guard locker{m_lock};
1459 	    bool update_status_requested = false;
1460 	    std::swap(update_status_requested, m_update_status_requested);
1461 	
1462 	    running = is_running_();
1463 	    if (running && update_status_requested) {
1464 	      started = start_mirror_image_status_update(false, true);
1465 	    }
1466 	  }
1467 	
1468 	  // if a deferred update is available, send it -- otherwise reschedule
1469 	  // the timer task
1470 	  if (started) {
1471 	    queue_mirror_image_status_update(boost::none);
1472 	  } else if (running) {
1473 	    reschedule_update_status_task(0);
1474 	  }
1475 	
1476 	  // mark committed status update as no longer in-flight
1477 	  finish_mirror_image_status_update();
1478 	}
1479 	
1480 	template <typename I>
1481 	void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1482 	  bool canceled_task = false;
1483 	  {
1484 	    std::lock_guard locker{m_lock};
1485 	    std::lock_guard timer_locker{m_threads->timer_lock};
1486 	
1487 	    if (m_update_status_task) {
1488 	      dout(15) << "canceling existing status update task" << dendl;
1489 	
1490 	      canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1491 	      m_update_status_task = nullptr;
1492 	    }
1493 	
1494 	    if (new_interval > 0) {
1495 	      m_update_status_interval = new_interval;
1496 	    }
1497 	
1498 	    if (new_interval >= 0 && is_running_() &&
1499 	        start_mirror_image_status_update(true, false)) {
1500 	      m_update_status_task = new LambdaContext(
1501 	        [this](int r) {
1502 	          ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
1503 	          m_update_status_task = nullptr;
1504 	
1505 	          queue_mirror_image_status_update(boost::none);
1506 	        });
1507 	      dout(15) << "scheduling status update task after "
1508 	               << m_update_status_interval << " seconds" << dendl;
1509 	      m_threads->timer->add_event_after(m_update_status_interval,
1510 	                                        m_update_status_task);
1511 	    }
1512 	  }
1513 	
1514 	  if (canceled_task) {
1515 	    // decrement in-flight status update counter for canceled task
1516 	    finish_mirror_image_status_update();
1517 	  }
1518 	}
1519 	
1520 	template <typename I>
1521 	void ImageReplayer<I>::shut_down(int r) {
1522 	  dout(10) << "r=" << r << dendl;
1523 	
1524 	  bool canceled_delayed_preprocess_task = false;
1525 	  {
1526 	    std::lock_guard timer_locker{m_threads->timer_lock};
1527 	    if (m_delayed_preprocess_task != nullptr) {
1528 	      canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1529 	        m_delayed_preprocess_task);
1530 	      ceph_assert(canceled_delayed_preprocess_task);
1531 	      m_delayed_preprocess_task = nullptr;
1532 	    }
1533 	  }
1534 	  if (canceled_delayed_preprocess_task) {
1535 	    // wake up sleeping replay
1536 	    m_event_replay_tracker.finish_op();
1537 	  }
1538 	
1539 	  reschedule_update_status_task(-1);
1540 	
1541 	  {
1542 	    std::lock_guard locker{m_lock};
1543 	    ceph_assert(m_state == STATE_STOPPING);
1544 	
1545 	    // if status updates are in-flight, wait for them to complete
1546 	    // before proceeding
1547 	    if (m_in_flight_status_updates > 0) {
1548 	      if (m_on_update_status_finish == nullptr) {
1549 	        dout(15) << "waiting for in-flight status update" << dendl;
1550 	        m_on_update_status_finish = new LambdaContext(
1551 	          [this, r](int _r) {
1552 	            shut_down(r);
1553 	          });
1554 	      }
1555 	      return;
1556 	    }
1557 	  }
1558 	
1559 	  // NOTE: it's important to ensure that the local image is fully
1560 	  // closed before attempting to close the remote journal in
1561 	  // case the remote cluster is unreachable
1562 	
1563 	  // chain the shut down sequence (reverse order)
1564 	  Context *ctx = new LambdaContext(
1565 	    [this, r](int _r) {
1566 	      update_mirror_image_status(true, STATE_STOPPED);
1567 	      handle_shut_down(r);
1568 	    });
1569 	
1570 	  // close the remote journal
1571 	  if (m_remote_journaler != nullptr) {
1572 	    ctx = new LambdaContext([this, ctx](int r) {
1573 	        delete m_remote_journaler;
1574 	        m_remote_journaler = nullptr;
1575 	        ctx->complete(0);
1576 	      });
1577 	    ctx = new LambdaContext([this, ctx](int r) {
1578 		m_remote_journaler->remove_listener(&m_remote_listener);
1579 	        m_remote_journaler->shut_down(ctx);
1580 	      });
1581 	  }
1582 	
1583 	  // stop the replay of remote journal events
1584 	  if (m_replay_handler != nullptr) {
1585 	    ctx = new LambdaContext([this, ctx](int r) {
1586 	        delete m_replay_handler;
1587 	        m_replay_handler = nullptr;
1588 	
1589 	        m_event_replay_tracker.wait_for_ops(ctx);
1590 	      });
1591 	    ctx = new LambdaContext([this, ctx](int r) {
1592 	        m_remote_journaler->stop_replay(ctx);
1593 	      });
1594 	  }
1595 	
1596 	  // close the local image (release exclusive lock)
1597 	  if (m_local_image_ctx) {
1598 	    ctx = new LambdaContext([this, ctx](int r) {
1599 	      CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1600 	        &m_local_image_ctx, ctx);
1601 	      request->send();
1602 	    });
1603 	  }
1604 	
1605 	  // shut down event replay into the local image
1606 	  if (m_local_journal != nullptr) {
1607 	    ctx = new LambdaContext([this, ctx](int r) {
1608 	        m_local_journal = nullptr;
1609 	        ctx->complete(0);
1610 	      });
1611 	    if (m_local_replay != nullptr) {
1612 	      ctx = new LambdaContext([this, ctx](int r) {
1613 	          m_local_journal->stop_external_replay();
1614 	          m_local_replay = nullptr;
1615 	
1616 	          EventPreprocessor<I>::destroy(m_event_preprocessor);
1617 	          m_event_preprocessor = nullptr;
1618 	          ctx->complete(0);
1619 	        });
1620 	    }
1621 	    ctx = new LambdaContext([this, ctx](int r) {
1622 	        // blocks if listener notification is in-progress
1623 	        m_local_journal->remove_listener(m_journal_listener);
1624 	        ctx->complete(0);
1625 	      });
1626 	  }
1627 	
1628 	  // wait for all local in-flight replay events to complete
1629 	  ctx = new LambdaContext([this, ctx](int r) {
1630 	      if (r < 0) {
1631 	        derr << "error shutting down journal replay: " << cpp_strerror(r)
1632 	             << dendl;
1633 	      }
1634 	
1635 	      m_event_replay_tracker.wait_for_ops(ctx);
1636 	    });
1637 	
1638 	  // flush any local in-flight replay events
1639 	  if (m_local_replay != nullptr) {
1640 	    ctx = new LambdaContext([this, ctx](int r) {
1641 	        m_local_replay->shut_down(true, ctx);
1642 	      });
1643 	  }
1644 	
1645 	  m_threads->work_queue->queue(ctx, 0);
1646 	}
1647 	
1648 	template <typename I>
1649 	void ImageReplayer<I>::handle_shut_down(int r) {
1650 	  reschedule_update_status_task(-1);
1651 	
1652 	  bool resync_requested = false;
1653 	  bool delete_requested = false;
1654 	  bool unregister_asok_hook = false;
1655 	  {
1656 	    std::lock_guard locker{m_lock};
1657 	
1658 	    // if status updates are in-flight, wait for them to complete
1659 	    // before proceeding
1660 	    if (m_in_flight_status_updates > 0) {
1661 	      if (m_on_update_status_finish == nullptr) {
1662 	        dout(15) << "waiting for in-flight status update" << dendl;
1663 	        m_on_update_status_finish = new LambdaContext(
1664 	          [this, r](int _r) {
1665 	            handle_shut_down(r);
1666 	          });
1667 	      }
1668 	      return;
1669 	    }
1670 	
1671 	    if (m_delete_requested && !m_local_image_id.empty()) {
1672 	      ceph_assert(m_remote_image.image_id.empty());
1673 	      dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1674 	      unregister_asok_hook = true;
1675 	      std::swap(delete_requested, m_delete_requested);
1676 	    }
1677 	
1678 	    std::swap(resync_requested, m_resync_requested);
1679 	    if (delete_requested || resync_requested) {
1680 	      m_local_image_id = "";
1681 	    } else if (m_last_r == -ENOENT &&
1682 	               m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1683 	      dout(0) << "mirror image no longer exists" << dendl;
1684 	      unregister_asok_hook = true;
1685 	      m_finished = true;
1686 	    }
1687 	  }
1688 	
1689 	  if (unregister_asok_hook) {
1690 	    unregister_admin_socket_hook();
1691 	  }
1692 	
1693 	  if (delete_requested || resync_requested) {
1694 	    dout(5) << "moving image to trash" << dendl;
1695 	    auto ctx = new LambdaContext([this, r](int) {
1696 	      handle_shut_down(r);
1697 	    });
1698 	    ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
1699 	                                resync_requested, m_threads->work_queue, ctx);
1700 	    return;
1701 	  }
1702 	
1703 	  dout(10) << "stop complete" << dendl;
1704 	  ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1705 	  m_replay_status_formatter = nullptr;
1706 	
1707 	  Context *on_start = nullptr;
1708 	  Context *on_stop = nullptr;
1709 	  {
1710 	    std::lock_guard locker{m_lock};
1711 	    std::swap(on_start, m_on_start_finish);
1712 	    std::swap(on_stop, m_on_stop_finish);
1713 	    m_stop_requested = false;
1714 	    ceph_assert(m_delayed_preprocess_task == nullptr);
1715 	    ceph_assert(m_state == STATE_STOPPING);
1716 	    m_state = STATE_STOPPED;
1717 	  }
1718 	
1719 	  if (on_start != nullptr) {
1720 	    dout(10) << "on start finish complete, r=" << r << dendl;
1721 	    on_start->complete(r);
1722 	    r = 0;
1723 	  }
1724 	  if (on_stop != nullptr) {
1725 	    dout(10) << "on stop finish complete, r=" << r << dendl;
1726 	    on_stop->complete(r);
1727 	  }
1728 	}
1729 	
1730 	template <typename I>
1731 	void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1732 	  dout(20) << dendl;
1733 	
1734 	  cls::journal::Client client;
1735 	  {
1736 	    std::lock_guard locker{m_lock};
1737 	    if (!is_running_()) {
1738 	      return;
1739 	    }
1740 	
1741 	    int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1742 	    if (r < 0) {
1743 	      derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1744 	      return;
1745 	    }
1746 	  }
1747 	
1748 	  if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1749 	    dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1750 	    stop(nullptr, false, -ENOTCONN, "disconnected");
1751 	  }
1752 	}
1753 	
1754 	template <typename I>
1755 	std::string ImageReplayer<I>::to_string(const State state) {
1756 	  switch (state) {
1757 	  case ImageReplayer<I>::STATE_STARTING:
1758 	    return "Starting";
1759 	  case ImageReplayer<I>::STATE_REPLAYING:
1760 	    return "Replaying";
1761 	  case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1762 	    return "ReplayFlushing";
1763 	  case ImageReplayer<I>::STATE_STOPPING:
1764 	    return "Stopping";
1765 	  case ImageReplayer<I>::STATE_STOPPED:
1766 	    return "Stopped";
1767 	  default:
1768 	    break;
1769 	  }
1770 	  return "Unknown(" + stringify(state) + ")";
1771 	}
1772 	
1773 	template <typename I>
1774 	void ImageReplayer<I>::resync_image(Context *on_finish) {
1775 	  dout(10) << dendl;
1776 	
1777 	  m_resync_requested = true;
1778 	  stop(on_finish);
1779 	}
1780 	
1781 	template <typename I>
1782 	void ImageReplayer<I>::register_admin_socket_hook() {
1783 	  ImageReplayerAdminSocketHook<I> *asok_hook;
1784 	  {
1785 	    std::lock_guard locker{m_lock};
1786 	    if (m_asok_hook != nullptr) {
1787 	      return;
1788 	    }
1789 	
1790 	    ceph_assert(m_perf_counters == nullptr);
1791 	
1792 	    dout(15) << "registered asok hook: " << m_name << dendl;
1793 	    asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1794 	                                                    this);
1795 	    int r = asok_hook->register_commands();
1796 	    if (r == 0) {
1797 	      m_asok_hook = asok_hook;
1798 	
1799 	      CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
1800 	      auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_perf_stats_prio");
1801 	      PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_" + m_name,
1802 	                              l_rbd_mirror_first, l_rbd_mirror_last);
1803 	      plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1804 	      plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1805 	                          "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1806 	      plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1807 	                       "Replay latency", "rl", prio);
1808 	      m_perf_counters = plb.create_perf_counters();
1809 	      g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1810 	
1811 	      return;
1812 	    }
1813 	    derr << "error registering admin socket commands" << dendl;
1814 	  }
1815 	  delete asok_hook;
1816 	}
1817 	
1818 	template <typename I>
1819 	void ImageReplayer<I>::unregister_admin_socket_hook() {
1820 	  dout(15) << dendl;
1821 	
1822 	  AdminSocketHook *asok_hook = nullptr;
1823 	  PerfCounters *perf_counters = nullptr;
1824 	  {
1825 	    std::lock_guard locker{m_lock};
1826 	    std::swap(asok_hook, m_asok_hook);
1827 	    std::swap(perf_counters, m_perf_counters);
1828 	  }
1829 	  delete asok_hook;
1830 	  if (perf_counters != nullptr) {
1831 	    g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1832 	    delete perf_counters;
1833 	  }
1834 	}
1835 	
1836 	template <typename I>
1837 	void ImageReplayer<I>::reregister_admin_socket_hook() {
1838 	  {
1839 	    std::lock_guard locker{m_lock};
1840 	
1841 	    auto name = admin_socket_hook_name(m_local_image_name);
1842 	    if (m_asok_hook != nullptr && m_name == name) {
1843 	      return;
1844 	    }
1845 	    m_name = name;
1846 	  }
1847 	  unregister_admin_socket_hook();
1848 	  register_admin_socket_hook();
1849 	}
1850 	
1851 	template <typename I>
1852 	std::string ImageReplayer<I>::admin_socket_hook_name(
1853 	    const std::string &image_name) const {
1854 	  std::string name = m_local_io_ctx.get_namespace();
1855 	  if (!name.empty()) {
1856 	    name += "/";
1857 	  }
1858 	
1859 	  return m_local_io_ctx.get_pool_name() + "/" + name + image_name;
1860 	}
1861 	
1862 	template <typename I>
1863 	std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1864 	{
1865 	  os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1866 	     << "/" << replayer.get_global_image_id() << "]";
1867 	  return os;
1868 	}
1869 	
1870 	} // namespace mirror
1871 	} // namespace rbd
1872 	
1873 	template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
1874