1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "include/stringify.h"
5    	#include "common/Cond.h"
6    	#include "common/Timer.h"
7    	#include "common/debug.h"
8    	#include "common/errno.h"
9    	#include "librbd/Utils.h"
10   	#include "ImageReplayer.h"
11   	#include "InstanceReplayer.h"
12   	#include "ServiceDaemon.h"
13   	#include "Threads.h"
14   	
15   	#define dout_context g_ceph_context
16   	#define dout_subsys ceph_subsys_rbd_mirror
17   	#undef dout_prefix
18   	#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
19   	                           << this << " " << __func__ << ": "
20   	
21   	namespace rbd {
22   	namespace mirror {
23   	
24   	namespace {
25   	
26   	const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
27   	const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
28   	const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
29   	
30   	} // anonymous namespace
31   	
32   	using librbd::util::create_async_context_callback;
33   	using librbd::util::create_context_callback;
34   	
35   	template <typename I>
36   	InstanceReplayer<I>::InstanceReplayer(
37   	    librados::IoCtx &local_io_ctx, const std::string &local_mirror_uuid,
38   	    Threads<I> *threads, ServiceDaemon<I>* service_daemon,
39   	    journal::CacheManagerHandler *cache_manager_handler)
40   	  : m_local_io_ctx(local_io_ctx), m_local_mirror_uuid(local_mirror_uuid),
41   	    m_threads(threads), m_service_daemon(service_daemon),
42   	    m_cache_manager_handler(cache_manager_handler),
43   	    m_lock(ceph::make_mutex("rbd::mirror::InstanceReplayer " +
44   	        stringify(local_io_ctx.get_id()))) {
45   	}
46   	
47   	template <typename I>
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
48   	InstanceReplayer<I>::~InstanceReplayer() {
49   	  ceph_assert(m_image_state_check_task == nullptr);
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
50   	  ceph_assert(m_async_op_tracker.empty());
51   	  ceph_assert(m_image_replayers.empty());
52   	}
53   	
54   	template <typename I>
55   	int InstanceReplayer<I>::init() {
56   	  C_SaferCond init_ctx;
57   	  init(&init_ctx);
58   	  return init_ctx.wait();
59   	}
60   	
61   	template <typename I>
62   	void InstanceReplayer<I>::init(Context *on_finish) {
63   	  dout(10) << dendl;
64   	
65   	  Context *ctx = new LambdaContext(
66   	    [this, on_finish] (int r) {
67   	      {
68   	        std::lock_guard timer_locker{m_threads->timer_lock};
69   	        schedule_image_state_check_task();
70   	      }
71   	      on_finish->complete(0);
72   	    });
73   	
74   	  m_threads->work_queue->queue(ctx, 0);
75   	}
76   	
77   	template <typename I>
78   	void InstanceReplayer<I>::shut_down() {
79   	  C_SaferCond shut_down_ctx;
80   	  shut_down(&shut_down_ctx);
81   	  int r = shut_down_ctx.wait();
82   	  ceph_assert(r == 0);
83   	}
84   	
85   	template <typename I>
86   	void InstanceReplayer<I>::shut_down(Context *on_finish) {
87   	  dout(10) << dendl;
88   	
89   	  std::lock_guard locker{m_lock};
90   	
91   	  ceph_assert(m_on_shut_down == nullptr);
92   	  m_on_shut_down = on_finish;
93   	
94   	  Context *ctx = new LambdaContext(
95   	    [this] (int r) {
96   	      cancel_image_state_check_task();
97   	      wait_for_ops();
98   	    });
99   	
100  	  m_threads->work_queue->queue(ctx, 0);
101  	}
102  	
103  	template <typename I>
104  	void InstanceReplayer<I>::add_peer(std::string peer_uuid,
105  	                                   librados::IoCtx io_ctx) {
106  	  dout(10) << peer_uuid << dendl;
107  	
108  	  std::lock_guard locker{m_lock};
109  	  auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
110  	  ceph_assert(result);
111  	}
112  	
113  	template <typename I>
114  	void InstanceReplayer<I>::release_all(Context *on_finish) {
115  	  dout(10) << dendl;
116  	
117  	  std::lock_guard locker{m_lock};
118  	
119  	  C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
120  	  for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
121  	       it = m_image_replayers.erase(it)) {
122  	    auto image_replayer = it->second;
123  	    auto ctx = gather_ctx->new_sub();
124  	    ctx = new LambdaContext(
125  	      [image_replayer, ctx] (int r) {
126  	        image_replayer->destroy();
127  	        ctx->complete(0);
128  	      });
129  	    stop_image_replayer(image_replayer, ctx);
130  	  }
131  	  gather_ctx->activate();
132  	}
133  	
134  	template <typename I>
135  	void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
136  	                                        const std::string &global_image_id,
137  	                                        Context *on_finish) {
138  	  dout(10) << "global_image_id=" << global_image_id << dendl;
139  	
140  	  std::lock_guard locker{m_lock};
141  	
142  	  ceph_assert(m_on_shut_down == nullptr);
143  	
144  	  auto it = m_image_replayers.find(global_image_id);
145  	  if (it == m_image_replayers.end()) {
146  	    auto image_replayer = ImageReplayer<I>::create(
147  	        m_local_io_ctx, m_local_mirror_uuid, global_image_id,
148  	        m_threads, instance_watcher, m_cache_manager_handler);
149  	
150  	    dout(10) << global_image_id << ": creating replayer " << image_replayer
151  	             << dendl;
152  	
153  	    it = m_image_replayers.insert(std::make_pair(global_image_id,
154  	                                                 image_replayer)).first;
155  	
156  	    // TODO only a single peer is currently supported
157  	    ceph_assert(m_peers.size() == 1);
158  	    auto peer = *m_peers.begin();
159  	    image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
160  	    start_image_replayer(image_replayer);
161  	  } else {
162  	    // A duplicate acquire notification implies (1) connection hiccup or
163  	    // (2) new leader election. For the second case, restart the replayer to
164  	    // detect if the image has been deleted while the leader was offline
165  	    auto& image_replayer = it->second;
166  	    image_replayer->set_finished(false);
167  	    image_replayer->restart();
168  	  }
169  	
170  	  m_threads->work_queue->queue(on_finish, 0);
171  	}
172  	
173  	template <typename I>
174  	void InstanceReplayer<I>::release_image(const std::string &global_image_id,
175  	                                        Context *on_finish) {
176  	  dout(10) << "global_image_id=" << global_image_id << dendl;
177  	
178  	  std::lock_guard locker{m_lock};
179  	  ceph_assert(m_on_shut_down == nullptr);
180  	
181  	  auto it = m_image_replayers.find(global_image_id);
182  	  if (it == m_image_replayers.end()) {
183  	    dout(5) << global_image_id << ": not found" << dendl;
184  	    m_threads->work_queue->queue(on_finish, 0);
185  	    return;
186  	  }
187  	
188  	  auto image_replayer = it->second;
189  	  m_image_replayers.erase(it);
190  	
191  	  on_finish = new LambdaContext(
192  	    [image_replayer, on_finish] (int r) {
193  	      image_replayer->destroy();
194  	      on_finish->complete(0);
195  	    });
196  	  stop_image_replayer(image_replayer, on_finish);
197  	}
198  	
199  	template <typename I>
200  	void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
201  	                                            const std::string &peer_mirror_uuid,
202  	                                            Context *on_finish) {
203  	  dout(10) << "global_image_id=" << global_image_id << ", "
204  	           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
205  	
206  	  std::lock_guard locker{m_lock};
207  	  ceph_assert(m_on_shut_down == nullptr);
208  	
209  	  auto it = m_image_replayers.find(global_image_id);
210  	  if (it != m_image_replayers.end()) {
211  	    // TODO only a single peer is currently supported, therefore
212  	    // we can just interrupt the current image replayer and
213  	    // it will eventually detect that the peer image is missing and
214  	    // determine if a delete propagation is required.
215  	    auto image_replayer = it->second;
216  	    image_replayer->restart();
217  	  }
218  	  m_threads->work_queue->queue(on_finish, 0);
219  	}
220  	
221  	template <typename I>
222  	void InstanceReplayer<I>::print_status(Formatter *f) {
223  	  dout(10) << dendl;
224  	
225  	  std::lock_guard locker{m_lock};
226  	
227  	  f->open_array_section("image_replayers");
228  	  for (auto &kv : m_image_replayers) {
229  	    auto &image_replayer = kv.second;
230  	    image_replayer->print_status(f);
231  	  }
232  	  f->close_section();
233  	}
234  	
235  	template <typename I>
236  	void InstanceReplayer<I>::start()
237  	{
238  	  dout(10) << dendl;
239  	
240  	  std::lock_guard locker{m_lock};
241  	
242  	  m_manual_stop = false;
243  	
244  	  for (auto &kv : m_image_replayers) {
245  	    auto &image_replayer = kv.second;
246  	    image_replayer->start(nullptr, true);
247  	  }
248  	}
249  	
250  	template <typename I>
251  	void InstanceReplayer<I>::stop()
252  	{
253  	  dout(10) << dendl;
254  	
255  	  std::lock_guard locker{m_lock};
256  	
257  	  m_manual_stop = true;
258  	
259  	  for (auto &kv : m_image_replayers) {
260  	    auto &image_replayer = kv.second;
261  	    image_replayer->stop(nullptr, true);
262  	  }
263  	}
264  	
265  	template <typename I>
266  	void InstanceReplayer<I>::stop(Context *on_finish)
267  	{
268  	  dout(10) << dendl;
269  	
270  	  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
271  	  auto gather_ctx = new C_Gather(cct, on_finish);
272  	  {
273  	    std::lock_guard locker{m_lock};
274  	
275  	    m_manual_stop = true;
276  	
277  	    for (auto &kv : m_image_replayers) {
278  	      auto &image_replayer = kv.second;
279  	      image_replayer->stop(gather_ctx->new_sub(), true);
280  	    }
281  	  }
282  	
283  	  gather_ctx->activate();
284  	}
285  	
286  	template <typename I>
287  	void InstanceReplayer<I>::restart()
288  	{
289  	  dout(10) << dendl;
290  	
291  	  std::lock_guard locker{m_lock};
292  	
293  	  m_manual_stop = false;
294  	
295  	  for (auto &kv : m_image_replayers) {
296  	    auto &image_replayer = kv.second;
297  	    image_replayer->restart();
298  	  }
299  	}
300  	
301  	template <typename I>
302  	void InstanceReplayer<I>::flush()
303  	{
304  	  dout(10) << dendl;
305  	
306  	  std::lock_guard locker{m_lock};
307  	
308  	  for (auto &kv : m_image_replayers) {
309  	    auto &image_replayer = kv.second;
310  	    image_replayer->flush();
311  	  }
312  	}
313  	
314  	template <typename I>
315  	void InstanceReplayer<I>::start_image_replayer(
316  	    ImageReplayer<I> *image_replayer) {
317  	  ceph_assert(ceph_mutex_is_locked(m_lock));
318  	
319  	  std::string global_image_id = image_replayer->get_global_image_id();
320  	  if (!image_replayer->is_stopped()) {
321  	    return;
322  	  } else if (image_replayer->is_blacklisted()) {
323  	    derr << "global_image_id=" << global_image_id << ": blacklisted detected "
324  	         << "during image replay" << dendl;
325  	    return;
326  	  } else if (image_replayer->is_finished()) {
327  	    // TODO temporary until policy integrated
328  	    dout(5) << "removing image replayer for global_image_id="
329  	            << global_image_id << dendl;
330  	    m_image_replayers.erase(image_replayer->get_global_image_id());
331  	    image_replayer->destroy();
332  	    return;
333  	  } else if (m_manual_stop) {
334  	    return;
335  	  }
336  	
337  	  dout(10) << "global_image_id=" << global_image_id << dendl;
338  	  image_replayer->start(nullptr, false);
339  	}
340  	
341  	template <typename I>
342  	void InstanceReplayer<I>::queue_start_image_replayers() {
343  	  dout(10) << dendl;
344  	
345  	  Context *ctx = create_context_callback<
346  	    InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
347  	  m_async_op_tracker.start_op();
348  	  m_threads->work_queue->queue(ctx, 0);
349  	}
350  	
351  	template <typename I>
352  	void InstanceReplayer<I>::start_image_replayers(int r) {
353  	  dout(10) << dendl;
354  	
355  	  std::lock_guard locker{m_lock};
356  	  if (m_on_shut_down != nullptr) {
357  	    return;
358  	  }
359  	
360  	  uint64_t image_count = 0;
361  	  uint64_t warning_count = 0;
362  	  uint64_t error_count = 0;
363  	  for (auto it = m_image_replayers.begin();
364  	       it != m_image_replayers.end();) {
365  	    auto current_it(it);
366  	    ++it;
367  	
368  	    ++image_count;
369  	    auto health_state = current_it->second->get_health_state();
370  	    if (health_state == image_replayer::HEALTH_STATE_WARNING) {
371  	      ++warning_count;
372  	    } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
373  	      ++error_count;
374  	    }
375  	
376  	    start_image_replayer(current_it->second);
377  	  }
378  	
379  	  // TODO: add namespace support to service daemon
380  	  if (m_local_io_ctx.get_namespace().empty()) {
381  	    m_service_daemon->add_or_update_attribute(
382  	      m_local_io_ctx.get_id(), SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
383  	    m_service_daemon->add_or_update_attribute(
384  	      m_local_io_ctx.get_id(), SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
385  	    m_service_daemon->add_or_update_attribute(
386  	      m_local_io_ctx.get_id(), SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
387  	  }
388  	
389  	  m_async_op_tracker.finish_op();
390  	}
391  	
392  	template <typename I>
393  	void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
394  	                                              Context *on_finish) {
395  	  dout(10) << image_replayer << " global_image_id="
396  	           << image_replayer->get_global_image_id() << ", on_finish="
397  	           << on_finish << dendl;
398  	
399  	  if (image_replayer->is_stopped()) {
400  	    m_threads->work_queue->queue(on_finish, 0);
401  	    return;
402  	  }
403  	
404  	  m_async_op_tracker.start_op();
405  	  Context *ctx = create_async_context_callback(
406  	    m_threads->work_queue, new LambdaContext(
407  	      [this, image_replayer, on_finish] (int r) {
408  	        stop_image_replayer(image_replayer, on_finish);
409  	        m_async_op_tracker.finish_op();
410  	      }));
411  	
412  	  if (image_replayer->is_running()) {
413  	    image_replayer->stop(ctx, false);
414  	  } else {
415  	    int after = 1;
416  	    dout(10) << "scheduling image replayer " << image_replayer << " stop after "
417  	             << after << " sec (task " << ctx << ")" << dendl;
418  	    ctx = new LambdaContext(
419  	      [this, after, ctx] (int r) {
420  	        std::lock_guard timer_locker{m_threads->timer_lock};
421  	        m_threads->timer->add_event_after(after, ctx);
422  	      });
423  	    m_threads->work_queue->queue(ctx, 0);
424  	  }
425  	}
426  	
427  	template <typename I>
428  	void InstanceReplayer<I>::wait_for_ops() {
429  	  dout(10) << dendl;
430  	
431  	  Context *ctx = create_context_callback<
432  	    InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
433  	
434  	  m_async_op_tracker.wait_for_ops(ctx);
435  	}
436  	
437  	template <typename I>
438  	void InstanceReplayer<I>::handle_wait_for_ops(int r) {
439  	  dout(10) << "r=" << r << dendl;
440  	
441  	  ceph_assert(r == 0);
442  	
443  	  std::lock_guard locker{m_lock};
444  	  stop_image_replayers();
445  	}
446  	
447  	template <typename I>
448  	void InstanceReplayer<I>::stop_image_replayers() {
449  	  dout(10) << dendl;
450  	
451  	  ceph_assert(ceph_mutex_is_locked(m_lock));
452  	
453  	  Context *ctx = create_async_context_callback(
454  	    m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
455  	    &InstanceReplayer<I>::handle_stop_image_replayers>(this));
456  	
457  	  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
458  	  for (auto &it : m_image_replayers) {
459  	    stop_image_replayer(it.second, gather_ctx->new_sub());
460  	  }
461  	  gather_ctx->activate();
462  	}
463  	
464  	template <typename I>
465  	void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
466  	  dout(10) << "r=" << r << dendl;
467  	
468  	  ceph_assert(r == 0);
469  	
470  	  Context *on_finish = nullptr;
471  	  {
472  	    std::lock_guard locker{m_lock};
473  	
474  	    for (auto &it : m_image_replayers) {
475  	      ceph_assert(it.second->is_stopped());
476  	      it.second->destroy();
477  	    }
478  	    m_image_replayers.clear();
479  	
480  	    ceph_assert(m_on_shut_down != nullptr);
481  	    std::swap(on_finish, m_on_shut_down);
482  	  }
483  	  on_finish->complete(r);
484  	}
485  	
486  	template <typename I>
487  	void InstanceReplayer<I>::cancel_image_state_check_task() {
488  	  std::lock_guard timer_locker{m_threads->timer_lock};
489  	
490  	  if (m_image_state_check_task == nullptr) {
491  	    return;
492  	  }
493  	
494  	  dout(10) << m_image_state_check_task << dendl;
495  	  bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
496  	  ceph_assert(canceled);
497  	  m_image_state_check_task = nullptr;
498  	}
499  	
500  	template <typename I>
501  	void InstanceReplayer<I>::schedule_image_state_check_task() {
502  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
503  	  ceph_assert(m_image_state_check_task == nullptr);
504  	
505  	  m_image_state_check_task = new LambdaContext(
506  	    [this](int r) {
507  	      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
508  	      m_image_state_check_task = nullptr;
509  	      schedule_image_state_check_task();
510  	      queue_start_image_replayers();
511  	    });
512  	
513  	  auto cct = static_cast<CephContext *>(m_local_io_ctx.cct());
514  	  int after = cct->_conf.get_val<uint64_t>(
515  	    "rbd_mirror_image_state_check_interval");
516  	
517  	  dout(10) << "scheduling image state check after " << after << " sec (task "
518  	           << m_image_state_check_task << ")" << dendl;
519  	  m_threads->timer->add_event_after(after, m_image_state_check_task);
520  	}
521  	
522  	} // namespace mirror
523  	} // namespace rbd
524  	
525  	template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;
526