1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include <boost/range/adaptor/map.hpp>
5    	
6    	#include "common/Formatter.h"
7    	#include "common/PriorityCache.h"
8    	#include "common/admin_socket.h"
9    	#include "common/debug.h"
10   	#include "common/errno.h"
11   	#include "journal/Types.h"
12   	#include "librbd/ImageCtx.h"
13   	#include "perfglue/heap_profiler.h"
14   	#include "Mirror.h"
15   	#include "ServiceDaemon.h"
16   	#include "Threads.h"
17   	
18   	#define dout_context g_ceph_context
19   	#define dout_subsys ceph_subsys_rbd_mirror
20   	
21   	using std::list;
22   	using std::map;
23   	using std::set;
24   	using std::string;
25   	using std::unique_ptr;
26   	using std::vector;
27   	
28   	using librados::Rados;
29   	using librados::IoCtx;
30   	using librbd::mirror_peer_t;
31   	
32   	namespace rbd {
33   	namespace mirror {
34   	
35   	namespace {
36   	
37   	class MirrorAdminSocketCommand {
38   	public:
39   	  virtual ~MirrorAdminSocketCommand() {}
40   	  virtual int call(Formatter *f) = 0;
41   	};
42   	
43   	class StatusCommand : public MirrorAdminSocketCommand {
44   	public:
45   	  explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46   	
47   	  int call(Formatter *f) override {
48   	    mirror->print_status(f);
49   	    return 0;
50   	  }
51   	
52   	private:
53   	  Mirror *mirror;
54   	};
55   	
56   	class StartCommand : public MirrorAdminSocketCommand {
57   	public:
58   	  explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59   	
60   	  int call(Formatter *f) override {
61   	    mirror->start();
62   	    return 0;
63   	  }
64   	
65   	private:
66   	  Mirror *mirror;
67   	};
68   	
69   	class StopCommand : public MirrorAdminSocketCommand {
70   	public:
71   	  explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72   	
73   	  int call(Formatter *f) override {
74   	    mirror->stop();
75   	    return 0;
76   	  }
77   	
78   	private:
79   	  Mirror *mirror;
80   	};
81   	
82   	class RestartCommand : public MirrorAdminSocketCommand {
83   	public:
84   	  explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85   	
86   	  int call(Formatter *f) override {
87   	    mirror->restart();
88   	    return 0;
89   	  }
90   	
91   	private:
92   	  Mirror *mirror;
93   	};
94   	
95   	class FlushCommand : public MirrorAdminSocketCommand {
96   	public:
97   	  explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98   	
99   	  int call(Formatter *f) override {
100  	    mirror->flush();
101  	    return 0;
102  	  }
103  	
104  	private:
105  	  Mirror *mirror;
106  	};
107  	
108  	class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109  	public:
110  	  explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111  	
112  	  int call(Formatter *f) override {
113  	    mirror->release_leader();
114  	    return 0;
115  	  }
116  	
117  	private:
118  	  Mirror *mirror;
119  	};
120  	
121  	#undef dout_prefix
122  	#define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
123  	                           << m_name << " " << __func__ << ": "
124  	
125  	struct PriCache : public PriorityCache::PriCache {
126  	  std::string m_name;
127  	  int64_t m_base_cache_max_size;
128  	  int64_t m_extra_cache_max_size;
129  	
130  	  PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
131  	  PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
132  	  int64_t m_base_cache_bytes = 0;
133  	  int64_t m_extra_cache_bytes = 0;
134  	  int64_t m_committed_bytes = 0;
135  	  double m_cache_ratio = 0;
136  	
137  	  PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
138  	    : m_name(name), m_base_cache_max_size(min_size),
139  	      m_extra_cache_max_size(max_size - min_size) {
140  	    ceph_assert(max_size >= min_size);
141  	  }
142  	
143  	  void prioritize() {
144  	    if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
145  	      return;
146  	    }
147  	    auto pri = static_cast<uint8_t>(m_base_cache_pri);
148  	    m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
149  	
150  	    dout(30) << m_base_cache_pri << dendl;
151  	  }
152  	
153  	  int64_t request_cache_bytes(PriorityCache::Priority pri,
154  	                              uint64_t total_cache) const override {
155  	    int64_t cache_bytes = 0;
156  	
157  	    if (pri == m_base_cache_pri) {
158  	      cache_bytes += m_base_cache_max_size;
159  	    }
160  	    if (pri == m_extra_cache_pri) {
161  	      cache_bytes += m_extra_cache_max_size;
162  	    }
163  	
164  	    dout(30) << cache_bytes << dendl;
165  	
166  	    return cache_bytes;
167  	  }
168  	
169  	  int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
170  	    int64_t cache_bytes = 0;
171  	
172  	    if (pri == m_base_cache_pri) {
173  	      cache_bytes += m_base_cache_bytes;
174  	    }
175  	    if (pri == m_extra_cache_pri) {
176  	      cache_bytes += m_extra_cache_bytes;
177  	    }
178  	
179  	    dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
180  	
181  	    return cache_bytes;
182  	  }
183  	
184  	  int64_t get_cache_bytes() const override {
185  	    auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
186  	
187  	    dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
188  	             << cache_bytes << dendl;
189  	
190  	    return cache_bytes;
191  	  }
192  	
193  	  void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
194  	    ceph_assert(bytes >= 0);
195  	    ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
196  	                bytes == 0);
197  	
198  	    dout(30) << "pri=" << pri << " " << bytes << dendl;
199  	
200  	    if (pri == m_base_cache_pri) {
201  	      m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
202  	      bytes -= std::min(m_base_cache_bytes, bytes);
203  	    }
204  	
205  	    if (pri == m_extra_cache_pri) {
206  	      m_extra_cache_bytes = bytes;
207  	    }
208  	  }
209  	
210  	  void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
211  	    ceph_assert(bytes >= 0);
212  	    ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
213  	
214  	    dout(30) << "pri=" << pri << " " << bytes << dendl;
215  	
216  	    if (pri == m_base_cache_pri) {
217  	      ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
218  	
219  	      auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
220  	      m_base_cache_bytes += chunk;
221  	      bytes -= chunk;
222  	    }
223  	
224  	    if (pri == m_extra_cache_pri) {
225  	      m_extra_cache_bytes += bytes;
226  	    }
227  	  }
228  	
229  	  int64_t commit_cache_size(uint64_t total_cache) override {
230  	    m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
231  	
232  	    dout(30) << m_committed_bytes << dendl;
233  	
234  	    return m_committed_bytes;
235  	  }
236  	
237  	  int64_t get_committed_size() const override {
238  	    dout(30) << m_committed_bytes << dendl;
239  	
240  	    return m_committed_bytes;
241  	  }
242  	
243  	  double get_cache_ratio() const override {
244  	    dout(30) << m_cache_ratio << dendl;
245  	
246  	    return m_cache_ratio;
247  	  }
248  	
249  	  void set_cache_ratio(double ratio) override {
250  	    dout(30) << m_cache_ratio << dendl;
251  	
252  	    m_cache_ratio = ratio;
253  	  }
254  	
255  	  std::string get_cache_name() const override {
256  	    return m_name;
257  	  }
258  	};
259  	
260  	} // anonymous namespace
261  	
262  	#undef dout_prefix
263  	#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
264  	                           << __func__ << ": "
265  	
266  	class MirrorAdminSocketHook : public AdminSocketHook {
267  	public:
268  	  MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
269  	    admin_socket(cct->get_admin_socket()) {
270  	    std::string command;
271  	    int r;
272  	
273  	    command = "rbd mirror status";
274  	    r = admin_socket->register_command(command, this,
275  					       "get status for rbd mirror");
276  	    if (r == 0) {
277  	      commands[command] = new StatusCommand(mirror);
278  	    }
279  	
280  	    command = "rbd mirror start";
281  	    r = admin_socket->register_command(command, this,
282  					       "start rbd mirror");
283  	    if (r == 0) {
284  	      commands[command] = new StartCommand(mirror);
285  	    }
286  	
287  	    command = "rbd mirror stop";
288  	    r = admin_socket->register_command(command, this,
289  					       "stop rbd mirror");
290  	    if (r == 0) {
291  	      commands[command] = new StopCommand(mirror);
292  	    }
293  	
294  	    command = "rbd mirror restart";
295  	    r = admin_socket->register_command(command, this,
296  					       "restart rbd mirror");
297  	    if (r == 0) {
298  	      commands[command] = new RestartCommand(mirror);
299  	    }
300  	
301  	    command = "rbd mirror flush";
302  	    r = admin_socket->register_command(command, this,
303  					       "flush rbd mirror");
304  	    if (r == 0) {
305  	      commands[command] = new FlushCommand(mirror);
306  	    }
307  	
308  	    command = "rbd mirror leader release";
309  	    r = admin_socket->register_command(command, this,
310  					       "release rbd mirror leader");
311  	    if (r == 0) {
312  	      commands[command] = new LeaderReleaseCommand(mirror);
313  	    }
314  	  }
315  	
316  	  ~MirrorAdminSocketHook() override {
317  	    (void)admin_socket->unregister_commands(this);
318  	    for (Commands::const_iterator i = commands.begin(); i != commands.end();
319  		 ++i) {
320  	      delete i->second;
321  	    }
322  	  }
323  	
324  	  int call(std::string_view command, const cmdmap_t& cmdmap,
325  		   Formatter *f,
326  		   std::ostream& errss,
327  		   bufferlist& out) override {
328  	    Commands::const_iterator i = commands.find(command);
329  	    ceph_assert(i != commands.end());
330  	    return i->second->call(f);
331  	  }
332  	
333  	private:
334  	  typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
335  	
336  	  AdminSocket *admin_socket;
337  	  Commands commands;
338  	};
339  	
340  	class CacheManagerHandler : public journal::CacheManagerHandler {
341  	public:
342  	  CacheManagerHandler(CephContext *cct)
343  	    : m_cct(cct) {
344  	
345  	    if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
346  	      return;
347  	    }
348  	
349  	    uint64_t base = m_cct->_conf.get_val<Option::size_t>(
350  	        "rbd_mirror_memory_base");
351  	    double fragmentation = m_cct->_conf.get_val<double>(
352  	        "rbd_mirror_memory_expected_fragmentation");
353  	    uint64_t target = m_cct->_conf.get_val<Option::size_t>(
354  	        "rbd_mirror_memory_target");
355  	    uint64_t min = m_cct->_conf.get_val<Option::size_t>(
356  	        "rbd_mirror_memory_cache_min");
357  	    uint64_t max = min;
358  	
359  	    // When setting the maximum amount of memory to use for cache, first
360  	    // assume some base amount of memory for the daemon and then fudge in
361  	    // some overhead for fragmentation that scales with cache usage.
362  	    uint64_t ltarget = (1.0 - fragmentation) * target;
363  	    if (ltarget > base + min) {
364  	      max = ltarget - base;
365  	    }
366  	
367  	    m_next_balance = ceph_clock_now();
368  	    m_next_resize = ceph_clock_now();
369  	
370  	    m_cache_manager = std::make_unique<PriorityCache::Manager>(
371  	      m_cct, min, max, target, false);
372  	  }
373  	
374  	  ~CacheManagerHandler() {
375  	    std::lock_guard locker{m_lock};
376  	
377  	    ceph_assert(m_caches.empty());
378  	  }
379  	
380  	  void register_cache(const std::string &cache_name,
381  	                      uint64_t min_size, uint64_t max_size,
382  	                      journal::CacheRebalanceHandler* handler) override {
383  	    if (!m_cache_manager) {
384  	      handler->handle_cache_rebalanced(max_size);
385  	      return;
386  	    }
387  	
388  	    dout(20) << cache_name << " min_size=" << min_size << " max_size="
389  	             << max_size << " handler=" << handler << dendl;
390  	
391  	    std::lock_guard locker{m_lock};
392  	
393  	    auto p = m_caches.insert(
394  	        {cache_name, {cache_name, min_size, max_size, handler}});
395  	    ceph_assert(p.second == true);
396  	
397  	    m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
398  	    m_next_balance = ceph_clock_now();
399  	  }
400  	
401  	  void unregister_cache(const std::string &cache_name) override {
402  	    if (!m_cache_manager) {
403  	      return;
404  	    }
405  	
406  	    dout(20) << cache_name << dendl;
407  	
408  	    std::lock_guard locker{m_lock};
409  	
410  	    auto it = m_caches.find(cache_name);
411  	    ceph_assert(it != m_caches.end());
412  	
413  	    m_cache_manager->erase(cache_name);
414  	    m_caches.erase(it);
415  	    m_next_balance = ceph_clock_now();
416  	  }
417  	
418  	  void run_cache_manager() {
419  	    if (!m_cache_manager) {
420  	      return;
421  	    }
422  	
423  	    std::lock_guard locker{m_lock};
424  	
425  	    // Before we trim, check and see if it's time to rebalance/resize.
426  	    auto autotune_interval = m_cct->_conf.get_val<double>(
427  	        "rbd_mirror_memory_cache_autotune_interval");
428  	    auto resize_interval = m_cct->_conf.get_val<double>(
429  	        "rbd_mirror_memory_cache_resize_interval");
430  	
431  	    utime_t now = ceph_clock_now();
432  	
433  	    if (autotune_interval > 0 && m_next_balance <= now) {
434  	      dout(20) << "balance" << dendl;
435  	      m_cache_manager->balance();
436  	
437  	      for (auto &it : m_caches) {
438  	        auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
439  	        auto new_cache_bytes = pri_cache->get_cache_bytes();
440  	        it.second.handler->handle_cache_rebalanced(new_cache_bytes);
441  	        pri_cache->prioritize();
442  	      }
443  	
444  	      m_next_balance = ceph_clock_now();
445  	      m_next_balance += autotune_interval;
446  	    }
447  	
448  	    if (resize_interval > 0 && m_next_resize < now) {
449  	      if (ceph_using_tcmalloc()) {
450  	        dout(20) << "tune memory" << dendl;
451  	        m_cache_manager->tune_memory();
452  	      }
453  	
454  	      m_next_resize = ceph_clock_now();
455  	      m_next_resize += resize_interval;
456  	    }
457  	  }
458  	
459  	private:
460  	  struct Cache {
461  	    std::shared_ptr<PriorityCache::PriCache> pri_cache;
462  	    journal::CacheRebalanceHandler *handler;
463  	
464  	    Cache(const std::string name, uint64_t min_size, uint64_t max_size,
465  	          journal::CacheRebalanceHandler *handler)
466  	      : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
467  	    }
468  	  };
469  	
470  	  CephContext *m_cct;
471  	
472  	  mutable ceph::mutex m_lock =
473  	    ceph::make_mutex("rbd::mirror::CacheManagerHandler");
474  	  std::unique_ptr<PriorityCache::Manager> m_cache_manager;
475  	  std::map<std::string, Cache> m_caches;
476  	
477  	  utime_t m_next_balance;
478  	  utime_t m_next_resize;
479  	};
480  	
481  	Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
482  	  m_cct(cct),
483  	  m_args(args),
484  	  m_local(new librados::Rados()),
485  	  m_cache_manager_handler(new CacheManagerHandler(cct)),
486  	  m_asok_hook(new MirrorAdminSocketHook(cct, this))
487  	{
488  	  m_threads =
(14) Event template_instantiation_context: instantiation of "T &CephContext::lookup_or_create_singleton_object<T,Args...>(std::string_view, bool, Args &&...) [with T=rbd::mirror::Threads<librbd::ImageCtx>, Args=<CephContext *&>]" at line 489 of "../../../../src/tools/rbd_mirror/Mirror.cc"
Also see events: [no_matching_constructor][caretline][argument_list_types_add_on][compiler_generated_function_context][template_instantiation_context][template_instantiation_context][template_instantiation_context][template_instantiation_context][template_instantiation_context][template_instantiation_context][template_instantiation_context][template_instantiation_context][template_instantiation_context]
489  	    &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>(
490  		"rbd_mirror::threads", false, cct));
491  	  m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
492  	}
493  	
494  	Mirror::~Mirror()
495  	{
496  	  delete m_asok_hook;
497  	}
498  	
499  	void Mirror::handle_signal(int signum)
500  	{
501  	  m_stopping = true;
502  	  {
503  	    std::lock_guard l{m_lock};
504  	    m_cond.notify_all();
505  	  }
506  	}
507  	
508  	int Mirror::init()
509  	{
510  	  int r = m_local->init_with_context(m_cct);
511  	  if (r < 0) {
512  	    derr << "could not initialize rados handle" << dendl;
513  	    return r;
514  	  }
515  	
516  	  r = m_local->connect();
517  	  if (r < 0) {
518  	    derr << "error connecting to local cluster" << dendl;
519  	    return r;
520  	  }
521  	
522  	  r = m_service_daemon->init();
523  	  if (r < 0) {
524  	    derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
525  	    return r;
526  	  }
527  	
528  	  m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
529  	                                                   m_service_daemon.get()));
530  	  return r;
531  	}
532  	
533  	void Mirror::run()
534  	{
535  	  dout(20) << "enter" << dendl;
536  	
537  	  utime_t next_refresh_pools = ceph_clock_now();
538  	
539  	  while (!m_stopping) {
540  	    utime_t now = ceph_clock_now();
541  	    bool refresh_pools = next_refresh_pools <= now;
542  	    if (refresh_pools) {
543  	      m_local_cluster_watcher->refresh_pools();
544  	      next_refresh_pools = ceph_clock_now();
545  	      next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
546  	          "rbd_mirror_pool_replayers_refresh_interval");
547  	    }
548  	    std::unique_lock l{m_lock};
549  	    if (!m_manual_stop) {
550  	      if (refresh_pools) {
551  	        update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
552  	      }
553  	      m_cache_manager_handler->run_cache_manager();
554  	    }
555  	    m_cond.wait_for(l, 1s);
556  	  }
557  	
558  	  // stop all pool replayers in parallel
559  	  std::lock_guard locker{m_lock};
560  	  for (auto &pool_replayer : m_pool_replayers) {
561  	    pool_replayer.second->stop(false);
562  	  }
563  	  dout(20) << "return" << dendl;
564  	}
565  	
566  	void Mirror::print_status(Formatter *f)
567  	{
568  	  dout(20) << "enter" << dendl;
569  	
570  	  std::lock_guard l{m_lock};
571  	
572  	  if (m_stopping) {
573  	    return;
574  	  }
575  	
576  	  f->open_object_section("mirror_status");
577  	  f->open_array_section("pool_replayers");
578  	  for (auto &pool_replayer : m_pool_replayers) {
579  	    pool_replayer.second->print_status(f);
580  	  }
581  	  f->close_section();
582  	  f->close_section();
583  	}
584  	
585  	void Mirror::start()
586  	{
587  	  dout(20) << "enter" << dendl;
588  	  std::lock_guard l{m_lock};
589  	
590  	  if (m_stopping) {
591  	    return;
592  	  }
593  	
594  	  m_manual_stop = false;
595  	
596  	  for (auto &pool_replayer : m_pool_replayers) {
597  	    pool_replayer.second->start();
598  	  }
599  	}
600  	
601  	void Mirror::stop()
602  	{
603  	  dout(20) << "enter" << dendl;
604  	  std::lock_guard l{m_lock};
605  	
606  	  if (m_stopping) {
607  	    return;
608  	  }
609  	
610  	  m_manual_stop = true;
611  	
612  	  for (auto &pool_replayer : m_pool_replayers) {
613  	    pool_replayer.second->stop(true);
614  	  }
615  	}
616  	
617  	void Mirror::restart()
618  	{
619  	  dout(20) << "enter" << dendl;
620  	  std::lock_guard l{m_lock};
621  	
622  	  if (m_stopping) {
623  	    return;
624  	  }
625  	
626  	  m_manual_stop = false;
627  	
628  	  for (auto &pool_replayer : m_pool_replayers) {
629  	    pool_replayer.second->restart();
630  	  }
631  	}
632  	
633  	void Mirror::flush()
634  	{
635  	  dout(20) << "enter" << dendl;
636  	  std::lock_guard l{m_lock};
637  	
638  	  if (m_stopping || m_manual_stop) {
639  	    return;
640  	  }
641  	
642  	  for (auto &pool_replayer : m_pool_replayers) {
643  	    pool_replayer.second->flush();
644  	  }
645  	}
646  	
647  	void Mirror::release_leader()
648  	{
649  	  dout(20) << "enter" << dendl;
650  	  std::lock_guard l{m_lock};
651  	
652  	  if (m_stopping) {
653  	    return;
654  	  }
655  	
656  	  for (auto &pool_replayer : m_pool_replayers) {
657  	    pool_replayer.second->release_leader();
658  	  }
659  	}
660  	
661  	void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
662  	{
663  	  dout(20) << "enter" << dendl;
664  	  ceph_assert(ceph_mutex_is_locked(m_lock));
665  	
666  	  // remove stale pool replayers before creating new pool replayers
667  	  for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
668  	    auto &peer = it->first.second;
669  	    auto pool_peer_it = pool_peers.find(it->first.first);
670  	    if (pool_peer_it == pool_peers.end() ||
671  	        pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
672  	      dout(20) << "removing pool replayer for " << peer << dendl;
673  	      // TODO: make async
674  	      it->second->shut_down();
675  	      it = m_pool_replayers.erase(it);
676  	    } else {
677  	      ++it;
678  	    }
679  	  }
680  	
681  	  for (auto &kv : pool_peers) {
682  	    for (auto &peer : kv.second) {
683  	      PoolPeer pool_peer(kv.first, peer);
684  	
685  	      auto pool_replayers_it = m_pool_replayers.find(pool_peer);
686  	      if (pool_replayers_it != m_pool_replayers.end()) {
687  	        auto& pool_replayer = pool_replayers_it->second;
688  	        if (pool_replayer->is_blacklisted()) {
689  	          derr << "restarting blacklisted pool replayer for " << peer << dendl;
690  	          // TODO: make async
691  	          pool_replayer->shut_down();
692  	          pool_replayer->init();
693  	        } else if (!pool_replayer->is_running()) {
694  	          derr << "restarting failed pool replayer for " << peer << dendl;
695  	          // TODO: make async
696  	          pool_replayer->shut_down();
697  	          pool_replayer->init();
698  	        }
699  	      } else {
700  	        dout(20) << "starting pool replayer for " << peer << dendl;
701  	        unique_ptr<PoolReplayer<>> pool_replayer(
702  	            new PoolReplayer<>(m_threads, m_service_daemon.get(),
703  	                               m_cache_manager_handler.get(), kv.first, peer,
704  	                               m_args));
705  	
706  	        // TODO: make async
707  	        pool_replayer->init();
708  	        m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
709  	      }
710  	    }
711  	
712  	    // TODO currently only support a single peer
713  	  }
714  	}
715  	
716  	} // namespace mirror
717  	} // namespace rbd
718