1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph distributed storage system
5    	 *
6    	 * Copyright (C) 2016 Mirantis Inc
7    	 *
8    	 * Author: Mykola Golub <mgolub@mirantis.com>
9    	 *
10   	 *  This library is free software; you can redistribute it and/or
11   	 *  modify it under the terms of the GNU Lesser General Public
12   	 *  License as published by the Free Software Foundation; either
13   	 *  version 2.1 of the License, or (at your option) any later version.
14   	 *
15   	 */
16   	
17   	#include "include/rados/librados.hpp"
18   	#include "include/rbd/librbd.hpp"
19   	#include "include/stringify.h"
20   	#include "test/rbd_mirror/test_fixture.h"
21   	#include "cls/journal/cls_journal_types.h"
22   	#include "cls/journal/cls_journal_client.h"
23   	#include "cls/rbd/cls_rbd_types.h"
24   	#include "cls/rbd/cls_rbd_client.h"
25   	#include "journal/Journaler.h"
26   	#include "librbd/ExclusiveLock.h"
27   	#include "librbd/ImageCtx.h"
28   	#include "librbd/ImageState.h"
29   	#include "librbd/Journal.h"
30   	#include "librbd/Operations.h"
31   	#include "librbd/Utils.h"
32   	#include "librbd/internal.h"
33   	#include "librbd/api/Mirror.h"
34   	#include "librbd/io/AioCompletion.h"
35   	#include "librbd/io/ImageRequestWQ.h"
36   	#include "librbd/io/ReadResult.h"
37   	#include "tools/rbd_mirror/ImageReplayer.h"
38   	#include "tools/rbd_mirror/InstanceWatcher.h"
39   	#include "tools/rbd_mirror/Threads.h"
40   	#include "tools/rbd_mirror/Throttler.h"
41   	#include "tools/rbd_mirror/Types.h"
42   	
43   	#include "test/librados/test_cxx.h"
44   	#include "gtest/gtest.h"
45   	
46   	using rbd::mirror::RadosRef;
47   	
48   	void register_test_rbd_mirror() {
49   	}
50   	
51   	#define TEST_IO_SIZE 512
52   	#define TEST_IO_COUNT 11
53   	
54   	class TestImageReplayer : public ::rbd::mirror::TestFixture {
55   	public:
56   	  struct C_WatchCtx : public librados::WatchCtx2 {
57   	    TestImageReplayer *test;
58   	    std::string oid;
59   	    ceph::mutex lock = ceph::make_mutex("C_WatchCtx::lock");
60   	    ceph::condition_variable cond;
61   	    bool notified;
62   	
63   	    C_WatchCtx(TestImageReplayer *test, const std::string &oid)
64   	      : test(test), oid(oid), notified(false) {
65   	    }
66   	
67   	    void handle_notify(uint64_t notify_id, uint64_t cookie,
68   	                               uint64_t notifier_id, bufferlist& bl_) override {
69   	      bufferlist bl;
70   	      test->m_remote_ioctx.notify_ack(oid, notify_id, cookie, bl);
71   	
72   	      std::lock_guard locker{lock};
73   	      notified = true;
74   	      cond.notify_all();
75   	    }
76   	
77   	    void handle_error(uint64_t cookie, int err) override {
78   	      ASSERT_EQ(0, err);
79   	    }
80   	  };
81   	
82   	  TestImageReplayer()
83   	    : m_local_cluster(new librados::Rados()), m_watch_handle(0)
84   	  {
85   	    EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get()));
86   	    EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false"));
87   	    EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1"));
88   	    EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_commit_age",
89   	                                           "0.1"));
90   	    m_local_pool_name = get_temp_pool_name();
91   	    EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str()));
92   	    EXPECT_EQ(0, m_local_cluster->ioctx_create(m_local_pool_name.c_str(),
93   						      m_local_ioctx));
94   	    m_local_ioctx.application_enable("rbd", true);
95   	
96   	    EXPECT_EQ("", connect_cluster_pp(m_remote_cluster));
97   	    EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false"));
98   	
99   	    m_remote_pool_name = get_temp_pool_name();
100  	    EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str()));
101  	    m_remote_pool_id = m_remote_cluster.pool_lookup(m_remote_pool_name.c_str());
102  	    EXPECT_GE(m_remote_pool_id, 0);
103  	
104  	    EXPECT_EQ(0, m_remote_cluster.ioctx_create(m_remote_pool_name.c_str(),
105  						       m_remote_ioctx));
106  	    m_remote_ioctx.application_enable("rbd", true);
107  	
108  	    EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_remote_ioctx,
109  	                                                 RBD_MIRROR_MODE_POOL));
110  	
111  	    m_image_name = get_temp_image_name();
112  	    uint64_t features = librbd::util::get_rbd_default_features(g_ceph_context);
113  	    features |= RBD_FEATURE_EXCLUSIVE_LOCK | RBD_FEATURE_JOURNALING;
114  	    int order = 0;
115  	    EXPECT_EQ(0, librbd::create(m_remote_ioctx, m_image_name.c_str(), 1 << 22,
116  					false, features, &order, 0, 0));
117  	    m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
118  	    m_global_image_id = get_global_image_id(m_remote_ioctx, m_remote_image_id);
119  	
120  	    auto cct = reinterpret_cast<CephContext*>(m_local_ioctx.cct());
121  	    m_threads.reset(new rbd::mirror::Threads<>(cct));
122  	
123  	    m_image_sync_throttler.reset(new rbd::mirror::Throttler<>(
124  	        cct, "rbd_mirror_concurrent_image_syncs"));
125  	
126  	    m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
127  	        m_local_ioctx, m_threads->work_queue, nullptr,
128  	        m_image_sync_throttler.get());
129  	    m_instance_watcher->handle_acquire_leader();
130  	  }
131  	
132  	  ~TestImageReplayer() override
133  	  {
134  	    unwatch();
135  	
136  	    m_instance_watcher->handle_release_leader();
137  	
138  	    delete m_replayer;
139  	    delete m_instance_watcher;
140  	
141  	    EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
142  	    EXPECT_EQ(0, m_local_cluster->pool_delete(m_local_pool_name.c_str()));
143  	  }
144  	
145  	  template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
146  	  void create_replayer() {
147  	    m_replayer = new ImageReplayerT(m_local_ioctx, m_local_mirror_uuid,
148  	                                    m_global_image_id, m_threads.get(),
149  	                                    m_instance_watcher, nullptr);
150  	    m_replayer->add_peer("peer uuid", m_remote_ioctx);
151  	  }
152  	
153  	  void start()
154  	  {
155  	    C_SaferCond cond;
156  	    m_replayer->start(&cond);
157  	    ASSERT_EQ(0, cond.wait());
158  	
159  	    ASSERT_EQ(0U, m_watch_handle);
160  	    std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
161  	    m_watch_ctx = new C_WatchCtx(this, oid);
162  	    ASSERT_EQ(0, m_remote_ioctx.watch2(oid, &m_watch_handle, m_watch_ctx));
163  	  }
164  	
165  	  void unwatch() {
166  	    if (m_watch_handle != 0) {
167  	      m_remote_ioctx.unwatch2(m_watch_handle);
168  	      delete m_watch_ctx;
169  	      m_watch_ctx = nullptr;
170  	      m_watch_handle = 0;
171  	    }
172  	  }
173  	
174  	  void stop()
175  	  {
176  	    unwatch();
177  	
178  	    C_SaferCond cond;
179  	    m_replayer->stop(&cond);
180  	    ASSERT_EQ(0, cond.wait());
181  	  }
182  	
183  	  void bootstrap()
184  	  {
185  	    create_replayer<>();
186  	
187  	    start();
188  	    wait_for_replay_complete();
189  	    stop();
190  	  }
191  	
192  	  std::string  get_temp_image_name()
193  	  {
194  	    return "image" + stringify(++_image_number);
195  	  }
196  	
197  	  std::string get_image_id(librados::IoCtx &ioctx, const string &image_name)
198  	  {
199  	    std::string obj = librbd::util::id_obj_name(image_name);
200  	    std::string id;
201  	    EXPECT_EQ(0, librbd::cls_client::get_id(&ioctx, obj, &id));
202  	    return id;
203  	  }
204  	
205  	  std::string get_global_image_id(librados::IoCtx& io_ctx,
206  	                                  const std::string& image_id) {
207  	    cls::rbd::MirrorImage mirror_image;
208  	    EXPECT_EQ(0, librbd::cls_client::mirror_image_get(&io_ctx, image_id,
209  	                                                      &mirror_image));
210  	    return mirror_image.global_image_id;
211  	  }
212  	
213  	  void open_image(librados::IoCtx &ioctx, const std::string &image_name,
214  			  bool readonly, librbd::ImageCtx **ictxp)
215  	  {
216  	    librbd::ImageCtx *ictx = new librbd::ImageCtx(image_name.c_str(),
217  							  "", "", ioctx, readonly);
218  	    EXPECT_EQ(0, ictx->state->open(0));
219  	    *ictxp = ictx;
220  	  }
221  	
222  	  void open_local_image(librbd::ImageCtx **ictxp)
223  	  {
224  	    open_image(m_local_ioctx, m_image_name, true, ictxp);
225  	  }
226  	
227  	  void open_remote_image(librbd::ImageCtx **ictxp)
228  	  {
229  	    open_image(m_remote_ioctx, m_image_name, false, ictxp);
230  	  }
231  	
232  	  void close_image(librbd::ImageCtx *ictx)
233  	  {
234  	    ictx->state->close();
235  	  }
236  	
237  	  void get_commit_positions(cls::journal::ObjectPosition *master_position,
238  				    cls::journal::ObjectPosition *mirror_position)
239  	  {
240  	    std::string master_client_id = "";
241  	    std::string mirror_client_id = m_local_mirror_uuid;
242  	
243  	    m_replayer->flush();
244  	
245  	    C_SaferCond cond;
246  	    uint64_t minimum_set;
247  	    uint64_t active_set;
248  	    std::set<cls::journal::Client> registered_clients;
249  	    std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
250  	    cls::journal::client::get_mutable_metadata(m_remote_ioctx, oid,
251  						       &minimum_set, &active_set,
252  						       &registered_clients, &cond);
253  	    ASSERT_EQ(0, cond.wait());
254  	
255  	    *master_position = cls::journal::ObjectPosition();
256  	    *mirror_position = cls::journal::ObjectPosition();
257  	
258  	    std::set<cls::journal::Client>::const_iterator c;
259  	    for (c = registered_clients.begin(); c != registered_clients.end(); ++c) {
260  	      std::cout << __func__ << ": client: " << *c << std::endl;
261  	      if (c->state != cls::journal::CLIENT_STATE_CONNECTED) {
262  		continue;
263  	      }
264  	      cls::journal::ObjectPositions object_positions =
265  		c->commit_position.object_positions;
266  	      cls::journal::ObjectPositions::const_iterator p =
267  		object_positions.begin();
268  	      if (p != object_positions.end()) {
269  		if (c->id == master_client_id) {
270  		  ASSERT_EQ(cls::journal::ObjectPosition(), *master_position);
271  		  *master_position = *p;
272  		} else if (c->id == mirror_client_id) {
273  		  ASSERT_EQ(cls::journal::ObjectPosition(), *mirror_position);
274  		  *mirror_position = *p;
275  		}
276  	      }
277  	    }
278  	  }
279  	
280  	  bool wait_for_watcher_notify(int seconds)
281  	  {
282  	    if (m_watch_handle == 0) {
283  	      return false;
284  	    }
285  	
286  	    std::unique_lock locker{m_watch_ctx->lock};
287  	    while (!m_watch_ctx->notified) {
288  	      if (m_watch_ctx->cond.wait_for(locker,
289  					     std::chrono::seconds(seconds)) ==
290  		  std::cv_status::timeout) {
291  	        return false;
292  	      }
293  	    }
294  	    m_watch_ctx->notified = false;
295  	    return true;
296  	  }
297  	
298  	  void wait_for_replay_complete()
299  	  {
300  	    cls::journal::ObjectPosition master_position;
301  	    cls::journal::ObjectPosition mirror_position;
302  	
303  	    for (int i = 0; i < 100; i++) {
304  	      get_commit_positions(&master_position, &mirror_position);
305  	      if (master_position == mirror_position) {
306  		break;
307  	      }
308  	      wait_for_watcher_notify(1);
309  	    }
310  	
311  	    ASSERT_EQ(master_position, mirror_position);
312  	  }
313  	
314  	  void wait_for_stopped() {
315  	    for (int i = 0; i < 100; i++) {
316  	      if (m_replayer->is_stopped()) {
317  	        break;
318  	      }
319  	      wait_for_watcher_notify(1);
320  	    }
321  	    ASSERT_TRUE(m_replayer->is_stopped());
322  	  }
323  	
324  	  void write_test_data(librbd::ImageCtx *ictx, const char *test_data, off_t off,
325  	                       size_t len)
326  	  {
327  	    size_t written;
328  	    bufferlist bl;
329  	    bl.append(std::string(test_data, len));
330  	    written = ictx->io_work_queue->write(off, len, std::move(bl), 0);
331  	    printf("wrote: %d\n", (int)written);
332  	    ASSERT_EQ(len, written);
333  	  }
334  	
335  	  void read_test_data(librbd::ImageCtx *ictx, const char *expected, off_t off,
336  	                      size_t len)
337  	  {
338  	    ssize_t read;
339  	    char *result = (char *)malloc(len + 1);
340  	
341  	    ASSERT_NE(static_cast<char *>(NULL), result);
342  	    read = ictx->io_work_queue->read(
343  	      off, len, librbd::io::ReadResult{result, len}, 0);
344  	    printf("read: %d\n", (int)read);
345  	    ASSERT_EQ(len, static_cast<size_t>(read));
346  	    result[len] = '\0';
347  	    if (memcmp(result, expected, len)) {
348  	      printf("read: %s\nexpected: %s\n", result, expected);
349  	      ASSERT_EQ(0, memcmp(result, expected, len));
350  	    }
351  	    free(result);
352  	  }
353  	
354  	  void generate_test_data() {
355  	    for (int i = 0; i < TEST_IO_SIZE; ++i) {
(1) Event dont_call: "rand" should not be used for security-related applications, because linear congruential algorithms are too easy to break.
(2) Event remediation: Use a compliant random number generator, such as "/dev/random" or "/dev/urandom" on Unix-like systems, and CNG (Cryptography API: Next Generation) on Windows.
356  	      m_test_data[i] = (char) (rand() % (126 - 33) + 33);
357  	    }
358  	    m_test_data[TEST_IO_SIZE] = '\0';
359  	  }
360  	
361  	  void flush(librbd::ImageCtx *ictx)
362  	  {
363  	    C_SaferCond aio_flush_ctx;
364  	    auto c = librbd::io::AioCompletion::create(&aio_flush_ctx);
365  	    c->get();
366  	    ictx->io_work_queue->aio_flush(c);
367  	    ASSERT_EQ(0, c->wait_for_complete());
368  	    c->put();
369  	
370  	    C_SaferCond journal_flush_ctx;
371  	    ictx->journal->flush_commit_position(&journal_flush_ctx);
372  	    ASSERT_EQ(0, journal_flush_ctx.wait());
373  	
374  	    printf("flushed\n");
375  	  }
376  	
377  	  static int _image_number;
378  	
379  	  std::shared_ptr<librados::Rados> m_local_cluster;
380  	  std::unique_ptr<rbd::mirror::Threads<>> m_threads;
381  	  std::unique_ptr<rbd::mirror::Throttler<>> m_image_sync_throttler;
382  	  librados::Rados m_remote_cluster;
383  	  rbd::mirror::InstanceWatcher<> *m_instance_watcher;
384  	  std::string m_local_mirror_uuid = "local mirror uuid";
385  	  std::string m_remote_mirror_uuid = "remote mirror uuid";
386  	  std::string m_local_pool_name, m_remote_pool_name;
387  	  librados::IoCtx m_local_ioctx, m_remote_ioctx;
388  	  std::string m_image_name;
389  	  int64_t m_remote_pool_id;
390  	  std::string m_remote_image_id;
391  	  std::string m_global_image_id;
392  	  rbd::mirror::ImageReplayer<> *m_replayer = nullptr;
393  	  C_WatchCtx *m_watch_ctx;
394  	  uint64_t m_watch_handle;
395  	  char m_test_data[TEST_IO_SIZE + 1];
396  	  std::string m_journal_commit_age;
397  	};
398  	
399  	int TestImageReplayer::_image_number;
400  	
401  	TEST_F(TestImageReplayer, Bootstrap)
402  	{
403  	  bootstrap();
404  	}
405  	
406  	TEST_F(TestImageReplayer, BootstrapErrorLocalImageExists)
407  	{
408  	  int order = 0;
409  	  EXPECT_EQ(0, librbd::create(m_local_ioctx, m_image_name.c_str(), 1 << 22,
410  				      false, 0, &order, 0, 0));
411  	
412  	  create_replayer<>();
413  	  C_SaferCond cond;
414  	  m_replayer->start(&cond);
415  	  ASSERT_EQ(-EEXIST, cond.wait());
416  	}
417  	
418  	TEST_F(TestImageReplayer, BootstrapErrorNoJournal)
419  	{
420  	  ASSERT_EQ(0, librbd::Journal<>::remove(m_remote_ioctx, m_remote_image_id));
421  	
422  	  create_replayer<>();
423  	  C_SaferCond cond;
424  	  m_replayer->start(&cond);
425  	  ASSERT_EQ(-ENOENT, cond.wait());
426  	}
427  	
428  	TEST_F(TestImageReplayer, BootstrapErrorMirrorDisabled)
429  	{
430  	  // disable remote image mirroring
431  	  ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(m_remote_ioctx,
432  	                                               RBD_MIRROR_MODE_IMAGE));
433  	  librbd::ImageCtx *ictx;
434  	  open_remote_image(&ictx);
435  	  ASSERT_EQ(0, librbd::api::Mirror<>::image_disable(ictx, true));
436  	  close_image(ictx);
437  	
438  	  create_replayer<>();
439  	  C_SaferCond cond;
440  	  m_replayer->start(&cond);
441  	  ASSERT_EQ(-ENOENT, cond.wait());
442  	}
443  	
444  	TEST_F(TestImageReplayer, BootstrapMirrorDisabling)
445  	{
446  	  // set remote image mirroring state to DISABLING
447  	  ASSERT_EQ(0, librbd::api::Mirror<>::mode_set(m_remote_ioctx,
448  	                                               RBD_MIRROR_MODE_IMAGE));
449  	  librbd::ImageCtx *ictx;
450  	  open_remote_image(&ictx);
451  	  ASSERT_EQ(0, librbd::api::Mirror<>::image_enable(ictx, false));
452  	  cls::rbd::MirrorImage mirror_image;
453  	  ASSERT_EQ(0, librbd::cls_client::mirror_image_get(&m_remote_ioctx, ictx->id,
454  	                                                    &mirror_image));
455  	  mirror_image.state = cls::rbd::MirrorImageState::MIRROR_IMAGE_STATE_DISABLING;
456  	  ASSERT_EQ(0, librbd::cls_client::mirror_image_set(&m_remote_ioctx, ictx->id,
457  	                                                    mirror_image));
458  	  close_image(ictx);
459  	
460  	  create_replayer<>();
461  	  C_SaferCond cond;
462  	  m_replayer->start(&cond);
463  	  ASSERT_EQ(-EREMOTEIO, cond.wait());
464  	  ASSERT_TRUE(m_replayer->is_stopped());
465  	}
466  	
467  	TEST_F(TestImageReplayer, BootstrapDemoted)
468  	{
469  	  // demote remote image
470  	  librbd::ImageCtx *ictx;
471  	  open_remote_image(&ictx);
472  	  ASSERT_EQ(0, librbd::api::Mirror<>::image_demote(ictx));
473  	  close_image(ictx);
474  	
475  	  create_replayer<>();
476  	  C_SaferCond cond;
477  	  m_replayer->start(&cond);
478  	  ASSERT_EQ(-EREMOTEIO, cond.wait());
479  	  ASSERT_TRUE(m_replayer->is_stopped());
480  	}
481  	
482  	TEST_F(TestImageReplayer, StartInterrupted)
483  	{
484  	  create_replayer<>();
485  	  C_SaferCond start_cond, stop_cond;
486  	  m_replayer->start(&start_cond);
487  	  m_replayer->stop(&stop_cond);
488  	  int r = start_cond.wait();
489  	  printf("start returned %d\n", r);
490  	  // TODO: improve the test to avoid this race
491  	  ASSERT_TRUE(r == -ECANCELED || r == 0);
492  	  ASSERT_EQ(0, stop_cond.wait());
493  	}
494  	
495  	TEST_F(TestImageReplayer, JournalReset)
496  	{
497  	  bootstrap();
498  	  delete m_replayer;
499  	
500  	  ASSERT_EQ(0, librbd::Journal<>::reset(m_remote_ioctx, m_remote_image_id));
501  	
502  	  // try to recover
503  	  bootstrap();
504  	}
505  	
506  	TEST_F(TestImageReplayer, ErrorNoJournal)
507  	{
508  	  bootstrap();
509  	
510  	  // disable remote journal journaling
511  	  // (reset before disabling, so it does not fail with EBUSY)
512  	  ASSERT_EQ(0, librbd::Journal<>::reset(m_remote_ioctx, m_remote_image_id));
513  	  librbd::ImageCtx *ictx;
514  	  open_remote_image(&ictx);
515  	  uint64_t features;
516  	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
517  	  ASSERT_EQ(0, ictx->operations->update_features(RBD_FEATURE_JOURNALING,
518  	                                                 false));
519  	  close_image(ictx);
520  	
521  	  C_SaferCond cond;
522  	  m_replayer->start(&cond);
523  	  ASSERT_EQ(0, cond.wait());
524  	}
525  	
526  	TEST_F(TestImageReplayer, StartStop)
527  	{
528  	  bootstrap();
529  	
530  	  start();
531  	  wait_for_replay_complete();
532  	  stop();
533  	}
534  	
535  	TEST_F(TestImageReplayer, WriteAndStartReplay)
536  	{
537  	  bootstrap();
538  	
539  	  // Write to remote image and start replay
540  	
541  	  librbd::ImageCtx *ictx;
542  	
543  	  generate_test_data();
544  	  open_remote_image(&ictx);
545  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
546  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
547  	  }
548  	  flush(ictx);
549  	  close_image(ictx);
550  	
551  	  start();
552  	  wait_for_replay_complete();
553  	  stop();
554  	
555  	  open_local_image(&ictx);
556  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
557  	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
558  	  }
559  	  close_image(ictx);
560  	}
561  	
562  	TEST_F(TestImageReplayer, StartReplayAndWrite)
563  	{
564  	  bootstrap();
565  	
566  	  // Start replay and write to remote image
567  	
568  	  librbd::ImageCtx *ictx;
569  	
570  	  start();
571  	
572  	  generate_test_data();
573  	  open_remote_image(&ictx);
574  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
575  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
576  	  }
577  	  flush(ictx);
578  	
579  	  wait_for_replay_complete();
580  	
581  	  for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
582  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
583  	  }
584  	  flush(ictx);
585  	  close_image(ictx);
586  	
587  	  wait_for_replay_complete();
588  	
589  	  open_local_image(&ictx);
590  	  for (int i = 0; i < 2 * TEST_IO_COUNT; ++i) {
591  	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
592  	  }
593  	  close_image(ictx);
594  	
595  	  stop();
596  	}
597  	
598  	TEST_F(TestImageReplayer, NextTag)
599  	{
600  	  bootstrap();
601  	
602  	  // write, reopen, and write again to test switch to the next tag
603  	
604  	  librbd::ImageCtx *ictx;
605  	
606  	  start();
607  	
608  	  generate_test_data();
609  	
610  	  const int N = 10;
611  	
612  	  for (int j = 0; j < N; j++) {
613  	    open_remote_image(&ictx);
614  	    for (int i = j * TEST_IO_COUNT; i < (j + 1) * TEST_IO_COUNT; ++i) {
615  	      write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
616  	    }
617  	    close_image(ictx);
618  	  }
619  	
620  	  wait_for_replay_complete();
621  	
622  	  open_local_image(&ictx);
623  	  for (int i = 0; i < N * TEST_IO_COUNT; ++i) {
624  	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
625  	  }
626  	  close_image(ictx);
627  	
628  	  stop();
629  	}
630  	
631  	TEST_F(TestImageReplayer, Resync)
632  	{
633  	  bootstrap();
634  	
635  	  librbd::ImageCtx *ictx;
636  	
637  	  start();
638  	
639  	  generate_test_data();
640  	
641  	  open_remote_image(&ictx);
642  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
643  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
644  	  }
645  	  flush(ictx);
646  	
647  	  wait_for_replay_complete();
648  	
649  	  for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
650  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
651  	  }
652  	  flush(ictx);
653  	  close_image(ictx);
654  	
655  	  C_SaferCond ctx;
656  	  m_replayer->resync_image(&ctx);
657  	  ASSERT_EQ(0, ctx.wait());
658  	
659  	  wait_for_stopped();
660  	
661  	  C_SaferCond cond;
662  	  m_replayer->start(&cond);
663  	  ASSERT_EQ(0, cond.wait());
664  	
665  	  ASSERT_TRUE(m_replayer->is_replaying());
666  	  wait_for_replay_complete();
667  	
668  	  open_local_image(&ictx);
669  	  for (int i = 0; i < 2 * TEST_IO_COUNT; ++i) {
670  	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
671  	  }
672  	  close_image(ictx);
673  	
674  	  stop();
675  	}
676  	
677  	TEST_F(TestImageReplayer, Resync_While_Stop)
678  	{
679  	
680  	  bootstrap();
681  	
682  	  start();
683  	
684  	  generate_test_data();
685  	
686  	  librbd::ImageCtx *ictx;
687  	  open_remote_image(&ictx);
688  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
689  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
690  	  }
691  	  flush(ictx);
692  	
693  	  wait_for_replay_complete();
694  	
695  	  for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
696  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
697  	  }
698  	  flush(ictx);
699  	  close_image(ictx);
700  	
701  	  wait_for_replay_complete();
702  	
703  	  C_SaferCond cond;
704  	  m_replayer->stop(&cond);
705  	  ASSERT_EQ(0, cond.wait());
706  	
707  	  open_local_image(&ictx);
708  	  librbd::Journal<>::request_resync(ictx);
709  	  close_image(ictx);
710  	
711  	  C_SaferCond cond2;
712  	  m_replayer->start(&cond2);
713  	  ASSERT_EQ(0, cond2.wait());
714  	
715  	  ASSERT_TRUE(m_replayer->is_stopped());
716  	
717  	  C_SaferCond cond3;
718  	  m_replayer->start(&cond3);
719  	  ASSERT_EQ(0, cond3.wait());
720  	
721  	  ASSERT_TRUE(m_replayer->is_replaying());
722  	
723  	  wait_for_replay_complete();
724  	
725  	  open_local_image(&ictx);
726  	  for (int i = 0; i < 2 * TEST_IO_COUNT; ++i) {
727  	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
728  	  }
729  	  close_image(ictx);
730  	
731  	  stop();
732  	}
733  	
734  	TEST_F(TestImageReplayer, Resync_StartInterrupted)
735  	{
736  	
737  	  bootstrap();
738  	
739  	  librbd::ImageCtx *ictx;
740  	  open_local_image(&ictx);
741  	  librbd::Journal<>::request_resync(ictx);
742  	  close_image(ictx);
743  	
744  	  C_SaferCond cond;
745  	  m_replayer->start(&cond);
746  	  ASSERT_EQ(0, cond.wait());
747  	
748  	  ASSERT_TRUE(m_replayer->is_stopped());
749  	
750  	  C_SaferCond cond2;
751  	  m_replayer->start(&cond2);
752  	  ASSERT_EQ(0, cond2.wait());
753  	
754  	  ASSERT_EQ(0U, m_watch_handle);
755  	  std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
756  	  m_watch_ctx = new C_WatchCtx(this, oid);
757  	  ASSERT_EQ(0, m_remote_ioctx.watch2(oid, &m_watch_handle, m_watch_ctx));
758  	
759  	  ASSERT_TRUE(m_replayer->is_replaying());
760  	
761  	  generate_test_data();
762  	  open_remote_image(&ictx);
763  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
764  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
765  	  }
766  	  flush(ictx);
767  	
768  	  wait_for_replay_complete();
769  	
770  	  for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
771  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
772  	  }
773  	  flush(ictx);
774  	  close_image(ictx);
775  	
776  	  wait_for_replay_complete();
777  	
778  	  open_local_image(&ictx);
779  	  for (int i = 0; i < 2 * TEST_IO_COUNT; ++i) {
780  	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
781  	  }
782  	  close_image(ictx);
783  	
784  	  stop();
785  	}
786  	
787  	TEST_F(TestImageReplayer, MultipleReplayFailures_SingleEpoch) {
788  	  bootstrap();
789  	
790  	  // inject a snapshot that cannot be unprotected
791  	  librbd::ImageCtx *ictx;
792  	  open_image(m_local_ioctx, m_image_name, false, &ictx);
793  	  ictx->features &= ~RBD_FEATURE_JOURNALING;
794  	  ASSERT_EQ(0, ictx->operations->snap_create(cls::rbd::UserSnapshotNamespace(),
795  						     "foo"));
796  	  ASSERT_EQ(0, ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(),
797  						      "foo"));
798  	  ASSERT_EQ(0, librbd::cls_client::add_child(&ictx->md_ctx, RBD_CHILDREN,
799  	                                             {ictx->md_ctx.get_id(), "",
800  	                                              ictx->id,
801  						      ictx->snap_ids[{cls::rbd::UserSnapshotNamespace(), "foo"}]},
802  	                                             "dummy child id"));
803  	  close_image(ictx);
804  	
805  	  // race failed op shut down with new ops
806  	  open_remote_image(&ictx);
807  	  for (uint64_t i = 0; i < 10; ++i) {
808  	    std::shared_lock owner_locker{ictx->owner_lock};
809  	    C_SaferCond request_lock;
810  	    ictx->exclusive_lock->acquire_lock(&request_lock);
811  	    ASSERT_EQ(0, request_lock.wait());
812  	
813  	    C_SaferCond append_ctx;
814  	    ictx->journal->append_op_event(
815  	      i,
816  	      librbd::journal::EventEntry{
817  	        librbd::journal::SnapUnprotectEvent{i,
818  						    cls::rbd::UserSnapshotNamespace(),
819  						    "foo"}},
820  	      &append_ctx);
821  	    ASSERT_EQ(0, append_ctx.wait());
822  	
823  	    C_SaferCond commit_ctx;
824  	    ictx->journal->commit_op_event(i, 0, &commit_ctx);
825  	    ASSERT_EQ(0, commit_ctx.wait());
826  	
827  	    C_SaferCond release_ctx;
828  	    ictx->exclusive_lock->release_lock(&release_ctx);
829  	    ASSERT_EQ(0, release_ctx.wait());
830  	  }
831  	
832  	  for (uint64_t i = 0; i < 5; ++i) {
833  	    start();
834  	    wait_for_stopped();
835  	    unwatch();
836  	  }
837  	  close_image(ictx);
838  	}
839  	
840  	TEST_F(TestImageReplayer, MultipleReplayFailures_MultiEpoch) {
841  	  bootstrap();
842  	
843  	  // inject a snapshot that cannot be unprotected
844  	  librbd::ImageCtx *ictx;
845  	  open_image(m_local_ioctx, m_image_name, false, &ictx);
846  	  ictx->features &= ~RBD_FEATURE_JOURNALING;
847  	  ASSERT_EQ(0, ictx->operations->snap_create(cls::rbd::UserSnapshotNamespace(),
848  						     "foo"));
849  	  ASSERT_EQ(0, ictx->operations->snap_protect(cls::rbd::UserSnapshotNamespace(),
850  						      "foo"));
851  	  ASSERT_EQ(0, librbd::cls_client::add_child(&ictx->md_ctx, RBD_CHILDREN,
852  	                                             {ictx->md_ctx.get_id(), "",
853  	                                              ictx->id,
854  						      ictx->snap_ids[{cls::rbd::UserSnapshotNamespace(),
855  								      "foo"}]},
856  	                                             "dummy child id"));
857  	  close_image(ictx);
858  	
859  	  // race failed op shut down with new tag flush
860  	  open_remote_image(&ictx);
861  	  {
862  	    std::shared_lock owner_locker{ictx->owner_lock};
863  	    C_SaferCond request_lock;
864  	    ictx->exclusive_lock->acquire_lock(&request_lock);
865  	    ASSERT_EQ(0, request_lock.wait());
866  	
867  	    C_SaferCond append_ctx;
868  	    ictx->journal->append_op_event(
869  	      1U,
870  	      librbd::journal::EventEntry{
871  	        librbd::journal::SnapUnprotectEvent{1U,
872  						    cls::rbd::UserSnapshotNamespace(),
873  						    "foo"}},
874  	      &append_ctx);
875  	    ASSERT_EQ(0, append_ctx.wait());
876  	
877  	    C_SaferCond commit_ctx;
878  	    ictx->journal->commit_op_event(1U, 0, &commit_ctx);
879  	    ASSERT_EQ(0, commit_ctx.wait());
880  	
881  	    C_SaferCond release_ctx;
882  	    ictx->exclusive_lock->release_lock(&release_ctx);
883  	    ASSERT_EQ(0, release_ctx.wait());
884  	  }
885  	
886  	  generate_test_data();
887  	  write_test_data(ictx, m_test_data, 0, TEST_IO_SIZE);
888  	
889  	  for (uint64_t i = 0; i < 5; ++i) {
890  	    start();
891  	    wait_for_stopped();
892  	    unwatch();
893  	  }
894  	  close_image(ictx);
895  	}
896  	
897  	TEST_F(TestImageReplayer, Disconnect)
898  	{
899  	  bootstrap();
900  	
901  	  // Make sure rbd_mirroring_resync_after_disconnect is not set
902  	  EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_resync_after_disconnect", "false"));
903  	
904  	  // Test start fails if disconnected
905  	
906  	  librbd::ImageCtx *ictx;
907  	
908  	  generate_test_data();
909  	  open_remote_image(&ictx);
910  	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
911  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
912  	  }
913  	  flush(ictx);
914  	  close_image(ictx);
915  	
916  	  std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
917  	  ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
918  		m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
919  	
920  	  C_SaferCond cond1;
921  	  m_replayer->start(&cond1);
922  	  ASSERT_EQ(-ENOTCONN, cond1.wait());
923  	
924  	  // Test start succeeds after resync
925  	
926  	  open_local_image(&ictx);
927  	  librbd::Journal<>::request_resync(ictx);
928  	  close_image(ictx);
929  	  C_SaferCond cond2;
930  	  m_replayer->start(&cond2);
931  	  ASSERT_EQ(0, cond2.wait());
932  	
933  	  start();
934  	  wait_for_replay_complete();
935  	
936  	  // Test replay stopped after disconnect
937  	
938  	  open_remote_image(&ictx);
939  	  for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
940  	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
941  	  }
942  	  flush(ictx);
943  	  close_image(ictx);
944  	
945  	  ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
946  		m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
947  	  bufferlist bl;
948  	  ASSERT_EQ(0, m_remote_ioctx.notify2(oid, bl, 5000, NULL));
949  	
950  	  wait_for_stopped();
951  	
952  	  // Test start fails after disconnect
953  	
954  	  C_SaferCond cond3;
955  	  m_replayer->start(&cond3);
956  	  ASSERT_EQ(-ENOTCONN, cond3.wait());
957  	  C_SaferCond cond4;
958  	  m_replayer->start(&cond4);
959  	  ASSERT_EQ(-ENOTCONN, cond4.wait());
960  	
961  	  // Test automatic resync if rbd_mirroring_resync_after_disconnect is set
962  	
963  	  EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_resync_after_disconnect", "true"));
964  	
965  	  // Resync is flagged on first start attempt
966  	  C_SaferCond cond5;
967  	  m_replayer->start(&cond5);
968  	  ASSERT_EQ(-ENOTCONN, cond5.wait());
969  	
970  	  C_SaferCond cond6;
971  	  m_replayer->start(&cond6);
972  	  ASSERT_EQ(0, cond6.wait());
973  	  wait_for_replay_complete();
974  	
975  	  stop();
976  	}
977  	
978  	TEST_F(TestImageReplayer, UpdateFeatures)
979  	{
980  	  const uint64_t FEATURES_TO_UPDATE =
981  	    RBD_FEATURE_OBJECT_MAP | RBD_FEATURE_FAST_DIFF;
982  	
983  	  uint64_t features;
984  	  librbd::ImageCtx *ictx;
985  	
986  	  // Make sure the features we will update are disabled initially
987  	
988  	  open_remote_image(&ictx);
989  	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
990  	  features &= FEATURES_TO_UPDATE;
991  	  if (features) {
992  	    ASSERT_EQ(0, ictx->operations->update_features(FEATURES_TO_UPDATE,
993  	                                                   false));
994  	  }
995  	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
996  	  ASSERT_EQ(0U, features & FEATURES_TO_UPDATE);
997  	  close_image(ictx);
998  	
999  	  bootstrap();
1000 	
1001 	  open_remote_image(&ictx);
1002 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1003 	  ASSERT_EQ(0U, features & FEATURES_TO_UPDATE);
1004 	  close_image(ictx);
1005 	
1006 	  open_local_image(&ictx);
1007 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1008 	  ASSERT_EQ(0U, features & FEATURES_TO_UPDATE);
1009 	  close_image(ictx);
1010 	
1011 	  // Start replay and update features
1012 	
1013 	  start();
1014 	
1015 	  open_remote_image(&ictx);
1016 	  ASSERT_EQ(0, ictx->operations->update_features(FEATURES_TO_UPDATE,
1017 	                                                 true));
1018 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1019 	  ASSERT_EQ(FEATURES_TO_UPDATE, features & FEATURES_TO_UPDATE);
1020 	  close_image(ictx);
1021 	
1022 	  wait_for_replay_complete();
1023 	
1024 	  open_local_image(&ictx);
1025 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1026 	  ASSERT_EQ(FEATURES_TO_UPDATE, features & FEATURES_TO_UPDATE);
1027 	  close_image(ictx);
1028 	
1029 	  open_remote_image(&ictx);
1030 	  ASSERT_EQ(0, ictx->operations->update_features(FEATURES_TO_UPDATE,
1031 	                                                 false));
1032 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1033 	  ASSERT_EQ(0U, features & FEATURES_TO_UPDATE);
1034 	  close_image(ictx);
1035 	
1036 	  wait_for_replay_complete();
1037 	
1038 	  open_local_image(&ictx);
1039 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1040 	  ASSERT_EQ(0U, features & FEATURES_TO_UPDATE);
1041 	  close_image(ictx);
1042 	
1043 	  // Test update_features error does not stop replication
1044 	
1045 	  open_remote_image(&ictx);
1046 	  ASSERT_EQ(0, librbd::get_features(ictx, &features));
1047 	  ASSERT_NE(0U, features & RBD_FEATURE_EXCLUSIVE_LOCK);
1048 	  ASSERT_EQ(-EINVAL, ictx->operations->update_features(RBD_FEATURE_EXCLUSIVE_LOCK,
1049 	                                                       false));
1050 	  generate_test_data();
1051 	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
1052 	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
1053 	  }
1054 	  flush(ictx);
1055 	  close_image(ictx);
1056 	
1057 	  wait_for_replay_complete();
1058 	
1059 	  open_local_image(&ictx);
1060 	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
1061 	    read_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
1062 	  }
1063 	  close_image(ictx);
1064 	
1065 	  stop();
1066 	}
1067 	
1068 	TEST_F(TestImageReplayer, MetadataSetRemove)
1069 	{
1070 	  const std::string KEY = "test_key";
1071 	  const std::string VALUE = "test_value";
1072 	
1073 	  librbd::ImageCtx *ictx;
1074 	  std::string value;
1075 	
1076 	  bootstrap();
1077 	
1078 	  start();
1079 	
1080 	  // Test metadata_set replication
1081 	
1082 	  open_remote_image(&ictx);
1083 	  ASSERT_EQ(0, ictx->operations->metadata_set(KEY, VALUE));
1084 	  value.clear();
1085 	  ASSERT_EQ(0, librbd::metadata_get(ictx, KEY, &value));
1086 	  ASSERT_EQ(VALUE, value);
1087 	  close_image(ictx);
1088 	
1089 	  wait_for_replay_complete();
1090 	
1091 	  open_local_image(&ictx);
1092 	  value.clear();
1093 	  ASSERT_EQ(0, librbd::metadata_get(ictx, KEY, &value));
1094 	  ASSERT_EQ(VALUE, value);
1095 	  close_image(ictx);
1096 	
1097 	  // Test metadata_remove replication
1098 	
1099 	  open_remote_image(&ictx);
1100 	  ASSERT_EQ(0, ictx->operations->metadata_remove(KEY));
1101 	  ASSERT_EQ(-ENOENT, librbd::metadata_get(ictx, KEY, &value));
1102 	  close_image(ictx);
1103 	
1104 	  wait_for_replay_complete();
1105 	
1106 	  open_local_image(&ictx);
1107 	  ASSERT_EQ(-ENOENT, librbd::metadata_get(ictx, KEY, &value));
1108 	  close_image(ictx);
1109 	
1110 	  stop();
1111 	}
1112 	
1113 	TEST_F(TestImageReplayer, MirroringDelay)
1114 	{
1115 	  const double DELAY = 10; // set less than wait_for_replay_complete timeout
1116 	
1117 	  librbd::ImageCtx *ictx;
1118 	  utime_t start_time;
1119 	  double delay;
1120 	
1121 	  bootstrap();
1122 	
1123 	  ASSERT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_replay_delay",
1124 	                                         stringify(DELAY).c_str()));
1125 	  open_local_image(&ictx);
1126 	  ASSERT_EQ(DELAY, ictx->mirroring_replay_delay);
1127 	  close_image(ictx);
1128 	
1129 	  start();
1130 	
1131 	  // Test delay
1132 	
1133 	  generate_test_data();
1134 	  open_remote_image(&ictx);
1135 	  start_time = ceph_clock_now();
1136 	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
1137 	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
1138 	  }
1139 	  flush(ictx);
1140 	  close_image(ictx);
1141 	
1142 	  wait_for_replay_complete();
1143 	  delay = ceph_clock_now() - start_time;
1144 	  ASSERT_GE(delay, DELAY);
1145 	
1146 	  // Test stop when delaying replay
1147 	
1148 	  open_remote_image(&ictx);
1149 	  start_time = ceph_clock_now();
1150 	  for (int i = 0; i < TEST_IO_COUNT; ++i) {
1151 	    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
1152 	  }
1153 	  close_image(ictx);
1154 	
1155 	  sleep(DELAY / 2);
1156 	  stop();
1157 	  start();
1158 	
1159 	  wait_for_replay_complete();
1160 	  delay = ceph_clock_now() - start_time;
1161 	  ASSERT_GE(delay, DELAY);
1162 	
1163 	  stop();
1164 	}
1165