1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "common/debug.h"
5    	#include "common/errno.h"
6    	#include "common/Timer.h"
7    	#include "common/WorkQueue.h"
8    	
9    	#include "librbd/Utils.h"
10   	#include "tools/rbd_mirror/Threads.h"
11   	
12   	#include "ImageMap.h"
13   	#include "image_map/LoadRequest.h"
14   	#include "image_map/SimplePolicy.h"
15   	#include "image_map/UpdateRequest.h"
16   	
17   	#define dout_context g_ceph_context
18   	#define dout_subsys ceph_subsys_rbd_mirror
19   	#undef dout_prefix
20   	#define dout_prefix *_dout << "rbd::mirror::ImageMap: " << this << " " \
21   	                           << __func__ << ": "
22   	
23   	namespace rbd {
24   	namespace mirror {
25   	
26   	using ::operator<<;
27   	using image_map::Policy;
28   	
29   	using librbd::util::unique_lock_name;
30   	using librbd::util::create_async_context_callback;
31   	
32   	template <typename I>
33   	struct ImageMap<I>::C_NotifyInstance : public Context {
34   	  ImageMap* image_map;
35   	  std::string global_image_id;
36   	  bool acquire_release;
37   	
38   	  C_NotifyInstance(ImageMap* image_map, const std::string& global_image_id,
39   	                   bool acquire_release)
40   	    : image_map(image_map), global_image_id(global_image_id),
41   	      acquire_release(acquire_release) {
42   	    image_map->start_async_op();
43   	  }
44   	
45   	  void finish(int r) override {
46   	    if (acquire_release) {
47   	      image_map->handle_peer_ack(global_image_id, r);
48   	    } else {
49   	      image_map->handle_peer_ack_remove(global_image_id, r);
50   	    }
51   	    image_map->finish_async_op();
52   	  }
53   	};
54   	
55   	template <typename I>
56   	ImageMap<I>::ImageMap(librados::IoCtx &ioctx, Threads<I> *threads,
57   	                      const std::string& instance_id,
58   	                      image_map::Listener &listener)
59   	  : m_ioctx(ioctx), m_threads(threads), m_instance_id(instance_id),
60   	    m_listener(listener),
61   	    m_lock(ceph::make_mutex(
62   	      unique_lock_name("rbd::mirror::ImageMap::m_lock", this))) {
63   	}
64   	
65   	template <typename I>
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
66   	ImageMap<I>::~ImageMap() {
67   	  ceph_assert(m_async_op_tracker.empty());
68   	  ceph_assert(m_timer_task == nullptr);
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
69   	  ceph_assert(m_rebalance_task == nullptr);
70   	}
71   	
72   	template <typename I>
73   	void ImageMap<I>::continue_action(const std::set<std::string> &global_image_ids,
74   	                                  int r) {
75   	  dout(20) << dendl;
76   	
77   	  {
78   	    std::lock_guard locker{m_lock};
79   	    if (m_shutting_down) {
80   	      return;
81   	    }
82   	
83   	    for (auto const &global_image_id : global_image_ids) {
84   	      bool schedule = m_policy->finish_action(global_image_id, r);
85   	      if (schedule) {
86   	        schedule_action(global_image_id);
87   	      }
88   	    }
89   	  }
90   	
91   	  schedule_update_task();
92   	}
93   	
94   	template <typename I>
95   	void ImageMap<I>::handle_update_request(
96   	    const Updates &updates,
97   	    const std::set<std::string> &remove_global_image_ids, int r) {
98   	  dout(20) << "r=" << r << dendl;
99   	
100  	  std::set<std::string> global_image_ids;
101  	
102  	  global_image_ids.insert(remove_global_image_ids.begin(),
103  	                          remove_global_image_ids.end());
104  	  for (auto const &update : updates) {
105  	    global_image_ids.insert(update.global_image_id);
106  	  }
107  	
108  	  continue_action(global_image_ids, r);
109  	}
110  	
111  	template <typename I>
112  	void ImageMap<I>::update_image_mapping(Updates&& map_updates,
113  	                                       std::set<std::string>&& map_removals) {
114  	  if (map_updates.empty() && map_removals.empty()) {
115  	    return;
116  	  }
117  	
118  	  dout(5) << "updates=[" << map_updates << "], "
119  	          << "removes=[" << map_removals << "]" << dendl;
120  	
121  	  Context *on_finish = new LambdaContext(
122  	    [this, map_updates, map_removals](int r) {
123  	      handle_update_request(map_updates, map_removals, r);
124  	      finish_async_op();
125  	    });
126  	  on_finish = create_async_context_callback(m_threads->work_queue, on_finish);
127  	
128  	  // empty meta policy for now..
129  	  image_map::PolicyMetaNone policy_meta;
130  	
131  	  bufferlist bl;
132  	  encode(image_map::PolicyData(policy_meta), bl);
133  	
134  	  // prepare update map
135  	  std::map<std::string, cls::rbd::MirrorImageMap> update_mapping;
136  	  for (auto const &update : map_updates) {
137  	    update_mapping.emplace(
138  	      update.global_image_id, cls::rbd::MirrorImageMap(update.instance_id,
139  	      update.mapped_time, bl));
140  	  }
141  	
142  	  start_async_op();
143  	  image_map::UpdateRequest<I> *req = image_map::UpdateRequest<I>::create(
144  	    m_ioctx, std::move(update_mapping), std::move(map_removals), on_finish);
145  	  req->send();
146  	}
147  	
148  	template <typename I>
149  	void ImageMap<I>::process_updates() {
150  	  dout(20) << dendl;
151  	
152  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
153  	  ceph_assert(m_timer_task == nullptr);
154  	
155  	  Updates map_updates;
156  	  std::set<std::string> map_removals;
157  	  Updates acquire_updates;
158  	  Updates release_updates;
159  	
160  	  // gather updates by advancing the state machine
161  	  m_lock.lock();
162  	  for (auto const &global_image_id : m_global_image_ids) {
163  	    image_map::ActionType action_type =
164  	      m_policy->start_action(global_image_id);
165  	    image_map::LookupInfo info = m_policy->lookup(global_image_id);
166  	
167  	    dout(15) << "global_image_id=" << global_image_id << ", "
168  	             << "action=" << action_type << ", "
169  	             << "instance=" << info.instance_id << dendl;
170  	    switch (action_type) {
171  	    case image_map::ACTION_TYPE_NONE:
172  	      continue;
173  	    case image_map::ACTION_TYPE_MAP_UPDATE:
174  	      ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
175  	      map_updates.emplace_back(global_image_id, info.instance_id,
176  	                               info.mapped_time);
177  	      break;
178  	    case image_map::ACTION_TYPE_MAP_REMOVE:
179  	      map_removals.emplace(global_image_id);
180  	      break;
181  	    case image_map::ACTION_TYPE_ACQUIRE:
182  	      ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
183  	      acquire_updates.emplace_back(global_image_id, info.instance_id);
184  	      break;
185  	    case image_map::ACTION_TYPE_RELEASE:
186  	      ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
187  	      release_updates.emplace_back(global_image_id, info.instance_id);
188  	      break;
189  	    }
190  	  }
191  	  m_global_image_ids.clear();
192  	  m_lock.unlock();
193  	
194  	  // notify listener (acquire, release) and update on-disk map. note
195  	  // that its safe to process this outside m_lock as we still hold
196  	  // timer lock.
197  	  notify_listener_acquire_release_images(acquire_updates, release_updates);
198  	  update_image_mapping(std::move(map_updates), std::move(map_removals));
199  	}
200  	
201  	template <typename I>
202  	void ImageMap<I>::schedule_update_task() {
203  	  std::lock_guard timer_lock{m_threads->timer_lock};
204  	  schedule_update_task(m_threads->timer_lock);
205  	}
206  	
207  	template <typename I>
208  	void ImageMap<I>::schedule_update_task(const ceph::mutex &timer_lock) {
209  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
210  	
211  	  schedule_rebalance_task();
212  	
213  	  if (m_timer_task != nullptr) {
214  	    return;
215  	  }
216  	
217  	  {
218  	    std::lock_guard locker{m_lock};
219  	    if (m_global_image_ids.empty()) {
220  	      return;
221  	    }
222  	  }
223  	
224  	  m_timer_task = new LambdaContext([this](int r) {
225  	      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
226  	      m_timer_task = nullptr;
227  	
228  	      process_updates();
229  	    });
230  	
231  	  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
232  	  double after = cct->_conf.get_val<double>("rbd_mirror_image_policy_update_throttle_interval");
233  	
234  	  dout(20) << "scheduling image check update (" << m_timer_task << ")"
235  	           << " after " << after << " second(s)" << dendl;
236  	  m_threads->timer->add_event_after(after, m_timer_task);
237  	}
238  	
239  	template <typename I>
240  	void ImageMap<I>::rebalance() {
241  	  ceph_assert(m_rebalance_task == nullptr);
242  	
243  	  {
244  	    std::lock_guard locker{m_lock};
245  	    if (m_async_op_tracker.empty() && m_global_image_ids.empty()){
246  	      dout(20) << "starting rebalance" << dendl;
247  	
248  	      std::set<std::string> remap_global_image_ids;
249  	      m_policy->add_instances({}, &remap_global_image_ids);
250  	
251  	      for (auto const &global_image_id : remap_global_image_ids) {
252  	        schedule_action(global_image_id);
253  	      }
254  	    }
255  	  }
256  	
257  	  schedule_update_task(m_threads->timer_lock);
258  	}
259  	
260  	template <typename I>
261  	void ImageMap<I>::schedule_rebalance_task() {
262  	  ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
263  	
264  	  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
265  	
266  	  // fetch the updated value of idle timeout for (re)scheduling
267  	  double resched_after = cct->_conf.get_val<double>(
268  	    "rbd_mirror_image_policy_rebalance_timeout");
269  	  if (!resched_after) {
270  	    return;
271  	  }
272  	
273  	  // cancel existing rebalance task if any before scheduling
274  	  if (m_rebalance_task != nullptr) {
275  	    m_threads->timer->cancel_event(m_rebalance_task);
276  	  }
277  	
278  	  m_rebalance_task = new LambdaContext([this](int _) {
279  	      ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
280  	      m_rebalance_task = nullptr;
281  	
282  	      rebalance();
283  	    });
284  	
285  	  dout(20) << "scheduling rebalance (" << m_rebalance_task << ")"
286  	           << " after " << resched_after << " second(s)" << dendl;
287  	  m_threads->timer->add_event_after(resched_after, m_rebalance_task);
288  	}
289  	
290  	template <typename I>
291  	void ImageMap<I>::schedule_action(const std::string &global_image_id) {
292  	  dout(20) << "global_image_id=" << global_image_id << dendl;
293  	  ceph_assert(ceph_mutex_is_locked(m_lock));
294  	
295  	  m_global_image_ids.emplace(global_image_id);
296  	}
297  	
298  	template <typename I>
299  	void ImageMap<I>::notify_listener_acquire_release_images(
300  	    const Updates &acquire, const Updates &release) {
301  	  if (acquire.empty() && release.empty()) {
302  	    return;
303  	  }
304  	
305  	  dout(5) << "acquire=[" << acquire << "], "
306  	          << "release=[" << release << "]" << dendl;
307  	
308  	  for (auto const &update : acquire) {
309  	    m_listener.acquire_image(
310  	      update.global_image_id, update.instance_id,
311  	      create_async_context_callback(
312  	        m_threads->work_queue,
313  	        new C_NotifyInstance(this, update.global_image_id, true)));
314  	  }
315  	
316  	  for (auto const &update : release) {
317  	    m_listener.release_image(
318  	      update.global_image_id, update.instance_id,
319  	      create_async_context_callback(
320  	        m_threads->work_queue,
321  	        new C_NotifyInstance(this, update.global_image_id, true)));
322  	  }
323  	}
324  	
325  	template <typename I>
326  	void ImageMap<I>::notify_listener_remove_images(const std::string &peer_uuid,
327  	                                                const Updates &remove) {
328  	  dout(5) << "peer_uuid=" << peer_uuid << ", "
329  	          << "remove=[" << remove << "]" << dendl;
330  	
331  	  for (auto const &update : remove) {
332  	    m_listener.remove_image(
333  	      peer_uuid, update.global_image_id, update.instance_id,
334  	      create_async_context_callback(
335  	        m_threads->work_queue,
336  	        new C_NotifyInstance(this, update.global_image_id, false)));
337  	  }
338  	}
339  	
340  	template <typename I>
341  	void ImageMap<I>::handle_load(const std::map<std::string,
342  	                              cls::rbd::MirrorImageMap> &image_mapping) {
343  	  dout(20) << dendl;
344  	
345  	  {
346  	    std::lock_guard locker{m_lock};
347  	    m_policy->init(image_mapping);
348  	
349  	    for (auto& pair : image_mapping) {
350  	      schedule_action(pair.first);
351  	    }
352  	  }
353  	  schedule_update_task();
354  	}
355  	
356  	template <typename I>
357  	void ImageMap<I>::handle_peer_ack_remove(const std::string &global_image_id,
358  	                                         int r) {
359  	  std::lock_guard locker{m_lock};
360  	  dout(5) << "global_image_id=" << global_image_id << dendl;
361  	
362  	  if (r < 0) {
363  	    derr << "failed to remove global_image_id=" << global_image_id << dendl;
364  	  }
365  	
366  	  auto peer_it = m_peer_map.find(global_image_id);
367  	  if (peer_it == m_peer_map.end()) {
368  	    return;
369  	  }
370  	
371  	  m_peer_map.erase(peer_it);
372  	}
373  	
374  	template <typename I>
375  	void ImageMap<I>::update_images_added(
376  	    const std::string &peer_uuid,
377  	    const std::set<std::string> &global_image_ids) {
378  	  dout(5) << "peer_uuid=" << peer_uuid << ", "
379  	          << "global_image_ids=[" << global_image_ids << "]" << dendl;
380  	  ceph_assert(ceph_mutex_is_locked(m_lock));
381  	
382  	  for (auto const &global_image_id : global_image_ids) {
383  	    auto result = m_peer_map[global_image_id].insert(peer_uuid);
384  	    if (result.second && m_peer_map[global_image_id].size() == 1) {
385  	      if (m_policy->add_image(global_image_id)) {
386  	        schedule_action(global_image_id);
387  	      }
388  	    }
389  	  }
390  	}
391  	
392  	template <typename I>
393  	void ImageMap<I>::update_images_removed(
394  	    const std::string &peer_uuid,
395  	    const std::set<std::string> &global_image_ids) {
396  	  dout(5) << "peer_uuid=" << peer_uuid << ", "
397  	          << "global_image_ids=[" << global_image_ids << "]" << dendl;
398  	  ceph_assert(ceph_mutex_is_locked(m_lock));
399  	
400  	  Updates to_remove;
401  	  for (auto const &global_image_id : global_image_ids) {
402  	    image_map::LookupInfo info = m_policy->lookup(global_image_id);
403  	    bool image_mapped = (info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
404  	
405  	    bool image_removed = image_mapped;
406  	    bool peer_removed = false;
407  	    auto peer_it = m_peer_map.find(global_image_id);
408  	    if (peer_it != m_peer_map.end()) {
409  	      auto& peer_set = peer_it->second;
410  	      peer_removed = peer_set.erase(peer_uuid);
411  	      image_removed = peer_removed && peer_set.empty();
412  	    }
413  	
414  	    if (image_mapped && peer_removed && !peer_uuid.empty()) {
415  	      // peer image has been deleted
416  	      to_remove.emplace_back(global_image_id, info.instance_id);
417  	    }
418  	
419  	    if (image_mapped && image_removed) {
420  	      // local and peer images have been deleted
421  	      if (m_policy->remove_image(global_image_id)) {
422  	        schedule_action(global_image_id);
423  	      }
424  	    }
425  	  }
426  	
427  	  if (!to_remove.empty()) {
428  	    // removal notification will be notified instantly. this is safe
429  	    // even after scheduling action for images as we still hold m_lock
430  	    notify_listener_remove_images(peer_uuid, to_remove);
431  	  }
432  	}
433  	
434  	template <typename I>
435  	void ImageMap<I>::update_instances_added(
436  	    const std::vector<std::string> &instance_ids) {
437  	  {
438  	    std::lock_guard locker{m_lock};
439  	    if (m_shutting_down) {
440  	      return;
441  	    }
442  	
443  	    std::vector<std::string> filtered_instance_ids;
444  	    filter_instance_ids(instance_ids, &filtered_instance_ids, false);
445  	    if (filtered_instance_ids.empty()) {
446  	      return;
447  	    }
448  	
449  	    dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
450  	
451  	    std::set<std::string> remap_global_image_ids;
452  	    m_policy->add_instances(filtered_instance_ids, &remap_global_image_ids);
453  	
454  	    for (auto const &global_image_id : remap_global_image_ids) {
455  	      schedule_action(global_image_id);
456  	    }
457  	  }
458  	
459  	  schedule_update_task();
460  	}
461  	
462  	template <typename I>
463  	void ImageMap<I>::update_instances_removed(
464  	    const std::vector<std::string> &instance_ids) {
465  	  {
466  	    std::lock_guard locker{m_lock};
467  	    if (m_shutting_down) {
468  	      return;
469  	    }
470  	
471  	    std::vector<std::string> filtered_instance_ids;
472  	    filter_instance_ids(instance_ids, &filtered_instance_ids, true);
473  	    if (filtered_instance_ids.empty()) {
474  	      return;
475  	    }
476  	
477  	    dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
478  	
479  	    std::set<std::string> remap_global_image_ids;
480  	    m_policy->remove_instances(filtered_instance_ids, &remap_global_image_ids);
481  	
482  	    for (auto const &global_image_id : remap_global_image_ids) {
483  	      schedule_action(global_image_id);
484  	    }
485  	  }
486  	
487  	  schedule_update_task();
488  	}
489  	
490  	template <typename I>
491  	void ImageMap<I>::update_images(const std::string &peer_uuid,
492  	                                std::set<std::string> &&added_global_image_ids,
493  	                                std::set<std::string> &&removed_global_image_ids) {
494  	  dout(5) << "peer_uuid=" << peer_uuid << ", " << "added_count="
495  	          << added_global_image_ids.size() << ", " << "removed_count="
496  	          << removed_global_image_ids.size() << dendl;
497  	
498  	  {
499  	    std::lock_guard locker{m_lock};
500  	    if (m_shutting_down) {
501  	      return;
502  	    }
503  	
504  	    if (!removed_global_image_ids.empty()) {
505  	      update_images_removed(peer_uuid, removed_global_image_ids);
506  	    }
507  	    if (!added_global_image_ids.empty()) {
508  	      update_images_added(peer_uuid, added_global_image_ids);
509  	    }
510  	  }
511  	
512  	  schedule_update_task();
513  	}
514  	
515  	template <typename I>
516  	void ImageMap<I>::handle_peer_ack(const std::string &global_image_id, int r) {
517  	  dout (20) << "global_image_id=" << global_image_id << ", r=" << r
518  	            << dendl;
519  	
520  	  continue_action({global_image_id}, r);
521  	}
522  	
523  	template <typename I>
524  	void ImageMap<I>::init(Context *on_finish) {
525  	  dout(20) << dendl;
526  	
527  	  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
528  	  std::string policy_type = cct->_conf.get_val<string>("rbd_mirror_image_policy_type");
529  	
530  	  if (policy_type == "none" || policy_type == "simple") {
531  	    m_policy.reset(image_map::SimplePolicy::create(m_ioctx));
532  	  } else {
533  	    ceph_abort(); // not really needed as such, but catch it.
534  	  }
535  	
536  	  dout(20) << "mapping policy=" << policy_type << dendl;
537  	
538  	  start_async_op();
539  	  C_LoadMap *ctx = new C_LoadMap(this, on_finish);
540  	  image_map::LoadRequest<I> *req = image_map::LoadRequest<I>::create(
541  	    m_ioctx, &ctx->image_mapping, ctx);
542  	  req->send();
543  	}
544  	
545  	template <typename I>
546  	void ImageMap<I>::shut_down(Context *on_finish) {
547  	  dout(20) << dendl;
548  	
549  	  {
550  	    std::lock_guard timer_lock{m_threads->timer_lock};
551  	
552  	    {
553  	      std::lock_guard locker{m_lock};
554  	      ceph_assert(!m_shutting_down);
555  	
556  	      m_shutting_down = true;
557  	      m_policy.reset();
558  	    }
559  	
560  	    if (m_timer_task != nullptr) {
561  	      m_threads->timer->cancel_event(m_timer_task);
562  	      m_timer_task = nullptr;
563  	    }
564  	    if (m_rebalance_task != nullptr) {
565  	      m_threads->timer->cancel_event(m_rebalance_task);
566  	      m_rebalance_task = nullptr;
567  	    }
568  	  }
569  	
570  	  wait_for_async_ops(on_finish);
571  	}
572  	
573  	template <typename I>
574  	void ImageMap<I>::filter_instance_ids(
575  	    const std::vector<std::string> &instance_ids,
576  	    std::vector<std::string> *filtered_instance_ids, bool removal) const {
577  	  CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
578  	  std::string policy_type = cct->_conf.get_val<string>("rbd_mirror_image_policy_type");
579  	
580  	  if (policy_type != "none") {
581  	    *filtered_instance_ids = instance_ids;
582  	    return;
583  	  }
584  	
585  	  if (removal) {
586  	    // propagate removals for external instances
587  	    for (auto& instance_id : instance_ids) {
588  	      if (instance_id != m_instance_id) {
589  	        filtered_instance_ids->push_back(instance_id);
590  	      }
591  	    }
592  	  } else if (std::find(instance_ids.begin(), instance_ids.end(),
593  	                       m_instance_id) != instance_ids.end()) {
594  	    // propagate addition only for local instance
595  	    filtered_instance_ids->push_back(m_instance_id);
596  	  }
597  	}
598  	
599  	} // namespace mirror
600  	} // namespace rbd
601  	
602  	template class rbd::mirror::ImageMap<librbd::ImageCtx>;
603