1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/admin_socket.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "include/stringify.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "common/Timer.h"
12 #include "common/WorkQueue.h"
13 #include "global/global_context.h"
14 #include "journal/Journaler.h"
15 #include "journal/ReplayHandler.h"
16 #include "journal/Settings.h"
17 #include "librbd/ExclusiveLock.h"
18 #include "librbd/ImageCtx.h"
19 #include "librbd/ImageState.h"
20 #include "librbd/Journal.h"
21 #include "librbd/Operations.h"
22 #include "librbd/Utils.h"
23 #include "librbd/journal/Replay.h"
24 #include "ImageDeleter.h"
25 #include "ImageReplayer.h"
26 #include "Threads.h"
27 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
28 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
29 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
32 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rbd_mirror
36 #undef dout_prefix
37 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
38 << __func__ << ": "
39
40 using std::map;
41 using std::string;
42 using std::unique_ptr;
43 using std::shared_ptr;
44 using std::vector;
45
46 extern PerfCounters *g_perf_counters;
47
48 namespace rbd {
49 namespace mirror {
50
51 using librbd::util::create_context_callback;
52 using librbd::util::create_rados_callback;
53 using namespace rbd::mirror::image_replayer;
54
55 template <typename I>
56 std::ostream &operator<<(std::ostream &os,
57 const typename ImageReplayer<I>::State &state);
58
59 namespace {
60
61 template <typename I>
62 struct ReplayHandler : public ::journal::ReplayHandler {
63 ImageReplayer<I> *replayer;
64 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
65
66 void handle_entries_available() override {
67 replayer->handle_replay_ready();
68 }
69 void handle_complete(int r) override {
70 std::stringstream ss;
71 if (r == -ENOMEM) {
72 ss << "not enough memory in autotune cache";
73 } else if (r < 0) {
74 ss << "replay completed with error: " << cpp_strerror(r);
75 }
76 replayer->handle_replay_complete(r, ss.str());
77 }
78 };
79
80 template <typename I>
81 class ImageReplayerAdminSocketCommand {
82 public:
83 ImageReplayerAdminSocketCommand(const std::string &desc,
84 ImageReplayer<I> *replayer)
85 : desc(desc), replayer(replayer) {
86 }
87 virtual ~ImageReplayerAdminSocketCommand() {}
88 virtual int call(Formatter *f) = 0;
89
90 std::string desc;
91 ImageReplayer<I> *replayer;
92 bool registered = false;
93 };
94
95 template <typename I>
96 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
97 public:
98 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
99 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
100 }
101
102 int call(Formatter *f) override {
103 this->replayer->print_status(f);
104 return 0;
105 }
106 };
107
108 template <typename I>
109 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
110 public:
111 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
112 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
113 }
114
115 int call(Formatter *f) override {
116 this->replayer->start(nullptr, true);
117 return 0;
118 }
119 };
120
121 template <typename I>
122 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
123 public:
124 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
125 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
126 }
127
128 int call(Formatter *f) override {
129 this->replayer->stop(nullptr, true);
130 return 0;
131 }
132 };
133
134 template <typename I>
135 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
136 public:
137 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
138 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
139 }
140
141 int call(Formatter *f) override {
142 this->replayer->restart();
143 return 0;
144 }
145 };
146
147 template <typename I>
148 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
149 public:
150 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
151 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
152 }
153
154 int call(Formatter *f) override {
155 this->replayer->flush();
156 return 0;
157 }
158 };
159
160 template <typename I>
161 class ImageReplayerAdminSocketHook : public AdminSocketHook {
162 public:
163 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
164 ImageReplayer<I> *replayer)
165 : admin_socket(cct->get_admin_socket()),
166 commands{{"rbd mirror flush " + name,
167 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
168 {"rbd mirror restart " + name,
169 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
170 {"rbd mirror start " + name,
171 new StartCommand<I>("start rbd mirror " + name, replayer)},
172 {"rbd mirror status " + name,
173 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
174 {"rbd mirror stop " + name,
175 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
176 }
177
178 int register_commands() {
179 for (auto &it : commands) {
180 int r = admin_socket->register_command(it.first, this,
181 it.second->desc);
182 if (r < 0) {
183 return r;
184 }
185 it.second->registered = true;
186 }
187 return 0;
188 }
189
190 ~ImageReplayerAdminSocketHook() override {
191 admin_socket->unregister_commands(this);
192 for (auto &it : commands) {
193 delete it.second;
194 }
195 commands.clear();
196 }
197
198 int call(std::string_view command, const cmdmap_t& cmdmap,
199 Formatter *f,
200 std::ostream& errss,
201 bufferlist& out) override {
202 auto i = commands.find(command);
203 ceph_assert(i != commands.end());
204 return i->second->call(f);
205 }
206
207 private:
208 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I>*,
209 std::less<>> Commands;
210
211 AdminSocket *admin_socket;
212 Commands commands;
213 };
214
215 uint32_t calculate_replay_delay(const utime_t &event_time,
216 int mirroring_replay_delay) {
217 if (mirroring_replay_delay <= 0) {
218 return 0;
219 }
220
221 utime_t now = ceph_clock_now();
222 if (event_time + mirroring_replay_delay <= now) {
223 return 0;
224 }
225
226 // ensure it is rounded up when converting to integer
227 return (event_time + mirroring_replay_delay - now) + 1;
228 }
229
230 } // anonymous namespace
231
232 template <typename I>
233 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
234 const std::string &description, bool flush)
235 {
236 const std::string desc = "bootstrapping, " + description;
237 replayer->set_state_description(0, desc);
238 if (flush) {
239 replayer->update_mirror_image_status(false, boost::none);
240 }
241 }
242
243 template <typename I>
244 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
245 ::journal::JournalMetadata *) {
246 auto ctx = new LambdaContext([this](int r) {
247 replayer->handle_remote_journal_metadata_updated();
248 });
249 replayer->m_threads->work_queue->queue(ctx, 0);
250 }
251
252 template <typename I>
253 ImageReplayer<I>::ImageReplayer(
254 librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
255 const std::string &global_image_id, Threads<I> *threads,
256 InstanceWatcher<I> *instance_watcher,
257 journal::CacheManagerHandler *cache_manager_handler) :
258 m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
259 m_global_image_id(global_image_id), m_threads(threads),
260 m_instance_watcher(instance_watcher),
261 m_cache_manager_handler(cache_manager_handler),
262 m_local_image_name(global_image_id),
263 m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " +
264 stringify(local_io_ctx.get_id()) + " " + global_image_id)),
265 m_progress_cxt(this),
266 m_journal_listener(new JournalListener(this)),
267 m_remote_listener(this)
268 {
269 // Register asok commands using a temporary "remote_pool_name/global_image_id"
270 // name. When the image name becomes known on start the asok commands will be
271 // re-registered using "remote_pool_name/remote_image_name" name.
272
273 m_name = admin_socket_hook_name(global_image_id);
274 register_admin_socket_hook();
275 }
276
277 template <typename I>
278 ImageReplayer<I>::~ImageReplayer()
279 {
280 unregister_admin_socket_hook();
281 ceph_assert(m_event_preprocessor == nullptr);
282 ceph_assert(m_replay_status_formatter == nullptr);
283 ceph_assert(m_local_image_ctx == nullptr);
284 ceph_assert(m_local_replay == nullptr);
285 ceph_assert(m_remote_journaler == nullptr);
286 ceph_assert(m_replay_handler == nullptr);
287 ceph_assert(m_on_start_finish == nullptr);
288 ceph_assert(m_on_stop_finish == nullptr);
289 ceph_assert(m_bootstrap_request == nullptr);
290 ceph_assert(m_in_flight_status_updates == 0);
291
292 delete m_journal_listener;
293 }
294
295 template <typename I>
296 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
297 std::lock_guard locker{m_lock};
298
299 if (!m_mirror_image_status_state) {
300 return image_replayer::HEALTH_STATE_OK;
301 } else if (*m_mirror_image_status_state ==
302 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
303 *m_mirror_image_status_state ==
304 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
305 return image_replayer::HEALTH_STATE_WARNING;
306 }
307 return image_replayer::HEALTH_STATE_ERROR;
308 }
309
310 template <typename I>
311 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
312 librados::IoCtx &io_ctx) {
313 std::lock_guard locker{m_lock};
314 auto it = m_peers.find({peer_uuid});
315 if (it == m_peers.end()) {
316 m_peers.insert({peer_uuid, io_ctx});
317 }
318 }
319
320 template <typename I>
321 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
322 dout(10) << r << " " << desc << dendl;
323
324 std::lock_guard l{m_lock};
325 m_last_r = r;
326 m_state_desc = desc;
327 }
328
329 template <typename I>
330 void ImageReplayer<I>::start(Context *on_finish, bool manual)
331 {
332 dout(10) << "on_finish=" << on_finish << dendl;
333
334 int r = 0;
335 {
336 std::lock_guard locker{m_lock};
337 if (!is_stopped_()) {
338 derr << "already running" << dendl;
339 r = -EINVAL;
340 } else if (m_manual_stop && !manual) {
341 dout(5) << "stopped manually, ignoring start without manual flag"
342 << dendl;
343 r = -EPERM;
344 } else {
345 m_state = STATE_STARTING;
346 m_last_r = 0;
347 m_state_desc.clear();
348 m_manual_stop = false;
349 m_delete_requested = false;
350
351 if (on_finish != nullptr) {
352 ceph_assert(m_on_start_finish == nullptr);
353 m_on_start_finish = on_finish;
354 }
355 ceph_assert(m_on_stop_finish == nullptr);
356 }
357 }
358
359 if (r < 0) {
360 if (on_finish) {
361 on_finish->complete(r);
362 }
363 return;
364 }
365
366 prepare_local_image();
367 }
368
369 template <typename I>
370 void ImageReplayer<I>::prepare_local_image() {
371 dout(10) << dendl;
372
373 m_local_image_id = "";
374 Context *ctx = create_context_callback<
375 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
376 auto req = PrepareLocalImageRequest<I>::create(
377 m_local_io_ctx, m_global_image_id, &m_local_image_id, &m_local_image_name,
378 &m_local_image_tag_owner, m_threads->work_queue, ctx);
379 req->send();
380 }
381
382 template <typename I>
383 void ImageReplayer<I>::handle_prepare_local_image(int r) {
384 dout(10) << "r=" << r << dendl;
385
386 if (r == -ENOENT) {
387 dout(10) << "local image does not exist" << dendl;
388 } else if (r < 0) {
389 on_start_fail(r, "error preparing local image for replay");
390 return;
391 } else {
392 reregister_admin_socket_hook();
393 }
394
395 // local image doesn't exist or is non-primary
396 prepare_remote_image();
397 }
398
399 template <typename I>
400 void ImageReplayer<I>::prepare_remote_image() {
401 dout(10) << dendl;
402 if (m_peers.empty()) {
403 // technically nothing to bootstrap, but it handles the status update
404 bootstrap();
405 return;
406 }
407
408 // TODO need to support multiple remote images
409 ceph_assert(!m_peers.empty());
410 m_remote_image = {*m_peers.begin()};
411
412 auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
413 journal::Settings journal_settings;
414 journal_settings.commit_interval = cct->_conf.get_val<double>(
415 "rbd_mirror_journal_commit_age");
416
417 Context *ctx = create_context_callback<
418 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
419 auto req = PrepareRemoteImageRequest<I>::create(
420 m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
421 m_local_image_id, journal_settings, m_cache_manager_handler,
422 &m_remote_image.mirror_uuid, &m_remote_image.image_id, &m_remote_journaler,
423 &m_client_state, &m_client_meta, ctx);
424 req->send();
425 }
426
427 template <typename I>
428 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
429 dout(10) << "r=" << r << dendl;
430
431 ceph_assert(r < 0 ? m_remote_journaler == nullptr : m_remote_journaler != nullptr);
432 if (r < 0 && !m_local_image_id.empty() &&
433 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
434 // local image is primary -- fall-through
435 } else if (r == -ENOENT) {
436 dout(10) << "remote image does not exist" << dendl;
437
438 // TODO need to support multiple remote images
439 if (m_remote_image.image_id.empty() && !m_local_image_id.empty() &&
440 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
441 // local image exists and is non-primary and linked to the missing
442 // remote image
443
444 m_delete_requested = true;
445 on_start_fail(0, "remote image no longer exists");
446 } else {
447 on_start_fail(-ENOENT, "remote image does not exist");
448 }
449 return;
450 } else if (r < 0) {
451 on_start_fail(r, "error retrieving remote image id");
452 return;
453 }
454
455 bootstrap();
456 }
457
458 template <typename I>
459 void ImageReplayer<I>::bootstrap() {
460 dout(10) << dendl;
461
462 if (!m_local_image_id.empty() &&
463 m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
464 dout(5) << "local image is primary" << dendl;
465 on_start_fail(0, "local image is primary");
466 return;
467 } else if (m_peers.empty()) {
468 dout(5) << "no peer clusters" << dendl;
469 on_start_fail(-ENOENT, "no peer clusters");
470 return;
471 }
472
473 BootstrapRequest<I> *request = nullptr;
474 {
475 std::lock_guard locker{m_lock};
476 if (on_start_interrupted(m_lock)) {
477 return;
478 }
479
480 auto ctx = create_context_callback<
481 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
482 request = BootstrapRequest<I>::create(
483 m_threads, m_local_io_ctx, m_remote_image.io_ctx, m_instance_watcher,
484 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
485 m_global_image_id, m_local_mirror_uuid, m_remote_image.mirror_uuid,
486 m_remote_journaler, &m_client_state, &m_client_meta, ctx,
487 &m_resync_requested, &m_progress_cxt);
488 request->get();
489 m_bootstrap_request = request;
490 }
491
492 update_mirror_image_status(false, boost::none);
493 reschedule_update_status_task(10);
494
495 request->send();
496 }
497
498 template <typename I>
499 void ImageReplayer<I>::handle_bootstrap(int r) {
500 dout(10) << "r=" << r << dendl;
501 {
502 std::lock_guard locker{m_lock};
503 m_bootstrap_request->put();
504 m_bootstrap_request = nullptr;
505 if (m_local_image_ctx) {
506 m_local_image_id = m_local_image_ctx->id;
507 }
508 }
509
510 if (on_start_interrupted()) {
511 return;
512 } else if (r == -EREMOTEIO) {
513 m_local_image_tag_owner = "";
514 dout(5) << "remote image is non-primary" << dendl;
515 on_start_fail(-EREMOTEIO, "remote image is non-primary");
516 return;
517 } else if (r == -EEXIST) {
518 m_local_image_tag_owner = "";
519 on_start_fail(r, "split-brain detected");
520 return;
521 } else if (r < 0) {
522 on_start_fail(r, "error bootstrapping replay");
523 return;
524 } else if (m_resync_requested) {
525 on_start_fail(0, "resync requested");
526 return;
527 }
528
529 ceph_assert(m_local_journal == nullptr);
530 {
531 std::shared_lock image_locker{m_local_image_ctx->image_lock};
532 if (m_local_image_ctx->journal != nullptr) {
533 m_local_journal = m_local_image_ctx->journal;
534 m_local_journal->add_listener(m_journal_listener);
535 }
536 }
537
538 if (m_local_journal == nullptr) {
539 on_start_fail(-EINVAL, "error accessing local journal");
540 return;
541 }
542
543 update_mirror_image_status(false, boost::none);
544 init_remote_journaler();
545 }
546
547 template <typename I>
548 void ImageReplayer<I>::init_remote_journaler() {
549 dout(10) << dendl;
550
551 Context *ctx = create_context_callback<
552 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
553 m_remote_journaler->init(ctx);
554 }
555
556 template <typename I>
557 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
558 dout(10) << "r=" << r << dendl;
559
560 if (on_start_interrupted()) {
561 return;
562 } else if (r < 0) {
563 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
564 on_start_fail(r, "error initializing remote journal");
565 return;
566 }
567
568 m_remote_journaler->add_listener(&m_remote_listener);
569
570 cls::journal::Client client;
571 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
572 if (r < 0) {
573 derr << "error retrieving remote journal client: " << cpp_strerror(r)
574 << dendl;
575 on_start_fail(r, "error retrieving remote journal client");
576 return;
577 }
578
579 dout(5) << "image_id=" << m_local_image_id << ", "
580 << "client_meta.image_id=" << m_client_meta.image_id << ", "
581 << "client.state=" << client.state << dendl;
582 if (m_client_meta.image_id == m_local_image_id &&
583 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
584 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
585 if (m_local_image_ctx->config.template get_val<bool>("rbd_mirroring_resync_after_disconnect")) {
586 m_resync_requested = true;
587 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
588 } else {
589 on_start_fail(-ENOTCONN, "disconnected");
590 }
591 return;
592 }
593
594 start_replay();
595 }
596
597 template <typename I>
598 void ImageReplayer<I>::start_replay() {
599 dout(10) << dendl;
600
601 Context *start_ctx = create_context_callback<
602 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
603 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
604 }
605
606 template <typename I>
607 void ImageReplayer<I>::handle_start_replay(int r) {
608 dout(10) << "r=" << r << dendl;
609
610 if (r < 0) {
611 ceph_assert(m_local_replay == nullptr);
612 derr << "error starting external replay on local image "
613 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
614 on_start_fail(r, "error starting replay on local image");
615 return;
616 }
617
618 m_replay_status_formatter =
619 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
620
621 Context *on_finish(nullptr);
622 {
623 std::lock_guard locker{m_lock};
624 ceph_assert(m_state == STATE_STARTING);
625 m_state = STATE_REPLAYING;
626 std::swap(m_on_start_finish, on_finish);
627 }
628
629 m_event_preprocessor = EventPreprocessor<I>::create(
630 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
631 &m_client_meta, m_threads->work_queue);
632
633 update_mirror_image_status(true, boost::none);
634 reschedule_update_status_task(30);
635
636 if (on_replay_interrupted()) {
637 if (on_finish != nullptr) {
638 on_finish->complete(r);
639 }
640 return;
641 }
642
643 {
644 CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
645 double poll_seconds = cct->_conf.get_val<double>(
646 "rbd_mirror_journal_poll_age");
647
648 std::lock_guard locker{m_lock};
649 m_replay_handler = new ReplayHandler<I>(this);
650 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
651
652 dout(10) << "m_remote_journaler=" << *m_remote_journaler << dendl;
653 }
654
655 dout(10) << "start succeeded" << dendl;
656 if (on_finish != nullptr) {
657 dout(10) << "on finish complete, r=" << r << dendl;
658 on_finish->complete(r);
659 }
660 }
661
662 template <typename I>
663 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
664 {
665 dout(10) << "r=" << r << dendl;
666 Context *ctx = new LambdaContext([this, r, desc](int _r) {
667 {
668 std::lock_guard locker{m_lock};
669 ceph_assert(m_state == STATE_STARTING);
670 m_state = STATE_STOPPING;
671 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
672 derr << "start failed: " << cpp_strerror(r) << dendl;
673 } else {
674 dout(10) << "start canceled" << dendl;
675 }
676 }
677
678 set_state_description(r, desc);
679 update_mirror_image_status(false, boost::none);
680 reschedule_update_status_task(-1);
681 shut_down(r);
682 });
683 m_threads->work_queue->queue(ctx, 0);
684 }
685
686 template <typename I>
687 bool ImageReplayer<I>::on_start_interrupted() {
688 std::lock_guard locker{m_lock};
689 return on_start_interrupted(m_lock);
690 }
691
692 template <typename I>
693 bool ImageReplayer<I>::on_start_interrupted(ceph::mutex& lock) {
694 ceph_assert(ceph_mutex_is_locked(m_lock));
695 ceph_assert(m_state == STATE_STARTING);
696 if (!m_stop_requested) {
697 return false;
698 }
699
700 on_start_fail(-ECANCELED, "");
701 return true;
702 }
703
704 template <typename I>
705 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
706 const std::string& desc)
707 {
708 dout(10) << "on_finish=" << on_finish << ", manual=" << manual
709 << ", desc=" << desc << dendl;
710
711 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
712 bool shut_down_replay = false;
713 bool running = true;
714 {
715 std::lock_guard locker{m_lock};
716
717 if (!is_running_()) {
718 running = false;
719 } else {
720 if (!is_stopped_()) {
721 if (m_state == STATE_STARTING) {
722 dout(10) << "canceling start" << dendl;
723 if (m_bootstrap_request != nullptr) {
724 bootstrap_request = m_bootstrap_request;
725 bootstrap_request->get();
726 }
727 } else {
728 dout(10) << "interrupting replay" << dendl;
729 shut_down_replay = true;
730 }
731
732 ceph_assert(m_on_stop_finish == nullptr);
733 std::swap(m_on_stop_finish, on_finish);
734 m_stop_requested = true;
735 m_manual_stop = manual;
736 }
737 }
738 }
739
740 // avoid holding lock since bootstrap request will update status
741 if (bootstrap_request != nullptr) {
742 dout(10) << "canceling bootstrap" << dendl;
743 bootstrap_request->cancel();
744 bootstrap_request->put();
745 }
746
747 if (!running) {
748 dout(20) << "not running" << dendl;
749 if (on_finish) {
750 on_finish->complete(-EINVAL);
751 }
752 return;
753 }
754
755 if (shut_down_replay) {
756 on_stop_journal_replay(r, desc);
757 } else if (on_finish != nullptr) {
758 on_finish->complete(0);
759 }
760 }
761
762 template <typename I>
763 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
764 {
765 dout(10) << dendl;
766
767 {
768 std::lock_guard locker{m_lock};
769 if (m_state != STATE_REPLAYING) {
770 // might be invoked multiple times while stopping
771 return;
772 }
773 m_stop_requested = true;
774 m_state = STATE_STOPPING;
775 }
776
777 set_state_description(r, desc);
778 update_mirror_image_status(true, boost::none);
779 reschedule_update_status_task(-1);
780 shut_down(0);
781 }
782
783 template <typename I>
784 void ImageReplayer<I>::handle_replay_ready()
785 {
786 dout(20) << dendl;
787 if (on_replay_interrupted()) {
788 return;
789 }
790
791 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
792 return;
793 }
794
795 m_event_replay_tracker.start_op();
796
797 m_lock.lock();
798 bool stopping = (m_state == STATE_STOPPING);
799 m_lock.unlock();
800
801 if (stopping) {
802 dout(10) << "stopping event replay" << dendl;
803 m_event_replay_tracker.finish_op();
804 return;
805 }
806
807 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
808 preprocess_entry();
809 return;
810 }
811
812 replay_flush();
813 }
814
815 template <typename I>
816 void ImageReplayer<I>::restart(Context *on_finish)
817 {
818 auto ctx = new LambdaContext(
819 [this, on_finish](int r) {
820 if (r < 0) {
821 // Try start anyway.
822 }
823 start(on_finish, true);
824 });
825 stop(ctx);
826 }
827
828 template <typename I>
829 void ImageReplayer<I>::flush()
830 {
831 dout(10) << dendl;
832 C_SaferCond ctx;
833 flush_local_replay(&ctx);
834 ctx.wait();
835
836 update_mirror_image_status(false, boost::none);
837 }
838
839 template <typename I>
840 void ImageReplayer<I>::flush_local_replay(Context* on_flush)
841 {
842 m_lock.lock();
843 if (m_state != STATE_REPLAYING) {
844 m_lock.unlock();
845 on_flush->complete(0);
846 return;
847 }
848
849 dout(15) << dendl;
850 auto ctx = new LambdaContext(
851 [this, on_flush](int r) {
852 handle_flush_local_replay(on_flush, r);
853 });
854 m_local_replay->flush(ctx);
855 m_lock.unlock();
856 }
857
858 template <typename I>
859 void ImageReplayer<I>::handle_flush_local_replay(Context* on_flush, int r)
860 {
861 dout(15) << "r=" << r << dendl;
862 if (r < 0) {
863 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
864 on_flush->complete(r);
865 return;
866 }
867
868 flush_commit_position(on_flush);
869 }
870
871 template <typename I>
872 void ImageReplayer<I>::flush_commit_position(Context* on_flush)
873 {
874 m_lock.lock();
875 if (m_state != STATE_REPLAYING) {
876 m_lock.unlock();
877 on_flush->complete(0);
878 return;
879 }
880
881 dout(15) << dendl;
882 auto ctx = new LambdaContext(
883 [this, on_flush](int r) {
884 handle_flush_commit_position(on_flush, r);
885 });
886 m_remote_journaler->flush_commit_position(ctx);
887 m_lock.unlock();
888 }
889
890 template <typename I>
891 void ImageReplayer<I>::handle_flush_commit_position(Context* on_flush, int r)
892 {
893 dout(15) << "r=" << r << dendl;
894 if (r < 0) {
895 derr << "error flushing remote journal commit position: "
896 << cpp_strerror(r) << dendl;
897 }
898
899 on_flush->complete(r);
900 }
901
902 template <typename I>
903 bool ImageReplayer<I>::on_replay_interrupted()
904 {
905 bool shut_down;
906 {
907 std::lock_guard locker{m_lock};
908 shut_down = m_stop_requested;
909 }
910
911 if (shut_down) {
912 on_stop_journal_replay();
913 }
914 return shut_down;
915 }
916
917 template <typename I>
918 void ImageReplayer<I>::print_status(Formatter *f)
919 {
920 dout(10) << dendl;
921
922 std::lock_guard l{m_lock};
923
924 f->open_object_section("image_replayer");
925 f->dump_string("name", m_name);
926 f->dump_string("state", to_string(m_state));
927 f->close_section();
928 }
929
930 template <typename I>
931 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
932 {
933 dout(10) << "r=" << r << dendl;
934 if (r < 0) {
935 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
936 }
937
938 {
939 std::lock_guard locker{m_lock};
940 m_stop_requested = true;
941 }
942 on_stop_journal_replay(r, error_desc);
943 }
944
945 template <typename I>
946 void ImageReplayer<I>::replay_flush() {
947 dout(10) << dendl;
948
949 bool interrupted = false;
950 {
951 std::lock_guard locker{m_lock};
952 if (m_state != STATE_REPLAYING) {
953 dout(10) << "replay interrupted" << dendl;
954 interrupted = true;
955 } else {
956 m_state = STATE_REPLAY_FLUSHING;
957 }
958 }
959
960 if (interrupted) {
961 m_event_replay_tracker.finish_op();
962 return;
963 }
964
965 // shut down the replay to flush all IO and ops and create a new
966 // replayer to handle the new tag epoch
967 Context *ctx = create_context_callback<
968 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
969 ctx = new LambdaContext([this, ctx](int r) {
970 m_local_image_ctx->journal->stop_external_replay();
971 m_local_replay = nullptr;
972
973 if (r < 0) {
974 ctx->complete(r);
975 return;
976 }
977
978 m_local_journal->start_external_replay(&m_local_replay, ctx);
979 });
980 m_local_replay->shut_down(false, ctx);
981 }
982
983 template <typename I>
984 void ImageReplayer<I>::handle_replay_flush(int r) {
985 dout(10) << "r=" << r << dendl;
986
987 {
988 std::lock_guard locker{m_lock};
989 ceph_assert(m_state == STATE_REPLAY_FLUSHING);
990 m_state = STATE_REPLAYING;
991 }
992
993 if (r < 0) {
994 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
995 m_event_replay_tracker.finish_op();
996 handle_replay_complete(r, "replay flush encountered an error");
997 return;
998 } else if (on_replay_interrupted()) {
999 m_event_replay_tracker.finish_op();
1000 return;
1001 }
1002
1003 get_remote_tag();
1004 }
1005
1006 template <typename I>
1007 void ImageReplayer<I>::get_remote_tag() {
1008 dout(15) << "tag_tid: " << m_replay_tag_tid << dendl;
1009
1010 Context *ctx = create_context_callback<
1011 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1012 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1013 }
1014
1015 template <typename I>
1016 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1017 dout(15) << "r=" << r << dendl;
1018
1019 if (r == 0) {
1020 try {
1021 auto it = m_replay_tag.data.cbegin();
1022 decode(m_replay_tag_data, it);
1023 } catch (const buffer::error &err) {
1024 r = -EBADMSG;
1025 }
1026 }
1027
1028 if (r < 0) {
1029 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1030 << cpp_strerror(r) << dendl;
1031 m_event_replay_tracker.finish_op();
1032 handle_replay_complete(r, "failed to retrieve remote tag");
1033 return;
1034 }
1035
1036 m_replay_tag_valid = true;
1037 dout(15) << "decoded remote tag " << m_replay_tag_tid << ": "
1038 << m_replay_tag_data << dendl;
1039
1040 allocate_local_tag();
1041 }
1042
1043 template <typename I>
1044 void ImageReplayer<I>::allocate_local_tag() {
1045 dout(15) << dendl;
1046
1047 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1048 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1049 mirror_uuid = m_remote_image.mirror_uuid;
1050 } else if (mirror_uuid == m_local_mirror_uuid) {
1051 mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1052 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1053 // handle possible edge condition where daemon can failover and
1054 // the local image has already been promoted/demoted
1055 auto local_tag_data = m_local_journal->get_tag_data();
1056 if (local_tag_data.mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID &&
1057 (local_tag_data.predecessor.commit_valid &&
1058 local_tag_data.predecessor.mirror_uuid ==
1059 librbd::Journal<>::LOCAL_MIRROR_UUID)) {
1060 dout(15) << "skipping stale demotion event" << dendl;
1061 handle_process_entry_safe(m_replay_entry, m_replay_start_time, 0);
1062 handle_replay_ready();
1063 return;
1064 } else {
1065 dout(5) << "encountered image demotion: stopping" << dendl;
1066 std::lock_guard locker{m_lock};
1067 m_stop_requested = true;
1068 }
1069 }
1070
1071 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1072 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1073 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1074 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1075 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1076 }
1077
1078 dout(15) << "mirror_uuid=" << mirror_uuid << ", "
1079 << "predecessor=" << predecessor << ", "
1080 << "replay_tag_tid=" << m_replay_tag_tid << dendl;
1081 Context *ctx = create_context_callback<
1082 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1083 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1084 }
1085
1086 template <typename I>
1087 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1088 dout(15) << "r=" << r << ", "
1089 << "tag_tid=" << m_local_journal->get_tag_tid() << dendl;
1090
1091 if (r < 0) {
1092 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1093 m_event_replay_tracker.finish_op();
1094 handle_replay_complete(r, "failed to allocate journal tag");
1095 return;
1096 }
1097
1098 preprocess_entry();
1099 }
1100
1101 template <typename I>
1102 void ImageReplayer<I>::preprocess_entry() {
1103 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1104 << dendl;
1105
1106 bufferlist data = m_replay_entry.get_data();
1107 auto it = data.cbegin();
1108 int r = m_local_replay->decode(&it, &m_event_entry);
1109 if (r < 0) {
1110 derr << "failed to decode journal event" << dendl;
1111 m_event_replay_tracker.finish_op();
1112 handle_replay_complete(r, "failed to decode journal event");
1113 return;
1114 }
1115
1116 uint32_t delay = calculate_replay_delay(
1117 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1118 if (delay == 0) {
1119 handle_preprocess_entry_ready(0);
1120 return;
1121 }
1122
1123 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1124
1125 std::lock_guard timer_locker{m_threads->timer_lock};
1126 ceph_assert(m_delayed_preprocess_task == nullptr);
1127 m_delayed_preprocess_task = new LambdaContext(
1128 [this](int r) {
1129 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
1130 m_delayed_preprocess_task = nullptr;
1131 m_threads->work_queue->queue(
1132 create_context_callback<ImageReplayer,
1133 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1134 });
1135 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1136 }
1137
1138 template <typename I>
1139 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1140 dout(20) << "r=" << r << dendl;
1141 ceph_assert(r == 0);
1142
1143 m_replay_start_time = ceph_clock_now();
1144 if (!m_event_preprocessor->is_required(m_event_entry)) {
1145 process_entry();
1146 return;
1147 }
1148
1149 Context *ctx = create_context_callback<
1150 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1151 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1152 }
1153
1154 template <typename I>
1155 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1156 dout(20) << "r=" << r << dendl;
1157
1158 if (r < 0) {
1159 m_event_replay_tracker.finish_op();
1160
1161 if (r == -ECANCELED) {
1162 handle_replay_complete(0, "lost exclusive lock");
1163 } else {
1164 derr << "failed to preprocess journal event" << dendl;
1165 handle_replay_complete(r, "failed to preprocess journal event");
1166 }
1167 return;
1168 }
1169
1170 process_entry();
1171 }
1172
1173 template <typename I>
1174 void ImageReplayer<I>::process_entry() {
1175 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1176 << dendl;
1177
1178 // stop replaying events if stop has been requested
1179 if (on_replay_interrupted()) {
1180 m_event_replay_tracker.finish_op();
1181 return;
1182 }
1183
1184 Context *on_ready = create_context_callback<
1185 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1186 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry),
1187 m_replay_start_time);
1188
1189 m_local_replay->process(m_event_entry, on_ready, on_commit);
1190 }
1191
1192 template <typename I>
1193 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1194 dout(20) << dendl;
1195 ceph_assert(r == 0);
1196
1197 bool update_status = false;
1198 {
1199 std::shared_lock image_locker{m_local_image_ctx->image_lock};
1200 if (m_local_image_name != m_local_image_ctx->name) {
1201 m_local_image_name = m_local_image_ctx->name;
1202 update_status = true;
1203 }
1204 }
1205
1206 if (update_status) {
1207 reschedule_update_status_task(0);
1208 }
1209
1210 // attempt to process the next event
1211 handle_replay_ready();
1212 }
1213
1214 template <typename I>
1215 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry &replay_entry,
1216 const utime_t &replay_start_time,
1217 int r) {
1218 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1219 << dendl;
1220
1221 if (r < 0) {
1222 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1223 handle_replay_complete(r, "failed to commit journal event");
1224 } else {
1225 ceph_assert(m_remote_journaler != nullptr);
1226 m_remote_journaler->committed(replay_entry);
1227 }
1228
1229 auto bytes = replay_entry.get_data().length();
1230 auto latency = ceph_clock_now() - replay_start_time;
1231
1232 if (g_perf_counters) {
1233 g_perf_counters->inc(l_rbd_mirror_replay);
1234 g_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1235 g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1236 }
1237
1238 auto ctx = new LambdaContext(
1239 [this, bytes, latency](int r) {
1240 std::lock_guard locker{m_lock};
1241 if (m_perf_counters) {
1242 m_perf_counters->inc(l_rbd_mirror_replay);
1243 m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
1244 m_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
1245 }
1246 m_event_replay_tracker.finish_op();
1247 });
1248 m_threads->work_queue->queue(ctx, 0);
1249 }
1250
1251 template <typename I>
1252 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1253 const OptionalState &state) {
1254 dout(15) << dendl;
1255 {
1256 std::lock_guard locker{m_lock};
1257 if (!start_mirror_image_status_update(force, false)) {
1258 return false;
1259 }
1260 }
1261
1262 queue_mirror_image_status_update(state);
1263 return true;
1264 }
1265
1266 template <typename I>
1267 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1268 bool restarting) {
1269 ceph_assert(ceph_mutex_is_locked(m_lock));
1270
1271 if (!force && !is_stopped_()) {
1272 if (!is_running_()) {
1273 dout(15) << "shut down in-progress: ignoring update" << dendl;
1274 return false;
1275 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1276 dout(15) << "already sending update" << dendl;
1277 m_update_status_requested = true;
1278 return false;
1279 }
1280 }
1281
1282 ++m_in_flight_status_updates;
1283 dout(15) << "in-flight updates=" << m_in_flight_status_updates << dendl;
1284 return true;
1285 }
1286
1287 template <typename I>
1288 void ImageReplayer<I>::finish_mirror_image_status_update() {
1289 reregister_admin_socket_hook();
1290
1291 Context *on_finish = nullptr;
1292 {
1293 std::lock_guard locker{m_lock};
1294 ceph_assert(m_in_flight_status_updates > 0);
1295 if (--m_in_flight_status_updates > 0) {
1296 dout(15) << "waiting on " << m_in_flight_status_updates << " in-flight "
1297 << "updates" << dendl;
1298 return;
1299 }
1300
1301 std::swap(on_finish, m_on_update_status_finish);
1302 }
1303
1304 dout(15) << dendl;
1305 if (on_finish != nullptr) {
1306 on_finish->complete(0);
1307 }
1308 }
1309
1310 template <typename I>
1311 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1312 dout(15) << dendl;
1313
1314 auto ctx = new LambdaContext(
1315 [this, state](int r) {
1316 send_mirror_status_update(state);
1317 });
1318
1319 // ensure pending IO is flushed and the commit position is updated
1320 // prior to updating the mirror status
1321 auto ctx2 = new LambdaContext(
1322 [this, ctx=std::move(ctx)](int r) {
1323 flush_local_replay(ctx);
1324 });
1325 m_threads->work_queue->queue(ctx2, 0);
1326 }
1327
1328 template <typename I>
1329 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1330 State state;
1331 std::string state_desc;
1332 int last_r;
1333 bool stopping_replay;
1334
1335 OptionalMirrorImageStatusState mirror_image_status_state =
1336 boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1337 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1338 {
1339 std::lock_guard locker{m_lock};
1340 state = m_state;
1341 state_desc = m_state_desc;
1342 mirror_image_status_state = m_mirror_image_status_state;
1343 last_r = m_last_r;
1344 stopping_replay = (m_local_image_ctx != nullptr);
1345
1346 if (m_bootstrap_request != nullptr) {
1347 bootstrap_request = m_bootstrap_request;
1348 bootstrap_request->get();
1349 }
1350 }
1351
1352 bool syncing = false;
1353 if (bootstrap_request != nullptr) {
1354 syncing = bootstrap_request->is_syncing();
1355 bootstrap_request->put();
1356 bootstrap_request = nullptr;
1357 }
1358
1359 if (opt_state) {
1360 state = *opt_state;
1361 }
1362
1363 cls::rbd::MirrorImageStatus status;
1364 status.up = true;
1365 switch (state) {
1366 case STATE_STARTING:
1367 if (syncing) {
1368 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1369 status.description = state_desc.empty() ? "syncing" : state_desc;
1370 mirror_image_status_state = status.state;
1371 } else {
1372 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1373 status.description = "starting replay";
1374 }
1375 break;
1376 case STATE_REPLAYING:
1377 case STATE_REPLAY_FLUSHING:
1378 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1379 {
1380 Context *on_req_finish = new LambdaContext(
1381 [this](int r) {
1382 dout(15) << "replay status ready: r=" << r << dendl;
1383 if (r >= 0) {
1384 send_mirror_status_update(boost::none);
1385 } else if (r == -EAGAIN) {
1386 // decrement in-flight status update counter
1387 handle_mirror_status_update(r);
1388 }
1389 });
1390
1391 std::string desc;
1392 ceph_assert(m_replay_status_formatter != nullptr);
1393 if (!m_replay_status_formatter->get_or_send_update(&desc,
1394 on_req_finish)) {
1395 dout(15) << "waiting for replay status" << dendl;
1396 return;
1397 }
1398 status.description = "replaying, " + desc;
1399 mirror_image_status_state = boost::make_optional(
1400 false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
1401 }
1402 break;
1403 case STATE_STOPPING:
1404 if (stopping_replay) {
1405 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1406 status.description = state_desc.empty() ? "stopping replay" : state_desc;
1407 break;
1408 }
1409 // FALLTHROUGH
1410 case STATE_STOPPED:
1411 if (last_r == -EREMOTEIO) {
1412 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1413 status.description = state_desc;
1414 mirror_image_status_state = status.state;
1415 } else if (last_r < 0) {
1416 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1417 status.description = state_desc;
1418 mirror_image_status_state = status.state;
1419 } else {
1420 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1421 status.description = state_desc.empty() ? "stopped" : state_desc;
1422 mirror_image_status_state = boost::none;
1423 }
1424 break;
1425 default:
1426 ceph_assert(!"invalid state");
1427 }
1428
1429 {
1430 std::lock_guard locker{m_lock};
1431 m_mirror_image_status_state = mirror_image_status_state;
1432 }
1433
1434 // prevent the status from ping-ponging when failed replays are restarted
1435 if (mirror_image_status_state &&
1436 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1437 status.state = *mirror_image_status_state;
1438 }
1439
1440 dout(15) << "status=" << status << dendl;
1441 librados::ObjectWriteOperation op;
1442 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1443
1444 librados::AioCompletion *aio_comp = create_rados_callback<
1445 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1446 int r = m_local_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1447 ceph_assert(r == 0);
1448 aio_comp->release();
1449 }
1450
1451 template <typename I>
1452 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1453 dout(15) << "r=" << r << dendl;
1454
1455 bool running = false;
1456 bool started = false;
1457 {
1458 std::lock_guard locker{m_lock};
1459 bool update_status_requested = false;
1460 std::swap(update_status_requested, m_update_status_requested);
1461
1462 running = is_running_();
1463 if (running && update_status_requested) {
1464 started = start_mirror_image_status_update(false, true);
1465 }
1466 }
1467
1468 // if a deferred update is available, send it -- otherwise reschedule
1469 // the timer task
1470 if (started) {
1471 queue_mirror_image_status_update(boost::none);
1472 } else if (running) {
1473 reschedule_update_status_task(0);
1474 }
1475
1476 // mark committed status update as no longer in-flight
1477 finish_mirror_image_status_update();
1478 }
1479
1480 template <typename I>
1481 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1482 bool canceled_task = false;
1483 {
1484 std::lock_guard locker{m_lock};
1485 std::lock_guard timer_locker{m_threads->timer_lock};
1486
1487 if (m_update_status_task) {
1488 dout(15) << "canceling existing status update task" << dendl;
1489
1490 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1491 m_update_status_task = nullptr;
1492 }
1493
1494 if (new_interval > 0) {
1495 m_update_status_interval = new_interval;
1496 }
1497
1498 if (new_interval >= 0 && is_running_() &&
1499 start_mirror_image_status_update(true, false)) {
1500 m_update_status_task = new LambdaContext(
1501 [this](int r) {
1502 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
1503 m_update_status_task = nullptr;
1504
1505 queue_mirror_image_status_update(boost::none);
1506 });
1507 dout(15) << "scheduling status update task after "
1508 << m_update_status_interval << " seconds" << dendl;
1509 m_threads->timer->add_event_after(m_update_status_interval,
1510 m_update_status_task);
1511 }
1512 }
1513
1514 if (canceled_task) {
1515 // decrement in-flight status update counter for canceled task
1516 finish_mirror_image_status_update();
1517 }
1518 }
1519
1520 template <typename I>
1521 void ImageReplayer<I>::shut_down(int r) {
1522 dout(10) << "r=" << r << dendl;
1523
1524 bool canceled_delayed_preprocess_task = false;
1525 {
1526 std::lock_guard timer_locker{m_threads->timer_lock};
1527 if (m_delayed_preprocess_task != nullptr) {
1528 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1529 m_delayed_preprocess_task);
1530 ceph_assert(canceled_delayed_preprocess_task);
1531 m_delayed_preprocess_task = nullptr;
1532 }
1533 }
1534 if (canceled_delayed_preprocess_task) {
1535 // wake up sleeping replay
1536 m_event_replay_tracker.finish_op();
1537 }
1538
1539 reschedule_update_status_task(-1);
1540
1541 {
1542 std::lock_guard locker{m_lock};
1543 ceph_assert(m_state == STATE_STOPPING);
1544
1545 // if status updates are in-flight, wait for them to complete
1546 // before proceeding
1547 if (m_in_flight_status_updates > 0) {
1548 if (m_on_update_status_finish == nullptr) {
1549 dout(15) << "waiting for in-flight status update" << dendl;
1550 m_on_update_status_finish = new LambdaContext(
1551 [this, r](int _r) {
1552 shut_down(r);
1553 });
1554 }
1555 return;
1556 }
1557 }
1558
1559 // NOTE: it's important to ensure that the local image is fully
1560 // closed before attempting to close the remote journal in
1561 // case the remote cluster is unreachable
1562
1563 // chain the shut down sequence (reverse order)
1564 Context *ctx = new LambdaContext(
1565 [this, r](int _r) {
1566 update_mirror_image_status(true, STATE_STOPPED);
1567 handle_shut_down(r);
1568 });
1569
1570 // close the remote journal
1571 if (m_remote_journaler != nullptr) {
1572 ctx = new LambdaContext([this, ctx](int r) {
1573 delete m_remote_journaler;
1574 m_remote_journaler = nullptr;
1575 ctx->complete(0);
1576 });
1577 ctx = new LambdaContext([this, ctx](int r) {
1578 m_remote_journaler->remove_listener(&m_remote_listener);
1579 m_remote_journaler->shut_down(ctx);
1580 });
1581 }
1582
1583 // stop the replay of remote journal events
1584 if (m_replay_handler != nullptr) {
1585 ctx = new LambdaContext([this, ctx](int r) {
1586 delete m_replay_handler;
1587 m_replay_handler = nullptr;
1588
1589 m_event_replay_tracker.wait_for_ops(ctx);
1590 });
1591 ctx = new LambdaContext([this, ctx](int r) {
1592 m_remote_journaler->stop_replay(ctx);
1593 });
1594 }
1595
1596 // close the local image (release exclusive lock)
1597 if (m_local_image_ctx) {
1598 ctx = new LambdaContext([this, ctx](int r) {
1599 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1600 &m_local_image_ctx, ctx);
1601 request->send();
1602 });
1603 }
1604
1605 // shut down event replay into the local image
1606 if (m_local_journal != nullptr) {
1607 ctx = new LambdaContext([this, ctx](int r) {
1608 m_local_journal = nullptr;
1609 ctx->complete(0);
1610 });
1611 if (m_local_replay != nullptr) {
1612 ctx = new LambdaContext([this, ctx](int r) {
1613 m_local_journal->stop_external_replay();
1614 m_local_replay = nullptr;
1615
1616 EventPreprocessor<I>::destroy(m_event_preprocessor);
1617 m_event_preprocessor = nullptr;
1618 ctx->complete(0);
1619 });
1620 }
1621 ctx = new LambdaContext([this, ctx](int r) {
1622 // blocks if listener notification is in-progress
1623 m_local_journal->remove_listener(m_journal_listener);
1624 ctx->complete(0);
1625 });
1626 }
1627
1628 // wait for all local in-flight replay events to complete
1629 ctx = new LambdaContext([this, ctx](int r) {
1630 if (r < 0) {
1631 derr << "error shutting down journal replay: " << cpp_strerror(r)
1632 << dendl;
1633 }
1634
1635 m_event_replay_tracker.wait_for_ops(ctx);
1636 });
1637
1638 // flush any local in-flight replay events
1639 if (m_local_replay != nullptr) {
(1) Event parameter_hidden: |
declaration hides parameter "r" (declared at line 1521) |
(2) Event caretline: |
^ |
1640 ctx = new LambdaContext([this, ctx](int r) {
1641 m_local_replay->shut_down(true, ctx);
1642 });
1643 }
1644
1645 m_threads->work_queue->queue(ctx, 0);
1646 }
1647
1648 template <typename I>
1649 void ImageReplayer<I>::handle_shut_down(int r) {
1650 reschedule_update_status_task(-1);
1651
1652 bool resync_requested = false;
1653 bool delete_requested = false;
1654 bool unregister_asok_hook = false;
1655 {
1656 std::lock_guard locker{m_lock};
1657
1658 // if status updates are in-flight, wait for them to complete
1659 // before proceeding
1660 if (m_in_flight_status_updates > 0) {
1661 if (m_on_update_status_finish == nullptr) {
1662 dout(15) << "waiting for in-flight status update" << dendl;
1663 m_on_update_status_finish = new LambdaContext(
1664 [this, r](int _r) {
1665 handle_shut_down(r);
1666 });
1667 }
1668 return;
1669 }
1670
1671 if (m_delete_requested && !m_local_image_id.empty()) {
1672 ceph_assert(m_remote_image.image_id.empty());
1673 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1674 unregister_asok_hook = true;
1675 std::swap(delete_requested, m_delete_requested);
1676 }
1677
1678 std::swap(resync_requested, m_resync_requested);
1679 if (delete_requested || resync_requested) {
1680 m_local_image_id = "";
1681 } else if (m_last_r == -ENOENT &&
1682 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1683 dout(0) << "mirror image no longer exists" << dendl;
1684 unregister_asok_hook = true;
1685 m_finished = true;
1686 }
1687 }
1688
1689 if (unregister_asok_hook) {
1690 unregister_admin_socket_hook();
1691 }
1692
1693 if (delete_requested || resync_requested) {
1694 dout(5) << "moving image to trash" << dendl;
1695 auto ctx = new LambdaContext([this, r](int) {
1696 handle_shut_down(r);
1697 });
1698 ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
1699 resync_requested, m_threads->work_queue, ctx);
1700 return;
1701 }
1702
1703 dout(10) << "stop complete" << dendl;
1704 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1705 m_replay_status_formatter = nullptr;
1706
1707 Context *on_start = nullptr;
1708 Context *on_stop = nullptr;
1709 {
1710 std::lock_guard locker{m_lock};
1711 std::swap(on_start, m_on_start_finish);
1712 std::swap(on_stop, m_on_stop_finish);
1713 m_stop_requested = false;
1714 ceph_assert(m_delayed_preprocess_task == nullptr);
1715 ceph_assert(m_state == STATE_STOPPING);
1716 m_state = STATE_STOPPED;
1717 }
1718
1719 if (on_start != nullptr) {
1720 dout(10) << "on start finish complete, r=" << r << dendl;
1721 on_start->complete(r);
1722 r = 0;
1723 }
1724 if (on_stop != nullptr) {
1725 dout(10) << "on stop finish complete, r=" << r << dendl;
1726 on_stop->complete(r);
1727 }
1728 }
1729
1730 template <typename I>
1731 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1732 dout(20) << dendl;
1733
1734 cls::journal::Client client;
1735 {
1736 std::lock_guard locker{m_lock};
1737 if (!is_running_()) {
1738 return;
1739 }
1740
1741 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1742 if (r < 0) {
1743 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1744 return;
1745 }
1746 }
1747
1748 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1749 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1750 stop(nullptr, false, -ENOTCONN, "disconnected");
1751 }
1752 }
1753
1754 template <typename I>
1755 std::string ImageReplayer<I>::to_string(const State state) {
1756 switch (state) {
1757 case ImageReplayer<I>::STATE_STARTING:
1758 return "Starting";
1759 case ImageReplayer<I>::STATE_REPLAYING:
1760 return "Replaying";
1761 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1762 return "ReplayFlushing";
1763 case ImageReplayer<I>::STATE_STOPPING:
1764 return "Stopping";
1765 case ImageReplayer<I>::STATE_STOPPED:
1766 return "Stopped";
1767 default:
1768 break;
1769 }
1770 return "Unknown(" + stringify(state) + ")";
1771 }
1772
1773 template <typename I>
1774 void ImageReplayer<I>::resync_image(Context *on_finish) {
1775 dout(10) << dendl;
1776
1777 m_resync_requested = true;
1778 stop(on_finish);
1779 }
1780
1781 template <typename I>
1782 void ImageReplayer<I>::register_admin_socket_hook() {
1783 ImageReplayerAdminSocketHook<I> *asok_hook;
1784 {
1785 std::lock_guard locker{m_lock};
1786 if (m_asok_hook != nullptr) {
1787 return;
1788 }
1789
1790 ceph_assert(m_perf_counters == nullptr);
1791
1792 dout(15) << "registered asok hook: " << m_name << dendl;
1793 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1794 this);
1795 int r = asok_hook->register_commands();
1796 if (r == 0) {
1797 m_asok_hook = asok_hook;
1798
1799 CephContext *cct = static_cast<CephContext *>(m_local_io_ctx.cct());
1800 auto prio = cct->_conf.get_val<int64_t>("rbd_mirror_perf_stats_prio");
1801 PerfCountersBuilder plb(g_ceph_context, "rbd_mirror_" + m_name,
1802 l_rbd_mirror_first, l_rbd_mirror_last);
1803 plb.add_u64_counter(l_rbd_mirror_replay, "replay", "Replays", "r", prio);
1804 plb.add_u64_counter(l_rbd_mirror_replay_bytes, "replay_bytes",
1805 "Replayed data", "rb", prio, unit_t(UNIT_BYTES));
1806 plb.add_time_avg(l_rbd_mirror_replay_latency, "replay_latency",
1807 "Replay latency", "rl", prio);
1808 m_perf_counters = plb.create_perf_counters();
1809 g_ceph_context->get_perfcounters_collection()->add(m_perf_counters);
1810
1811 return;
1812 }
1813 derr << "error registering admin socket commands" << dendl;
1814 }
1815 delete asok_hook;
1816 }
1817
1818 template <typename I>
1819 void ImageReplayer<I>::unregister_admin_socket_hook() {
1820 dout(15) << dendl;
1821
1822 AdminSocketHook *asok_hook = nullptr;
1823 PerfCounters *perf_counters = nullptr;
1824 {
1825 std::lock_guard locker{m_lock};
1826 std::swap(asok_hook, m_asok_hook);
1827 std::swap(perf_counters, m_perf_counters);
1828 }
1829 delete asok_hook;
1830 if (perf_counters != nullptr) {
1831 g_ceph_context->get_perfcounters_collection()->remove(perf_counters);
1832 delete perf_counters;
1833 }
1834 }
1835
1836 template <typename I>
1837 void ImageReplayer<I>::reregister_admin_socket_hook() {
1838 {
1839 std::lock_guard locker{m_lock};
1840
1841 auto name = admin_socket_hook_name(m_local_image_name);
1842 if (m_asok_hook != nullptr && m_name == name) {
1843 return;
1844 }
1845 m_name = name;
1846 }
1847 unregister_admin_socket_hook();
1848 register_admin_socket_hook();
1849 }
1850
1851 template <typename I>
1852 std::string ImageReplayer<I>::admin_socket_hook_name(
1853 const std::string &image_name) const {
1854 std::string name = m_local_io_ctx.get_namespace();
1855 if (!name.empty()) {
1856 name += "/";
1857 }
1858
1859 return m_local_io_ctx.get_pool_name() + "/" + name + image_name;
1860 }
1861
1862 template <typename I>
1863 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1864 {
1865 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1866 << "/" << replayer.get_global_image_id() << "]";
1867 return os;
1868 }
1869
1870 } // namespace mirror
1871 } // namespace rbd
1872
1873 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
1874