1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/range/adaptor/map.hpp>
5
6 #include "common/Formatter.h"
7 #include "common/PriorityCache.h"
8 #include "common/admin_socket.h"
9 #include "common/debug.h"
10 #include "common/errno.h"
11 #include "journal/Types.h"
12 #include "librbd/ImageCtx.h"
13 #include "perfglue/heap_profiler.h"
14 #include "Mirror.h"
15 #include "ServiceDaemon.h"
16 #include "Threads.h"
17
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_rbd_mirror
20
21 using std::list;
22 using std::map;
23 using std::set;
24 using std::string;
25 using std::unique_ptr;
26 using std::vector;
27
28 using librados::Rados;
29 using librados::IoCtx;
30 using librbd::mirror_peer_t;
31
32 namespace rbd {
33 namespace mirror {
34
35 namespace {
36
37 class MirrorAdminSocketCommand {
38 public:
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual int call(Formatter *f) = 0;
41 };
42
43 class StatusCommand : public MirrorAdminSocketCommand {
44 public:
45 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46
47 int call(Formatter *f) override {
48 mirror->print_status(f);
49 return 0;
50 }
51
52 private:
53 Mirror *mirror;
54 };
55
56 class StartCommand : public MirrorAdminSocketCommand {
57 public:
58 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59
60 int call(Formatter *f) override {
61 mirror->start();
62 return 0;
63 }
64
65 private:
66 Mirror *mirror;
67 };
68
69 class StopCommand : public MirrorAdminSocketCommand {
70 public:
71 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72
73 int call(Formatter *f) override {
74 mirror->stop();
75 return 0;
76 }
77
78 private:
79 Mirror *mirror;
80 };
81
82 class RestartCommand : public MirrorAdminSocketCommand {
83 public:
84 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85
86 int call(Formatter *f) override {
87 mirror->restart();
88 return 0;
89 }
90
91 private:
92 Mirror *mirror;
93 };
94
95 class FlushCommand : public MirrorAdminSocketCommand {
96 public:
97 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98
99 int call(Formatter *f) override {
100 mirror->flush();
101 return 0;
102 }
103
104 private:
105 Mirror *mirror;
106 };
107
108 class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109 public:
110 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111
112 int call(Formatter *f) override {
113 mirror->release_leader();
114 return 0;
115 }
116
117 private:
118 Mirror *mirror;
119 };
120
121 #undef dout_prefix
122 #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
123 << m_name << " " << __func__ << ": "
124
125 struct PriCache : public PriorityCache::PriCache {
126 std::string m_name;
127 int64_t m_base_cache_max_size;
128 int64_t m_extra_cache_max_size;
129
130 PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
131 PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
132 int64_t m_base_cache_bytes = 0;
133 int64_t m_extra_cache_bytes = 0;
134 int64_t m_committed_bytes = 0;
135 double m_cache_ratio = 0;
136
137 PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
138 : m_name(name), m_base_cache_max_size(min_size),
139 m_extra_cache_max_size(max_size - min_size) {
140 ceph_assert(max_size >= min_size);
141 }
142
143 void prioritize() {
144 if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
145 return;
146 }
147 auto pri = static_cast<uint8_t>(m_base_cache_pri);
148 m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
149
150 dout(30) << m_base_cache_pri << dendl;
151 }
152
153 int64_t request_cache_bytes(PriorityCache::Priority pri,
154 uint64_t total_cache) const override {
155 int64_t cache_bytes = 0;
156
157 if (pri == m_base_cache_pri) {
158 cache_bytes += m_base_cache_max_size;
159 }
160 if (pri == m_extra_cache_pri) {
161 cache_bytes += m_extra_cache_max_size;
162 }
163
164 dout(30) << cache_bytes << dendl;
165
166 return cache_bytes;
167 }
168
169 int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
170 int64_t cache_bytes = 0;
171
172 if (pri == m_base_cache_pri) {
173 cache_bytes += m_base_cache_bytes;
174 }
175 if (pri == m_extra_cache_pri) {
176 cache_bytes += m_extra_cache_bytes;
177 }
178
179 dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
180
181 return cache_bytes;
182 }
183
184 int64_t get_cache_bytes() const override {
185 auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
186
187 dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
188 << cache_bytes << dendl;
189
190 return cache_bytes;
191 }
192
193 void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
194 ceph_assert(bytes >= 0);
195 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
196 bytes == 0);
197
198 dout(30) << "pri=" << pri << " " << bytes << dendl;
199
200 if (pri == m_base_cache_pri) {
201 m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
202 bytes -= std::min(m_base_cache_bytes, bytes);
203 }
204
205 if (pri == m_extra_cache_pri) {
206 m_extra_cache_bytes = bytes;
207 }
208 }
209
210 void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
211 ceph_assert(bytes >= 0);
212 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
213
214 dout(30) << "pri=" << pri << " " << bytes << dendl;
215
216 if (pri == m_base_cache_pri) {
217 ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
218
219 auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
220 m_base_cache_bytes += chunk;
221 bytes -= chunk;
222 }
223
224 if (pri == m_extra_cache_pri) {
225 m_extra_cache_bytes += bytes;
226 }
227 }
228
229 int64_t commit_cache_size(uint64_t total_cache) override {
230 m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
231
232 dout(30) << m_committed_bytes << dendl;
233
234 return m_committed_bytes;
235 }
236
237 int64_t get_committed_size() const override {
238 dout(30) << m_committed_bytes << dendl;
239
240 return m_committed_bytes;
241 }
242
243 double get_cache_ratio() const override {
244 dout(30) << m_cache_ratio << dendl;
245
246 return m_cache_ratio;
247 }
248
249 void set_cache_ratio(double ratio) override {
250 dout(30) << m_cache_ratio << dendl;
251
252 m_cache_ratio = ratio;
253 }
254
255 std::string get_cache_name() const override {
256 return m_name;
257 }
258 };
259
260 } // anonymous namespace
261
262 #undef dout_prefix
263 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
264 << __func__ << ": "
265
266 class MirrorAdminSocketHook : public AdminSocketHook {
267 public:
268 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
269 admin_socket(cct->get_admin_socket()) {
270 std::string command;
271 int r;
272
273 command = "rbd mirror status";
274 r = admin_socket->register_command(command, this,
275 "get status for rbd mirror");
276 if (r == 0) {
277 commands[command] = new StatusCommand(mirror);
278 }
279
280 command = "rbd mirror start";
281 r = admin_socket->register_command(command, this,
282 "start rbd mirror");
283 if (r == 0) {
284 commands[command] = new StartCommand(mirror);
285 }
286
287 command = "rbd mirror stop";
288 r = admin_socket->register_command(command, this,
289 "stop rbd mirror");
290 if (r == 0) {
291 commands[command] = new StopCommand(mirror);
292 }
293
294 command = "rbd mirror restart";
295 r = admin_socket->register_command(command, this,
296 "restart rbd mirror");
297 if (r == 0) {
298 commands[command] = new RestartCommand(mirror);
299 }
300
301 command = "rbd mirror flush";
302 r = admin_socket->register_command(command, this,
303 "flush rbd mirror");
304 if (r == 0) {
305 commands[command] = new FlushCommand(mirror);
306 }
307
308 command = "rbd mirror leader release";
309 r = admin_socket->register_command(command, this,
310 "release rbd mirror leader");
311 if (r == 0) {
312 commands[command] = new LeaderReleaseCommand(mirror);
313 }
314 }
315
316 ~MirrorAdminSocketHook() override {
317 (void)admin_socket->unregister_commands(this);
318 for (Commands::const_iterator i = commands.begin(); i != commands.end();
319 ++i) {
320 delete i->second;
321 }
322 }
323
324 int call(std::string_view command, const cmdmap_t& cmdmap,
325 Formatter *f,
326 std::ostream& errss,
327 bufferlist& out) override {
328 Commands::const_iterator i = commands.find(command);
329 ceph_assert(i != commands.end());
330 return i->second->call(f);
331 }
332
333 private:
334 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
335
336 AdminSocket *admin_socket;
337 Commands commands;
338 };
339
340 class CacheManagerHandler : public journal::CacheManagerHandler {
341 public:
342 CacheManagerHandler(CephContext *cct)
343 : m_cct(cct) {
344
345 if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
346 return;
347 }
348
349 uint64_t base = m_cct->_conf.get_val<Option::size_t>(
350 "rbd_mirror_memory_base");
351 double fragmentation = m_cct->_conf.get_val<double>(
352 "rbd_mirror_memory_expected_fragmentation");
353 uint64_t target = m_cct->_conf.get_val<Option::size_t>(
354 "rbd_mirror_memory_target");
355 uint64_t min = m_cct->_conf.get_val<Option::size_t>(
356 "rbd_mirror_memory_cache_min");
357 uint64_t max = min;
358
359 // When setting the maximum amount of memory to use for cache, first
360 // assume some base amount of memory for the daemon and then fudge in
361 // some overhead for fragmentation that scales with cache usage.
362 uint64_t ltarget = (1.0 - fragmentation) * target;
363 if (ltarget > base + min) {
364 max = ltarget - base;
365 }
366
367 m_next_balance = ceph_clock_now();
368 m_next_resize = ceph_clock_now();
369
370 m_cache_manager = std::make_unique<PriorityCache::Manager>(
371 m_cct, min, max, target, false);
372 }
373
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
374 ~CacheManagerHandler() {
375 std::lock_guard locker{m_lock};
376
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
377 ceph_assert(m_caches.empty());
378 }
379
380 void register_cache(const std::string &cache_name,
381 uint64_t min_size, uint64_t max_size,
382 journal::CacheRebalanceHandler* handler) override {
383 if (!m_cache_manager) {
384 handler->handle_cache_rebalanced(max_size);
385 return;
386 }
387
388 dout(20) << cache_name << " min_size=" << min_size << " max_size="
389 << max_size << " handler=" << handler << dendl;
390
391 std::lock_guard locker{m_lock};
392
393 auto p = m_caches.insert(
394 {cache_name, {cache_name, min_size, max_size, handler}});
395 ceph_assert(p.second == true);
396
397 m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
398 m_next_balance = ceph_clock_now();
399 }
400
401 void unregister_cache(const std::string &cache_name) override {
402 if (!m_cache_manager) {
403 return;
404 }
405
406 dout(20) << cache_name << dendl;
407
408 std::lock_guard locker{m_lock};
409
410 auto it = m_caches.find(cache_name);
411 ceph_assert(it != m_caches.end());
412
413 m_cache_manager->erase(cache_name);
414 m_caches.erase(it);
415 m_next_balance = ceph_clock_now();
416 }
417
418 void run_cache_manager() {
419 if (!m_cache_manager) {
420 return;
421 }
422
423 std::lock_guard locker{m_lock};
424
425 // Before we trim, check and see if it's time to rebalance/resize.
426 auto autotune_interval = m_cct->_conf.get_val<double>(
427 "rbd_mirror_memory_cache_autotune_interval");
428 auto resize_interval = m_cct->_conf.get_val<double>(
429 "rbd_mirror_memory_cache_resize_interval");
430
431 utime_t now = ceph_clock_now();
432
433 if (autotune_interval > 0 && m_next_balance <= now) {
434 dout(20) << "balance" << dendl;
435 m_cache_manager->balance();
436
437 for (auto &it : m_caches) {
438 auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
439 auto new_cache_bytes = pri_cache->get_cache_bytes();
440 it.second.handler->handle_cache_rebalanced(new_cache_bytes);
441 pri_cache->prioritize();
442 }
443
444 m_next_balance = ceph_clock_now();
445 m_next_balance += autotune_interval;
446 }
447
448 if (resize_interval > 0 && m_next_resize < now) {
449 if (ceph_using_tcmalloc()) {
450 dout(20) << "tune memory" << dendl;
451 m_cache_manager->tune_memory();
452 }
453
454 m_next_resize = ceph_clock_now();
455 m_next_resize += resize_interval;
456 }
457 }
458
459 private:
460 struct Cache {
461 std::shared_ptr<PriorityCache::PriCache> pri_cache;
462 journal::CacheRebalanceHandler *handler;
463
464 Cache(const std::string name, uint64_t min_size, uint64_t max_size,
465 journal::CacheRebalanceHandler *handler)
466 : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
467 }
468 };
469
470 CephContext *m_cct;
471
472 mutable ceph::mutex m_lock =
473 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
474 std::unique_ptr<PriorityCache::Manager> m_cache_manager;
475 std::map<std::string, Cache> m_caches;
476
477 utime_t m_next_balance;
478 utime_t m_next_resize;
479 };
480
481 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
482 m_cct(cct),
483 m_args(args),
484 m_local(new librados::Rados()),
485 m_cache_manager_handler(new CacheManagerHandler(cct)),
486 m_asok_hook(new MirrorAdminSocketHook(cct, this))
487 {
488 m_threads =
489 &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>(
490 "rbd_mirror::threads", false, cct));
491 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
492 }
493
494 Mirror::~Mirror()
495 {
496 delete m_asok_hook;
497 }
498
499 void Mirror::handle_signal(int signum)
500 {
501 m_stopping = true;
502 {
503 std::lock_guard l{m_lock};
504 m_cond.notify_all();
505 }
506 }
507
508 int Mirror::init()
509 {
510 int r = m_local->init_with_context(m_cct);
511 if (r < 0) {
512 derr << "could not initialize rados handle" << dendl;
513 return r;
514 }
515
516 r = m_local->connect();
517 if (r < 0) {
518 derr << "error connecting to local cluster" << dendl;
519 return r;
520 }
521
522 r = m_service_daemon->init();
523 if (r < 0) {
524 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
525 return r;
526 }
527
528 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
529 m_service_daemon.get()));
530 return r;
531 }
532
533 void Mirror::run()
534 {
535 dout(20) << "enter" << dendl;
536
537 utime_t next_refresh_pools = ceph_clock_now();
538
539 while (!m_stopping) {
540 utime_t now = ceph_clock_now();
541 bool refresh_pools = next_refresh_pools <= now;
542 if (refresh_pools) {
543 m_local_cluster_watcher->refresh_pools();
544 next_refresh_pools = ceph_clock_now();
545 next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
546 "rbd_mirror_pool_replayers_refresh_interval");
547 }
548 std::unique_lock l{m_lock};
549 if (!m_manual_stop) {
550 if (refresh_pools) {
551 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
552 }
553 m_cache_manager_handler->run_cache_manager();
554 }
555 m_cond.wait_for(l, 1s);
556 }
557
558 // stop all pool replayers in parallel
559 std::lock_guard locker{m_lock};
560 for (auto &pool_replayer : m_pool_replayers) {
561 pool_replayer.second->stop(false);
562 }
563 dout(20) << "return" << dendl;
564 }
565
566 void Mirror::print_status(Formatter *f)
567 {
568 dout(20) << "enter" << dendl;
569
570 std::lock_guard l{m_lock};
571
572 if (m_stopping) {
573 return;
574 }
575
576 f->open_object_section("mirror_status");
577 f->open_array_section("pool_replayers");
578 for (auto &pool_replayer : m_pool_replayers) {
579 pool_replayer.second->print_status(f);
580 }
581 f->close_section();
582 f->close_section();
583 }
584
585 void Mirror::start()
586 {
587 dout(20) << "enter" << dendl;
588 std::lock_guard l{m_lock};
589
590 if (m_stopping) {
591 return;
592 }
593
594 m_manual_stop = false;
595
596 for (auto &pool_replayer : m_pool_replayers) {
597 pool_replayer.second->start();
598 }
599 }
600
601 void Mirror::stop()
602 {
603 dout(20) << "enter" << dendl;
604 std::lock_guard l{m_lock};
605
606 if (m_stopping) {
607 return;
608 }
609
610 m_manual_stop = true;
611
612 for (auto &pool_replayer : m_pool_replayers) {
613 pool_replayer.second->stop(true);
614 }
615 }
616
617 void Mirror::restart()
618 {
619 dout(20) << "enter" << dendl;
620 std::lock_guard l{m_lock};
621
622 if (m_stopping) {
623 return;
624 }
625
626 m_manual_stop = false;
627
628 for (auto &pool_replayer : m_pool_replayers) {
629 pool_replayer.second->restart();
630 }
631 }
632
633 void Mirror::flush()
634 {
635 dout(20) << "enter" << dendl;
636 std::lock_guard l{m_lock};
637
638 if (m_stopping || m_manual_stop) {
639 return;
640 }
641
642 for (auto &pool_replayer : m_pool_replayers) {
643 pool_replayer.second->flush();
644 }
645 }
646
647 void Mirror::release_leader()
648 {
649 dout(20) << "enter" << dendl;
650 std::lock_guard l{m_lock};
651
652 if (m_stopping) {
653 return;
654 }
655
656 for (auto &pool_replayer : m_pool_replayers) {
657 pool_replayer.second->release_leader();
658 }
659 }
660
661 void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
662 {
663 dout(20) << "enter" << dendl;
664 ceph_assert(ceph_mutex_is_locked(m_lock));
665
666 // remove stale pool replayers before creating new pool replayers
667 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
668 auto &peer = it->first.second;
669 auto pool_peer_it = pool_peers.find(it->first.first);
670 if (pool_peer_it == pool_peers.end() ||
671 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
672 dout(20) << "removing pool replayer for " << peer << dendl;
673 // TODO: make async
674 it->second->shut_down();
675 it = m_pool_replayers.erase(it);
676 } else {
677 ++it;
678 }
679 }
680
681 for (auto &kv : pool_peers) {
682 for (auto &peer : kv.second) {
683 PoolPeer pool_peer(kv.first, peer);
684
685 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
686 if (pool_replayers_it != m_pool_replayers.end()) {
687 auto& pool_replayer = pool_replayers_it->second;
688 if (pool_replayer->is_blacklisted()) {
689 derr << "restarting blacklisted pool replayer for " << peer << dendl;
690 // TODO: make async
691 pool_replayer->shut_down();
692 pool_replayer->init();
693 } else if (!pool_replayer->is_running()) {
694 derr << "restarting failed pool replayer for " << peer << dendl;
695 // TODO: make async
696 pool_replayer->shut_down();
697 pool_replayer->init();
698 }
699 } else {
700 dout(20) << "starting pool replayer for " << peer << dendl;
701 unique_ptr<PoolReplayer<>> pool_replayer(
702 new PoolReplayer<>(m_threads, m_service_daemon.get(),
703 m_cache_manager_handler.get(), kv.first, peer,
704 m_args));
705
706 // TODO: make async
707 pool_replayer->init();
708 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
709 }
710 }
711
712 // TODO currently only support a single peer
713 }
714 }
715
716 } // namespace mirror
717 } // namespace rbd
718