1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/debug.h"
5 #include "common/errno.h"
6 #include "common/Timer.h"
7 #include "common/WorkQueue.h"
8
9 #include "librbd/Utils.h"
10 #include "tools/rbd_mirror/Threads.h"
11
12 #include "ImageMap.h"
13 #include "image_map/LoadRequest.h"
14 #include "image_map/SimplePolicy.h"
15 #include "image_map/UpdateRequest.h"
16
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rbd_mirror
19 #undef dout_prefix
20 #define dout_prefix *_dout << "rbd::mirror::ImageMap: " << this << " " \
21 << __func__ << ": "
22
23 namespace rbd {
24 namespace mirror {
25
26 using ::operator<<;
27 using image_map::Policy;
28
29 using librbd::util::unique_lock_name;
30 using librbd::util::create_async_context_callback;
31
32 template <typename I>
33 struct ImageMap<I>::C_NotifyInstance : public Context {
34 ImageMap* image_map;
35 std::string global_image_id;
36 bool acquire_release;
37
38 C_NotifyInstance(ImageMap* image_map, const std::string& global_image_id,
39 bool acquire_release)
40 : image_map(image_map), global_image_id(global_image_id),
41 acquire_release(acquire_release) {
42 image_map->start_async_op();
43 }
44
45 void finish(int r) override {
46 if (acquire_release) {
47 image_map->handle_peer_ack(global_image_id, r);
48 } else {
49 image_map->handle_peer_ack_remove(global_image_id, r);
50 }
51 image_map->finish_async_op();
52 }
53 };
54
55 template <typename I>
56 ImageMap<I>::ImageMap(librados::IoCtx &ioctx, Threads<I> *threads,
57 const std::string& instance_id,
58 image_map::Listener &listener)
59 : m_ioctx(ioctx), m_threads(threads), m_instance_id(instance_id),
60 m_listener(listener),
61 m_lock(ceph::make_mutex(
62 unique_lock_name("rbd::mirror::ImageMap::m_lock", this))) {
63 }
64
65 template <typename I>
66 ImageMap<I>::~ImageMap() {
67 ceph_assert(m_async_op_tracker.empty());
68 ceph_assert(m_timer_task == nullptr);
69 ceph_assert(m_rebalance_task == nullptr);
70 }
71
72 template <typename I>
73 void ImageMap<I>::continue_action(const std::set<std::string> &global_image_ids,
74 int r) {
75 dout(20) << dendl;
76
77 {
78 std::lock_guard locker{m_lock};
79 if (m_shutting_down) {
80 return;
81 }
82
83 for (auto const &global_image_id : global_image_ids) {
84 bool schedule = m_policy->finish_action(global_image_id, r);
85 if (schedule) {
86 schedule_action(global_image_id);
87 }
88 }
89 }
90
91 schedule_update_task();
92 }
93
94 template <typename I>
95 void ImageMap<I>::handle_update_request(
96 const Updates &updates,
97 const std::set<std::string> &remove_global_image_ids, int r) {
98 dout(20) << "r=" << r << dendl;
99
100 std::set<std::string> global_image_ids;
101
102 global_image_ids.insert(remove_global_image_ids.begin(),
103 remove_global_image_ids.end());
104 for (auto const &update : updates) {
105 global_image_ids.insert(update.global_image_id);
106 }
107
108 continue_action(global_image_ids, r);
109 }
110
111 template <typename I>
112 void ImageMap<I>::update_image_mapping(Updates&& map_updates,
113 std::set<std::string>&& map_removals) {
114 if (map_updates.empty() && map_removals.empty()) {
115 return;
116 }
117
118 dout(5) << "updates=[" << map_updates << "], "
119 << "removes=[" << map_removals << "]" << dendl;
120
121 Context *on_finish = new LambdaContext(
122 [this, map_updates, map_removals](int r) {
123 handle_update_request(map_updates, map_removals, r);
124 finish_async_op();
125 });
126 on_finish = create_async_context_callback(m_threads->work_queue, on_finish);
127
128 // empty meta policy for now..
129 image_map::PolicyMetaNone policy_meta;
130
131 bufferlist bl;
132 encode(image_map::PolicyData(policy_meta), bl);
133
134 // prepare update map
135 std::map<std::string, cls::rbd::MirrorImageMap> update_mapping;
136 for (auto const &update : map_updates) {
137 update_mapping.emplace(
138 update.global_image_id, cls::rbd::MirrorImageMap(update.instance_id,
139 update.mapped_time, bl));
140 }
141
142 start_async_op();
143 image_map::UpdateRequest<I> *req = image_map::UpdateRequest<I>::create(
144 m_ioctx, std::move(update_mapping), std::move(map_removals), on_finish);
145 req->send();
146 }
147
148 template <typename I>
149 void ImageMap<I>::process_updates() {
(1) Event cond_true: |
Condition "should_gather", taking true branch. |
150 dout(20) << dendl;
151
(2) Event cond_true: |
Condition "this->m_threads->timer_lock.is_locked()", taking true branch. |
152 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
(3) Event cond_true: |
Condition "this->m_timer_task == NULL", taking true branch. |
153 ceph_assert(m_timer_task == nullptr);
154
155 Updates map_updates;
156 std::set<std::string> map_removals;
157 Updates acquire_updates;
158 Updates release_updates;
159
160 // gather updates by advancing the state machine
161 m_lock.lock();
(5) Event for_loop: |
Iterating over another element of "this->m_global_image_ids". |
162 for (auto const &global_image_id : m_global_image_ids) {
163 image_map::ActionType action_type =
164 m_policy->start_action(global_image_id);
165 image_map::LookupInfo info = m_policy->lookup(global_image_id);
166
167 dout(15) << "global_image_id=" << global_image_id << ", "
168 << "action=" << action_type << ", "
169 << "instance=" << info.instance_id << dendl;
170 switch (action_type) {
171 case image_map::ACTION_TYPE_NONE:
172 continue;
173 case image_map::ACTION_TYPE_MAP_UPDATE:
174 ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
175 map_updates.emplace_back(global_image_id, info.instance_id,
176 info.mapped_time);
177 break;
178 case image_map::ACTION_TYPE_MAP_REMOVE:
179 map_removals.emplace(global_image_id);
180 break;
181 case image_map::ACTION_TYPE_ACQUIRE:
182 ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
183 acquire_updates.emplace_back(global_image_id, info.instance_id);
184 break;
185 case image_map::ACTION_TYPE_RELEASE:
186 ceph_assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
187 release_updates.emplace_back(global_image_id, info.instance_id);
188 break;
189 }
190 }
191 m_global_image_ids.clear();
192 m_lock.unlock();
193
194 // notify listener (acquire, release) and update on-disk map. note
195 // that its safe to process this outside m_lock as we still hold
196 // timer lock.
197 notify_listener_acquire_release_images(acquire_updates, release_updates);
198 update_image_mapping(std::move(map_updates), std::move(map_removals));
199 }
200
201 template <typename I>
202 void ImageMap<I>::schedule_update_task() {
203 std::lock_guard timer_lock{m_threads->timer_lock};
204 schedule_update_task(m_threads->timer_lock);
205 }
206
207 template <typename I>
208 void ImageMap<I>::schedule_update_task(const ceph::mutex &timer_lock) {
209 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
210
211 schedule_rebalance_task();
212
213 if (m_timer_task != nullptr) {
214 return;
215 }
216
217 {
218 std::lock_guard locker{m_lock};
219 if (m_global_image_ids.empty()) {
220 return;
221 }
222 }
223
224 m_timer_task = new LambdaContext([this](int r) {
225 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
226 m_timer_task = nullptr;
227
228 process_updates();
229 });
230
231 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
232 double after = cct->_conf.get_val<double>("rbd_mirror_image_policy_update_throttle_interval");
233
234 dout(20) << "scheduling image check update (" << m_timer_task << ")"
235 << " after " << after << " second(s)" << dendl;
236 m_threads->timer->add_event_after(after, m_timer_task);
237 }
238
239 template <typename I>
240 void ImageMap<I>::rebalance() {
241 ceph_assert(m_rebalance_task == nullptr);
242
243 {
244 std::lock_guard locker{m_lock};
245 if (m_async_op_tracker.empty() && m_global_image_ids.empty()){
246 dout(20) << "starting rebalance" << dendl;
247
248 std::set<std::string> remap_global_image_ids;
249 m_policy->add_instances({}, &remap_global_image_ids);
250
251 for (auto const &global_image_id : remap_global_image_ids) {
252 schedule_action(global_image_id);
253 }
254 }
255 }
256
257 schedule_update_task(m_threads->timer_lock);
258 }
259
260 template <typename I>
261 void ImageMap<I>::schedule_rebalance_task() {
262 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
263
264 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
265
266 // fetch the updated value of idle timeout for (re)scheduling
267 double resched_after = cct->_conf.get_val<double>(
268 "rbd_mirror_image_policy_rebalance_timeout");
269 if (!resched_after) {
270 return;
271 }
272
273 // cancel existing rebalance task if any before scheduling
274 if (m_rebalance_task != nullptr) {
275 m_threads->timer->cancel_event(m_rebalance_task);
276 }
277
278 m_rebalance_task = new LambdaContext([this](int _) {
279 ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
280 m_rebalance_task = nullptr;
281
282 rebalance();
283 });
284
285 dout(20) << "scheduling rebalance (" << m_rebalance_task << ")"
286 << " after " << resched_after << " second(s)" << dendl;
287 m_threads->timer->add_event_after(resched_after, m_rebalance_task);
288 }
289
290 template <typename I>
291 void ImageMap<I>::schedule_action(const std::string &global_image_id) {
292 dout(20) << "global_image_id=" << global_image_id << dendl;
293 ceph_assert(ceph_mutex_is_locked(m_lock));
294
295 m_global_image_ids.emplace(global_image_id);
296 }
297
298 template <typename I>
299 void ImageMap<I>::notify_listener_acquire_release_images(
300 const Updates &acquire, const Updates &release) {
301 if (acquire.empty() && release.empty()) {
302 return;
303 }
304
305 dout(5) << "acquire=[" << acquire << "], "
306 << "release=[" << release << "]" << dendl;
307
308 for (auto const &update : acquire) {
309 m_listener.acquire_image(
310 update.global_image_id, update.instance_id,
311 create_async_context_callback(
312 m_threads->work_queue,
313 new C_NotifyInstance(this, update.global_image_id, true)));
314 }
315
316 for (auto const &update : release) {
317 m_listener.release_image(
318 update.global_image_id, update.instance_id,
319 create_async_context_callback(
320 m_threads->work_queue,
321 new C_NotifyInstance(this, update.global_image_id, true)));
322 }
323 }
324
325 template <typename I>
326 void ImageMap<I>::notify_listener_remove_images(const std::string &peer_uuid,
327 const Updates &remove) {
328 dout(5) << "peer_uuid=" << peer_uuid << ", "
329 << "remove=[" << remove << "]" << dendl;
330
331 for (auto const &update : remove) {
332 m_listener.remove_image(
333 peer_uuid, update.global_image_id, update.instance_id,
334 create_async_context_callback(
335 m_threads->work_queue,
336 new C_NotifyInstance(this, update.global_image_id, false)));
337 }
338 }
339
340 template <typename I>
341 void ImageMap<I>::handle_load(const std::map<std::string,
342 cls::rbd::MirrorImageMap> &image_mapping) {
343 dout(20) << dendl;
344
345 {
346 std::lock_guard locker{m_lock};
347 m_policy->init(image_mapping);
348
349 for (auto& pair : image_mapping) {
350 schedule_action(pair.first);
351 }
352 }
353 schedule_update_task();
354 }
355
356 template <typename I>
357 void ImageMap<I>::handle_peer_ack_remove(const std::string &global_image_id,
358 int r) {
359 std::lock_guard locker{m_lock};
360 dout(5) << "global_image_id=" << global_image_id << dendl;
361
362 if (r < 0) {
363 derr << "failed to remove global_image_id=" << global_image_id << dendl;
364 }
365
366 auto peer_it = m_peer_map.find(global_image_id);
367 if (peer_it == m_peer_map.end()) {
368 return;
369 }
370
371 m_peer_map.erase(peer_it);
372 }
373
374 template <typename I>
375 void ImageMap<I>::update_images_added(
376 const std::string &peer_uuid,
377 const std::set<std::string> &global_image_ids) {
378 dout(5) << "peer_uuid=" << peer_uuid << ", "
379 << "global_image_ids=[" << global_image_ids << "]" << dendl;
380 ceph_assert(ceph_mutex_is_locked(m_lock));
381
382 for (auto const &global_image_id : global_image_ids) {
383 auto result = m_peer_map[global_image_id].insert(peer_uuid);
384 if (result.second && m_peer_map[global_image_id].size() == 1) {
385 if (m_policy->add_image(global_image_id)) {
386 schedule_action(global_image_id);
387 }
388 }
389 }
390 }
391
392 template <typename I>
393 void ImageMap<I>::update_images_removed(
394 const std::string &peer_uuid,
395 const std::set<std::string> &global_image_ids) {
396 dout(5) << "peer_uuid=" << peer_uuid << ", "
397 << "global_image_ids=[" << global_image_ids << "]" << dendl;
398 ceph_assert(ceph_mutex_is_locked(m_lock));
399
400 Updates to_remove;
401 for (auto const &global_image_id : global_image_ids) {
402 image_map::LookupInfo info = m_policy->lookup(global_image_id);
403 bool image_mapped = (info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
404
405 bool image_removed = image_mapped;
406 bool peer_removed = false;
407 auto peer_it = m_peer_map.find(global_image_id);
408 if (peer_it != m_peer_map.end()) {
409 auto& peer_set = peer_it->second;
410 peer_removed = peer_set.erase(peer_uuid);
411 image_removed = peer_removed && peer_set.empty();
412 }
413
414 if (image_mapped && peer_removed && !peer_uuid.empty()) {
415 // peer image has been deleted
416 to_remove.emplace_back(global_image_id, info.instance_id);
417 }
418
419 if (image_mapped && image_removed) {
420 // local and peer images have been deleted
421 if (m_policy->remove_image(global_image_id)) {
422 schedule_action(global_image_id);
423 }
424 }
425 }
426
427 if (!to_remove.empty()) {
428 // removal notification will be notified instantly. this is safe
429 // even after scheduling action for images as we still hold m_lock
430 notify_listener_remove_images(peer_uuid, to_remove);
431 }
432 }
433
434 template <typename I>
435 void ImageMap<I>::update_instances_added(
436 const std::vector<std::string> &instance_ids) {
437 {
438 std::lock_guard locker{m_lock};
439 if (m_shutting_down) {
440 return;
441 }
442
443 std::vector<std::string> filtered_instance_ids;
444 filter_instance_ids(instance_ids, &filtered_instance_ids, false);
445 if (filtered_instance_ids.empty()) {
446 return;
447 }
448
449 dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
450
451 std::set<std::string> remap_global_image_ids;
452 m_policy->add_instances(filtered_instance_ids, &remap_global_image_ids);
453
454 for (auto const &global_image_id : remap_global_image_ids) {
455 schedule_action(global_image_id);
456 }
457 }
458
459 schedule_update_task();
460 }
461
462 template <typename I>
463 void ImageMap<I>::update_instances_removed(
464 const std::vector<std::string> &instance_ids) {
465 {
466 std::lock_guard locker{m_lock};
467 if (m_shutting_down) {
468 return;
469 }
470
471 std::vector<std::string> filtered_instance_ids;
472 filter_instance_ids(instance_ids, &filtered_instance_ids, true);
473 if (filtered_instance_ids.empty()) {
474 return;
475 }
476
477 dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
478
479 std::set<std::string> remap_global_image_ids;
480 m_policy->remove_instances(filtered_instance_ids, &remap_global_image_ids);
481
482 for (auto const &global_image_id : remap_global_image_ids) {
483 schedule_action(global_image_id);
484 }
485 }
486
487 schedule_update_task();
488 }
489
490 template <typename I>
491 void ImageMap<I>::update_images(const std::string &peer_uuid,
492 std::set<std::string> &&added_global_image_ids,
493 std::set<std::string> &&removed_global_image_ids) {
494 dout(5) << "peer_uuid=" << peer_uuid << ", " << "added_count="
495 << added_global_image_ids.size() << ", " << "removed_count="
496 << removed_global_image_ids.size() << dendl;
497
498 {
499 std::lock_guard locker{m_lock};
500 if (m_shutting_down) {
501 return;
502 }
503
504 if (!removed_global_image_ids.empty()) {
505 update_images_removed(peer_uuid, removed_global_image_ids);
506 }
507 if (!added_global_image_ids.empty()) {
508 update_images_added(peer_uuid, added_global_image_ids);
509 }
510 }
511
512 schedule_update_task();
513 }
514
515 template <typename I>
516 void ImageMap<I>::handle_peer_ack(const std::string &global_image_id, int r) {
517 dout (20) << "global_image_id=" << global_image_id << ", r=" << r
518 << dendl;
519
520 continue_action({global_image_id}, r);
521 }
522
523 template <typename I>
524 void ImageMap<I>::init(Context *on_finish) {
525 dout(20) << dendl;
526
527 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
528 std::string policy_type = cct->_conf.get_val<string>("rbd_mirror_image_policy_type");
529
530 if (policy_type == "none" || policy_type == "simple") {
531 m_policy.reset(image_map::SimplePolicy::create(m_ioctx));
532 } else {
533 ceph_abort(); // not really needed as such, but catch it.
534 }
535
536 dout(20) << "mapping policy=" << policy_type << dendl;
537
538 start_async_op();
539 C_LoadMap *ctx = new C_LoadMap(this, on_finish);
540 image_map::LoadRequest<I> *req = image_map::LoadRequest<I>::create(
541 m_ioctx, &ctx->image_mapping, ctx);
542 req->send();
543 }
544
545 template <typename I>
546 void ImageMap<I>::shut_down(Context *on_finish) {
547 dout(20) << dendl;
548
549 {
550 std::lock_guard timer_lock{m_threads->timer_lock};
551
552 {
553 std::lock_guard locker{m_lock};
554 ceph_assert(!m_shutting_down);
555
556 m_shutting_down = true;
557 m_policy.reset();
558 }
559
560 if (m_timer_task != nullptr) {
561 m_threads->timer->cancel_event(m_timer_task);
562 m_timer_task = nullptr;
563 }
564 if (m_rebalance_task != nullptr) {
565 m_threads->timer->cancel_event(m_rebalance_task);
566 m_rebalance_task = nullptr;
567 }
568 }
569
570 wait_for_async_ops(on_finish);
571 }
572
573 template <typename I>
574 void ImageMap<I>::filter_instance_ids(
575 const std::vector<std::string> &instance_ids,
576 std::vector<std::string> *filtered_instance_ids, bool removal) const {
577 CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
578 std::string policy_type = cct->_conf.get_val<string>("rbd_mirror_image_policy_type");
579
580 if (policy_type != "none") {
581 *filtered_instance_ids = instance_ids;
582 return;
583 }
584
585 if (removal) {
586 // propagate removals for external instances
587 for (auto& instance_id : instance_ids) {
588 if (instance_id != m_instance_id) {
589 filtered_instance_ids->push_back(instance_id);
590 }
591 }
592 } else if (std::find(instance_ids.begin(), instance_ids.end(),
593 m_instance_id) != instance_ids.end()) {
594 // propagate addition only for local instance
595 filtered_instance_ids->push_back(m_instance_id);
596 }
597 }
598
599 } // namespace mirror
600 } // namespace rbd
601
602 template class rbd::mirror::ImageMap<librbd::ImageCtx>;
603