1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Cond.h"
7 #include "common/Formatter.h"
8 #include "common/admin_socket.h"
9 #include "common/ceph_argparse.h"
10 #include "common/code_environment.h"
11 #include "common/common_init.h"
12 #include "common/debug.h"
13 #include "common/errno.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/api/Config.h"
17 #include "librbd/api/Namespace.h"
18 #include "ServiceDaemon.h"
19 #include "Threads.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
23 #undef dout_prefix
24 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
25 << this << " " << __func__ << ": "
26
27 namespace rbd {
28 namespace mirror {
29
30 using ::operator<<;
31
32 namespace {
33
34 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
35
36 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
37 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
38
39 template <typename I>
40 class PoolReplayerAdminSocketCommand {
41 public:
42 PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
43 : pool_replayer(pool_replayer) {
44 }
45 virtual ~PoolReplayerAdminSocketCommand() {}
46 virtual int call(Formatter *f) = 0;
47 protected:
48 PoolReplayer<I> *pool_replayer;
49 };
50
51 template <typename I>
52 class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
53 public:
54 explicit StatusCommand(PoolReplayer<I> *pool_replayer)
55 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
56 }
57
58 int call(Formatter *f) override {
59 this->pool_replayer->print_status(f);
60 return 0;
61 }
62 };
63
64 template <typename I>
65 class StartCommand : public PoolReplayerAdminSocketCommand<I> {
66 public:
67 explicit StartCommand(PoolReplayer<I> *pool_replayer)
68 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
69 }
70
71 int call(Formatter *f) override {
72 this->pool_replayer->start();
73 return 0;
74 }
75 };
76
77 template <typename I>
78 class StopCommand : public PoolReplayerAdminSocketCommand<I> {
79 public:
80 explicit StopCommand(PoolReplayer<I> *pool_replayer)
81 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
82 }
83
84 int call(Formatter *f) override {
85 this->pool_replayer->stop(true);
86 return 0;
87 }
88 };
89
90 template <typename I>
91 class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
92 public:
93 explicit RestartCommand(PoolReplayer<I> *pool_replayer)
94 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
95 }
96
97 int call(Formatter *f) override {
98 this->pool_replayer->restart();
99 return 0;
100 }
101 };
102
103 template <typename I>
104 class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
105 public:
106 explicit FlushCommand(PoolReplayer<I> *pool_replayer)
107 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
108 }
109
110 int call(Formatter *f) override {
111 this->pool_replayer->flush();
112 return 0;
113 }
114 };
115
116 template <typename I>
117 class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
118 public:
119 explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
120 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
121 }
122
123 int call(Formatter *f) override {
124 this->pool_replayer->release_leader();
125 return 0;
126 }
127 };
128
129 template <typename I>
130 class PoolReplayerAdminSocketHook : public AdminSocketHook {
131 public:
132 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
133 PoolReplayer<I> *pool_replayer)
134 : admin_socket(cct->get_admin_socket()) {
135 std::string command;
136 int r;
137
138 command = "rbd mirror status " + name;
139 r = admin_socket->register_command(command, this,
140 "get status for rbd mirror " + name);
141 if (r == 0) {
142 commands[command] = new StatusCommand<I>(pool_replayer);
143 }
144
145 command = "rbd mirror start " + name;
146 r = admin_socket->register_command(command, this,
147 "start rbd mirror " + name);
148 if (r == 0) {
149 commands[command] = new StartCommand<I>(pool_replayer);
150 }
151
152 command = "rbd mirror stop " + name;
153 r = admin_socket->register_command(command, this,
154 "stop rbd mirror " + name);
155 if (r == 0) {
156 commands[command] = new StopCommand<I>(pool_replayer);
157 }
158
159 command = "rbd mirror restart " + name;
160 r = admin_socket->register_command(command, this,
161 "restart rbd mirror " + name);
162 if (r == 0) {
163 commands[command] = new RestartCommand<I>(pool_replayer);
164 }
165
166 command = "rbd mirror flush " + name;
167 r = admin_socket->register_command(command, this,
168 "flush rbd mirror " + name);
169 if (r == 0) {
170 commands[command] = new FlushCommand<I>(pool_replayer);
171 }
172
173 command = "rbd mirror leader release " + name;
174 r = admin_socket->register_command(command, this,
175 "release rbd mirror leader " + name);
176 if (r == 0) {
177 commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
178 }
179 }
180
181 ~PoolReplayerAdminSocketHook() override {
182 (void)admin_socket->unregister_commands(this);
183 for (auto i = commands.begin(); i != commands.end(); ++i) {
184 delete i->second;
185 }
186 }
187
188 int call(std::string_view command, const cmdmap_t& cmdmap,
189 Formatter *f,
190 std::ostream& ss,
191 bufferlist& out) override {
192 auto i = commands.find(command);
193 ceph_assert(i != commands.end());
194 return i->second->call(f);
195 }
196
197 private:
198 typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
199 std::less<>> Commands;
200
201 AdminSocket *admin_socket;
202 Commands commands;
203 };
204
205 } // anonymous namespace
206
207 template <typename I>
208 PoolReplayer<I>::PoolReplayer(
209 Threads<I> *threads, ServiceDaemon<I> *service_daemon,
210 journal::CacheManagerHandler *cache_manager_handler, int64_t local_pool_id,
211 const PeerSpec &peer, const std::vector<const char*> &args) :
212 m_threads(threads),
213 m_service_daemon(service_daemon),
214 m_cache_manager_handler(cache_manager_handler),
215 m_local_pool_id(local_pool_id),
216 m_peer(peer),
217 m_args(args),
218 m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))),
219 m_pool_replayer_thread(this),
220 m_leader_listener(this) {
221 }
222
223 template <typename I>
224 PoolReplayer<I>::~PoolReplayer()
225 {
226 shut_down();
227
228 ceph_assert(m_asok_hook == nullptr);
229 }
230
231 template <typename I>
232 bool PoolReplayer<I>::is_blacklisted() const {
233 std::lock_guard locker{m_lock};
234 return m_blacklisted;
235 }
236
237 template <typename I>
238 bool PoolReplayer<I>::is_leader() const {
239 std::lock_guard locker{m_lock};
240 return m_leader_watcher && m_leader_watcher->is_leader();
241 }
242
243 template <typename I>
244 bool PoolReplayer<I>::is_running() const {
245 return m_pool_replayer_thread.is_started();
246 }
247
248 template <typename I>
249 void PoolReplayer<I>::init() {
250 ceph_assert(!m_pool_replayer_thread.is_started());
251
252 // reset state
253 m_stopping = false;
254 m_blacklisted = false;
255
256 dout(10) << "replaying for " << m_peer << dendl;
257 int r = init_rados(g_ceph_context->_conf->cluster,
258 g_ceph_context->_conf->name.to_str(),
259 "", "", "local cluster", &m_local_rados, false);
260 if (r < 0) {
261 m_callout_id = m_service_daemon->add_or_update_callout(
262 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
263 "unable to connect to local cluster");
264 return;
265 }
266
267 r = init_rados(m_peer.cluster_name, m_peer.client_name,
268 m_peer.mon_host, m_peer.key,
269 std::string("remote peer ") + stringify(m_peer),
270 &m_remote_rados, true);
271 if (r < 0) {
272 m_callout_id = m_service_daemon->add_or_update_callout(
273 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
274 "unable to connect to remote cluster");
275 return;
276 }
277
278 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
279 if (r < 0) {
280 derr << "error accessing local pool " << m_local_pool_id << ": "
281 << cpp_strerror(r) << dendl;
282 return;
283 }
284
285 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
286 librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
287
288 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
289 &m_local_mirror_uuid);
290 if (r < 0) {
291 derr << "failed to retrieve local mirror uuid from pool "
292 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
293 m_callout_id = m_service_daemon->add_or_update_callout(
294 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
295 "unable to query local mirror uuid");
296 return;
297 }
298
299 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
300 m_remote_io_ctx);
301 if (r < 0) {
302 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
303 << ": " << cpp_strerror(r) << dendl;
304 m_callout_id = m_service_daemon->add_or_update_callout(
305 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
306 "unable to access remote pool");
307 return;
308 }
309
310 dout(10) << "connected to " << m_peer << dendl;
311
312 m_image_sync_throttler.reset(
313 Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
314
315 m_image_deletion_throttler.reset(
316 Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions"));
317
318 m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
319 "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
320 m_threads, m_image_sync_throttler.get(), m_image_deletion_throttler.get(),
321 m_service_daemon, m_cache_manager_handler));
322
323 C_SaferCond on_init;
324 m_default_namespace_replayer->init(&on_init);
325 r = on_init.wait();
326 if (r < 0) {
327 derr << "error initializing default namespace replayer: " << cpp_strerror(r)
328 << dendl;
329 m_callout_id = m_service_daemon->add_or_update_callout(
330 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
331 "unable to initialize default namespace replayer");
332 m_default_namespace_replayer.reset();
333 return;
334 }
335
336 m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
337 &m_leader_listener));
338 r = m_leader_watcher->init();
339 if (r < 0) {
340 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
341 m_callout_id = m_service_daemon->add_or_update_callout(
342 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
343 "unable to initialize leader messenger object");
344 m_leader_watcher.reset();
345 return;
346 }
347
348 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
349 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
350 m_callout_id = service_daemon::CALLOUT_ID_NONE;
351 }
352
353 m_pool_replayer_thread.create("pool replayer");
354 }
355
356 template <typename I>
357 void PoolReplayer<I>::shut_down() {
358 {
359 std::lock_guard l{m_lock};
360 m_stopping = true;
361 m_cond.notify_all();
362 }
363 if (m_pool_replayer_thread.is_started()) {
364 m_pool_replayer_thread.join();
365 }
366
367 if (m_leader_watcher) {
368 m_leader_watcher->shut_down();
369 }
370 m_leader_watcher.reset();
371
372 if (m_default_namespace_replayer) {
373 C_SaferCond on_shut_down;
374 m_default_namespace_replayer->shut_down(&on_shut_down);
375 on_shut_down.wait();
376 }
377 m_default_namespace_replayer.reset();
378
379 m_image_sync_throttler.reset();
380 m_image_deletion_throttler.reset();
381
382 m_local_rados.reset();
383 m_remote_rados.reset();
384 }
385
386 template <typename I>
387 int PoolReplayer<I>::init_rados(const std::string &cluster_name,
388 const std::string &client_name,
389 const std::string &mon_host,
390 const std::string &key,
391 const std::string &description,
392 RadosRef *rados_ref,
393 bool strip_cluster_overrides) {
394 rados_ref->reset(new librados::Rados());
395
396 // NOTE: manually bootstrap a CephContext here instead of via
397 // the librados API to avoid mixing global singletons between
398 // the librados shared library and the daemon
399 // TODO: eliminate intermingling of global singletons within Ceph APIs
400 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
401 if (client_name.empty() || !iparams.name.from_str(client_name)) {
402 derr << "error initializing cluster handle for " << description << dendl;
403 return -EINVAL;
404 }
405
406 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
407 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
408 cct->_conf->cluster = cluster_name;
409
410 // librados::Rados::conf_read_file
411 int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
412 if (r < 0 && r != -ENOENT) {
413 // do not treat this as fatal, it might still be able to connect
414 derr << "could not read ceph conf for " << description << ": "
415 << cpp_strerror(r) << dendl;
416 }
417
418 // preserve cluster-specific config settings before applying environment/cli
419 // overrides
420 std::map<std::string, std::string> config_values;
421 if (strip_cluster_overrides) {
422 // remote peer connections shouldn't apply cluster-specific
423 // configuration settings
(1) Event parameter_hidden: |
declaration hides parameter "key" (declared at line 390) |
(2) Event caretline: |
^ |
424 for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
425 config_values[key] = cct->_conf.get_val<std::string>(key);
426 }
427 }
428
429 cct->_conf.parse_env(cct->get_module_type());
430
431 // librados::Rados::conf_parse_env
432 std::vector<const char*> args;
433 r = cct->_conf.parse_argv(args);
434 if (r < 0) {
435 derr << "could not parse environment for " << description << ":"
436 << cpp_strerror(r) << dendl;
437 cct->put();
438 return r;
439 }
440 cct->_conf.parse_env(cct->get_module_type());
441
442 if (!m_args.empty()) {
443 // librados::Rados::conf_parse_argv
444 args = m_args;
445 r = cct->_conf.parse_argv(args);
446 if (r < 0) {
447 derr << "could not parse command line args for " << description << ": "
448 << cpp_strerror(r) << dendl;
449 cct->put();
450 return r;
451 }
452 }
453
454 if (strip_cluster_overrides) {
455 // remote peer connections shouldn't apply cluster-specific
456 // configuration settings
457 for (auto& pair : config_values) {
458 auto value = cct->_conf.get_val<std::string>(pair.first);
459 if (pair.second != value) {
460 dout(0) << "reverting global config option override: "
461 << pair.first << ": " << value << " -> " << pair.second
462 << dendl;
463 cct->_conf.set_val_or_die(pair.first, pair.second);
464 }
465 }
466 }
467
468 if (!g_ceph_context->_conf->admin_socket.empty()) {
469 cct->_conf.set_val_or_die("admin_socket",
470 "$run_dir/$name.$pid.$cluster.$cctid.asok");
471 }
472
473 if (!mon_host.empty()) {
474 r = cct->_conf.set_val("mon_host", mon_host);
475 if (r < 0) {
476 derr << "failed to set mon_host config for " << description << ": "
477 << cpp_strerror(r) << dendl;
478 cct->put();
479 return r;
480 }
481 }
482
483 if (!key.empty()) {
484 r = cct->_conf.set_val("key", key);
485 if (r < 0) {
486 derr << "failed to set key config for " << description << ": "
487 << cpp_strerror(r) << dendl;
488 cct->put();
489 return r;
490 }
491 }
492
493 // disable unnecessary librbd cache
494 cct->_conf.set_val_or_die("rbd_cache", "false");
495 cct->_conf.apply_changes(nullptr);
496 cct->_conf.complain_about_parse_error(cct);
497
498 r = (*rados_ref)->init_with_context(cct);
499 ceph_assert(r == 0);
500 cct->put();
501
502 r = (*rados_ref)->connect();
503 if (r < 0) {
504 derr << "error connecting to " << description << ": "
505 << cpp_strerror(r) << dendl;
506 return r;
507 }
508
509 return 0;
510 }
511
512 template <typename I>
513 void PoolReplayer<I>::run() {
514 dout(20) << dendl;
515
516 while (true) {
517 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
518 m_peer.cluster_name;
519 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
520 m_asok_hook_name = asok_hook_name;
521 delete m_asok_hook;
522
523 m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
524 m_asok_hook_name, this);
525 }
526
527 with_namespace_replayers([this]() { update_namespace_replayers(); });
528
529 std::unique_lock locker{m_lock};
530
531 if (m_default_namespace_replayer->is_blacklisted()) {
532 m_blacklisted = true;
533 m_stopping = true;
534 }
535
536 for (auto &it : m_namespace_replayers) {
537 if (it.second->is_blacklisted()) {
538 m_blacklisted = true;
539 m_stopping = true;
540 break;
541 }
542 }
543
544 if (m_stopping) {
545 break;
546 }
547
548 auto seconds = g_ceph_context->_conf.get_val<uint64_t>(
549 "rbd_mirror_pool_replayers_refresh_interval");
550 m_cond.wait_for(locker, ceph::make_timespan(seconds));
551 }
552
553 // shut down namespace replayers
554 with_namespace_replayers([this]() { update_namespace_replayers(); });
555
556 delete m_asok_hook;
557 m_asok_hook = nullptr;
558 }
559
560 template <typename I>
561 void PoolReplayer<I>::update_namespace_replayers() {
562 dout(20) << dendl;
563
564 ceph_assert(ceph_mutex_is_locked(m_lock));
565
566 std::set<std::string> mirroring_namespaces;
567 if (!m_stopping) {
568 int r = list_mirroring_namespaces(&mirroring_namespaces);
569 if (r < 0) {
570 return;
571 }
572 }
573
574 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
575 C_SaferCond cond;
576 auto gather_ctx = new C_Gather(cct, &cond);
577 for (auto it = m_namespace_replayers.begin();
578 it != m_namespace_replayers.end(); ) {
579 auto iter = mirroring_namespaces.find(it->first);
580 if (iter == mirroring_namespaces.end()) {
581 auto namespace_replayer = it->second;
582 auto on_shut_down = new LambdaContext(
583 [this, namespace_replayer, ctx=gather_ctx->new_sub()](int r) {
584 delete namespace_replayer;
585 ctx->complete(r);
586 });
587 namespace_replayer->shut_down(on_shut_down);
588 it = m_namespace_replayers.erase(it);
589 } else {
590 mirroring_namespaces.erase(iter);
591 it++;
592 }
593 }
594
595 for (auto &name : mirroring_namespaces) {
596 auto namespace_replayer = NamespaceReplayer<I>::create(
597 name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
598 m_threads, m_image_sync_throttler.get(),
599 m_image_deletion_throttler.get(), m_service_daemon,
600 m_cache_manager_handler);
601 auto on_init = new LambdaContext(
602 [this, namespace_replayer, name, &mirroring_namespaces,
603 ctx=gather_ctx->new_sub()](int r) {
604 if (r < 0) {
605 derr << "failed to initialize namespace replayer for namespace "
606 << name << ": " << cpp_strerror(r) << dendl;
607 delete namespace_replayer;
608 mirroring_namespaces.erase(name);
609 } else {
610 std::lock_guard locker{m_lock};
611 m_namespace_replayers[name] = namespace_replayer;
612 }
613 ctx->complete(r);
614 });
615 namespace_replayer->init(on_init);
616 }
617
618 gather_ctx->activate();
619
620 m_lock.unlock();
621 cond.wait();
622 m_lock.lock();
623
624 if (m_leader) {
625 C_SaferCond acquire_cond;
626 auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond);
627
628 for (auto &name : mirroring_namespaces) {
629 namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub());
630 }
631 acquire_gather_ctx->activate();
632
633 m_lock.unlock();
634 acquire_cond.wait();
635 m_lock.lock();
636
637 std::vector<std::string> instance_ids;
638 m_leader_watcher->list_instances(&instance_ids);
639
640 for (auto &name : mirroring_namespaces) {
641 auto it = m_namespace_replayers.find(name);
642 if (it == m_namespace_replayers.end()) {
643 // acuire leader for this namespace replayer failed
644 continue;
645 }
646 it->second->handle_instances_added(instance_ids);
647 }
648 } else {
649 std::string leader_instance_id;
650 if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) {
651 for (auto &name : mirroring_namespaces) {
652 m_namespace_replayers[name]->handle_update_leader(leader_instance_id);
653 }
654 }
655 }
656 }
657
658 template <typename I>
659 int PoolReplayer<I>::list_mirroring_namespaces(
660 std::set<std::string> *namespaces) {
661 ceph_assert(ceph_mutex_is_locked(m_lock));
662
663 std::vector<std::string> names;
664
665 int r = librbd::api::Namespace<I>::list(m_local_io_ctx, &names);
666 if (r < 0) {
667 derr << "failed to list namespaces: " << cpp_strerror(r) << dendl;
668 return r;
669 }
670
671 for (auto &name : names) {
672 cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
673 int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode);
674 if (r < 0 && r != -ENOENT) {
675 derr << "failed to get namespace mirror mode: " << cpp_strerror(r)
676 << dendl;
677 if (m_namespace_replayers.count(name) == 0) {
678 continue;
679 }
680 } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
681 dout(10) << "mirroring is disabled for namespace " << name << dendl;
682 continue;
683 }
684
685 namespaces->insert(name);
686 }
687
688 return 0;
689 }
690
691 template <typename I>
692 void PoolReplayer<I>::namespace_replayer_acquire_leader(const std::string &name,
693 Context *on_finish) {
694 ceph_assert(ceph_mutex_is_locked(m_lock));
695
696 auto it = m_namespace_replayers.find(name);
697 ceph_assert(it != m_namespace_replayers.end());
698
699 on_finish = new LambdaContext(
700 [this, name, on_finish](int r) {
701 if (r < 0) {
702 derr << "failed to handle acquire leader for namespace: "
703 << name << ": " << cpp_strerror(r) << dendl;
704
705 // remove the namespace replayer -- update_namespace_replayers will
706 // retry to create it and acquire leader.
707
708 std::lock_guard locker{m_lock};
709
710 auto namespace_replayer = m_namespace_replayers[name];
711 m_namespace_replayers.erase(name);
712 auto on_shut_down = new LambdaContext(
713 [this, namespace_replayer, on_finish](int r) {
714 delete namespace_replayer;
715 on_finish->complete(r);
716 });
717 namespace_replayer->shut_down(on_shut_down);
718 return;
719 }
720 on_finish->complete(0);
721 });
722
723 it->second->handle_acquire_leader(on_finish);
724 }
725
726 template <typename I>
727 void PoolReplayer<I>::print_status(Formatter *f) {
728 dout(20) << dendl;
729
730 assert(f);
731
732 std::lock_guard l{m_lock};
733
734 f->open_object_section("pool_replayer_status");
735 f->dump_string("pool", m_local_io_ctx.get_pool_name());
736 f->dump_stream("peer") << m_peer;
737 f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id();
738
739 std::string state("running");
740 if (m_manual_stop) {
741 state = "stopped (manual)";
742 } else if (m_stopping) {
743 state = "stopped";
744 }
745 f->dump_string("state", state);
746
747 std::string leader_instance_id;
748 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
749 f->dump_string("leader_instance_id", leader_instance_id);
750
751 bool leader = m_leader_watcher->is_leader();
752 f->dump_bool("leader", leader);
753 if (leader) {
754 std::vector<std::string> instance_ids;
755 m_leader_watcher->list_instances(&instance_ids);
756 f->open_array_section("instances");
757 for (auto instance_id : instance_ids) {
758 f->dump_string("instance_id", instance_id);
759 }
760 f->close_section(); // instances
761 }
762
763 f->dump_string("local_cluster_admin_socket",
764 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf.
765 get_val<std::string>("admin_socket"));
766 f->dump_string("remote_cluster_admin_socket",
767 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf.
768 get_val<std::string>("admin_socket"));
769
770 if (m_image_sync_throttler) {
771 f->open_object_section("sync_throttler");
772 m_image_sync_throttler->print_status(f);
773 f->close_section(); // sync_throttler
774 }
775
776 if (m_image_deletion_throttler) {
777 f->open_object_section("deletion_throttler");
778 m_image_deletion_throttler->print_status(f);
779 f->close_section(); // deletion_throttler
780 }
781
782 m_default_namespace_replayer->print_status(f);
783
784 f->open_array_section("namespaces");
785 for (auto &it : m_namespace_replayers) {
786 f->open_object_section("namespace");
787 f->dump_string("name", it.first);
788 it.second->print_status(f);
789 f->close_section(); // namespace
790 }
791 f->close_section(); // namespaces
792
793 f->close_section(); // pool_replayer_status
794 }
795
796 template <typename I>
797 void PoolReplayer<I>::start() {
798 dout(20) << dendl;
799
800 std::lock_guard l{m_lock};
801
802 if (m_stopping) {
803 return;
804 }
805
806 m_manual_stop = false;
807
808 m_default_namespace_replayer->start();
809 for (auto &it : m_namespace_replayers) {
810 it.second->start();
811 }
812 }
813
814 template <typename I>
815 void PoolReplayer<I>::stop(bool manual) {
816 dout(20) << "enter: manual=" << manual << dendl;
817
818 std::lock_guard l{m_lock};
819 if (!manual) {
820 m_stopping = true;
821 m_cond.notify_all();
822 return;
823 } else if (m_stopping) {
824 return;
825 }
826
827 m_manual_stop = true;
828
829 m_default_namespace_replayer->stop();
830 for (auto &it : m_namespace_replayers) {
831 it.second->stop();
832 }
833 }
834
835 template <typename I>
836 void PoolReplayer<I>::restart() {
837 dout(20) << dendl;
838
839 std::lock_guard l{m_lock};
840
841 if (m_stopping) {
842 return;
843 }
844
845 m_default_namespace_replayer->restart();
846 for (auto &it : m_namespace_replayers) {
847 it.second->restart();
848 }
849 }
850
851 template <typename I>
852 void PoolReplayer<I>::flush() {
853 dout(20) << dendl;
854
855 std::lock_guard l{m_lock};
856
857 if (m_stopping || m_manual_stop) {
858 return;
859 }
860
861 m_default_namespace_replayer->flush();
862 for (auto &it : m_namespace_replayers) {
863 it.second->flush();
864 }
865 }
866
867 template <typename I>
868 void PoolReplayer<I>::release_leader() {
869 dout(20) << dendl;
870
871 std::lock_guard l{m_lock};
872
873 if (m_stopping || !m_leader_watcher) {
874 return;
875 }
876
877 m_leader_watcher->release_leader();
878 }
879
880 template <typename I>
881 void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
882 dout(20) << dendl;
883
884 with_namespace_replayers(
885 [this](Context *on_finish) {
886 dout(10) << "handle_post_acquire_leader" << dendl;
887
888 ceph_assert(ceph_mutex_is_locked(m_lock));
889
890 m_service_daemon->add_or_update_attribute(m_local_pool_id,
891 SERVICE_DAEMON_LEADER_KEY,
892 true);
893 auto ctx = new LambdaContext(
894 [this, on_finish](int r) {
895 if (r == 0) {
896 std::lock_guard locker{m_lock};
897 m_leader = true;
898 }
899 on_finish->complete(r);
900 });
901
902 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
903 auto gather_ctx = new C_Gather(cct, ctx);
904
905 m_default_namespace_replayer->handle_acquire_leader(
906 gather_ctx->new_sub());
907
908 for (auto &it : m_namespace_replayers) {
909 namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub());
910 }
911
912 gather_ctx->activate();
913 }, on_finish);
914 }
915
916 template <typename I>
917 void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
918 dout(20) << dendl;
919
920 with_namespace_replayers(
921 [this](Context *on_finish) {
922 dout(10) << "handle_pre_release_leader" << dendl;
923
924 ceph_assert(ceph_mutex_is_locked(m_lock));
925
926 m_leader = false;
927 m_service_daemon->remove_attribute(m_local_pool_id,
928 SERVICE_DAEMON_LEADER_KEY);
929
930 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
931 auto gather_ctx = new C_Gather(cct, on_finish);
932
933 m_default_namespace_replayer->handle_release_leader(
934 gather_ctx->new_sub());
935
936 for (auto &it : m_namespace_replayers) {
937 it.second->handle_release_leader(gather_ctx->new_sub());
938 }
939
940 gather_ctx->activate();
941 }, on_finish);
942 }
943
944 template <typename I>
945 void PoolReplayer<I>::handle_update_leader(
946 const std::string &leader_instance_id) {
947 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
948
949 std::lock_guard locker{m_lock};
950
951 m_default_namespace_replayer->handle_update_leader(leader_instance_id);
952
953 for (auto &it : m_namespace_replayers) {
954 it.second->handle_update_leader(leader_instance_id);
955 }
956 }
957
958 template <typename I>
959 void PoolReplayer<I>::handle_instances_added(
960 const std::vector<std::string> &instance_ids) {
961 dout(5) << "instance_ids=" << instance_ids << dendl;
962
963 std::lock_guard locker{m_lock};
964 if (!m_leader_watcher->is_leader()) {
965 return;
966 }
967
968 m_default_namespace_replayer->handle_instances_added(instance_ids);
969
970 for (auto &it : m_namespace_replayers) {
971 it.second->handle_instances_added(instance_ids);
972 }
973 }
974
975 template <typename I>
976 void PoolReplayer<I>::handle_instances_removed(
977 const std::vector<std::string> &instance_ids) {
978 dout(5) << "instance_ids=" << instance_ids << dendl;
979
980 std::lock_guard locker{m_lock};
981 if (!m_leader_watcher->is_leader()) {
982 return;
983 }
984
985 m_default_namespace_replayer->handle_instances_removed(instance_ids);
986
987 for (auto &it : m_namespace_replayers) {
988 it.second->handle_instances_removed(instance_ids);
989 }
990 }
991
992 } // namespace mirror
993 } // namespace rbd
994
995 template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;
996