1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /** \file
5 *
6 * This is an OSD class that implements methods for
7 * use with rbd.
8 *
9 * Most of these deal with the rbd header object. Methods prefixed
10 * with old_ deal with the original rbd design, in which clients read
11 * and interpreted the header object directly.
12 *
13 * The new format is meant to be opaque to clients - all their
14 * interactions with non-data objects should go through this
15 * class. The OSD class interface leaves the class to implement its
16 * own argument and payload serialization/deserialization, so for ease
17 * of implementation we use the existing ceph encoding/decoding
18 * methods. Something like json might be preferable, but the rbd
19 * kernel module has to be able to understand format as well. The
20 * datatypes exposed to the clients are strings, unsigned integers,
21 * and vectors of those types. The on-wire format can be found in
22 * src/include/encoding.h.
23 *
24 * The methods for interacting with the new format document their
25 * parameters as the client sees them - it would be silly to mention
26 * in each one that they take an input and an output bufferlist.
27 */
28 #include "include/types.h"
29
30 #include <algorithm>
31 #include <errno.h>
32 #include <sstream>
33
34 #include "include/uuid.h"
35 #include "common/bit_vector.hpp"
36 #include "common/errno.h"
37 #include "objclass/objclass.h"
38 #include "osd/osd_types.h"
39 #include "include/rbd_types.h"
40 #include "include/rbd/object_map_types.h"
41
42 #include "cls/rbd/cls_rbd.h"
43 #include "cls/rbd/cls_rbd_types.h"
44
45
46 /*
47 * Object keys:
48 *
49 * <partial list>
50 *
51 * stripe_unit: size in bytes of the stripe unit. if not present,
52 * the stripe unit is assumed to match the object size (1 << order).
53 *
54 * stripe_count: number of objects to stripe over before looping back.
55 * if not present or 1, striping is disabled. this is the default.
56 *
57 */
58
59 CLS_VER(2,0)
60 CLS_NAME(rbd)
61
62 #define RBD_MAX_KEYS_READ 64
63 #define RBD_SNAP_KEY_PREFIX "snapshot_"
64 #define RBD_SNAP_CHILDREN_KEY_PREFIX "snap_children_"
65 #define RBD_DIR_ID_KEY_PREFIX "id_"
66 #define RBD_DIR_NAME_KEY_PREFIX "name_"
67 #define RBD_METADATA_KEY_PREFIX "metadata_"
68
69 namespace {
70
71 uint64_t get_encode_features(cls_method_context_t hctx) {
72 uint64_t features = 0;
73 ceph_release_t require_osd_release = cls_get_required_osd_release(hctx);
74 if (require_osd_release >= ceph_release_t::nautilus) {
75 features |= CEPH_FEATURE_SERVER_NAUTILUS;
76 }
77 return features;
78 }
79
80 bool calc_sparse_extent(const bufferptr &bp, size_t sparse_size,
81 uint64_t length, size_t *write_offset,
82 size_t *write_length, size_t *offset) {
83 size_t extent_size;
84 if (*offset + sparse_size > length) {
85 extent_size = length - *offset;
86 } else {
87 extent_size = sparse_size;
88 }
89
90 bufferptr extent(bp, *offset, extent_size);
91 *offset += extent_size;
92
93 bool extent_is_zero = extent.is_zero();
94 if (!extent_is_zero) {
95 *write_length += extent_size;
96 }
97 if (extent_is_zero && *write_length == 0) {
98 *write_offset += extent_size;
99 }
100
101 if ((extent_is_zero || *offset == length) && *write_length != 0) {
102 return true;
103 }
104 return false;
105 }
106
107 } // anonymous namespace
108
109 static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
110 {
111 unsigned snap_count = 0;
112 uint64_t snap_names_len = 0;
113 struct rbd_obj_header_ondisk *header;
114
115 CLS_LOG(20, "snapshots_list");
116
117 while (1) {
118 int len = sizeof(*header) +
119 snap_count * sizeof(struct rbd_obj_snap_ondisk) +
120 snap_names_len;
121
122 int rc = cls_cxx_read(hctx, 0, len, &bl);
123 if (rc < 0)
124 return rc;
125
126 if (bl.length() < sizeof(*header))
127 return -EINVAL;
128
129 header = (struct rbd_obj_header_ondisk *)bl.c_str();
130 ceph_assert(header);
131
132 if ((snap_count != header->snap_count) ||
133 (snap_names_len != header->snap_names_len)) {
134 snap_count = header->snap_count;
135 snap_names_len = header->snap_names_len;
136 bl.clear();
137 continue;
138 }
139 break;
140 }
141
142 return 0;
143 }
144
145 static void key_from_snap_id(snapid_t snap_id, string *out)
146 {
147 ostringstream oss;
148 oss << RBD_SNAP_KEY_PREFIX
149 << std::setw(16) << std::setfill('0') << std::hex << snap_id;
150 *out = oss.str();
151 }
152
153 static snapid_t snap_id_from_key(const string &key) {
154 istringstream iss(key);
155 uint64_t id;
156 iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
157 return id;
158 }
159
160 template<typename T>
161 static int read_key(cls_method_context_t hctx, const string &key, T *out)
162 {
163 bufferlist bl;
164 int r = cls_cxx_map_get_val(hctx, key, &bl);
165 if (r < 0) {
166 if (r != -ENOENT) {
167 CLS_ERR("error reading omap key %s: %s", key.c_str(), cpp_strerror(r).c_str());
168 }
169 return r;
170 }
171
172 try {
173 auto it = bl.cbegin();
174 decode(*out, it);
175 } catch (const buffer::error &err) {
176 CLS_ERR("error decoding %s", key.c_str());
177 return -EIO;
178 }
179
180 return 0;
181 }
182
183 template <typename T>
184 static int write_key(cls_method_context_t hctx, const string &key, const T &t) {
185 bufferlist bl;
186 encode(t, bl);
187
188 int r = cls_cxx_map_set_val(hctx, key, &bl);
189 if (r < 0) {
190 CLS_ERR("failed to set omap key: %s", key.c_str());
191 return r;
192 }
193 return 0;
194 }
195
196 template <typename T>
197 static int write_key(cls_method_context_t hctx, const string &key, const T &t,
198 uint64_t features) {
199 bufferlist bl;
200 encode(t, bl, features);
201
202 int r = cls_cxx_map_set_val(hctx, key, &bl);
203 if (r < 0) {
204 CLS_ERR("failed to set omap key: %s", key.c_str());
205 return r;
206 }
207 return 0;
208 }
209
210 static int remove_key(cls_method_context_t hctx, const string &key) {
211 int r = cls_cxx_map_remove_key(hctx, key);
212 if (r < 0 && r != -ENOENT) {
213 CLS_ERR("failed to remove key: %s", key.c_str());
214 return r;
215 }
216 return 0;
217 }
218
219 static bool is_valid_id(const string &id) {
220 if (!id.size())
221 return false;
222 for (size_t i = 0; i < id.size(); ++i) {
223 if (!isalnum(id[i])) {
224 return false;
225 }
226 }
227 return true;
228 }
229
230 /**
231 * verify that the header object exists
232 *
233 * @return 0 if the object exists, -ENOENT if it does not, or other error
234 */
235 static int check_exists(cls_method_context_t hctx)
236 {
237 uint64_t size;
238 time_t mtime;
239 return cls_cxx_stat(hctx, &size, &mtime);
240 }
241
242 namespace image {
243
244 /**
245 * check that given feature(s) are set
246 *
247 * @param hctx context
248 * @param need features needed
249 * @return 0 if features are set, negative error (like ENOEXEC) otherwise
250 */
251 int require_feature(cls_method_context_t hctx, uint64_t need)
252 {
253 uint64_t features;
254 int r = read_key(hctx, "features", &features);
255 if (r == -ENOENT) // this implies it's an old-style image with no features
256 return -ENOEXEC;
257 if (r < 0)
258 return r;
259 if ((features & need) != need) {
260 CLS_LOG(10, "require_feature missing feature %llx, have %llx",
261 (unsigned long long)need, (unsigned long long)features);
262 return -ENOEXEC;
263 }
264 return 0;
265 }
266
267 std::string snap_children_key_from_snap_id(snapid_t snap_id)
268 {
269 ostringstream oss;
270 oss << RBD_SNAP_CHILDREN_KEY_PREFIX
271 << std::setw(16) << std::setfill('0') << std::hex << snap_id;
272 return oss.str();
273 }
274
275 int set_op_features(cls_method_context_t hctx, uint64_t op_features,
276 uint64_t mask) {
277 uint64_t orig_features;
278 int r = read_key(hctx, "features", &orig_features);
279 if (r < 0) {
280 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
281 return r;
282 }
283
284 uint64_t orig_op_features = 0;
285 r = read_key(hctx, "op_features", &orig_op_features);
286 if (r < 0 && r != -ENOENT) {
287 CLS_ERR("Could not read op features off disk: %s", cpp_strerror(r).c_str());
288 return r;
289 }
290
291 op_features = (orig_op_features & ~mask) | (op_features & mask);
292 CLS_LOG(10, "op_features=%" PRIu64 " orig_op_features=%" PRIu64,
293 op_features, orig_op_features);
294 if (op_features == orig_op_features) {
295 return 0;
296 }
297
298 uint64_t features = orig_features;
299 if (op_features == 0ULL) {
300 features &= ~RBD_FEATURE_OPERATIONS;
301
302 r = cls_cxx_map_remove_key(hctx, "op_features");
303 if (r == -ENOENT) {
304 r = 0;
305 }
306 } else {
307 features |= RBD_FEATURE_OPERATIONS;
308
309 bufferlist bl;
310 encode(op_features, bl);
311 r = cls_cxx_map_set_val(hctx, "op_features", &bl);
312 }
313
314 if (r < 0) {
315 CLS_ERR("error updating op features: %s", cpp_strerror(r).c_str());
316 return r;
317 }
318
319 if (features != orig_features) {
320 bufferlist bl;
321 encode(features, bl);
322 r = cls_cxx_map_set_val(hctx, "features", &bl);
323 if (r < 0) {
324 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
325 return r;
326 }
327 }
328
329 return 0;
330 }
331
332 int set_migration(cls_method_context_t hctx,
333 const cls::rbd::MigrationSpec &migration_spec, bool init) {
334 if (init) {
335 bufferlist bl;
336 int r = cls_cxx_map_get_val(hctx, "migration", &bl);
337 if (r != -ENOENT) {
338 if (r == 0) {
339 CLS_LOG(10, "migration already set");
340 return -EEXIST;
341 }
342 CLS_ERR("failed to read migration off disk: %s", cpp_strerror(r).c_str());
343 return r;
344 }
345
346 uint64_t features = 0;
347 r = read_key(hctx, "features", &features);
348 if (r == -ENOENT) {
349 CLS_LOG(20, "no features, assuming v1 format");
350 bufferlist header;
351 r = cls_cxx_read(hctx, 0, sizeof(RBD_HEADER_TEXT), &header);
352 if (r < 0) {
353 CLS_ERR("failed to read v1 header: %s", cpp_strerror(r).c_str());
354 return r;
355 }
356 if (header.length() != sizeof(RBD_HEADER_TEXT)) {
357 CLS_ERR("unrecognized v1 header format");
358 return -ENXIO;
359 }
360 if (memcmp(RBD_HEADER_TEXT, header.c_str(), header.length()) != 0) {
361 if (memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(),
362 header.length()) == 0) {
363 CLS_LOG(10, "migration already set");
364 return -EEXIST;
365 } else {
366 CLS_ERR("unrecognized v1 header format");
367 return -ENXIO;
368 }
369 }
370 if (migration_spec.header_type != cls::rbd::MIGRATION_HEADER_TYPE_SRC) {
371 CLS_LOG(10, "v1 format image can only be migration source");
372 return -EINVAL;
373 }
374
375 header.clear();
376 header.append(RBD_MIGRATE_HEADER_TEXT);
377 r = cls_cxx_write(hctx, 0, header.length(), &header);
378 if (r < 0) {
379 CLS_ERR("error updating v1 header: %s", cpp_strerror(r).c_str());
380 return r;
381 }
382 } else if (r < 0) {
383 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
384 return r;
385 } else if ((features & RBD_FEATURE_MIGRATING) != 0ULL) {
386 if (migration_spec.header_type != cls::rbd::MIGRATION_HEADER_TYPE_DST) {
387 CLS_LOG(10, "migrating feature already set");
388 return -EEXIST;
389 }
390 } else {
391 features |= RBD_FEATURE_MIGRATING;
392 bl.clear();
393 encode(features, bl);
394 r = cls_cxx_map_set_val(hctx, "features", &bl);
395 if (r < 0) {
396 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
397 return r;
398 }
399 }
400 }
401
402 bufferlist bl;
403 encode(migration_spec, bl);
404 int r = cls_cxx_map_set_val(hctx, "migration", &bl);
405 if (r < 0) {
406 CLS_ERR("error setting migration: %s", cpp_strerror(r).c_str());
407 return r;
408 }
409
410 return 0;
411 }
412
413 int read_migration(cls_method_context_t hctx,
414 cls::rbd::MigrationSpec *migration_spec) {
415 uint64_t features = 0;
416 int r = read_key(hctx, "features", &features);
417 if (r == -ENOENT) {
418 CLS_LOG(20, "no features, assuming v1 format");
419 bufferlist header;
420 r = cls_cxx_read(hctx, 0, sizeof(RBD_HEADER_TEXT), &header);
421 if (r < 0) {
422 CLS_ERR("failed to read v1 header: %s", cpp_strerror(r).c_str());
423 return r;
424 }
425 if (header.length() != sizeof(RBD_HEADER_TEXT)) {
426 CLS_ERR("unrecognized v1 header format");
427 return -ENXIO;
428 }
429 if (memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(), header.length()) != 0) {
430 if (memcmp(RBD_HEADER_TEXT, header.c_str(), header.length()) == 0) {
431 CLS_LOG(10, "migration feature not set");
432 return -EINVAL;
433 } else {
434 CLS_ERR("unrecognized v1 header format");
435 return -ENXIO;
436 }
437 }
438 if (migration_spec->header_type != cls::rbd::MIGRATION_HEADER_TYPE_SRC) {
439 CLS_LOG(10, "v1 format image can only be migration source");
440 return -EINVAL;
441 }
442 } else if (r < 0) {
443 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
444 return r;
445 } else if ((features & RBD_FEATURE_MIGRATING) == 0ULL) {
446 CLS_LOG(10, "migration feature not set");
447 return -EINVAL;
448 }
449
450 r = read_key(hctx, "migration", migration_spec);
451 if (r < 0) {
452 CLS_ERR("failed to read migration off disk: %s", cpp_strerror(r).c_str());
453 return r;
454 }
455
456 return 0;
457 }
458
459 int remove_migration(cls_method_context_t hctx) {
460 int r = remove_key(hctx, "migration");
461 if (r < 0) {
462 return r;
463 }
464
465 uint64_t features = 0;
466 r = read_key(hctx, "features", &features);
467 if (r == -ENOENT) {
468 CLS_LOG(20, "no features, assuming v1 format");
469 bufferlist header;
470 r = cls_cxx_read(hctx, 0, sizeof(RBD_MIGRATE_HEADER_TEXT), &header);
471 if (header.length() != sizeof(RBD_MIGRATE_HEADER_TEXT)) {
472 CLS_ERR("unrecognized v1 header format");
473 return -ENXIO;
474 }
475 if (memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(), header.length()) != 0) {
476 if (memcmp(RBD_HEADER_TEXT, header.c_str(), header.length()) == 0) {
477 CLS_LOG(10, "migration feature not set");
478 return -EINVAL;
479 } else {
480 CLS_ERR("unrecognized v1 header format");
481 return -ENXIO;
482 }
483 }
484 header.clear();
485 header.append(RBD_HEADER_TEXT);
486 r = cls_cxx_write(hctx, 0, header.length(), &header);
487 if (r < 0) {
488 CLS_ERR("error updating v1 header: %s", cpp_strerror(r).c_str());
489 return r;
490 }
491 } else if (r < 0) {
492 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
493 return r;
494 } else if ((features & RBD_FEATURE_MIGRATING) == 0ULL) {
495 CLS_LOG(10, "migrating feature not set");
496 } else {
497 features &= ~RBD_FEATURE_MIGRATING;
498 bufferlist bl;
499 encode(features, bl);
500 r = cls_cxx_map_set_val(hctx, "features", &bl);
501 if (r < 0) {
502 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
503 return r;
504 }
505 }
506
507 return 0;
508 }
509
510 namespace snapshot {
511
512 template<typename L>
513 int iterate(cls_method_context_t hctx, L& lambda) {
514 int max_read = RBD_MAX_KEYS_READ;
515 string last_read = RBD_SNAP_KEY_PREFIX;
516 bool more = false;
517 do {
518 map<string, bufferlist> vals;
519 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
520 max_read, &vals, &more);
521 if (r < 0) {
522 return r;
523 }
524
525 cls_rbd_snap snap_meta;
526 for (auto& val : vals) {
527 auto iter = val.second.cbegin();
528 try {
529 decode(snap_meta, iter);
530 } catch (const buffer::error &err) {
531 CLS_ERR("error decoding snapshot metadata for snap : %s",
532 val.first.c_str());
533 return -EIO;
534 }
535
536 r = lambda(snap_meta);
537 if (r < 0) {
538 return r;
539 }
540 }
541
542 if (!vals.empty()) {
543 last_read = vals.rbegin()->first;
544 }
545 } while (more);
546
547 return 0;
548 }
549
550 int write(cls_method_context_t hctx, const std::string& snap_key,
551 cls_rbd_snap&& snap) {
552 int r;
553 uint64_t encode_features = get_encode_features(hctx);
554 if (snap.migrate_parent_format(encode_features)) {
555 // ensure the normalized parent link exists before removing it from the
556 // snapshot record
557 cls_rbd_parent on_disk_parent;
558 r = read_key(hctx, "parent", &on_disk_parent);
559 if (r < 0 && r != -ENOENT) {
560 return r;
561 }
562
563 if (!on_disk_parent.exists()) {
564 on_disk_parent = snap.parent;
565 on_disk_parent.head_overlap = std::nullopt;
566
567 r = write_key(hctx, "parent", on_disk_parent, encode_features);
568 if (r < 0) {
569 return r;
570 }
571 }
572
573 // only store the parent overlap in the snapshot
574 snap.parent_overlap = snap.parent.head_overlap;
575 snap.parent = {};
576 }
577
578 r = write_key(hctx, snap_key, snap, encode_features);
579 if (r < 0) {
580 return r;
581 }
582 return 0;
583 }
584
585 } // namespace snapshot
586
587 namespace parent {
588
589 int attach(cls_method_context_t hctx, cls_rbd_parent parent,
590 bool reattach) {
591 int r = check_exists(hctx);
592 if (r < 0) {
593 CLS_LOG(20, "cls_rbd::image::parent::attach: child doesn't exist");
594 return r;
595 }
596
597 r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
598 if (r < 0) {
599 CLS_LOG(20, "cls_rbd::image::parent::attach: child does not support "
600 "layering");
601 return r;
602 }
603
604 CLS_LOG(20, "cls_rbd::image::parent::attach: pool=%" PRIi64 ", ns=%s, id=%s, "
605 "snapid=%" PRIu64 ", size=%" PRIu64,
606 parent.pool_id, parent.pool_namespace.c_str(),
607 parent.image_id.c_str(), parent.snap_id.val,
608 parent.head_overlap.value_or(0ULL));
609 if (!parent.exists() || parent.head_overlap.value_or(0ULL) == 0ULL) {
610 return -EINVAL;
611 }
612
613 // make sure there isn't already a parent
614 cls_rbd_parent on_disk_parent;
615 r = read_key(hctx, "parent", &on_disk_parent);
616 if (r < 0 && r != -ENOENT) {
617 return r;
618 }
619
620 auto on_disk_parent_without_overlap{on_disk_parent};
621 on_disk_parent_without_overlap.head_overlap = parent.head_overlap;
622
623 if (r == 0 &&
624 (on_disk_parent.head_overlap ||
625 on_disk_parent_without_overlap != parent) &&
626 !reattach) {
627 CLS_LOG(20, "cls_rbd::parent::attach: existing legacy parent "
628 "pool=%" PRIi64 ", ns=%s, id=%s, snapid=%" PRIu64 ", "
629 "overlap=%" PRIu64,
630 on_disk_parent.pool_id, on_disk_parent.pool_namespace.c_str(),
631 on_disk_parent.image_id.c_str(), on_disk_parent.snap_id.val,
632 on_disk_parent.head_overlap.value_or(0ULL));
633 return -EEXIST;
634 }
635
636 // our overlap is the min of our size and the parent's size.
637 uint64_t our_size;
638 r = read_key(hctx, "size", &our_size);
639 if (r < 0) {
640 return r;
641 }
642
643 parent.head_overlap = std::min(*parent.head_overlap, our_size);
644
645 r = write_key(hctx, "parent", parent, get_encode_features(hctx));
646 if (r < 0) {
647 return r;
648 }
649
650 return 0;
651 }
652
653 int detach(cls_method_context_t hctx, bool legacy_api) {
654 int r = check_exists(hctx);
655 if (r < 0) {
656 CLS_LOG(20, "cls_rbd::parent::detach: child doesn't exist");
657 return r;
658 }
659
660 uint64_t features;
661 r = read_key(hctx, "features", &features);
662 if (r == -ENOENT || ((features & RBD_FEATURE_LAYERING) == 0)) {
663 CLS_LOG(20, "cls_rbd::image::parent::detach: child does not support "
664 "layering");
665 return -ENOEXEC;
666 } else if (r < 0) {
667 return r;
668 }
669
670 cls_rbd_parent on_disk_parent;
671 r = read_key(hctx, "parent", &on_disk_parent);
672 if (r < 0) {
673 return r;
674 } else if (legacy_api && !on_disk_parent.pool_namespace.empty()) {
675 return -EXDEV;
676 } else if (!on_disk_parent.head_overlap) {
677 return -ENOENT;
678 }
679
680 auto detach_lambda = [hctx, features](const cls_rbd_snap& snap_meta) {
681 if (snap_meta.parent.pool_id != -1 || snap_meta.parent_overlap) {
682 if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0ULL) {
683 // remove parent reference from snapshot
684 cls_rbd_snap snap_meta_copy = snap_meta;
685 snap_meta_copy.parent = {};
686 snap_meta_copy.parent_overlap = std::nullopt;
687
688 std::string snap_key;
689 key_from_snap_id(snap_meta_copy.id, &snap_key);
690 int r = snapshot::write(hctx, snap_key, std::move(snap_meta_copy));
691 if (r < 0) {
692 return r;
693 }
694 } else {
695 return -EEXIST;
696 }
697 }
698 return 0;
699 };
700
701 r = snapshot::iterate(hctx, detach_lambda);
702 bool has_child_snaps = (r == -EEXIST);
703 if (r < 0 && r != -EEXIST) {
704 return r;
705 }
706
707 ceph_release_t require_osd_release = cls_get_required_osd_release(hctx);
708 if (has_child_snaps && require_osd_release >= ceph_release_t::nautilus) {
709 // remove overlap from HEAD revision but keep spec for snapshots
710 on_disk_parent.head_overlap = std::nullopt;
711 r = write_key(hctx, "parent", on_disk_parent, get_encode_features(hctx));
712 if (r < 0) {
713 return r;
714 }
715 } else {
716 r = remove_key(hctx, "parent");
717 if (r < 0 && r != -ENOENT) {
718 return r;
719 }
720 }
721
722 if (!has_child_snaps) {
723 // disable clone child op feature if no longer associated
724 r = set_op_features(hctx, 0, RBD_OPERATION_FEATURE_CLONE_CHILD);
725 if (r < 0) {
726 return r;
727 }
728 }
729 return 0;
730 }
731
732 } // namespace parent
733 } // namespace image
734
735 /**
736 * Initialize the header with basic metadata.
737 * Extra features may initialize more fields in the future.
738 * Everything is stored as key/value pairs as omaps in the header object.
739 *
740 * If features the OSD does not understand are requested, -ENOSYS is
741 * returned.
742 *
743 * Input:
744 * @param size number of bytes in the image (uint64_t)
745 * @param order bits to shift to determine the size of data objects (uint8_t)
746 * @param features what optional things this image will use (uint64_t)
747 * @param object_prefix a prefix for all the data objects
748 * @param data_pool_id pool id where data objects is stored (int64_t)
749 *
750 * Output:
751 * @return 0 on success, negative error code on failure
752 */
753 int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
754 {
755 string object_prefix;
756 uint64_t features, size;
757 uint8_t order;
758 int64_t data_pool_id = -1;
759
760 try {
761 auto iter = in->cbegin();
762 decode(size, iter);
763 decode(order, iter);
764 decode(features, iter);
765 decode(object_prefix, iter);
766 if (!iter.end()) {
767 decode(data_pool_id, iter);
768 }
769 } catch (const buffer::error &err) {
770 return -EINVAL;
771 }
772
773 CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
774 object_prefix.c_str(), (unsigned long long)size, order,
775 (unsigned long long)features);
776
777 if (features & ~RBD_FEATURES_ALL) {
778 return -ENOSYS;
779 }
780
781 if (!object_prefix.size()) {
782 return -EINVAL;
783 }
784
785 bufferlist stored_prefixbl;
786 int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
787 if (r != -ENOENT) {
788 CLS_ERR("reading object_prefix returned %d", r);
789 return -EEXIST;
790 }
791
792 bufferlist sizebl;
793 bufferlist orderbl;
794 bufferlist featuresbl;
795 bufferlist object_prefixbl;
796 bufferlist snap_seqbl;
797 bufferlist timestampbl;
798 uint64_t snap_seq = 0;
799 utime_t timestamp = ceph_clock_now();
800 encode(size, sizebl);
801 encode(order, orderbl);
802 encode(features, featuresbl);
803 encode(object_prefix, object_prefixbl);
804 encode(snap_seq, snap_seqbl);
805 encode(timestamp, timestampbl);
806
807 map<string, bufferlist> omap_vals;
808 omap_vals["size"] = sizebl;
809 omap_vals["order"] = orderbl;
810 omap_vals["features"] = featuresbl;
811 omap_vals["object_prefix"] = object_prefixbl;
812 omap_vals["snap_seq"] = snap_seqbl;
813 omap_vals["create_timestamp"] = timestampbl;
814 omap_vals["access_timestamp"] = timestampbl;
815 omap_vals["modify_timestamp"] = timestampbl;
816
817 if ((features & RBD_FEATURE_OPERATIONS) != 0ULL) {
818 CLS_ERR("Attempting to set internal feature: operations");
819 return -EINVAL;
820 }
821
822 if (features & RBD_FEATURE_DATA_POOL) {
823 if (data_pool_id == -1) {
824 CLS_ERR("data pool not provided with feature enabled");
825 return -EINVAL;
826 }
827
828 bufferlist data_pool_id_bl;
829 encode(data_pool_id, data_pool_id_bl);
830 omap_vals["data_pool_id"] = data_pool_id_bl;
831 } else if (data_pool_id != -1) {
832 CLS_ERR("data pool provided with feature disabled");
833 return -EINVAL;
834 }
835
836 r = cls_cxx_map_set_vals(hctx, &omap_vals);
837 if (r < 0)
838 return r;
839
840 return 0;
841 }
842
843 /**
844 * Input:
845 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t) (deprecated)
846 * @param read_only true if the image will be used read-only (bool)
847 *
848 * Output:
849 * @param features list of enabled features for the given snapshot (uint64_t)
850 * @param incompatible incompatible feature bits
851 * @returns 0 on success, negative error code on failure
852 */
853 int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
854 {
855 bool read_only = false;
856
857 auto iter = in->cbegin();
858 try {
859 uint64_t snap_id;
860 decode(snap_id, iter);
861 if (!iter.end()) {
862 decode(read_only, iter);
863 }
864 } catch (const buffer::error &err) {
865 return -EINVAL;
866 }
867
868 CLS_LOG(20, "get_features read_only=%d", read_only);
869
870 uint64_t features;
871 int r = read_key(hctx, "features", &features);
872 if (r < 0) {
873 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
874 return r;
875 }
876
877 uint64_t incompatible = (read_only ? features & RBD_FEATURES_INCOMPATIBLE :
878 features & RBD_FEATURES_RW_INCOMPATIBLE);
879 encode(features, *out);
880 encode(incompatible, *out);
881 return 0;
882 }
883
884 /**
885 * set the image features
886 *
887 * Input:
888 * @param features image features
889 * @param mask image feature mask
890 *
891 * Output:
892 * none
893 *
894 * @returns 0 on success, negative error code upon failure
895 */
896 int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
897 {
898 uint64_t features;
899 uint64_t mask;
900 auto iter = in->cbegin();
901 try {
902 decode(features, iter);
903 decode(mask, iter);
904 } catch (const buffer::error &err) {
905 return -EINVAL;
906 }
907
908 // check that features exists to make sure this is a header object
909 // that was created correctly
910 uint64_t orig_features = 0;
911 int r = read_key(hctx, "features", &orig_features);
912 if (r < 0 && r != -ENOENT) {
913 CLS_ERR("Could not read image's features off disk: %s",
914 cpp_strerror(r).c_str());
915 return r;
916 }
917
918 if ((mask & RBD_FEATURES_INTERNAL) != 0ULL) {
919 CLS_ERR("Attempting to set internal feature: %" PRIu64,
920 static_cast<uint64_t>(mask & RBD_FEATURES_INTERNAL));
921 return -EINVAL;
922 }
923
924 // newer clients might attempt to mask off features we don't support
925 mask &= RBD_FEATURES_ALL;
926
927 uint64_t enabled_features = features & mask;
928 if ((enabled_features & RBD_FEATURES_MUTABLE) != enabled_features) {
929 CLS_ERR("Attempting to enable immutable feature: %" PRIu64,
930 static_cast<uint64_t>(enabled_features & ~RBD_FEATURES_MUTABLE));
931 return -EINVAL;
932 }
933
934 uint64_t disabled_features = ~features & mask;
935 uint64_t disable_mask = (RBD_FEATURES_MUTABLE | RBD_FEATURES_DISABLE_ONLY);
936 if ((disabled_features & disable_mask) != disabled_features) {
937 CLS_ERR("Attempting to disable immutable feature: %" PRIu64,
938 enabled_features & ~disable_mask);
939 return -EINVAL;
940 }
941
942 features = (orig_features & ~mask) | (features & mask);
943 CLS_LOG(10, "set_features features=%" PRIu64 " orig_features=%" PRIu64,
944 features, orig_features);
945
946 bufferlist bl;
947 encode(features, bl);
948 r = cls_cxx_map_set_val(hctx, "features", &bl);
949 if (r < 0) {
950 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
951 return r;
952 }
953 return 0;
954 }
955
956 /**
957 * Input:
958 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
959 *
960 * Output:
961 * @param order bits to shift to get the size of data objects (uint8_t)
962 * @param size size of the image in bytes for the given snapshot (uint64_t)
963 * @returns 0 on success, negative error code on failure
964 */
965 int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
966 {
967 uint64_t snap_id, size;
968 uint8_t order;
969
970 auto iter = in->cbegin();
971 try {
972 decode(snap_id, iter);
973 } catch (const buffer::error &err) {
974 return -EINVAL;
975 }
976
977 CLS_LOG(20, "get_size snap_id=%llu", (unsigned long long)snap_id);
978
979 int r = read_key(hctx, "order", &order);
980 if (r < 0) {
981 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
982 return r;
983 }
984
985 if (snap_id == CEPH_NOSNAP) {
986 r = read_key(hctx, "size", &size);
987 if (r < 0) {
988 CLS_ERR("failed to read the image's size off of disk: %s", cpp_strerror(r).c_str());
989 return r;
990 }
991 } else {
992 cls_rbd_snap snap;
993 string snapshot_key;
994 key_from_snap_id(snap_id, &snapshot_key);
995 int r = read_key(hctx, snapshot_key, &snap);
996 if (r < 0)
997 return r;
998
999 size = snap.image_size;
1000 }
1001
1002 encode(order, *out);
1003 encode(size, *out);
1004
1005 return 0;
1006 }
1007
1008 /**
1009 * Input:
1010 * @param size new capacity of the image in bytes (uint64_t)
1011 *
1012 * Output:
1013 * @returns 0 on success, negative error code on failure
1014 */
1015 int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1016 {
1017 uint64_t size;
1018
1019 auto iter = in->cbegin();
1020 try {
1021 decode(size, iter);
1022 } catch (const buffer::error &err) {
1023 return -EINVAL;
1024 }
1025
1026 // check that size exists to make sure this is a header object
1027 // that was created correctly
1028 uint64_t orig_size;
1029 int r = read_key(hctx, "size", &orig_size);
1030 if (r < 0) {
1031 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
1032 return r;
1033 }
1034
1035 CLS_LOG(20, "set_size size=%llu orig_size=%llu", (unsigned long long)size,
1036 (unsigned long long)orig_size);
1037
1038 bufferlist sizebl;
1039 encode(size, sizebl);
1040 r = cls_cxx_map_set_val(hctx, "size", &sizebl);
1041 if (r < 0) {
1042 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1043 return r;
1044 }
1045
1046 // if we are shrinking, and have a parent, shrink our overlap with
1047 // the parent, too.
1048 if (size < orig_size) {
1049 cls_rbd_parent parent;
1050 r = read_key(hctx, "parent", &parent);
1051 if (r == -ENOENT)
1052 r = 0;
1053 if (r < 0)
1054 return r;
1055 if (parent.exists() && parent.head_overlap.value_or(0ULL) > size) {
1056 parent.head_overlap = size;
1057 r = write_key(hctx, "parent", parent, get_encode_features(hctx));
1058 if (r < 0) {
1059 return r;
1060 }
1061 }
1062 }
1063
1064 return 0;
1065 }
1066
1067 /**
1068 * get the current protection status of the specified snapshot
1069 *
1070 * Input:
1071 * @param snap_id (uint64_t) which snapshot to get the status of
1072 *
1073 * Output:
1074 * @param status (uint8_t) one of:
1075 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
1076 *
1077 * @returns 0 on success, negative error code on failure
1078 * @returns -EINVAL if snapid is CEPH_NOSNAP
1079 */
1080 int get_protection_status(cls_method_context_t hctx, bufferlist *in,
1081 bufferlist *out)
1082 {
1083 snapid_t snap_id;
1084
1085 auto iter = in->cbegin();
1086 try {
1087 decode(snap_id, iter);
1088 } catch (const buffer::error &err) {
1089 CLS_LOG(20, "get_protection_status: invalid decode");
1090 return -EINVAL;
1091 }
1092
1093 int r = check_exists(hctx);
1094 if (r < 0)
1095 return r;
1096
1097 CLS_LOG(20, "get_protection_status snap_id=%llu",
1098 (unsigned long long)snap_id.val);
1099
1100 if (snap_id == CEPH_NOSNAP)
1101 return -EINVAL;
1102
1103 cls_rbd_snap snap;
1104 string snapshot_key;
1105 key_from_snap_id(snap_id.val, &snapshot_key);
1106 r = read_key(hctx, snapshot_key, &snap);
1107 if (r < 0) {
1108 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
1109 return r;
1110 }
1111
1112 if (snap.protection_status >= RBD_PROTECTION_STATUS_LAST) {
1113 CLS_ERR("invalid protection status for snap id %llu: %u",
1114 (unsigned long long)snap_id.val, snap.protection_status);
1115 return -EIO;
1116 }
1117
1118 encode(snap.protection_status, *out);
1119 return 0;
1120 }
1121
1122 /**
1123 * set the proctection status of a snapshot
1124 *
1125 * Input:
1126 * @param snapid (uint64_t) which snapshot to set the status of
1127 * @param status (uint8_t) one of:
1128 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
1129 *
1130 * @returns 0 on success, negative error code on failure
1131 * @returns -EINVAL if snapid is CEPH_NOSNAP
1132 */
1133 int set_protection_status(cls_method_context_t hctx, bufferlist *in,
1134 bufferlist *out)
1135 {
1136 snapid_t snap_id;
1137 uint8_t status;
1138
1139 auto iter = in->cbegin();
1140 try {
1141 decode(snap_id, iter);
1142 decode(status, iter);
1143 } catch (const buffer::error &err) {
1144 CLS_LOG(20, "set_protection_status: invalid decode");
1145 return -EINVAL;
1146 }
1147
1148 int r = check_exists(hctx);
1149 if (r < 0)
1150 return r;
1151
1152 r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
1153 if (r < 0) {
1154 CLS_LOG(20, "image does not support layering");
1155 return r;
1156 }
1157
1158 CLS_LOG(20, "set_protection_status snapid=%llu status=%u",
1159 (unsigned long long)snap_id.val, status);
1160
1161 if (snap_id == CEPH_NOSNAP)
1162 return -EINVAL;
1163
1164 if (status >= RBD_PROTECTION_STATUS_LAST) {
1165 CLS_LOG(10, "invalid protection status for snap id %llu: %u",
1166 (unsigned long long)snap_id.val, status);
1167 return -EINVAL;
1168 }
1169
1170 cls_rbd_snap snap;
1171 string snapshot_key;
1172 key_from_snap_id(snap_id.val, &snapshot_key);
1173 r = read_key(hctx, snapshot_key, &snap);
1174 if (r < 0) {
1175 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
1176 return r;
1177 }
1178
1179 snap.protection_status = status;
1180 r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
1181 if (r < 0) {
1182 return r;
1183 }
1184
1185 return 0;
1186 }
1187
1188 /**
1189 * get striping parameters
1190 *
1191 * Input:
1192 * none
1193 *
1194 * Output:
1195 * @param stripe unit (bytes)
1196 * @param stripe count (num objects)
1197 *
1198 * @returns 0 on success
1199 */
1200 int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1201 {
1202 int r = check_exists(hctx);
1203 if (r < 0)
1204 return r;
1205
1206 CLS_LOG(20, "get_stripe_unit_count");
1207
1208 r = image::require_feature(hctx, RBD_FEATURE_STRIPINGV2);
1209 if (r < 0)
1210 return r;
1211
1212 uint64_t stripe_unit = 0, stripe_count = 0;
1213 r = read_key(hctx, "stripe_unit", &stripe_unit);
1214 if (r == -ENOENT) {
1215 // default to object size
1216 uint8_t order;
1217 r = read_key(hctx, "order", &order);
1218 if (r < 0) {
1219 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
1220 return -EIO;
1221 }
1222 stripe_unit = 1ull << order;
1223 }
1224 if (r < 0)
1225 return r;
1226 r = read_key(hctx, "stripe_count", &stripe_count);
1227 if (r == -ENOENT) {
1228 // default to 1
1229 stripe_count = 1;
1230 r = 0;
1231 }
1232 if (r < 0)
1233 return r;
1234
1235 encode(stripe_unit, *out);
1236 encode(stripe_count, *out);
1237 return 0;
1238 }
1239
1240 /**
1241 * set striping parameters
1242 *
1243 * Input:
1244 * @param stripe unit (bytes)
1245 * @param stripe count (num objects)
1246 *
1247 * @returns 0 on success
1248 */
1249 int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1250 {
1251 uint64_t stripe_unit, stripe_count;
1252
1253 auto iter = in->cbegin();
1254 try {
1255 decode(stripe_unit, iter);
1256 decode(stripe_count, iter);
1257 } catch (const buffer::error &err) {
1258 CLS_LOG(20, "set_stripe_unit_count: invalid decode");
1259 return -EINVAL;
1260 }
1261
1262 if (!stripe_count || !stripe_unit)
1263 return -EINVAL;
1264
1265 int r = check_exists(hctx);
1266 if (r < 0)
1267 return r;
1268
1269 CLS_LOG(20, "set_stripe_unit_count");
1270
1271 r = image::require_feature(hctx, RBD_FEATURE_STRIPINGV2);
1272 if (r < 0)
1273 return r;
1274
1275 uint8_t order;
1276 r = read_key(hctx, "order", &order);
1277 if (r < 0) {
1278 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
1279 return r;
1280 }
1281 if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
1282 CLS_ERR("stripe unit %llu is not a factor of the object size %llu",
1283 (unsigned long long)stripe_unit, 1ull << order);
1284 return -EINVAL;
1285 }
1286
1287 bufferlist bl, bl2;
1288 encode(stripe_unit, bl);
1289 r = cls_cxx_map_set_val(hctx, "stripe_unit", &bl);
1290 if (r < 0) {
1291 CLS_ERR("error writing stripe_unit metadata: %s", cpp_strerror(r).c_str());
1292 return r;
1293 }
1294
1295 encode(stripe_count, bl2);
1296 r = cls_cxx_map_set_val(hctx, "stripe_count", &bl2);
1297 if (r < 0) {
1298 CLS_ERR("error writing stripe_count metadata: %s", cpp_strerror(r).c_str());
1299 return r;
1300 }
1301
1302 return 0;
1303 }
1304
1305 int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1306 {
1307 CLS_LOG(20, "get_create_timestamp");
1308
1309 utime_t timestamp;
1310 bufferlist bl;
1311 int r = cls_cxx_map_get_val(hctx, "create_timestamp", &bl);
1312 if (r < 0) {
1313 if (r != -ENOENT) {
1314 CLS_ERR("error reading create_timestamp: %s", cpp_strerror(r).c_str());
1315 return r;
1316 }
1317 } else {
1318 try {
1319 auto it = bl.cbegin();
1320 decode(timestamp, it);
1321 } catch (const buffer::error &err) {
1322 CLS_ERR("could not decode create_timestamp");
1323 return -EIO;
1324 }
1325 }
1326
1327 encode(timestamp, *out);
1328 return 0;
1329 }
1330
1331 /**
1332 * get the image access timestamp
1333 *
1334 * Input:
1335 * @param none
1336 *
1337 * Output:
1338 * @param timestamp the image access timestamp
1339 *
1340 * @returns 0 on success, negative error code upon failure
1341 */
1342 int get_access_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1343 {
1344 CLS_LOG(20, "get_access_timestamp");
1345
1346 utime_t timestamp;
1347 bufferlist bl;
1348 int r = cls_cxx_map_get_val(hctx, "access_timestamp", &bl);
1349 if (r < 0) {
1350 if (r != -ENOENT) {
1351 CLS_ERR("error reading access_timestamp: %s", cpp_strerror(r).c_str());
1352 return r;
1353 }
1354 } else {
1355 try {
1356 auto it = bl.cbegin();
1357 decode(timestamp, it);
1358 } catch (const buffer::error &err) {
1359 CLS_ERR("could not decode access_timestamp");
1360 return -EIO;
1361 }
1362 }
1363
1364 encode(timestamp, *out);
1365 return 0;
1366 }
1367
1368 /**
1369 * get the image modify timestamp
1370 *
1371 * Input:
1372 * @param none
1373 *
1374 * Output:
1375 * @param timestamp the image modify timestamp
1376 *
1377 * @returns 0 on success, negative error code upon failure
1378 */
1379 int get_modify_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1380 {
1381 CLS_LOG(20, "get_modify_timestamp");
1382
1383 utime_t timestamp;
1384 bufferlist bl;
1385 int r = cls_cxx_map_get_val(hctx, "modify_timestamp", &bl);
1386 if (r < 0) {
1387 if (r != -ENOENT) {
1388 CLS_ERR("error reading modify_timestamp: %s", cpp_strerror(r).c_str());
1389 return r;
1390 }
1391 } else {
1392 try {
1393 auto it = bl.cbegin();
1394 decode(timestamp, it);
1395 } catch (const buffer::error &err) {
1396 CLS_ERR("could not decode modify_timestamp");
1397 return -EIO;
1398 }
1399 }
1400
1401 encode(timestamp, *out);
1402 return 0;
1403 }
1404
1405
1406 /**
1407 * get the image flags
1408 *
1409 * Input:
1410 * @param snap_id which snapshot to query, to CEPH_NOSNAP (uint64_t)
1411 *
1412 * Output:
1413 * @param flags image flags
1414 *
1415 * @returns 0 on success, negative error code upon failure
1416 */
1417 int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1418 {
1419 uint64_t snap_id;
1420 auto iter = in->cbegin();
1421 try {
1422 decode(snap_id, iter);
1423 } catch (const buffer::error &err) {
1424 return -EINVAL;
1425 }
1426
1427 CLS_LOG(20, "get_flags snap_id=%llu", (unsigned long long)snap_id);
1428
1429 uint64_t flags = 0;
1430 if (snap_id == CEPH_NOSNAP) {
1431 int r = read_key(hctx, "flags", &flags);
1432 if (r < 0 && r != -ENOENT) {
1433 CLS_ERR("failed to read flags off disk: %s", cpp_strerror(r).c_str());
1434 return r;
1435 }
1436 } else {
1437 cls_rbd_snap snap;
1438 string snapshot_key;
1439 key_from_snap_id(snap_id, &snapshot_key);
1440 int r = read_key(hctx, snapshot_key, &snap);
1441 if (r < 0) {
1442 return r;
1443 }
1444 flags = snap.flags;
1445 }
1446
1447 encode(flags, *out);
1448 return 0;
1449 }
1450
1451 /**
1452 * set the image flags
1453 *
1454 * Input:
1455 * @param flags image flags
1456 * @param mask image flag mask
1457 * @param snap_id which snapshot to update, or CEPH_NOSNAP (uint64_t)
1458 *
1459 * Output:
1460 * none
1461 *
1462 * @returns 0 on success, negative error code upon failure
1463 */
1464 int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1465 {
1466 uint64_t flags;
1467 uint64_t mask;
1468 uint64_t snap_id = CEPH_NOSNAP;
1469 auto iter = in->cbegin();
1470 try {
1471 decode(flags, iter);
1472 decode(mask, iter);
1473 if (!iter.end()) {
1474 decode(snap_id, iter);
1475 }
1476 } catch (const buffer::error &err) {
1477 return -EINVAL;
1478 }
1479
1480 // check that size exists to make sure this is a header object
1481 // that was created correctly
1482 int r;
1483 uint64_t orig_flags = 0;
1484 cls_rbd_snap snap_meta;
1485 string snap_meta_key;
1486 if (snap_id == CEPH_NOSNAP) {
1487 r = read_key(hctx, "flags", &orig_flags);
1488 if (r < 0 && r != -ENOENT) {
1489 CLS_ERR("Could not read image's flags off disk: %s",
1490 cpp_strerror(r).c_str());
1491 return r;
1492 }
1493 } else {
1494 key_from_snap_id(snap_id, &snap_meta_key);
1495 r = read_key(hctx, snap_meta_key, &snap_meta);
1496 if (r < 0) {
1497 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
1498 snap_id, cpp_strerror(r).c_str());
1499 return r;
1500 }
1501 orig_flags = snap_meta.flags;
1502 }
1503
1504 flags = (orig_flags & ~mask) | (flags & mask);
1505 CLS_LOG(20, "set_flags snap_id=%" PRIu64 ", orig_flags=%" PRIu64 ", "
1506 "new_flags=%" PRIu64 ", mask=%" PRIu64, snap_id, orig_flags,
1507 flags, mask);
1508
1509 if (snap_id == CEPH_NOSNAP) {
1510 r = write_key(hctx, "flags", flags);
1511 } else {
1512 snap_meta.flags = flags;
1513 r = image::snapshot::write(hctx, snap_meta_key, std::move(snap_meta));
1514 }
1515
1516 if (r < 0) {
1517 return r;
1518 }
1519 return 0;
1520 }
1521
1522 /**
1523 * Get the operation-based image features
1524 *
1525 * Input:
1526 *
1527 * Output:
1528 * @param bitmask of enabled op features (uint64_t)
1529 * @returns 0 on success, negative error code on failure
1530 */
1531 int op_features_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1532 {
1533 CLS_LOG(20, "op_features_get");
1534
1535 uint64_t op_features = 0;
1536 int r = read_key(hctx, "op_features", &op_features);
1537 if (r < 0 && r != -ENOENT) {
1538 CLS_ERR("failed to read op features off disk: %s", cpp_strerror(r).c_str());
1539 return r;
1540 }
1541
1542 encode(op_features, *out);
1543 return 0;
1544 }
1545
1546 /**
1547 * Set the operation-based image features
1548 *
1549 * Input:
1550 * @param op_features image op features
1551 * @param mask image op feature mask
1552 *
1553 * Output:
1554 * none
1555 *
1556 * @returns 0 on success, negative error code upon failure
1557 */
1558 int op_features_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1559 {
1560 uint64_t op_features;
1561 uint64_t mask;
1562 auto iter = in->cbegin();
1563 try {
1564 decode(op_features, iter);
1565 decode(mask, iter);
1566 } catch (const buffer::error &err) {
1567 return -EINVAL;
1568 }
1569
1570 uint64_t unsupported_op_features = (mask & ~RBD_OPERATION_FEATURES_ALL);
1571 if (unsupported_op_features != 0ULL) {
1572 CLS_ERR("unsupported op features: %" PRIu64, unsupported_op_features);
1573 return -EINVAL;
1574 }
1575
1576 return image::set_op_features(hctx, op_features, mask);
1577 }
1578
1579 /**
1580 * get the current parent, if any
1581 *
1582 * Input:
1583 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
1584 *
1585 * Output:
1586 * @param pool parent pool id (-1 if parent does not exist)
1587 * @param image parent image id
1588 * @param snapid parent snapid
1589 * @param size portion of parent mapped under the child
1590 *
1591 * @returns 0 on success or parent does not exist, negative error code on failure
1592 */
1593 int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1594 {
1595 uint64_t snap_id;
1596
1597 auto iter = in->cbegin();
1598 try {
1599 decode(snap_id, iter);
1600 } catch (const buffer::error &err) {
1601 return -EINVAL;
1602 }
1603
1604 int r = check_exists(hctx);
1605 if (r < 0) {
1606 return r;
1607 }
1608
1609 CLS_LOG(20, "get_parent snap_id=%" PRIu64, snap_id);
1610
1611 cls_rbd_parent parent;
1612 r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
1613 if (r == 0) {
1614 r = read_key(hctx, "parent", &parent);
1615 if (r < 0 && r != -ENOENT) {
1616 return r;
1617 } else if (!parent.pool_namespace.empty()) {
1618 return -EXDEV;
1619 }
1620
1621 if (snap_id != CEPH_NOSNAP) {
1622 cls_rbd_snap snap;
1623 std::string snapshot_key;
1624 key_from_snap_id(snap_id, &snapshot_key);
1625 r = read_key(hctx, snapshot_key, &snap);
1626 if (r < 0 && r != -ENOENT) {
1627 return r;
1628 }
1629
1630 if (snap.parent.exists()) {
1631 // legacy format where full parent spec is written within
1632 // each snapshot record
1633 parent = snap.parent;
1634 } else if (snap.parent_overlap) {
1635 // normalized parent reference
1636 if (!parent.exists()) {
1637 CLS_ERR("get_parent: snap_id=%" PRIu64 ": invalid parent spec",
1638 snap_id);
1639 return -EINVAL;
1640 }
1641 parent.head_overlap = *snap.parent_overlap;
1642 } else {
1643 // snapshot doesn't have associated parent
1644 parent = {};
1645 }
1646 }
1647 }
1648
1649 encode(parent.pool_id, *out);
1650 encode(parent.image_id, *out);
1651 encode(parent.snap_id, *out);
1652 encode(parent.head_overlap.value_or(0ULL), *out);
1653 return 0;
1654 }
1655
1656 /**
1657 * set the image parent
1658 *
1659 * Input:
1660 * @param pool parent pool
1661 * @param id parent image id
1662 * @param snapid parent snapid
1663 * @param size parent size
1664 *
1665 * @returns 0 on success, or negative error code
1666 */
1667 int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1668 {
1669 cls_rbd_parent parent;
1670 auto iter = in->cbegin();
1671 try {
1672 decode(parent.pool_id, iter);
1673 decode(parent.image_id, iter);
1674 decode(parent.snap_id, iter);
1675
1676 uint64_t overlap;
1677 decode(overlap, iter);
1678 parent.head_overlap = overlap;
1679 } catch (const buffer::error &err) {
1680 CLS_LOG(20, "cls_rbd::set_parent: invalid decode");
1681 return -EINVAL;
1682 }
1683
1684 int r = image::parent::attach(hctx, parent, false);
1685 if (r < 0) {
1686 return r;
1687 }
1688
1689 return 0;
1690 }
1691
1692
1693 /**
1694 * remove the parent pointer
1695 *
1696 * This can only happen on the head, not on a snapshot. No arguments.
1697 *
1698 * @returns 0 on success, negative error code on failure.
1699 */
1700 int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1701 {
1702 int r = image::parent::detach(hctx, true);
1703 if (r < 0) {
1704 return r;
1705 }
1706
1707 return 0;
1708 }
1709
1710 /**
1711 * Input:
1712 * none
1713 *
1714 * Output:
1715 * @param parent spec (cls::rbd::ParentImageSpec)
1716 * @returns 0 on success, negative error code on failure
1717 */
1718 int parent_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
1719 int r = check_exists(hctx);
1720 if (r < 0) {
1721 return r;
1722 }
1723
1724 CLS_LOG(20, "parent_get");
1725
1726 cls_rbd_parent parent;
1727 r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
1728 if (r == 0) {
1729 r = read_key(hctx, "parent", &parent);
1730 if (r < 0 && r != -ENOENT) {
1731 return r;
1732 } else if (r == -ENOENT) {
1733 // examine oldest snapshot to see if it has a denormalized parent
1734 auto parent_lambda = [&parent](const cls_rbd_snap& snap_meta) {
1735 if (snap_meta.parent.exists()) {
1736 parent = snap_meta.parent;
1737 }
1738 return 0;
1739 };
1740
1741 r = image::snapshot::iterate(hctx, parent_lambda);
1742 if (r < 0) {
1743 return r;
1744 }
1745 }
1746 }
1747
1748 cls::rbd::ParentImageSpec parent_image_spec{
1749 parent.pool_id, parent.pool_namespace, parent.image_id,
1750 parent.snap_id};
1751 encode(parent_image_spec, *out);
1752 return 0;
1753 }
1754
1755 /**
1756 * Input:
1757 * @param snap id (uint64_t) parent snapshot id
1758 *
1759 * Output:
1760 * @param byte overlap of parent image (std::optional<uint64_t>)
1761 * @returns 0 on success, negative error code on failure
1762 */
1763 int parent_overlap_get(cls_method_context_t hctx, bufferlist *in,
1764 bufferlist *out) {
1765 uint64_t snap_id;
1766 auto iter = in->cbegin();
1767 try {
1768 decode(snap_id, iter);
1769 } catch (const buffer::error &err) {
1770 return -EINVAL;
1771 }
1772
1773 int r = check_exists(hctx);
1774 CLS_LOG(20, "parent_overlap_get");
1775
1776 std::optional<uint64_t> parent_overlap = std::nullopt;
1777 r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
1778 if (r == 0) {
1779 if (snap_id == CEPH_NOSNAP) {
1780 cls_rbd_parent parent;
1781 r = read_key(hctx, "parent", &parent);
1782 if (r < 0 && r != -ENOENT) {
1783 return r;
1784 } else if (r == 0) {
1785 parent_overlap = parent.head_overlap;
1786 }
1787 } else {
1788 cls_rbd_snap snap;
1789 std::string snapshot_key;
1790 key_from_snap_id(snap_id, &snapshot_key);
1791 r = read_key(hctx, snapshot_key, &snap);
1792 if (r < 0) {
1793 return r;
1794 }
1795
1796 if (snap.parent_overlap) {
1797 parent_overlap = snap.parent_overlap;
1798 } else if (snap.parent.exists()) {
1799 // legacy format where full parent spec is written within
1800 // each snapshot record
1801 parent_overlap = snap.parent.head_overlap;
1802 }
1803 }
1804 };
1805
1806 encode(parent_overlap, *out);
1807 return 0;
1808 }
1809
1810 /**
1811 * Input:
1812 * @param parent spec (cls::rbd::ParentImageSpec)
1813 * @param size parent size (uint64_t)
1814 *
1815 * Output:
1816 * @returns 0 on success, negative error code on failure
1817 */
1818 int parent_attach(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
1819 cls::rbd::ParentImageSpec parent_image_spec;
1820 uint64_t parent_overlap;
1821 bool reattach = false;
1822
1823 auto iter = in->cbegin();
1824 try {
1825 decode(parent_image_spec, iter);
1826 decode(parent_overlap, iter);
1827 if (!iter.end()) {
1828 decode(reattach, iter);
1829 }
1830 } catch (const buffer::error &err) {
1831 CLS_LOG(20, "cls_rbd::parent_attach: invalid decode");
1832 return -EINVAL;
1833 }
1834
1835 int r = image::parent::attach(hctx, {parent_image_spec, parent_overlap},
1836 reattach);
1837 if (r < 0) {
1838 return r;
1839 }
1840
1841 return 0;
1842 }
1843
1844 /**
1845 * Input:
1846 * none
1847 *
1848 * Output:
1849 * @returns 0 on success, negative error code on failure
1850 */
1851 int parent_detach(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
1852 int r = image::parent::detach(hctx, false);
1853 if (r < 0) {
1854 return r;
1855 }
1856
1857 return 0;
1858 }
1859
1860
1861 /**
1862 * methods for dealing with rbd_children object
1863 */
1864
1865 static int decode_parent_common(bufferlist::const_iterator& it, uint64_t *pool_id,
1866 string *image_id, snapid_t *snap_id)
1867 {
1868 try {
1869 decode(*pool_id, it);
1870 decode(*image_id, it);
1871 decode(*snap_id, it);
1872 } catch (const buffer::error &err) {
1873 CLS_ERR("error decoding parent spec");
1874 return -EINVAL;
1875 }
1876 return 0;
1877 }
1878
1879 static int decode_parent(bufferlist *in, uint64_t *pool_id,
1880 string *image_id, snapid_t *snap_id)
1881 {
1882 auto it = in->cbegin();
1883 return decode_parent_common(it, pool_id, image_id, snap_id);
1884 }
1885
1886 static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
1887 string *image_id, snapid_t *snap_id,
1888 string *c_image_id)
1889 {
1890 auto it = in->cbegin();
1891 int r = decode_parent_common(it, pool_id, image_id, snap_id);
1892 if (r < 0)
1893 return r;
1894 try {
1895 decode(*c_image_id, it);
1896 } catch (const buffer::error &err) {
1897 CLS_ERR("error decoding child image id");
1898 return -EINVAL;
1899 }
1900 return 0;
1901 }
1902
1903 static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
1904 {
1905 bufferlist key_bl;
1906 encode(pool_id, key_bl);
1907 encode(image_id, key_bl);
1908 encode(snap_id, key_bl);
1909 return string(key_bl.c_str(), key_bl.length());
1910 }
1911
1912 /**
1913 * add child to rbd_children directory object
1914 *
1915 * rbd_children is a map of (p_pool_id, p_image_id, p_snap_id) to
1916 * [c_image_id, [c_image_id ... ]]
1917 *
1918 * Input:
1919 * @param p_pool_id parent pool id
1920 * @param p_image_id parent image oid
1921 * @param p_snap_id parent snapshot id
1922 * @param c_image_id new child image oid to add
1923 *
1924 * @returns 0 on success, negative error on failure
1925 */
1926
1927 int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1928 {
1929 int r;
1930
1931 uint64_t p_pool_id;
1932 snapid_t p_snap_id;
1933 string p_image_id, c_image_id;
1934 // Use set for ease of erase() for remove_child()
1935 std::set<string> children;
1936
1937 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1938 &c_image_id);
1939 if (r < 0)
1940 return r;
1941
1942 CLS_LOG(20, "add_child %s to (%" PRIu64 ", %s, %" PRIu64 ")", c_image_id.c_str(),
1943 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1944
1945 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1946
1947 // get current child list for parent, if any
1948 r = read_key(hctx, key, &children);
1949 if ((r < 0) && (r != -ENOENT)) {
1950 CLS_LOG(20, "add_child: omap read failed: %s", cpp_strerror(r).c_str());
1951 return r;
1952 }
1953
1954 if (children.find(c_image_id) != children.end()) {
1955 CLS_LOG(20, "add_child: child already exists: %s", c_image_id.c_str());
1956 return -EEXIST;
1957 }
1958 // add new child
1959 children.insert(c_image_id);
1960
1961 // write back
1962 bufferlist childbl;
1963 encode(children, childbl);
1964 r = cls_cxx_map_set_val(hctx, key, &childbl);
1965 if (r < 0)
1966 CLS_LOG(20, "add_child: omap write failed: %s", cpp_strerror(r).c_str());
1967 return r;
1968 }
1969
1970 /**
1971 * remove child from rbd_children directory object
1972 *
1973 * Input:
1974 * @param p_pool_id parent pool id
1975 * @param p_image_id parent image oid
1976 * @param p_snap_id parent snapshot id
1977 * @param c_image_id new child image oid to add
1978 *
1979 * @returns 0 on success, negative error on failure
1980 */
1981
1982 int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1983 {
1984 int r;
1985
1986 uint64_t p_pool_id;
1987 snapid_t p_snap_id;
1988 string p_image_id, c_image_id;
1989 std::set<string> children;
1990
1991 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1992 &c_image_id);
1993 if (r < 0)
1994 return r;
1995
1996 CLS_LOG(20, "remove_child %s from (%" PRIu64 ", %s, %" PRIu64 ")",
1997 c_image_id.c_str(), p_pool_id, p_image_id.c_str(),
1998 p_snap_id.val);
1999
2000 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
2001
2002 // get current child list for parent. Unlike add_child(), an empty list
2003 // is an error (how can we remove something that doesn't exist?)
2004 r = read_key(hctx, key, &children);
2005 if (r < 0) {
2006 CLS_LOG(20, "remove_child: read omap failed: %s", cpp_strerror(r).c_str());
2007 return r;
2008 }
2009
2010 if (children.find(c_image_id) == children.end()) {
2011 CLS_LOG(20, "remove_child: child not found: %s", c_image_id.c_str());
2012 return -ENOENT;
2013 }
2014 // find and remove child
2015 children.erase(c_image_id);
2016
2017 // now empty? remove key altogether
2018 if (children.empty()) {
2019 r = cls_cxx_map_remove_key(hctx, key);
2020 if (r < 0)
2021 CLS_LOG(20, "remove_child: remove key failed: %s", cpp_strerror(r).c_str());
2022 } else {
2023 // write back shortened children list
2024 bufferlist childbl;
2025 encode(children, childbl);
2026 r = cls_cxx_map_set_val(hctx, key, &childbl);
2027 if (r < 0)
2028 CLS_LOG(20, "remove_child: write omap failed: %s", cpp_strerror(r).c_str());
2029 }
2030 return r;
2031 }
2032
2033 /**
2034 * Input:
2035 * @param p_pool_id parent pool id
2036 * @param p_image_id parent image oid
2037 * @param p_snap_id parent snapshot id
2038 * @param c_image_id new child image oid to add
2039 *
2040 * Output:
2041 * @param children set<string> of children
2042 *
2043 * @returns 0 on success, negative error on failure
2044 */
2045 int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2046 {
2047 int r;
2048 uint64_t p_pool_id;
2049 snapid_t p_snap_id;
2050 string p_image_id;
2051 std::set<string> children;
2052
2053 r = decode_parent(in, &p_pool_id, &p_image_id, &p_snap_id);
2054 if (r < 0)
2055 return r;
2056
2057 CLS_LOG(20, "get_children of (%" PRIu64 ", %s, %" PRIu64 ")",
2058 p_pool_id, p_image_id.c_str(), p_snap_id.val);
2059
2060 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
2061
2062 r = read_key(hctx, key, &children);
2063 if (r < 0) {
2064 if (r != -ENOENT)
2065 CLS_LOG(20, "get_children: read omap failed: %s", cpp_strerror(r).c_str());
2066 return r;
2067 }
2068 encode(children, *out);
2069 return 0;
2070 }
2071
2072
2073 /**
2074 * Get the information needed to create a rados snap context for doing
2075 * I/O to the data objects. This must include all snapshots.
2076 *
2077 * Output:
2078 * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
2079 * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
2080 * @returns 0 on success, negative error code on failure
2081 */
2082 int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2083 {
2084 CLS_LOG(20, "get_snapcontext");
2085
2086 int r;
2087 int max_read = RBD_MAX_KEYS_READ;
2088 vector<snapid_t> snap_ids;
2089 string last_read = RBD_SNAP_KEY_PREFIX;
2090 bool more;
2091
2092 do {
2093 set<string> keys;
2094 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
2095 if (r < 0)
2096 return r;
2097
2098 for (set<string>::const_iterator it = keys.begin();
2099 it != keys.end(); ++it) {
2100 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0)
2101 break;
2102 snapid_t snap_id = snap_id_from_key(*it);
2103 snap_ids.push_back(snap_id);
2104 }
2105 if (!keys.empty())
2106 last_read = *(keys.rbegin());
2107 } while (more);
2108
2109 uint64_t snap_seq;
2110 r = read_key(hctx, "snap_seq", &snap_seq);
2111 if (r < 0) {
2112 CLS_ERR("could not read the image's snap_seq off disk: %s", cpp_strerror(r).c_str());
2113 return r;
2114 }
2115
2116 // snap_ids must be descending in a snap context
2117 std::reverse(snap_ids.begin(), snap_ids.end());
2118
2119 encode(snap_seq, *out);
2120 encode(snap_ids, *out);
2121
2122 return 0;
2123 }
2124
2125 /**
2126 * Output:
2127 * @param object_prefix prefix for data object names (string)
2128 * @returns 0 on success, negative error code on failure
2129 */
2130 int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2131 {
2132 CLS_LOG(20, "get_object_prefix");
2133
2134 string object_prefix;
2135 int r = read_key(hctx, "object_prefix", &object_prefix);
2136 if (r < 0) {
2137 CLS_ERR("failed to read the image's object prefix off of disk: %s",
2138 cpp_strerror(r).c_str());
2139 return r;
2140 }
2141
2142 encode(object_prefix, *out);
2143
2144 return 0;
2145 }
2146
2147 /**
2148 * Input:
2149 * none
2150 *
2151 * Output:
2152 * @param pool_id (int64_t) of data pool or -1 if none
2153 * @returns 0 on success, negative error code on failure
2154 */
2155 int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2156 {
2157 CLS_LOG(20, "get_data_pool");
2158
2159 int64_t data_pool_id = -1;
2160 int r = read_key(hctx, "data_pool_id", &data_pool_id);
2161 if (r == -ENOENT) {
2162 data_pool_id = -1;
2163 } else if (r < 0) {
2164 CLS_ERR("error reading image data pool id: %s", cpp_strerror(r).c_str());
2165 return r;
2166 }
2167
2168 encode(data_pool_id, *out);
2169 return 0;
2170 }
2171
2172 /**
2173 * Input:
2174 * @param snap_id which snapshot to query
2175 *
2176 * Output:
2177 * @param name (string) of the snapshot
2178 * @returns 0 on success, negative error code on failure
2179 */
2180 int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2181 {
2182 uint64_t snap_id;
2183
2184 auto iter = in->cbegin();
2185 try {
2186 decode(snap_id, iter);
2187 } catch (const buffer::error &err) {
2188 return -EINVAL;
2189 }
2190
2191 CLS_LOG(20, "get_snapshot_name snap_id=%llu", (unsigned long long)snap_id);
2192
2193 if (snap_id == CEPH_NOSNAP)
2194 return -EINVAL;
2195
2196 cls_rbd_snap snap;
2197 string snapshot_key;
2198 key_from_snap_id(snap_id, &snapshot_key);
2199 int r = read_key(hctx, snapshot_key, &snap);
2200 if (r < 0)
2201 return r;
2202
2203 encode(snap.name, *out);
2204
2205 return 0;
2206 }
2207
2208 /**
2209 * Input:
2210 * @param snap_id which snapshot to query
2211 *
2212 * Output:
2213 * @param timestamp (utime_t) of the snapshot
2214 * @returns 0 on success, negative error code on failure
2215 *
2216 * NOTE: deprecated - remove this method after Luminous is unsupported
2217 */
2218 int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2219 {
2220 uint64_t snap_id;
2221
2222 auto iter = in->cbegin();
2223 try {
2224 decode(snap_id, iter);
2225 } catch (const buffer::error &err) {
2226 return -EINVAL;
2227 }
2228
2229 CLS_LOG(20, "get_snapshot_timestamp snap_id=%llu", (unsigned long long)snap_id);
2230
2231 if (snap_id == CEPH_NOSNAP) {
2232 return -EINVAL;
2233 }
2234
2235 cls_rbd_snap snap;
2236 string snapshot_key;
2237 key_from_snap_id(snap_id, &snapshot_key);
2238 int r = read_key(hctx, snapshot_key, &snap);
2239 if (r < 0) {
2240 return r;
2241 }
2242
2243 encode(snap.timestamp, *out);
2244 return 0;
2245 }
2246
2247 /**
2248 * Input:
2249 * @param snap_id which snapshot to query
2250 *
2251 * Output:
2252 * @param snapshot (cls::rbd::SnapshotInfo)
2253 * @returns 0 on success, negative error code on failure
2254 */
2255 int snapshot_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2256 {
2257 uint64_t snap_id;
2258
2259 auto iter = in->cbegin();
2260 try {
2261 decode(snap_id, iter);
2262 } catch (const buffer::error &err) {
2263 return -EINVAL;
2264 }
2265
2266 CLS_LOG(20, "snapshot_get snap_id=%llu", (unsigned long long)snap_id);
2267 if (snap_id == CEPH_NOSNAP) {
2268 return -EINVAL;
2269 }
2270
2271 cls_rbd_snap snap;
2272 string snapshot_key;
2273 key_from_snap_id(snap_id, &snapshot_key);
2274 int r = read_key(hctx, snapshot_key, &snap);
2275 if (r < 0) {
2276 return r;
2277 }
2278
2279 cls::rbd::SnapshotInfo snapshot_info{snap.id, snap.snapshot_namespace,
2280 snap.name, snap.image_size,
2281 snap.timestamp, snap.child_count};
2282 encode(snapshot_info, *out);
2283 return 0;
2284 }
2285
2286 /**
2287 * Adds a snapshot to an rbd header. Ensures the id and name are unique.
2288 *
2289 * Input:
2290 * @param snap_name name of the snapshot (string)
2291 * @param snap_id id of the snapshot (uint64_t)
2292 * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespace)
2293 *
2294 * Output:
2295 * @returns 0 on success, negative error code on failure.
2296 * @returns -ESTALE if the input snap_id is less than the image's snap_seq
2297 * @returns -EEXIST if the id or name are already used by another snapshot
2298 */
2299 int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2300 {
2301 bufferlist snap_namebl, snap_idbl;
2302 cls_rbd_snap snap_meta;
2303 uint64_t snap_limit;
2304
2305 try {
2306 auto iter = in->cbegin();
2307 decode(snap_meta.name, iter);
2308 decode(snap_meta.id, iter);
2309 if (!iter.end()) {
2310 decode(snap_meta.snapshot_namespace, iter);
2311 }
2312 } catch (const buffer::error &err) {
2313 return -EINVAL;
2314 }
2315
2316 if (boost::get<cls::rbd::UnknownSnapshotNamespace>(
2317 &snap_meta.snapshot_namespace) != nullptr) {
2318 CLS_ERR("Unknown snapshot namespace provided");
2319 return -EINVAL;
2320 }
2321
2322 CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_meta.name.c_str(),
2323 (unsigned long long)snap_meta.id.val);
2324
2325 if (snap_meta.id > CEPH_MAXSNAP)
2326 return -EINVAL;
2327
2328 uint64_t cur_snap_seq;
2329 int r = read_key(hctx, "snap_seq", &cur_snap_seq);
2330 if (r < 0) {
2331 CLS_ERR("Could not read image's snap_seq off disk: %s", cpp_strerror(r).c_str());
2332 return r;
2333 }
2334
2335 // client lost a race with another snapshot creation.
2336 // snap_seq must be monotonically increasing.
2337 if (snap_meta.id < cur_snap_seq)
2338 return -ESTALE;
2339
2340 r = read_key(hctx, "size", &snap_meta.image_size);
2341 if (r < 0) {
2342 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
2343 return r;
2344 }
2345 r = read_key(hctx, "flags", &snap_meta.flags);
2346 if (r < 0 && r != -ENOENT) {
2347 CLS_ERR("Could not read image's flags off disk: %s", cpp_strerror(r).c_str());
2348 return r;
2349 }
2350
2351 r = read_key(hctx, "snap_limit", &snap_limit);
2352 if (r == -ENOENT) {
2353 snap_limit = UINT64_MAX;
2354 } else if (r < 0) {
2355 CLS_ERR("Could not read snapshot limit off disk: %s", cpp_strerror(r).c_str());
2356 return r;
2357 }
2358
2359 snap_meta.timestamp = ceph_clock_now();
2360
2361 uint64_t total_read = 0;
2362 auto pre_check_lambda =
2363 [&snap_meta, &total_read, snap_limit](const cls_rbd_snap& old_meta) {
2364 ++total_read;
2365 if (total_read >= snap_limit) {
2366 CLS_ERR("Attempt to create snapshot over limit of %" PRIu64,
2367 snap_limit);
2368 return -EDQUOT;
2369 }
2370
2371 if ((snap_meta.name == old_meta.name &&
2372 snap_meta.snapshot_namespace == old_meta.snapshot_namespace) ||
2373 snap_meta.id == old_meta.id) {
2374 CLS_LOG(20, "snap_name %s or snap_id %" PRIu64 " matches existing snap "
2375 "%s %" PRIu64, snap_meta.name.c_str(), snap_meta.id.val,
2376 old_meta.name.c_str(), old_meta.id.val);
2377 return -EEXIST;
2378 }
2379 return 0;
2380 };
2381
2382 r = image::snapshot::iterate(hctx, pre_check_lambda);
2383 if (r < 0) {
2384 return r;
2385 }
2386
2387 // snapshot inherits parent, if any
2388 cls_rbd_parent parent;
2389 r = read_key(hctx, "parent", &parent);
2390 if (r < 0 && r != -ENOENT) {
2391 return r;
2392 }
2393 if (r == 0) {
2394 // write helper method will convert to normalized format if required
2395 snap_meta.parent = parent;
2396 }
2397
2398 if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) ==
2399 cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
2400 // add snap_trash feature bit if not already enabled
2401 r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_SNAP_TRASH,
2402 RBD_OPERATION_FEATURE_SNAP_TRASH);
2403 if (r < 0) {
2404 return r;
2405 }
2406 }
2407
2408 r = write_key(hctx, "snap_seq", snap_meta.id);
2409 if (r < 0) {
2410 return r;
2411 }
2412
2413 std::string snapshot_key;
2414 key_from_snap_id(snap_meta.id, &snapshot_key);
2415 r = image::snapshot::write(hctx, snapshot_key, std::move(snap_meta));
2416 if (r < 0) {
2417 return r;
2418 }
2419
2420 return 0;
2421 }
2422
2423 /**
2424 * rename snapshot .
2425 *
2426 * Input:
2427 * @param src_snap_id old snap id of the snapshot (snapid_t)
2428 * @param dst_snap_name new name of the snapshot (string)
2429 *
2430 * Output:
2431 * @returns 0 on success, negative error code on failure.
2432 */
2433 int snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2434 {
2435 bufferlist snap_namebl, snap_idbl;
2436 snapid_t src_snap_id;
2437 string dst_snap_name;
2438 cls_rbd_snap snap_meta;
2439 int r;
2440
2441 try {
2442 auto iter = in->cbegin();
2443 decode(src_snap_id, iter);
2444 decode(dst_snap_name, iter);
2445 } catch (const buffer::error &err) {
2446 return -EINVAL;
2447 }
2448
2449 CLS_LOG(20, "snapshot_rename id=%" PRIu64 ", dst_name=%s",
2450 src_snap_id.val, dst_snap_name.c_str());
2451
2452 auto duplicate_name_lambda = [&dst_snap_name](const cls_rbd_snap& snap_meta) {
2453 if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) ==
2454 cls::rbd::SNAPSHOT_NAMESPACE_TYPE_USER &&
2455 snap_meta.name == dst_snap_name) {
2456 CLS_LOG(20, "snap_name %s matches existing snap with snap id %" PRIu64,
2457 dst_snap_name.c_str(), snap_meta.id.val);
2458 return -EEXIST;
2459 }
2460 return 0;
2461 };
2462 r = image::snapshot::iterate(hctx, duplicate_name_lambda);
2463 if (r < 0) {
2464 return r;
2465 }
2466
2467 std::string src_snap_key;
2468 key_from_snap_id(src_snap_id, &src_snap_key);
2469 r = read_key(hctx, src_snap_key, &snap_meta);
2470 if (r == -ENOENT) {
2471 CLS_LOG(20, "cannot find existing snap with snap id = %" PRIu64,
2472 src_snap_id.val);
2473 return r;
2474 }
2475
2476 if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) !=
2477 cls::rbd::SNAPSHOT_NAMESPACE_TYPE_USER) {
2478 // can only rename user snapshots
2479 return -EINVAL;
2480 }
2481
2482 snap_meta.name = dst_snap_name;
2483 r = image::snapshot::write(hctx, src_snap_key, std::move(snap_meta));
2484 if (r < 0) {
2485 return r;
2486 }
2487
2488 return 0;
2489 }
2490
2491 /**
2492 * Removes a snapshot from an rbd header.
2493 *
2494 * Input:
2495 * @param snap_id the id of the snapshot to remove (uint64_t)
2496 *
2497 * Output:
2498 * @returns 0 on success, negative error code on failure
2499 */
2500 int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2501 {
2502 snapid_t snap_id;
2503
2504 try {
2505 auto iter = in->cbegin();
2506 decode(snap_id, iter);
2507 } catch (const buffer::error &err) {
2508 return -EINVAL;
2509 }
2510
2511 CLS_LOG(20, "snapshot_remove id=%llu", (unsigned long long)snap_id.val);
2512
2513 // check if the key exists. we can't rely on remove_key doing this for
2514 // us, since OMAPRMKEYS returns success if the key is not there.
2515 // bug or feature? sounds like a bug, since tmap did not have this
2516 // behavior, but cls_rgw may rely on it...
2517 cls_rbd_snap snap;
2518 string snapshot_key;
2519 key_from_snap_id(snap_id, &snapshot_key);
2520 int r = read_key(hctx, snapshot_key, &snap);
2521 if (r == -ENOENT) {
2522 return -ENOENT;
2523 }
2524
2525 if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED) {
2526 return -EBUSY;
2527 }
2528
2529 // snapshot is in-use by clone v2 child
2530 if (snap.child_count > 0) {
2531 return -EBUSY;
2532 }
2533
2534 r = remove_key(hctx, snapshot_key);
2535 if (r < 0) {
2536 return r;
2537 }
2538
2539 bool has_child_snaps = false;
2540 bool has_trash_snaps = false;
2541 auto remove_lambda = [snap_id, &has_child_snaps, &has_trash_snaps](
2542 const cls_rbd_snap& snap_meta) {
2543 if (snap_meta.id != snap_id) {
2544 if (snap_meta.parent.pool_id != -1 || snap_meta.parent_overlap) {
2545 has_child_snaps = true;
2546 }
2547
2548 if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) ==
2549 cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
2550 has_trash_snaps = true;
2551 }
2552 }
2553 return 0;
2554 };
2555
2556 r = image::snapshot::iterate(hctx, remove_lambda);
2557 if (r < 0) {
2558 return r;
2559 }
2560
2561 cls_rbd_parent parent;
2562 r = read_key(hctx, "parent", &parent);
2563 if (r < 0 && r != -ENOENT) {
2564 return r;
2565 }
2566
2567 bool has_parent = (r >= 0 && parent.exists());
2568 bool is_head_child = (has_parent && parent.head_overlap);
2569 ceph_release_t require_osd_release = cls_get_required_osd_release(hctx);
2570 if (has_parent && !is_head_child && !has_child_snaps &&
2571 require_osd_release >= ceph_release_t::nautilus) {
2572 // remove the unused parent image spec
2573 r = remove_key(hctx, "parent");
2574 if (r < 0 && r != -ENOENT) {
2575 return r;
2576 }
2577 }
2578
2579 uint64_t op_features_mask = 0ULL;
2580 if (!has_child_snaps && !is_head_child) {
2581 // disable clone child op feature if no longer associated
2582 op_features_mask |= RBD_OPERATION_FEATURE_CLONE_CHILD;
2583 }
2584 if (!has_trash_snaps) {
2585 // remove the snap_trash op feature if not in-use by any other snapshots
2586 op_features_mask |= RBD_OPERATION_FEATURE_SNAP_TRASH;
2587 }
2588
2589 if (op_features_mask != 0ULL) {
2590 r = image::set_op_features(hctx, 0, op_features_mask);
2591 if (r < 0) {
2592 return r;
2593 }
2594 }
2595
2596 return 0;
2597 }
2598
2599 /**
2600 * Moves a snapshot to the trash namespace.
2601 *
2602 * Input:
2603 * @param snap_id the id of the snapshot to move to the trash (uint64_t)
2604 *
2605 * Output:
2606 * @returns 0 on success, negative error code on failure
2607 */
2608 int snapshot_trash_add(cls_method_context_t hctx, bufferlist *in,
2609 bufferlist *out)
2610 {
2611 snapid_t snap_id;
2612
2613 try {
2614 auto iter = in->cbegin();
2615 decode(snap_id, iter);
2616 } catch (const buffer::error &err) {
2617 return -EINVAL;
2618 }
2619
2620 CLS_LOG(20, "snapshot_trash_add id=%" PRIu64, snap_id.val);
2621
2622 cls_rbd_snap snap;
2623 std::string snapshot_key;
2624 key_from_snap_id(snap_id, &snapshot_key);
2625 int r = read_key(hctx, snapshot_key, &snap);
2626 if (r == -ENOENT) {
2627 return r;
2628 }
2629
2630 if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED) {
2631 return -EBUSY;
2632 }
2633
2634 auto snap_type = cls::rbd::get_snap_namespace_type(snap.snapshot_namespace);
2635 if (snap_type == cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
2636 return -EEXIST;
2637 }
2638
2639 // add snap_trash feature bit if not already enabled
2640 r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_SNAP_TRASH,
2641 RBD_OPERATION_FEATURE_SNAP_TRASH);
2642 if (r < 0) {
2643 return r;
2644 }
2645
2646 snap.snapshot_namespace = cls::rbd::TrashSnapshotNamespace{snap_type,
2647 snap.name};
2648 uuid_d uuid_gen;
2649 uuid_gen.generate_random();
2650 snap.name = uuid_gen.to_string();
2651
2652 r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
2653 if (r < 0) {
2654 return r;
2655 }
2656
2657 return 0;
2658 }
2659
2660 /**
2661 * Returns a uint64_t of all the features supported by this class.
2662 */
2663 int get_all_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2664 {
2665 uint64_t all_features = RBD_FEATURES_ALL;
2666 encode(all_features, *out);
2667 return 0;
2668 }
2669
2670 /**
2671 * "Copy up" data from the parent of a clone to the clone's object(s).
2672 * Used for implementing copy-on-write for a clone image. Client
2673 * will pass down a chunk of data that fits completely within one
2674 * clone block (one object), and is aligned (starts at beginning of block),
2675 * but may be shorter (for non-full parent blocks). The class method
2676 * can't know the object size to validate the requested length,
2677 * so it just writes the data as given if the child object doesn't
2678 * already exist, and returns success if it does.
2679 *
2680 * Input:
2681 * @param in bufferlist of data to write
2682 *
2683 * Output:
2684 * @returns 0 on success, or if block already exists in child
2685 * negative error code on other error
2686 */
2687
2688 int copyup(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2689 {
2690 // check for existence; if child object exists, just return success
2691 if (cls_cxx_stat(hctx, NULL, NULL) == 0)
2692 return 0;
2693 CLS_LOG(20, "copyup: writing length %d\n", in->length());
2694 return cls_cxx_write(hctx, 0, in->length(), in);
2695 }
2696
2697 /**
2698 * Input:
2699 * @param extent_map map of extents to write
2700 * @param data bufferlist of data to write
2701 *
2702 * Output:
2703 * @returns 0 on success, or if block already exists in child
2704 * negative error code on other error
2705 */
2706
2707 int sparse_copyup(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2708 {
2709 std::map<uint64_t, uint64_t> extent_map;
2710 bufferlist data;
2711
2712 try {
2713 auto iter = in->cbegin();
2714 decode(extent_map, iter);
2715 decode(data, iter);
2716 } catch (const buffer::error &err) {
2717 CLS_LOG(20, "sparse_copyup: invalid decode");
2718 return -EINVAL;
2719 }
2720
2721 int r = check_exists(hctx);
2722 if (r == 0) {
2723 return 0;
2724 }
2725
2726 if (extent_map.empty()) {
2727 CLS_LOG(20, "sparse_copyup: create empty object");
2728 r = cls_cxx_create(hctx, true);
2729 return r;
2730 }
2731
2732 uint64_t data_offset = 0;
2733 for (auto &it: extent_map) {
2734 auto off = it.first;
2735 auto len = it.second;
2736
2737 bufferlist tmpbl;
2738 try {
2739 tmpbl.substr_of(data, data_offset, len);
2740 } catch (const buffer::error &err) {
2741 CLS_LOG(20, "sparse_copyup: invalid data");
2742 return -EINVAL;
2743 }
2744 data_offset += len;
2745
2746 CLS_LOG(20, "sparse_copyup: writing extent %" PRIu64 "~%" PRIu64 "\n", off,
2747 len);
2748 int r = cls_cxx_write(hctx, off, len, &tmpbl);
2749 if (r < 0) {
2750 CLS_ERR("sparse_copyup: error writing extent %" PRIu64 "~%" PRIu64 ": %s",
2751 off, len, cpp_strerror(r).c_str());
2752 return r;
2753 }
2754 }
2755
2756 return 0;
2757 }
2758
2759 /************************ rbd_id object methods **************************/
2760
2761 /**
2762 * Input:
2763 * @param in ignored
2764 *
2765 * Output:
2766 * @param id the id stored in the object
2767 * @returns 0 on success, negative error code on failure
2768 */
2769 int get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2770 {
2771 uint64_t size;
2772 int r = cls_cxx_stat(hctx, &size, NULL);
2773 if (r < 0)
2774 return r;
2775
2776 if (size == 0)
2777 return -ENOENT;
2778
2779 bufferlist read_bl;
2780 r = cls_cxx_read(hctx, 0, size, &read_bl);
2781 if (r < 0) {
2782 CLS_ERR("get_id: could not read id: %s", cpp_strerror(r).c_str());
2783 return r;
2784 }
2785
2786 string id;
2787 try {
2788 auto iter = read_bl.cbegin();
2789 decode(id, iter);
2790 } catch (const buffer::error &err) {
2791 return -EIO;
2792 }
2793
2794 encode(id, *out);
2795 return 0;
2796 }
2797
2798 /**
2799 * Set the id of an image. The object must already exist.
2800 *
2801 * Input:
2802 * @param id the id of the image, as an alpha-numeric string
2803 *
2804 * Output:
2805 * @returns 0 on success, -EEXIST if the atomic create fails,
2806 * negative error code on other error
2807 */
2808 int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2809 {
2810 int r = check_exists(hctx);
2811 if (r < 0)
2812 return r;
2813
2814 string id;
2815 try {
2816 auto iter = in->cbegin();
2817 decode(id, iter);
2818 } catch (const buffer::error &err) {
2819 return -EINVAL;
2820 }
2821
2822 if (!is_valid_id(id)) {
2823 CLS_ERR("set_id: invalid id '%s'", id.c_str());
2824 return -EINVAL;
2825 }
2826
2827 uint64_t size;
2828 r = cls_cxx_stat(hctx, &size, NULL);
2829 if (r < 0)
2830 return r;
2831 if (size != 0)
2832 return -EEXIST;
2833
2834 CLS_LOG(20, "set_id: id=%s", id.c_str());
2835
2836 bufferlist write_bl;
2837 encode(id, write_bl);
2838 return cls_cxx_write(hctx, 0, write_bl.length(), &write_bl);
2839 }
2840
2841 /**
2842 * Update the access timestamp of an image
2843 *
2844 * Input:
2845 * @param none
2846 *
2847 * Output:
2848 * @returns 0 on success, negative error code on other error
2849 */
2850 int set_access_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2851 {
2852 int r = check_exists(hctx);
2853 if(r < 0)
2854 return r;
2855
2856 utime_t timestamp = ceph_clock_now();
2857 r = write_key(hctx, "access_timestamp", timestamp);
2858 if(r < 0) {
2859 CLS_ERR("error setting access_timestamp");
2860 return r;
2861 }
2862
2863 return 0;
2864 }
2865
2866 /**
2867 * Update the modify timestamp of an image
2868 *
2869 * Input:
2870 * @param none
2871 *
2872 * Output:
2873 * @returns 0 on success, negative error code on other error
2874 */
2875
2876 int set_modify_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2877 {
2878 int r = check_exists(hctx);
2879 if(r < 0)
2880 return r;
2881
2882 utime_t timestamp = ceph_clock_now();
2883 r = write_key(hctx, "modify_timestamp", timestamp);
2884 if(r < 0) {
2885 CLS_ERR("error setting modify_timestamp");
2886 return r;
2887 }
2888
2889 return 0;
2890 }
2891
2892
2893
2894 /*********************** methods for rbd_directory ***********************/
2895
2896 static const string dir_key_for_id(const string &id)
2897 {
2898 return RBD_DIR_ID_KEY_PREFIX + id;
2899 }
2900
2901 static const string dir_key_for_name(const string &name)
2902 {
2903 return RBD_DIR_NAME_KEY_PREFIX + name;
2904 }
2905
2906 static const string dir_name_from_key(const string &key)
2907 {
2908 return key.substr(strlen(RBD_DIR_NAME_KEY_PREFIX));
2909 }
2910
2911 static int dir_add_image_helper(cls_method_context_t hctx,
2912 const string &name, const string &id,
2913 bool check_for_unique_id)
2914 {
2915 if (!name.size() || !is_valid_id(id)) {
2916 CLS_ERR("dir_add_image_helper: invalid name '%s' or id '%s'",
2917 name.c_str(), id.c_str());
2918 return -EINVAL;
2919 }
2920
2921 CLS_LOG(20, "dir_add_image_helper name=%s id=%s", name.c_str(), id.c_str());
2922
2923 string tmp;
2924 string name_key = dir_key_for_name(name);
2925 string id_key = dir_key_for_id(id);
2926 int r = read_key(hctx, name_key, &tmp);
2927 if (r != -ENOENT) {
2928 CLS_LOG(10, "name already exists");
2929 return -EEXIST;
2930 }
2931 r = read_key(hctx, id_key, &tmp);
2932 if (r != -ENOENT && check_for_unique_id) {
2933 CLS_LOG(10, "id already exists");
2934 return -EBADF;
2935 }
2936 bufferlist id_bl, name_bl;
2937 encode(id, id_bl);
2938 encode(name, name_bl);
2939 map<string, bufferlist> omap_vals;
2940 omap_vals[name_key] = id_bl;
2941 omap_vals[id_key] = name_bl;
2942 return cls_cxx_map_set_vals(hctx, &omap_vals);
2943 }
2944
2945 static int dir_remove_image_helper(cls_method_context_t hctx,
2946 const string &name, const string &id)
2947 {
2948 CLS_LOG(20, "dir_remove_image_helper name=%s id=%s",
2949 name.c_str(), id.c_str());
2950
2951 string stored_name, stored_id;
2952 string name_key = dir_key_for_name(name);
2953 string id_key = dir_key_for_id(id);
2954 int r = read_key(hctx, name_key, &stored_id);
2955 if (r < 0) {
2956 if (r != -ENOENT)
2957 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
2958 return r;
2959 }
2960 r = read_key(hctx, id_key, &stored_name);
2961 if (r < 0) {
2962 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
2963 return r;
2964 }
2965
2966 // check if this op raced with a rename
2967 if (stored_name != name || stored_id != id) {
2968 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
2969 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
2970 return -ESTALE;
2971 }
2972
2973 r = cls_cxx_map_remove_key(hctx, name_key);
2974 if (r < 0) {
2975 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
2976 return r;
2977 }
2978
2979 r = cls_cxx_map_remove_key(hctx, id_key);
2980 if (r < 0) {
2981 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
2982 return r;
2983 }
2984
2985 return 0;
2986 }
2987
2988 /**
2989 * Rename an image in the directory, updating both indexes
2990 * atomically. This can't be done from the client calling
2991 * dir_add_image and dir_remove_image in one transaction because the
2992 * results of the first method are not visibale to later steps.
2993 *
2994 * Input:
2995 * @param src original name of the image
2996 * @param dest new name of the image
2997 * @param id the id of the image
2998 *
2999 * Output:
3000 * @returns -ESTALE if src and id do not map to each other
3001 * @returns -ENOENT if src or id are not in the directory
3002 * @returns -EEXIST if dest already exists
3003 * @returns 0 on success, negative error code on failure
3004 */
3005 int dir_rename_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3006 {
3007 string src, dest, id;
3008 try {
3009 auto iter = in->cbegin();
3010 decode(src, iter);
3011 decode(dest, iter);
3012 decode(id, iter);
3013 } catch (const buffer::error &err) {
3014 return -EINVAL;
3015 }
3016
3017 int r = dir_remove_image_helper(hctx, src, id);
3018 if (r < 0)
3019 return r;
3020 // ignore duplicate id because the result of
3021 // remove_image_helper is not visible yet
3022 return dir_add_image_helper(hctx, dest, id, false);
3023 }
3024
3025 /**
3026 * Get the id of an image given its name.
3027 *
3028 * Input:
3029 * @param name the name of the image
3030 *
3031 * Output:
3032 * @param id the id of the image
3033 * @returns 0 on success, negative error code on failure
3034 */
3035 int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3036 {
3037 string name;
3038
3039 try {
3040 auto iter = in->cbegin();
3041 decode(name, iter);
3042 } catch (const buffer::error &err) {
3043 return -EINVAL;
3044 }
3045
3046 CLS_LOG(20, "dir_get_id: name=%s", name.c_str());
3047
3048 string id;
3049 int r = read_key(hctx, dir_key_for_name(name), &id);
3050 if (r < 0) {
3051 if (r != -ENOENT)
3052 CLS_ERR("error reading id for name '%s': %s", name.c_str(), cpp_strerror(r).c_str());
3053 return r;
3054 }
3055 encode(id, *out);
3056 return 0;
3057 }
3058
3059 /**
3060 * Get the name of an image given its id.
3061 *
3062 * Input:
3063 * @param id the id of the image
3064 *
3065 * Output:
3066 * @param name the name of the image
3067 * @returns 0 on success, negative error code on failure
3068 */
3069 int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3070 {
3071 string id;
3072
3073 try {
3074 auto iter = in->cbegin();
3075 decode(id, iter);
3076 } catch (const buffer::error &err) {
3077 return -EINVAL;
3078 }
3079
3080 CLS_LOG(20, "dir_get_name: id=%s", id.c_str());
3081
3082 string name;
3083 int r = read_key(hctx, dir_key_for_id(id), &name);
3084 if (r < 0) {
3085 if (r != -ENOENT) {
3086 CLS_ERR("error reading name for id '%s': %s", id.c_str(),
3087 cpp_strerror(r).c_str());
3088 }
3089 return r;
3090 }
3091 encode(name, *out);
3092 return 0;
3093 }
3094
3095 /**
3096 * List the names and ids of the images in the directory, sorted by
3097 * name.
3098 *
3099 * Input:
3100 * @param start_after which name to begin listing after
3101 * (use the empty string to start at the beginning)
3102 * @param max_return the maximum number of names to list
3103 *
3104 * Output:
3105 * @param images map from name to id of up to max_return images
3106 * @returns 0 on success, negative error code on failure
3107 */
3108 int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3109 {
3110 string start_after;
3111 uint64_t max_return;
3112
3113 try {
3114 auto iter = in->cbegin();
3115 decode(start_after, iter);
3116 decode(max_return, iter);
3117 } catch (const buffer::error &err) {
3118 return -EINVAL;
3119 }
3120
3121 int max_read = RBD_MAX_KEYS_READ;
3122 map<string, string> images;
3123 string last_read = dir_key_for_name(start_after);
3124 bool more = true;
3125
3126 while (more && images.size() < max_return) {
3127 map<string, bufferlist> vals;
3128 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
3129 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
3130 max_read, &vals, &more);
3131 if (r < 0) {
3132 if (r != -ENOENT) {
3133 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
3134 }
3135 return r;
3136 }
3137
3138 for (map<string, bufferlist>::iterator it = vals.begin();
3139 it != vals.end(); ++it) {
3140 string id;
3141 auto iter = it->second.cbegin();
3142 try {
3143 decode(id, iter);
3144 } catch (const buffer::error &err) {
3145 CLS_ERR("could not decode id of image '%s'", it->first.c_str());
3146 return -EIO;
3147 }
3148 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(it->first).c_str(), id.c_str());
3149 images[dir_name_from_key(it->first)] = id;
3150 if (images.size() >= max_return)
3151 break;
3152 }
3153 if (!vals.empty()) {
3154 last_read = dir_key_for_name(images.rbegin()->first);
3155 }
3156 }
3157
3158 encode(images, *out);
3159
3160 return 0;
3161 }
3162
3163 /**
3164 * Add an image to the rbd directory. Creates the directory object if
3165 * needed, and updates the index from id to name and name to id.
3166 *
3167 * Input:
3168 * @param name the name of the image
3169 * @param id the id of the image
3170 *
3171 * Output:
3172 * @returns -EEXIST if the image name is already in the directory
3173 * @returns -EBADF if the image id is already in the directory
3174 * @returns 0 on success, negative error code on failure
3175 */
3176 int dir_add_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3177 {
3178 int r = cls_cxx_create(hctx, false);
3179 if (r < 0) {
3180 CLS_ERR("could not create directory: %s", cpp_strerror(r).c_str());
3181 return r;
3182 }
3183
3184 string name, id;
3185 try {
3186 auto iter = in->cbegin();
3187 decode(name, iter);
3188 decode(id, iter);
3189 } catch (const buffer::error &err) {
3190 return -EINVAL;
3191 }
3192
3193 return dir_add_image_helper(hctx, name, id, true);
3194 }
3195
3196 /**
3197 * Remove an image from the rbd directory.
3198 *
3199 * Input:
3200 * @param name the name of the image
3201 * @param id the id of the image
3202 *
3203 * Output:
3204 * @returns -ESTALE if the name and id do not map to each other
3205 * @returns 0 on success, negative error code on failure
3206 */
3207 int dir_remove_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3208 {
3209 string name, id;
3210 try {
3211 auto iter = in->cbegin();
3212 decode(name, iter);
3213 decode(id, iter);
3214 } catch (const buffer::error &err) {
3215 return -EINVAL;
3216 }
3217
3218 return dir_remove_image_helper(hctx, name, id);
3219 }
3220
3221 /**
3222 * Verify the current state of the directory
3223 *
3224 * Input:
3225 * @param state the DirectoryState of the directory
3226 *
3227 * Output:
3228 * @returns -ENOENT if the state does not match
3229 * @returns 0 on success, negative error code on failure
3230 */
3231 int dir_state_assert(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3232 {
3233 cls::rbd::DirectoryState directory_state = cls::rbd::DIRECTORY_STATE_READY;
3234 try {
3235 auto iter = in->cbegin();
3236 decode(directory_state, iter);
3237 } catch (const buffer::error &err) {
3238 return -EINVAL;
3239 }
3240
3241 cls::rbd::DirectoryState on_disk_directory_state = directory_state;
3242 int r = read_key(hctx, "state", &on_disk_directory_state);
3243 if (r < 0) {
3244 return r;
3245 }
3246
3247 if (directory_state != on_disk_directory_state) {
3248 return -ENOENT;
3249 }
3250 return 0;
3251 }
3252
3253 /**
3254 * Set the current state of the directory
3255 *
3256 * Input:
3257 * @param state the DirectoryState of the directory
3258 *
3259 * Output:
3260 * @returns -ENOENT if the state does not match
3261 * @returns 0 on success, negative error code on failure
3262 */
3263 int dir_state_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3264 {
3265 cls::rbd::DirectoryState directory_state;
3266 try {
3267 auto iter = in->cbegin();
3268 decode(directory_state, iter);
3269 } catch (const buffer::error &err) {
3270 return -EINVAL;
3271 }
3272
3273 int r = check_exists(hctx);
3274 if (r < 0 && r != -ENOENT) {
3275 return r;
3276 }
3277
3278 switch (directory_state) {
3279 case cls::rbd::DIRECTORY_STATE_READY:
3280 break;
3281 case cls::rbd::DIRECTORY_STATE_ADD_DISABLED:
3282 {
3283 if (r == -ENOENT) {
3284 return r;
3285 }
3286
3287 // verify that the directory is empty
3288 std::map<std::string, bufferlist> vals;
3289 bool more;
3290 r = cls_cxx_map_get_vals(hctx, RBD_DIR_NAME_KEY_PREFIX,
3291 RBD_DIR_NAME_KEY_PREFIX, 1, &vals, &more);
3292 if (r < 0) {
3293 return r;
3294 } else if (!vals.empty()) {
3295 return -EBUSY;
3296 }
3297 }
3298 break;
3299 default:
3300 return -EINVAL;
3301 }
3302
3303 r = write_key(hctx, "state", directory_state);
3304 if (r < 0) {
3305 return r;
3306 }
3307
3308 return 0;
3309 }
3310
3311 int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
3312 {
3313 uint64_t size;
3314 int r = cls_cxx_stat(hctx, &size, NULL);
3315 if (r < 0) {
3316 return r;
3317 }
3318 if (size == 0) {
3319 return -ENOENT;
3320 }
3321
3322 bufferlist bl;
3323 r = cls_cxx_read(hctx, 0, size, &bl);
3324 if (r < 0) {
3325 return r;
3326 }
3327
3328 try {
3329 auto iter = bl.cbegin();
3330 decode(object_map, iter);
3331 } catch (const buffer::error &err) {
3332 CLS_ERR("failed to decode object map: %s", err.what());
3333 return -EINVAL;
3334 }
3335 return 0;
3336 }
3337
3338 /**
3339 * Load an rbd image's object map
3340 *
3341 * Input:
3342 * none
3343 *
3344 * Output:
3345 * @param object map bit vector
3346 * @returns 0 on success, negative error code on failure
3347 */
3348 int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3349 {
3350 BitVector<2> object_map;
3351 int r = object_map_read(hctx, object_map);
3352 if (r < 0) {
3353 return r;
3354 }
3355
3356 object_map.set_crc_enabled(false);
3357 encode(object_map, *out);
3358 return 0;
3359 }
3360
3361 /**
3362 * Save an rbd image's object map
3363 *
3364 * Input:
3365 * @param object map bit vector
3366 *
3367 * Output:
3368 * @returns 0 on success, negative error code on failure
3369 */
3370 int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3371 {
3372 BitVector<2> object_map;
3373 try {
3374 auto iter = in->cbegin();
3375 decode(object_map, iter);
3376 } catch (const buffer::error &err) {
3377 return -EINVAL;
3378 }
3379
3380 object_map.set_crc_enabled(true);
3381
3382 bufferlist bl;
3383 encode(object_map, bl);
3384 CLS_LOG(20, "object_map_save: object size=%" PRIu64 ", byte size=%u",
3385 object_map.size(), bl.length());
3386 return cls_cxx_write_full(hctx, &bl);
3387 }
3388
3389 /**
3390 * Resize an rbd image's object map
3391 *
3392 * Input:
3393 * @param object_count the max number of objects in the image
3394 * @param default_state the default state of newly created objects
3395 *
3396 * Output:
3397 * @returns 0 on success, negative error code on failure
3398 */
3399 int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3400 {
3401 uint64_t object_count;
3402 uint8_t default_state;
3403 try {
3404 auto iter = in->cbegin();
3405 decode(object_count, iter);
3406 decode(default_state, iter);
3407 } catch (const buffer::error &err) {
3408 return -EINVAL;
3409 }
3410
3411 // protect against excessive memory requirements
3412 if (object_count > cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT) {
3413 CLS_ERR("object map too large: %" PRIu64, object_count);
3414 return -EINVAL;
3415 }
3416
3417 BitVector<2> object_map;
3418 int r = object_map_read(hctx, object_map);
3419 if ((r < 0) && (r != -ENOENT)) {
3420 return r;
3421 }
3422
3423 size_t orig_object_map_size = object_map.size();
3424 if (object_count < orig_object_map_size) {
3425 auto it = object_map.begin() + object_count;
3426 auto end_it = object_map.end() ;
3427 uint64_t i = object_count;
3428 for (; it != end_it; ++it, ++i) {
3429 if (*it != default_state) {
3430 CLS_ERR("object map indicates object still exists: %" PRIu64, i);
3431 return -ESTALE;
3432 }
3433 }
3434 object_map.resize(object_count);
3435 } else if (object_count > orig_object_map_size) {
3436 object_map.resize(object_count);
3437 auto it = object_map.begin() + orig_object_map_size;
3438 auto end_it = object_map.end();
3439 for (; it != end_it; ++it) {
3440 *it = default_state;
3441 }
3442 }
3443
3444 bufferlist map;
3445 encode(object_map, map);
3446 CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
3447 object_count, map.length());
3448 return cls_cxx_write_full(hctx, &map);
3449 }
3450
3451 /**
3452 * Update an rbd image's object map
3453 *
3454 * Input:
3455 * @param start_object_no the start object iterator
3456 * @param end_object_no the end object iterator
3457 * @param new_object_state the new object state
3458 * @param current_object_state optional current object state filter
3459 *
3460 * Output:
3461 * @returns 0 on success, negative error code on failure
3462 */
3463 int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3464 {
3465 uint64_t start_object_no;
3466 uint64_t end_object_no;
3467 uint8_t new_object_state;
3468 boost::optional<uint8_t> current_object_state;
3469 try {
3470 auto iter = in->cbegin();
3471 decode(start_object_no, iter);
3472 decode(end_object_no, iter);
3473 decode(new_object_state, iter);
3474 decode(current_object_state, iter);
3475 } catch (const buffer::error &err) {
3476 CLS_ERR("failed to decode message");
3477 return -EINVAL;
3478 }
3479
3480 uint64_t size;
3481 int r = cls_cxx_stat(hctx, &size, NULL);
3482 if (r < 0) {
3483 return r;
3484 }
3485
3486 BitVector<2> object_map;
3487 bufferlist header_bl;
3488 r = cls_cxx_read2(hctx, 0, object_map.get_header_length(), &header_bl,
3489 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
3490 if (r < 0) {
3491 CLS_ERR("object map header read failed");
3492 return r;
3493 }
3494
3495 try {
3496 auto it = header_bl.cbegin();
3497 object_map.decode_header(it);
3498 } catch (const buffer::error &err) {
3499 CLS_ERR("failed to decode object map header: %s", err.what());
3500 return -EINVAL;
3501 }
3502
3503 uint64_t object_byte_offset;
3504 uint64_t byte_length;
3505 object_map.get_header_crc_extents(&object_byte_offset, &byte_length);
3506
3507 bufferlist footer_bl;
3508 r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
3509 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
3510 if (r < 0) {
3511 CLS_ERR("object map footer read header CRC failed");
3512 return r;
3513 }
3514
3515 try {
3516 auto it = footer_bl.cbegin();
3517 object_map.decode_header_crc(it);
3518 } catch (const buffer::error &err) {
3519 CLS_ERR("failed to decode object map header CRC: %s", err.what());
3520 }
3521
3522 if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
3523 return -ERANGE;
3524 }
3525
3526 uint64_t object_count = end_object_no - start_object_no;
3527 object_map.get_data_crcs_extents(start_object_no, object_count,
3528 &object_byte_offset, &byte_length);
3529 const auto footer_object_offset = object_byte_offset;
3530
3531 footer_bl.clear();
3532 r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
3533 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
3534 if (r < 0) {
3535 CLS_ERR("object map footer read data CRCs failed");
3536 return r;
3537 }
3538
3539 try {
3540 auto it = footer_bl.cbegin();
3541 object_map.decode_data_crcs(it, start_object_no);
3542 } catch (const buffer::error &err) {
3543 CLS_ERR("failed to decode object map data CRCs: %s", err.what());
3544 }
3545
3546 uint64_t data_byte_offset;
3547 object_map.get_data_extents(start_object_no, object_count,
3548 &data_byte_offset, &object_byte_offset,
3549 &byte_length);
3550
3551 bufferlist data_bl;
3552 r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &data_bl,
3553 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
3554 if (r < 0) {
3555 CLS_ERR("object map data read failed");
3556 return r;
3557 }
3558
3559 try {
3560 auto it = data_bl.cbegin();
3561 object_map.decode_data(it, data_byte_offset);
3562 } catch (const buffer::error &err) {
3563 CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
3564 data_byte_offset, err.what());
3565 return -EINVAL;
3566 }
3567
3568 bool updated = false;
3569 auto it = object_map.begin() + start_object_no;
3570 auto end_it = object_map.begin() + end_object_no;
3571 for (; it != end_it; ++it) {
3572 uint8_t state = *it;
3573 if ((!current_object_state || state == *current_object_state ||
3574 (*current_object_state == OBJECT_EXISTS &&
3575 state == OBJECT_EXISTS_CLEAN)) && state != new_object_state) {
3576 *it = new_object_state;
3577 updated = true;
3578 }
3579 }
3580
3581 if (updated) {
3582 CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
3583 data_byte_offset, byte_length, object_byte_offset);
3584
3585 bufferlist data_bl;
3586 object_map.encode_data(data_bl, data_byte_offset, byte_length);
3587 r = cls_cxx_write2(hctx, object_byte_offset, data_bl.length(), &data_bl,
3588 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
3589 if (r < 0) {
3590 CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
3591 return r;
3592 }
3593
3594 footer_bl.clear();
3595 object_map.encode_data_crcs(footer_bl, start_object_no, object_count);
3596 r = cls_cxx_write2(hctx, footer_object_offset, footer_bl.length(),
3597 &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
3598 if (r < 0) {
3599 CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
3600 return r;
3601 }
3602 } else {
3603 CLS_LOG(20, "object_map_update: no update necessary");
3604 }
3605
3606 return 0;
3607 }
3608
3609 /**
3610 * Mark all _EXISTS objects as _EXISTS_CLEAN so future writes to the
3611 * image HEAD can be tracked.
3612 *
3613 * Input:
3614 * none
3615 *
3616 * Output:
3617 * @returns 0 on success, negative error code on failure
3618 */
3619 int object_map_snap_add(cls_method_context_t hctx, bufferlist *in,
3620 bufferlist *out)
3621 {
3622 BitVector<2> object_map;
3623 int r = object_map_read(hctx, object_map);
3624 if (r < 0) {
3625 return r;
3626 }
3627
3628 bool updated = false;
3629 auto it = object_map.begin();
3630 auto end_it = object_map.end();
3631 for (; it != end_it; ++it) {
3632 if (*it == OBJECT_EXISTS) {
3633 *it = OBJECT_EXISTS_CLEAN;
3634 updated = true;
3635 }
3636 }
3637
3638 if (updated) {
3639 bufferlist bl;
3640 encode(object_map, bl);
3641 r = cls_cxx_write_full(hctx, &bl);
3642 }
3643 return r;
3644 }
3645
3646 /**
3647 * Mark all _EXISTS_CLEAN objects as _EXISTS in the current object map
3648 * if the provided snapshot object map object is marked as _EXISTS.
3649 *
3650 * Input:
3651 * @param snapshot object map bit vector
3652 *
3653 * Output:
3654 * @returns 0 on success, negative error code on failure
3655 */
3656 int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
3657 bufferlist *out)
3658 {
3659 BitVector<2> src_object_map;
3660 try {
3661 auto iter = in->cbegin();
3662 decode(src_object_map, iter);
3663 } catch (const buffer::error &err) {
3664 return -EINVAL;
3665 }
3666
3667 BitVector<2> dst_object_map;
3668 int r = object_map_read(hctx, dst_object_map);
3669 if (r < 0) {
3670 return r;
3671 }
3672
3673 bool updated = false;
3674 auto src_it = src_object_map.begin();
3675 auto dst_it = dst_object_map.begin();
3676 auto dst_it_end = dst_object_map.end();
3677 uint64_t i = 0;
3678 for (; dst_it != dst_it_end; ++dst_it) {
3679 if (*dst_it == OBJECT_EXISTS_CLEAN &&
3680 (i >= src_object_map.size() || *src_it == OBJECT_EXISTS)) {
3681 *dst_it = OBJECT_EXISTS;
3682 updated = true;
3683 }
3684 if (i < src_object_map.size())
3685 ++src_it;
3686 ++i;
3687 }
3688
3689 if (updated) {
3690 bufferlist bl;
3691 encode(dst_object_map, bl);
3692 r = cls_cxx_write_full(hctx, &bl);
3693 }
3694 return r;
3695 }
3696
3697 static const string metadata_key_for_name(const string &name)
3698 {
3699 return RBD_METADATA_KEY_PREFIX + name;
3700 }
3701
3702 static const string metadata_name_from_key(const string &key)
3703 {
3704 return key.substr(strlen(RBD_METADATA_KEY_PREFIX));
3705 }
3706
3707 /**
3708 * Input:
3709 * @param start_after which name to begin listing after
3710 * (use the empty string to start at the beginning)
3711 * @param max_return the maximum number of names to list
3712
3713 * Output:
3714 * @param value
3715 * @returns 0 on success, negative error code on failure
3716 */
3717 int metadata_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3718 {
3719 string start_after;
3720 uint64_t max_return;
3721
3722 try {
3723 auto iter = in->cbegin();
3724 decode(start_after, iter);
3725 decode(max_return, iter);
3726 } catch (const buffer::error &err) {
3727 return -EINVAL;
3728 }
3729
3730 // TODO remove implicit support for zero during the N-release
3731 if (max_return == 0) {
3732 max_return = RBD_MAX_KEYS_READ;
3733 }
3734
3735 map<string, bufferlist> data;
3736 string last_read = metadata_key_for_name(start_after);
3737 bool more = true;
3738
3739 while (more && data.size() < max_return) {
3740 map<string, bufferlist> raw_data;
3741 int max_read = std::min<uint64_t>(RBD_MAX_KEYS_READ, max_return - data.size());
3742 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
3743 max_read, &raw_data, &more);
3744 if (r < 0) {
3745 if (r != -ENOENT) {
3746 CLS_ERR("failed to read the vals off of disk: %s",
3747 cpp_strerror(r).c_str());
3748 }
3749 return r;
3750 }
3751
3752 for (auto& kv : raw_data) {
3753 data[metadata_name_from_key(kv.first)].swap(kv.second);
3754 }
3755
3756 if (!raw_data.empty()) {
3757 last_read = raw_data.rbegin()->first;
3758 }
3759 }
3760
3761 encode(data, *out);
3762 return 0;
3763 }
3764
3765 /**
3766 * Input:
3767 * @param data <map(key, value)>
3768 *
3769 * Output:
3770 * @returns 0 on success, negative error code on failure
3771 */
3772 int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3773 {
3774 map<string, bufferlist> data, raw_data;
3775
3776 auto iter = in->cbegin();
3777 try {
3778 decode(data, iter);
3779 } catch (const buffer::error &err) {
3780 return -EINVAL;
3781 }
3782
3783 for (map<string, bufferlist>::iterator it = data.begin();
3784 it != data.end(); ++it) {
3785 CLS_LOG(20, "metadata_set key=%s value=%.*s", it->first.c_str(),
3786 it->second.length(), it->second.c_str());
3787 raw_data[metadata_key_for_name(it->first)].swap(it->second);
3788 }
3789 int r = cls_cxx_map_set_vals(hctx, &raw_data);
3790 if (r < 0) {
3791 CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
3792 return r;
3793 }
3794
3795 return 0;
3796 }
3797
3798 /**
3799 * Input:
3800 * @param key
3801 *
3802 * Output:
3803 * @returns 0 on success, negative error code on failure
3804 */
3805 int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3806 {
3807 string key;
3808
3809 auto iter = in->cbegin();
3810 try {
3811 decode(key, iter);
3812 } catch (const buffer::error &err) {
3813 return -EINVAL;
3814 }
3815
3816 CLS_LOG(20, "metadata_remove key=%s", key.c_str());
3817
3818 int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
3819 if (r < 0) {
3820 CLS_ERR("error removing metadata: %s", cpp_strerror(r).c_str());
3821 return r;
3822 }
3823
3824 return 0;
3825 }
3826
3827 /**
3828 * Input:
3829 * @param key
3830 *
3831 * Output:
3832 * @param metadata value associated with the key
3833 * @returns 0 on success, negative error code on failure
3834 */
3835 int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3836 {
3837 string key;
3838 bufferlist value;
3839
3840 auto iter = in->cbegin();
3841 try {
3842 decode(key, iter);
3843 } catch (const buffer::error &err) {
3844 return -EINVAL;
3845 }
3846
3847 CLS_LOG(20, "metadata_get key=%s", key.c_str());
3848
3849 int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
3850 if (r < 0) {
3851 if (r != -ENOENT)
3852 CLS_ERR("error getting metadata: %s", cpp_strerror(r).c_str());
3853 return r;
3854 }
3855
3856 encode(value, *out);
3857 return 0;
3858 }
3859
3860 int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
3861 bufferlist *out)
3862 {
3863 uint64_t snap_limit;
3864 int r = read_key(hctx, "snap_limit", &snap_limit);
3865 if (r == -ENOENT) {
3866 snap_limit = UINT64_MAX;
3867 } else if (r < 0) {
3868 CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
3869 return r;
3870 }
3871
3872 CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
3873 encode(snap_limit, *out);
3874
3875 return 0;
3876 }
3877
3878 int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
3879 bufferlist *out)
3880 {
3881 int rc;
3882 uint64_t new_limit;
3883 bufferlist bl;
3884 size_t snap_count = 0;
3885
3886 try {
3887 auto iter = in->cbegin();
3888 decode(new_limit, iter);
3889 } catch (const buffer::error &err) {
3890 return -EINVAL;
3891 }
3892
3893 if (new_limit == UINT64_MAX) {
3894 CLS_LOG(20, "remove snapshot limit\n");
3895 rc = cls_cxx_map_remove_key(hctx, "snap_limit");
3896 return rc;
3897 }
3898
3899 //try to read header as v1 format
3900 rc = snap_read_header(hctx, bl);
3901
3902 // error when reading header
3903 if (rc < 0 && rc != -EINVAL) {
3904 return rc;
3905 } else if (rc >= 0) {
3906 // success, the image is v1 format
3907 struct rbd_obj_header_ondisk *header;
3908 header = (struct rbd_obj_header_ondisk *)bl.c_str();
3909 snap_count = header->snap_count;
3910 } else {
3911 // else, the image is v2 format
3912 int max_read = RBD_MAX_KEYS_READ;
3913 string last_read = RBD_SNAP_KEY_PREFIX;
3914 bool more;
3915
3916 do {
3917 set<string> keys;
3918 rc = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
3919 if (rc < 0) {
3920 CLS_ERR("error retrieving snapshots: %s", cpp_strerror(rc).c_str());
3921 return rc;
3922 }
3923 for (auto& key : keys) {
3924 if (key.find(RBD_SNAP_KEY_PREFIX) != 0)
3925 break;
3926 snap_count++;
3927 }
3928 if (!keys.empty())
3929 last_read = *(keys.rbegin());
3930 } while (more);
3931 }
3932
3933 if (new_limit < snap_count) {
3934 rc = -ERANGE;
3935 CLS_LOG(10, "snapshot limit is less than the number of snapshots.\n");
3936 } else {
3937 CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
3938 bl.clear();
3939 encode(new_limit, bl);
3940 rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
3941 }
3942
3943 return rc;
3944 }
3945
3946
3947 /**
3948 * Input:
3949 * @param snap id (uint64_t) parent snapshot id
3950 * @param child spec (cls::rbd::ChildImageSpec) child image
3951 *
3952 * Output:
3953 * @returns 0 on success, negative error code on failure
3954 */
3955 int child_attach(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3956 {
3957 uint64_t snap_id;
3958 cls::rbd::ChildImageSpec child_image;
3959 try {
3960 auto it = in->cbegin();
3961 decode(snap_id, it);
3962 decode(child_image, it);
3963 } catch (const buffer::error &err) {
3964 return -EINVAL;
3965 }
3966
3967 CLS_LOG(20, "child_attach snap_id=%" PRIu64 ", child_pool_id=%" PRIi64 ", "
3968 "child_image_id=%s", snap_id, child_image.pool_id,
3969 child_image.image_id.c_str());
3970
3971 cls_rbd_snap snap;
3972 std::string snapshot_key;
3973 key_from_snap_id(snap_id, &snapshot_key);
3974 int r = read_key(hctx, snapshot_key, &snap);
3975 if (r < 0) {
3976 return r;
3977 }
3978
3979 if (cls::rbd::get_snap_namespace_type(snap.snapshot_namespace) ==
3980 cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
3981 // cannot attach to a deleted snapshot
3982 return -ENOENT;
3983 }
3984
3985 auto children_key = image::snap_children_key_from_snap_id(snap_id);
3986 cls::rbd::ChildImageSpecs child_images;
3987 r = read_key(hctx, children_key, &child_images);
3988 if (r < 0 && r != -ENOENT) {
3989 CLS_ERR("error reading snapshot children: %s", cpp_strerror(r).c_str());
3990 return r;
3991 }
3992
3993 auto it = child_images.insert(child_image);
3994 if (!it.second) {
3995 // child already attached to the snapshot
3996 return -EEXIST;
3997 }
3998
3999 r = write_key(hctx, children_key, child_images);
4000 if (r < 0) {
4001 CLS_ERR("error writing snapshot children: %s", cpp_strerror(r).c_str());
4002 return r;
4003 }
4004
4005 ++snap.child_count;
4006 r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
4007 if (r < 0) {
4008 return r;
4009 }
4010
4011 r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_CLONE_PARENT,
4012 RBD_OPERATION_FEATURE_CLONE_PARENT);
4013 if (r < 0) {
4014 return r;
4015 }
4016
4017 return 0;
4018 }
4019
4020 /**
4021 * Input:
4022 * @param snap id (uint64_t) parent snapshot id
4023 * @param child spec (cls::rbd::ChildImageSpec) child image
4024 *
4025 * Output:
4026 * @returns 0 on success, negative error code on failure
4027 */
4028 int child_detach(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4029 {
4030 uint64_t snap_id;
4031 cls::rbd::ChildImageSpec child_image;
4032 try {
4033 auto it = in->cbegin();
4034 decode(snap_id, it);
4035 decode(child_image, it);
4036 } catch (const buffer::error &err) {
4037 return -EINVAL;
4038 }
4039
4040 CLS_LOG(20, "child_detach snap_id=%" PRIu64 ", child_pool_id=%" PRIi64 ", "
4041 "child_image_id=%s", snap_id, child_image.pool_id,
4042 child_image.image_id.c_str());
4043
4044 cls_rbd_snap snap;
4045 std::string snapshot_key;
4046 key_from_snap_id(snap_id, &snapshot_key);
4047 int r = read_key(hctx, snapshot_key, &snap);
4048 if (r < 0) {
4049 return r;
4050 }
4051
4052 auto children_key = image::snap_children_key_from_snap_id(snap_id);
4053 cls::rbd::ChildImageSpecs child_images;
4054 r = read_key(hctx, children_key, &child_images);
4055 if (r < 0 && r != -ENOENT) {
4056 CLS_ERR("error reading snapshot children: %s", cpp_strerror(r).c_str());
4057 return r;
4058 }
4059
4060 if (snap.child_count != child_images.size()) {
4061 // children and reference count don't match
4062 CLS_ERR("children reference count mismatch: %" PRIu64, snap_id);
4063 return -EINVAL;
4064 }
4065
4066 if (child_images.erase(child_image) == 0) {
4067 // child not attached to the snapshot
4068 return -ENOENT;
4069 }
4070
4071 if (child_images.empty()) {
(2) Event returned_value: |
Assigning value from "remove_key(hctx, children_key)" to "r" here, but that stored value is overwritten before it can be used. |
Also see events: |
[value_overwrite] |
4072 r = remove_key(hctx, children_key);
4073 } else {
4074 r = write_key(hctx, children_key, child_images);
4075 if (r < 0) {
4076 CLS_ERR("error writing snapshot children: %s", cpp_strerror(r).c_str());
4077 return r;
4078 }
4079 }
4080
4081 --snap.child_count;
(1) Event value_overwrite: |
Overwriting previous write to "r" with value from "image::snapshot::write(hctx, snapshot_key, std::move(snap))". |
Also see events: |
[returned_value] |
4082 r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
4083 if (r < 0) {
4084 return r;
4085 }
4086
4087 if (snap.child_count == 0) {
4088 auto clone_in_use_lambda = [snap_id](const cls_rbd_snap& snap_meta) {
4089 if (snap_meta.id != snap_id && snap_meta.child_count > 0) {
4090 return -EEXIST;
4091 }
4092 return 0;
4093 };
4094
4095 r = image::snapshot::iterate(hctx, clone_in_use_lambda);
4096 if (r < 0 && r != -EEXIST) {
4097 return r;
4098 }
4099
4100 if (r != -EEXIST) {
4101 // remove the clone_v2 op feature if not in-use by any other snapshots
4102 r = image::set_op_features(hctx, 0, RBD_OPERATION_FEATURE_CLONE_PARENT);
4103 if (r < 0) {
4104 return r;
4105 }
4106 }
4107 }
4108
4109 return 0;
4110 }
4111
4112 /**
4113 * Input:
4114 * @param snap id (uint64_t) parent snapshot id
4115 *
4116 * Output:
4117 * @param (cls::rbd::ChildImageSpecs) child images
4118 * @returns 0 on success, negative error code on failure
4119 */
4120 int children_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4121 {
4122 uint64_t snap_id;
4123 try {
4124 auto it = in->cbegin();
4125 decode(snap_id, it);
4126 } catch (const buffer::error &err) {
4127 return -EINVAL;
4128 }
4129
4130 CLS_LOG(20, "child_detach snap_id=%" PRIu64, snap_id);
4131
4132 cls_rbd_snap snap;
4133 std::string snapshot_key;
4134 key_from_snap_id(snap_id, &snapshot_key);
4135 int r = read_key(hctx, snapshot_key, &snap);
4136 if (r < 0) {
4137 return r;
4138 }
4139
4140 auto children_key = image::snap_children_key_from_snap_id(snap_id);
4141 cls::rbd::ChildImageSpecs child_images;
4142 r = read_key(hctx, children_key, &child_images);
4143 if (r == -ENOENT) {
4144 return r;
4145 } else if (r < 0) {
4146 CLS_ERR("error reading snapshot children: %s", cpp_strerror(r).c_str());
4147 return r;
4148 }
4149
4150 encode(child_images, *out);
4151 return 0;
4152 }
4153
4154 /**
4155 * Set image migration.
4156 *
4157 * Input:
4158 * @param migration_spec (cls::rbd::MigrationSpec) image migration spec
4159 *
4160 * Output:
4161 *
4162 * @returns 0 on success, negative error code on failure
4163 */
4164 int migration_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
4165 cls::rbd::MigrationSpec migration_spec;
4166 try {
4167 auto it = in->cbegin();
4168 decode(migration_spec, it);
4169 } catch (const buffer::error &err) {
4170 return -EINVAL;
4171 }
4172
4173 int r = image::set_migration(hctx, migration_spec, true);
4174 if (r < 0) {
4175 return r;
4176 }
4177
4178 return 0;
4179 }
4180
4181 /**
4182 * Set image migration state.
4183 *
4184 * Input:
4185 * @param state (cls::rbd::MigrationState) migration state
4186 * @param description (std::string) migration state description
4187 *
4188 * Output:
4189 *
4190 * @returns 0 on success, negative error code on failure
4191 */
4192 int migration_set_state(cls_method_context_t hctx, bufferlist *in,
4193 bufferlist *out) {
4194 cls::rbd::MigrationState state;
4195 std::string description;
4196 try {
4197 auto it = in->cbegin();
4198 decode(state, it);
4199 decode(description, it);
4200 } catch (const buffer::error &err) {
4201 return -EINVAL;
4202 }
4203
4204 cls::rbd::MigrationSpec migration_spec;
4205 int r = image::read_migration(hctx, &migration_spec);
4206 if (r < 0) {
4207 return r;
4208 }
4209
4210 migration_spec.state = state;
4211 migration_spec.state_description = description;
4212
4213 r = image::set_migration(hctx, migration_spec, false);
4214 if (r < 0) {
4215 return r;
4216 }
4217
4218 return 0;
4219 }
4220
4221 /**
4222 * Get image migration spec.
4223 *
4224 * Input:
4225 *
4226 * Output:
4227 * @param migration_spec (cls::rbd::MigrationSpec) image migration spec
4228 *
4229 * @returns 0 on success, negative error code on failure
4230 */
4231 int migration_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
4232 cls::rbd::MigrationSpec migration_spec;
4233 int r = image::read_migration(hctx, &migration_spec);
4234 if (r < 0) {
4235 return r;
4236 }
4237
4238 encode(migration_spec, *out);
4239
4240 return 0;
4241 }
4242
4243 /**
4244 * Remove image migration spec.
4245 *
4246 * Input:
4247 *
4248 * Output:
4249 *
4250 * @returns 0 on success, negative error code on failure
4251 */
4252 int migration_remove(cls_method_context_t hctx, bufferlist *in,
4253 bufferlist *out) {
4254 int r = image::remove_migration(hctx);
4255 if (r < 0) {
4256 return r;
4257 }
4258
4259 return 0;
4260 }
4261
4262 /**
4263 * Ensure writer snapc state
4264 *
4265 * Input:
4266 * @param snap id (uint64_t) snap context sequence id
4267 * @param state (cls::rbd::AssertSnapcSeqState) snap context state
4268 *
4269 * Output:
4270 * @returns -ERANGE if assertion fails
4271 * @returns 0 on success, negative error code on failure
4272 */
4273 int assert_snapc_seq(cls_method_context_t hctx, bufferlist *in,
4274 bufferlist *out)
4275 {
4276 uint64_t snapc_seq;
4277 cls::rbd::AssertSnapcSeqState state;
4278 try {
4279 auto it = in->cbegin();
4280 decode(snapc_seq, it);
4281 decode(state, it);
4282 } catch (const buffer::error &err) {
4283 return -EINVAL;
4284 }
4285
4286 uint64_t snapset_seq;
4287 int r = cls_get_snapset_seq(hctx, &snapset_seq);
4288 if (r < 0 && r != -ENOENT) {
4289 return r;
4290 }
4291
4292 switch (state) {
4293 case cls::rbd::ASSERT_SNAPC_SEQ_GT_SNAPSET_SEQ:
4294 return (r == -ENOENT || snapc_seq > snapset_seq) ? 0 : -ERANGE;
4295 case cls::rbd::ASSERT_SNAPC_SEQ_LE_SNAPSET_SEQ:
4296 return (r == -ENOENT || snapc_seq > snapset_seq) ? -ERANGE : 0;
4297 default:
4298 return -EOPNOTSUPP;
4299 }
4300 }
4301
4302 /****************************** Old format *******************************/
4303
4304 int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4305 {
4306 bufferlist bl;
4307 struct rbd_obj_header_ondisk *header;
4308 int rc = snap_read_header(hctx, bl);
4309 if (rc < 0)
4310 return rc;
4311
4312 header = (struct rbd_obj_header_ondisk *)bl.c_str();
4313 bufferptr p(header->snap_names_len);
4314 char *buf = (char *)header;
4315 char *name = buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk);
4316 char *end = name + header->snap_names_len;
4317 memcpy(p.c_str(),
4318 buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk),
4319 header->snap_names_len);
4320
4321 encode(header->snap_seq, *out);
4322 encode(header->snap_count, *out);
4323
4324 for (unsigned i = 0; i < header->snap_count; i++) {
4325 string s = name;
4326 encode(header->snaps[i].id, *out);
4327 encode(header->snaps[i].image_size, *out);
4328 encode(s, *out);
4329
4330 name += strlen(name) + 1;
4331 if (name > end)
4332 return -EIO;
4333 }
4334
4335 return 0;
4336 }
4337
4338 int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4339 {
4340 bufferlist bl;
4341 struct rbd_obj_header_ondisk *header;
4342 bufferlist newbl;
4343 bufferptr header_bp(sizeof(*header));
4344 struct rbd_obj_snap_ondisk *new_snaps;
4345
4346 int rc = snap_read_header(hctx, bl);
4347 if (rc < 0)
4348 return rc;
4349
4350 header = (struct rbd_obj_header_ondisk *)bl.c_str();
4351
4352 int snaps_id_ofs = sizeof(*header);
4353 int names_ofs = snaps_id_ofs + sizeof(*new_snaps) * header->snap_count;
4354 const char *snap_name;
4355 const char *snap_names = ((char *)header) + names_ofs;
4356 const char *end = snap_names + header->snap_names_len;
4357 auto iter = in->cbegin();
4358 string s;
4359 uint64_t snap_id;
4360
4361 try {
4362 decode(s, iter);
4363 decode(snap_id, iter);
4364 } catch (const buffer::error &err) {
4365 return -EINVAL;
4366 }
4367 snap_name = s.c_str();
4368
4369 if (header->snap_seq > snap_id)
4370 return -ESTALE;
4371
4372 uint64_t snap_limit;
4373 rc = read_key(hctx, "snap_limit", &snap_limit);
4374 if (rc == -ENOENT) {
4375 snap_limit = UINT64_MAX;
4376 } else if (rc < 0) {
4377 return rc;
4378 }
4379
4380 if (header->snap_count >= snap_limit)
4381 return -EDQUOT;
4382
4383 const char *cur_snap_name;
4384 for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
4385 if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
4386 return -EEXIST;
4387 }
4388 if (cur_snap_name > end)
4389 return -EIO;
4390
4391 int snap_name_len = strlen(snap_name);
4392
4393 bufferptr new_names_bp(header->snap_names_len + snap_name_len + 1);
4394 bufferptr new_snaps_bp(sizeof(*new_snaps) * (header->snap_count + 1));
4395
4396 /* copy snap names and append to new snap name */
4397 char *new_snap_names = new_names_bp.c_str();
4398 strcpy(new_snap_names, snap_name);
4399 memcpy(new_snap_names + snap_name_len + 1, snap_names, header->snap_names_len);
4400
4401 /* append new snap id */
4402 new_snaps = (struct rbd_obj_snap_ondisk *)new_snaps_bp.c_str();
4403 memcpy(new_snaps + 1, header->snaps, sizeof(*new_snaps) * header->snap_count);
4404
4405 header->snap_count = header->snap_count + 1;
4406 header->snap_names_len = header->snap_names_len + snap_name_len + 1;
4407 header->snap_seq = snap_id;
4408
4409 new_snaps[0].id = snap_id;
4410 new_snaps[0].image_size = header->image_size;
4411
4412 memcpy(header_bp.c_str(), header, sizeof(*header));
4413
4414 newbl.push_back(header_bp);
4415 newbl.push_back(new_snaps_bp);
4416 newbl.push_back(new_names_bp);
4417
4418 rc = cls_cxx_write_full(hctx, &newbl);
4419 if (rc < 0)
4420 return rc;
4421
4422 return 0;
4423 }
4424
4425 int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4426 {
4427 bufferlist bl;
4428 struct rbd_obj_header_ondisk *header;
4429 bufferlist newbl;
4430 bufferptr header_bp(sizeof(*header));
4431
4432 int rc = snap_read_header(hctx, bl);
4433 if (rc < 0)
4434 return rc;
4435
4436 header = (struct rbd_obj_header_ondisk *)bl.c_str();
4437
4438 int snaps_id_ofs = sizeof(*header);
4439 int names_ofs = snaps_id_ofs + sizeof(struct rbd_obj_snap_ondisk) * header->snap_count;
4440 const char *snap_name;
4441 const char *snap_names = ((char *)header) + names_ofs;
4442 const char *orig_names = snap_names;
4443 const char *end = snap_names + header->snap_names_len;
4444 auto iter = in->cbegin();
4445 string s;
4446 unsigned i;
4447 bool found = false;
4448 struct rbd_obj_snap_ondisk snap;
4449
4450 try {
4451 decode(s, iter);
4452 } catch (const buffer::error &err) {
4453 return -EINVAL;
4454 }
4455 snap_name = s.c_str();
4456
4457 for (i = 0; snap_names < end; i++) {
4458 if (strcmp(snap_names, snap_name) == 0) {
4459 snap = header->snaps[i];
4460 found = true;
4461 break;
4462 }
4463 snap_names += strlen(snap_names) + 1;
4464 }
4465 if (!found) {
4466 CLS_ERR("couldn't find snap %s\n", snap_name);
4467 return -ENOENT;
4468 }
4469
4470 header->snap_names_len = header->snap_names_len - (s.length() + 1);
4471 header->snap_count = header->snap_count - 1;
4472
4473 bufferptr new_names_bp(header->snap_names_len);
4474 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
4475
4476 memcpy(header_bp.c_str(), header, sizeof(*header));
4477 newbl.push_back(header_bp);
4478
4479 if (header->snap_count) {
4480 int snaps_len = 0;
4481 int names_len = 0;
4482 CLS_LOG(20, "i=%u\n", i);
4483 if (i > 0) {
4484 snaps_len = sizeof(header->snaps[0]) * i;
4485 names_len = snap_names - orig_names;
4486 memcpy(new_snaps_bp.c_str(), header->snaps, snaps_len);
4487 memcpy(new_names_bp.c_str(), orig_names, names_len);
4488 }
4489 snap_names += s.length() + 1;
4490
4491 if (i < header->snap_count) {
4492 memcpy(new_snaps_bp.c_str() + snaps_len,
4493 header->snaps + i + 1,
4494 sizeof(header->snaps[0]) * (header->snap_count - i));
4495 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
4496 }
4497 newbl.push_back(new_snaps_bp);
4498 newbl.push_back(new_names_bp);
4499 }
4500
4501 rc = cls_cxx_write_full(hctx, &newbl);
4502 if (rc < 0)
4503 return rc;
4504
4505 return 0;
4506 }
4507
4508 /**
4509 * rename snapshot of old format.
4510 *
4511 * Input:
4512 * @param src_snap_id old snap id of the snapshot (snapid_t)
4513 * @param dst_snap_name new name of the snapshot (string)
4514 *
4515 * Output:
4516 * @returns 0 on success, negative error code on failure.
4517 */
4518 int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4519 {
4520 bufferlist bl;
4521 struct rbd_obj_header_ondisk *header;
4522 bufferlist newbl;
4523 bufferptr header_bp(sizeof(*header));
4524 snapid_t src_snap_id;
4525 const char *dst_snap_name;
4526 string dst;
4527
4528 int rc = snap_read_header(hctx, bl);
4529 if (rc < 0)
4530 return rc;
4531
4532 header = (struct rbd_obj_header_ondisk *)bl.c_str();
4533
4534 int snaps_id_ofs = sizeof(*header);
4535 int names_ofs = snaps_id_ofs + sizeof(rbd_obj_snap_ondisk) * header->snap_count;
4536 const char *snap_names = ((char *)header) + names_ofs;
4537 const char *orig_names = snap_names;
4538 const char *end = snap_names + header->snap_names_len;
4539 auto iter = in->cbegin();
4540 unsigned i;
4541 bool found = false;
4542
4543 try {
4544 decode(src_snap_id, iter);
4545 decode(dst, iter);
4546 } catch (const buffer::error &err) {
4547 return -EINVAL;
4548 }
4549 dst_snap_name = dst.c_str();
4550
4551 const char *cur_snap_name;
4552 for (cur_snap_name = snap_names; cur_snap_name < end;
4553 cur_snap_name += strlen(cur_snap_name) + 1) {
4554 if (strcmp(cur_snap_name, dst_snap_name) == 0)
4555 return -EEXIST;
4556 }
4557 if (cur_snap_name > end)
4558 return -EIO;
4559 for (i = 0; i < header->snap_count; i++) {
4560 if (src_snap_id == header->snaps[i].id) {
4561 found = true;
4562 break;
4563 }
4564 snap_names += strlen(snap_names) + 1;
4565 }
4566 if (!found) {
4567 CLS_ERR("couldn't find snap %llu\n", (unsigned long long)src_snap_id.val);
4568 return -ENOENT;
4569 }
4570
4571 CLS_LOG(20, "rename snap with snap id %llu to dest name %s", (unsigned long long)src_snap_id.val, dst_snap_name);
4572 header->snap_names_len = header->snap_names_len - strlen(snap_names) + dst.length();
4573
4574 bufferptr new_names_bp(header->snap_names_len);
4575 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
4576
4577 if (header->snap_count) {
4578 int names_len = 0;
4579 CLS_LOG(20, "i=%u\n", i);
4580 if (i > 0) {
4581 names_len = snap_names - orig_names;
4582 memcpy(new_names_bp.c_str(), orig_names, names_len);
4583 }
4584 strcpy(new_names_bp.c_str() + names_len, dst_snap_name);
4585 names_len += strlen(dst_snap_name) + 1;
4586 snap_names += strlen(snap_names) + 1;
4587 if (i < header->snap_count) {
4588 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
4589 }
4590 memcpy(new_snaps_bp.c_str(), header->snaps, sizeof(header->snaps[0]) * header->snap_count);
4591 }
4592
4593 memcpy(header_bp.c_str(), header, sizeof(*header));
4594 newbl.push_back(header_bp);
4595 newbl.push_back(new_snaps_bp);
4596 newbl.push_back(new_names_bp);
4597
4598 rc = cls_cxx_write_full(hctx, &newbl);
4599 if (rc < 0)
4600 return rc;
4601 return 0;
4602 }
4603
4604
4605 namespace mirror {
4606
4607 static const std::string UUID("mirror_uuid");
4608 static const std::string MODE("mirror_mode");
4609 static const std::string PEER_KEY_PREFIX("mirror_peer_");
4610 static const std::string IMAGE_KEY_PREFIX("image_");
4611 static const std::string GLOBAL_KEY_PREFIX("global_");
4612 static const std::string STATUS_GLOBAL_KEY_PREFIX("status_global_");
4613 static const std::string INSTANCE_KEY_PREFIX("instance_");
4614 static const std::string MIRROR_IMAGE_MAP_KEY_PREFIX("image_map_");
4615
4616 std::string peer_key(const std::string &uuid) {
4617 return PEER_KEY_PREFIX + uuid;
4618 }
4619
4620 std::string image_key(const string &image_id) {
4621 return IMAGE_KEY_PREFIX + image_id;
4622 }
4623
4624 std::string global_key(const string &global_id) {
4625 return GLOBAL_KEY_PREFIX + global_id;
4626 }
4627
4628 std::string status_global_key(const string &global_id) {
4629 return STATUS_GLOBAL_KEY_PREFIX + global_id;
4630 }
4631
4632 std::string instance_key(const string &instance_id) {
4633 return INSTANCE_KEY_PREFIX + instance_id;
4634 }
4635
4636 std::string mirror_image_map_key(const string& global_image_id) {
4637 return MIRROR_IMAGE_MAP_KEY_PREFIX + global_image_id;
4638 }
4639
4640 int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
4641 bufferlist mirror_uuid_bl;
4642 int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
4643 if (r < 0) {
4644 if (r != -ENOENT) {
4645 CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str());
4646 }
4647 return r;
4648 }
4649
4650 *mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length());
4651 return 0;
4652 }
4653
4654 void sanitize_entity_inst(entity_inst_t* entity_inst) {
4655 // make all addrs of type ANY because the type isn't what uniquely
4656 // identifies them and clients and on-disk formats can be encoded
4657 // with different backwards compatibility settings.
4658 entity_inst->addr.set_type(entity_addr_t::TYPE_ANY);
4659 }
4660
4661 int list_watchers(cls_method_context_t hctx,
4662 std::set<entity_inst_t> *entities) {
4663 obj_list_watch_response_t watchers;
4664 int r = cls_cxx_list_watchers(hctx, &watchers);
4665 if (r < 0 && r != -ENOENT) {
4666 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
4667 return r;
4668 }
4669
4670 entities->clear();
4671 for (auto &w : watchers.entries) {
4672 entity_inst_t entity_inst{w.name, w.addr};
4673 sanitize_entity_inst(&entity_inst);
4674
4675 entities->insert(entity_inst);
4676 }
4677 return 0;
4678 }
4679
4680 int read_peers(cls_method_context_t hctx,
4681 std::vector<cls::rbd::MirrorPeer> *peers) {
4682 std::string last_read = PEER_KEY_PREFIX;
4683 int max_read = RBD_MAX_KEYS_READ;
4684 bool more = true;
4685 while (more) {
4686 std::map<std::string, bufferlist> vals;
4687 int r = cls_cxx_map_get_vals(hctx, last_read, PEER_KEY_PREFIX.c_str(),
4688 max_read, &vals, &more);
4689 if (r < 0) {
4690 if (r != -ENOENT) {
4691 CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
4692 }
4693 return r;
4694 }
4695
4696 for (auto &it : vals) {
4697 try {
4698 auto bl_it = it.second.cbegin();
4699 cls::rbd::MirrorPeer peer;
4700 decode(peer, bl_it);
4701 peers->push_back(peer);
4702 } catch (const buffer::error &err) {
4703 CLS_ERR("could not decode peer '%s'", it.first.c_str());
4704 return -EIO;
4705 }
4706 }
4707
4708 if (!vals.empty()) {
4709 last_read = vals.rbegin()->first;
4710 }
4711 }
4712 return 0;
4713 }
4714
4715 int read_peer(cls_method_context_t hctx, const std::string &id,
4716 cls::rbd::MirrorPeer *peer) {
4717 bufferlist bl;
4718 int r = cls_cxx_map_get_val(hctx, peer_key(id), &bl);
4719 if (r < 0) {
4720 CLS_ERR("error reading peer '%s': %s", id.c_str(),
4721 cpp_strerror(r).c_str());
4722 return r;
4723 }
4724
4725 try {
4726 auto bl_it = bl.cbegin();
4727 decode(*peer, bl_it);
4728 } catch (const buffer::error &err) {
4729 CLS_ERR("could not decode peer '%s'", id.c_str());
4730 return -EIO;
4731 }
4732 return 0;
4733 }
4734
4735 int write_peer(cls_method_context_t hctx, const std::string &id,
4736 const cls::rbd::MirrorPeer &peer) {
4737 bufferlist bl;
4738 encode(peer, bl);
4739
4740 int r = cls_cxx_map_set_val(hctx, peer_key(id), &bl);
4741 if (r < 0) {
4742 CLS_ERR("error writing peer '%s': %s", id.c_str(),
4743 cpp_strerror(r).c_str());
4744 return r;
4745 }
4746 return 0;
4747 }
4748
4749 int image_get(cls_method_context_t hctx, const string &image_id,
4750 cls::rbd::MirrorImage *mirror_image) {
4751 bufferlist bl;
4752 int r = cls_cxx_map_get_val(hctx, image_key(image_id), &bl);
4753 if (r < 0) {
4754 if (r != -ENOENT) {
4755 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
4756 cpp_strerror(r).c_str());
4757 }
4758 return r;
4759 }
4760
4761 try {
4762 auto it = bl.cbegin();
4763 decode(*mirror_image, it);
4764 } catch (const buffer::error &err) {
4765 CLS_ERR("could not decode mirrored image '%s'", image_id.c_str());
4766 return -EIO;
4767 }
4768
4769 return 0;
4770 }
4771
4772 int image_set(cls_method_context_t hctx, const string &image_id,
4773 const cls::rbd::MirrorImage &mirror_image) {
4774 bufferlist bl;
4775 encode(mirror_image, bl);
4776
4777 cls::rbd::MirrorImage existing_mirror_image;
4778 int r = image_get(hctx, image_id, &existing_mirror_image);
4779 if (r == -ENOENT) {
4780 // make sure global id doesn't already exist
4781 std::string global_id_key = global_key(mirror_image.global_image_id);
4782 std::string image_id;
4783 r = read_key(hctx, global_id_key, &image_id);
4784 if (r >= 0) {
4785 return -EEXIST;
4786 } else if (r != -ENOENT) {
4787 CLS_ERR("error reading global image id: '%s': '%s'", image_id.c_str(),
4788 cpp_strerror(r).c_str());
4789 return r;
4790 }
4791
4792 // make sure this was not a race for disabling
4793 if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
4794 CLS_ERR("image '%s' is already disabled", image_id.c_str());
4795 return r;
4796 }
4797 } else if (r < 0) {
4798 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
4799 cpp_strerror(r).c_str());
4800 return r;
4801 } else if (existing_mirror_image.global_image_id !=
4802 mirror_image.global_image_id) {
4803 // cannot change the global id
4804 return -EINVAL;
4805 }
4806
4807 r = cls_cxx_map_set_val(hctx, image_key(image_id), &bl);
4808 if (r < 0) {
4809 CLS_ERR("error adding mirrored image '%s': %s", image_id.c_str(),
4810 cpp_strerror(r).c_str());
4811 return r;
4812 }
4813
4814 bufferlist image_id_bl;
4815 encode(image_id, image_id_bl);
4816 r = cls_cxx_map_set_val(hctx, global_key(mirror_image.global_image_id),
4817 &image_id_bl);
4818 if (r < 0) {
4819 CLS_ERR("error adding global id for image '%s': %s", image_id.c_str(),
4820 cpp_strerror(r).c_str());
4821 return r;
4822 }
4823 return 0;
4824 }
4825
4826 int image_remove(cls_method_context_t hctx, const string &image_id) {
4827 bufferlist bl;
4828 cls::rbd::MirrorImage mirror_image;
4829 int r = image_get(hctx, image_id, &mirror_image);
4830 if (r < 0) {
4831 if (r != -ENOENT) {
4832 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
4833 cpp_strerror(r).c_str());
4834 }
4835 return r;
4836 }
4837
4838 if (mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
4839 return -EBUSY;
4840 }
4841
4842 r = cls_cxx_map_remove_key(hctx, image_key(image_id));
4843 if (r < 0) {
4844 CLS_ERR("error removing mirrored image '%s': %s", image_id.c_str(),
4845 cpp_strerror(r).c_str());
4846 return r;
4847 }
4848
4849 r = cls_cxx_map_remove_key(hctx, global_key(mirror_image.global_image_id));
4850 if (r < 0 && r != -ENOENT) {
4851 CLS_ERR("error removing global id for image '%s': %s", image_id.c_str(),
4852 cpp_strerror(r).c_str());
4853 return r;
4854 }
4855
4856 r = cls_cxx_map_remove_key(hctx,
4857 status_global_key(mirror_image.global_image_id));
4858 if (r < 0 && r != -ENOENT) {
4859 CLS_ERR("error removing global status for image '%s': %s", image_id.c_str(),
4860 cpp_strerror(r).c_str());
4861 return r;
4862 }
4863
4864 return 0;
4865 }
4866
4867 struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
4868 entity_inst_t origin;
4869
4870 MirrorImageStatusOnDisk() {
4871 }
4872 MirrorImageStatusOnDisk(const cls::rbd::MirrorImageStatus &status) :
4873 cls::rbd::MirrorImageStatus(status) {
4874 }
4875
4876 void encode_meta(bufferlist &bl, uint64_t features) const {
4877 ENCODE_START(1, 1, bl);
4878 encode(origin, bl, features);
4879 ENCODE_FINISH(bl);
4880 }
4881
4882 void encode(bufferlist &bl, uint64_t features) const {
4883 encode_meta(bl, features);
4884 cls::rbd::MirrorImageStatus::encode(bl);
4885 }
4886
4887 void decode_meta(bufferlist::const_iterator &it) {
4888 DECODE_START(1, it);
4889 decode(origin, it);
4890 DECODE_FINISH(it);
4891 }
4892
4893 void decode(bufferlist::const_iterator &it) {
4894 decode_meta(it);
4895 cls::rbd::MirrorImageStatus::decode(it);
4896 }
4897 };
4898 WRITE_CLASS_ENCODER_FEATURES(MirrorImageStatusOnDisk)
4899
4900 int image_status_set(cls_method_context_t hctx, const string &global_image_id,
4901 const cls::rbd::MirrorImageStatus &status) {
4902 MirrorImageStatusOnDisk ondisk_status(status);
4903 ondisk_status.up = false;
4904 ondisk_status.last_update = ceph_clock_now();
4905
4906 int r = cls_get_request_origin(hctx, &ondisk_status.origin);
4907 sanitize_entity_inst(&ondisk_status.origin);
4908 ceph_assert(r == 0);
4909
4910 bufferlist bl;
4911 encode(ondisk_status, bl, cls_get_features(hctx));
4912
4913 r = cls_cxx_map_set_val(hctx, status_global_key(global_image_id), &bl);
4914 if (r < 0) {
4915 CLS_ERR("error setting status for mirrored image, global id '%s': %s",
4916 global_image_id.c_str(), cpp_strerror(r).c_str());
4917 return r;
4918 }
4919 return 0;
4920 }
4921
4922 int image_status_remove(cls_method_context_t hctx,
4923 const string &global_image_id) {
4924
4925 int r = cls_cxx_map_remove_key(hctx, status_global_key(global_image_id));
4926 if (r < 0) {
4927 CLS_ERR("error removing status for mirrored image, global id '%s': %s",
4928 global_image_id.c_str(), cpp_strerror(r).c_str());
4929 return r;
4930 }
4931 return 0;
4932 }
4933
4934 int image_status_get(cls_method_context_t hctx, const string &global_image_id,
4935 const std::set<entity_inst_t> &watchers,
4936 cls::rbd::MirrorImageStatus *status) {
4937
4938 bufferlist bl;
4939 int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
4940 if (r < 0) {
4941 if (r != -ENOENT) {
4942 CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
4943 global_image_id.c_str(), cpp_strerror(r).c_str());
4944 }
4945 return r;
4946 }
4947
4948 MirrorImageStatusOnDisk ondisk_status;
4949 try {
4950 auto it = bl.cbegin();
4951 decode(ondisk_status, it);
4952 } catch (const buffer::error &err) {
4953 CLS_ERR("could not decode status for mirrored image, global id '%s'",
4954 global_image_id.c_str());
4955 return -EIO;
4956 }
4957
4958
4959 *status = static_cast<cls::rbd::MirrorImageStatus>(ondisk_status);
4960 status->up = (watchers.find(ondisk_status.origin) != watchers.end());
4961 return 0;
4962 }
4963
4964 int image_status_list(cls_method_context_t hctx,
4965 const std::string &start_after, uint64_t max_return,
4966 map<std::string, cls::rbd::MirrorImage> *mirror_images,
4967 map<std::string, cls::rbd::MirrorImageStatus> *mirror_statuses) {
4968 std::string last_read = image_key(start_after);
4969 int max_read = RBD_MAX_KEYS_READ;
4970 bool more = true;
4971
4972 std::set<entity_inst_t> watchers;
4973 int r = list_watchers(hctx, &watchers);
4974 if (r < 0) {
4975 return r;
4976 }
4977
4978 while (more && mirror_images->size() < max_return) {
4979 std::map<std::string, bufferlist> vals;
4980 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4981 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read, &vals,
4982 &more);
4983 if (r < 0) {
4984 if (r != -ENOENT) {
4985 CLS_ERR("error reading mirror image directory by name: %s",
4986 cpp_strerror(r).c_str());
4987 }
4988 return r;
4989 }
4990
4991 for (auto it = vals.begin(); it != vals.end() &&
4992 mirror_images->size() < max_return; ++it) {
4993 const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
4994 cls::rbd::MirrorImage mirror_image;
4995 auto iter = it->second.cbegin();
4996 try {
4997 decode(mirror_image, iter);
4998 } catch (const buffer::error &err) {
4999 CLS_ERR("could not decode mirror image payload of image '%s'",
5000 image_id.c_str());
5001 return -EIO;
5002 }
5003
5004 (*mirror_images)[image_id] = mirror_image;
5005
5006 cls::rbd::MirrorImageStatus status;
5007 int r1 = image_status_get(hctx, mirror_image.global_image_id, watchers,
5008 &status);
5009 if (r1 < 0) {
5010 continue;
5011 }
5012
5013 (*mirror_statuses)[image_id] = status;
5014 }
5015 if (!vals.empty()) {
5016 last_read = image_key(mirror_images->rbegin()->first);
5017 }
5018 }
5019
5020 return 0;
5021 }
5022
5023 int image_status_get_summary(
5024 cls_method_context_t hctx,
5025 std::map<cls::rbd::MirrorImageStatusState, int> *states) {
5026 std::set<entity_inst_t> watchers;
5027 int r = list_watchers(hctx, &watchers);
5028 if (r < 0) {
5029 return r;
5030 }
5031
5032 states->clear();
5033
5034 string last_read = IMAGE_KEY_PREFIX;
5035 int max_read = RBD_MAX_KEYS_READ;
5036 bool more = true;
5037 while (more) {
5038 map<string, bufferlist> vals;
5039 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX,
5040 max_read, &vals, &more);
5041 if (r < 0) {
5042 if (r != -ENOENT) {
5043 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
5044 }
5045 return r;
5046 }
5047
5048 for (auto &list_it : vals) {
5049 const string &key = list_it.first;
5050
5051 if (0 != key.compare(0, IMAGE_KEY_PREFIX.size(), IMAGE_KEY_PREFIX)) {
5052 break;
5053 }
5054
5055 cls::rbd::MirrorImage mirror_image;
5056 auto iter = list_it.second.cbegin();
5057 try {
5058 decode(mirror_image, iter);
5059 } catch (const buffer::error &err) {
5060 CLS_ERR("could not decode mirror image payload for key '%s'",
5061 key.c_str());
5062 return -EIO;
5063 }
5064
5065 cls::rbd::MirrorImageStatus status;
5066 image_status_get(hctx, mirror_image.global_image_id, watchers, &status);
5067
5068 cls::rbd::MirrorImageStatusState state = status.up ? status.state :
5069 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
5070 (*states)[state]++;
5071 }
5072
5073 if (!vals.empty()) {
5074 last_read = vals.rbegin()->first;
5075 }
5076 }
5077
5078 return 0;
5079 }
5080
5081 int image_status_remove_down(cls_method_context_t hctx) {
5082 std::set<entity_inst_t> watchers;
5083 int r = list_watchers(hctx, &watchers);
5084 if (r < 0) {
5085 return r;
5086 }
5087
5088 string last_read = STATUS_GLOBAL_KEY_PREFIX;
5089 int max_read = RBD_MAX_KEYS_READ;
5090 bool more = true;
5091 while (more) {
5092 map<string, bufferlist> vals;
5093 r = cls_cxx_map_get_vals(hctx, last_read, STATUS_GLOBAL_KEY_PREFIX,
5094 max_read, &vals, &more);
5095 if (r < 0) {
5096 if (r != -ENOENT) {
5097 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
5098 }
5099 return r;
5100 }
5101
5102 for (auto &list_it : vals) {
5103 const string &key = list_it.first;
5104
5105 if (0 != key.compare(0, STATUS_GLOBAL_KEY_PREFIX.size(),
5106 STATUS_GLOBAL_KEY_PREFIX)) {
5107 break;
5108 }
5109
5110 MirrorImageStatusOnDisk status;
5111 try {
5112 auto it = list_it.second.cbegin();
5113 status.decode_meta(it);
5114 } catch (const buffer::error &err) {
5115 CLS_ERR("could not decode status metadata for mirrored image '%s'",
5116 key.c_str());
5117 return -EIO;
5118 }
5119
5120 if (watchers.find(status.origin) == watchers.end()) {
5121 CLS_LOG(20, "removing stale status object for key %s",
5122 key.c_str());
5123 int r1 = cls_cxx_map_remove_key(hctx, key);
5124 if (r1 < 0) {
5125 CLS_ERR("error removing stale status for key '%s': %s",
5126 key.c_str(), cpp_strerror(r1).c_str());
5127 return r1;
5128 }
5129 }
5130 }
5131
5132 if (!vals.empty()) {
5133 last_read = vals.rbegin()->first;
5134 }
5135 }
5136
5137 return 0;
5138 }
5139
5140 int image_instance_get(cls_method_context_t hctx,
5141 const string &global_image_id,
5142 const std::set<entity_inst_t> &watchers,
5143 entity_inst_t *instance) {
5144 bufferlist bl;
5145 int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
5146 if (r < 0) {
5147 if (r != -ENOENT) {
5148 CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
5149 global_image_id.c_str(), cpp_strerror(r).c_str());
5150 }
5151 return r;
5152 }
5153
5154 MirrorImageStatusOnDisk ondisk_status;
5155 try {
5156 auto it = bl.cbegin();
5157 decode(ondisk_status, it);
5158 } catch (const buffer::error &err) {
5159 CLS_ERR("could not decode status for mirrored image, global id '%s'",
5160 global_image_id.c_str());
5161 return -EIO;
5162 }
5163
5164 if (watchers.find(ondisk_status.origin) == watchers.end()) {
5165 return -ESTALE;
5166 }
5167
5168 *instance = ondisk_status.origin;
5169 return 0;
5170 }
5171
5172 int image_instance_list(cls_method_context_t hctx,
5173 const std::string &start_after,
5174 uint64_t max_return,
5175 map<std::string, entity_inst_t> *instances) {
5176 std::string last_read = image_key(start_after);
5177 int max_read = RBD_MAX_KEYS_READ;
5178 bool more = true;
5179
5180 std::set<entity_inst_t> watchers;
5181 int r = list_watchers(hctx, &watchers);
5182 if (r < 0) {
5183 return r;
5184 }
5185
5186 while (more && instances->size() < max_return) {
5187 std::map<std::string, bufferlist> vals;
5188 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
5189 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read, &vals,
5190 &more);
5191 if (r < 0) {
5192 if (r != -ENOENT) {
5193 CLS_ERR("error reading mirror image directory by name: %s",
5194 cpp_strerror(r).c_str());
5195 }
5196 return r;
5197 }
5198
5199 for (auto it = vals.begin(); it != vals.end() &&
5200 instances->size() < max_return; ++it) {
5201 const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
5202 cls::rbd::MirrorImage mirror_image;
5203 auto iter = it->second.cbegin();
5204 try {
5205 decode(mirror_image, iter);
5206 } catch (const buffer::error &err) {
5207 CLS_ERR("could not decode mirror image payload of image '%s'",
5208 image_id.c_str());
5209 return -EIO;
5210 }
5211
5212 entity_inst_t instance;
5213 r = image_instance_get(hctx, mirror_image.global_image_id, watchers,
5214 &instance);
5215 if (r < 0) {
5216 continue;
5217 }
5218
5219 (*instances)[image_id] = instance;
5220 }
5221 if (!vals.empty()) {
5222 last_read = vals.rbegin()->first;
5223 }
5224 }
5225
5226 return 0;
5227 }
5228
5229 int instances_list(cls_method_context_t hctx,
5230 std::vector<std::string> *instance_ids) {
5231 std::string last_read = INSTANCE_KEY_PREFIX;
5232 int max_read = RBD_MAX_KEYS_READ;
5233 bool more = true;
5234 while (more) {
5235 std::map<std::string, bufferlist> vals;
5236 int r = cls_cxx_map_get_vals(hctx, last_read, INSTANCE_KEY_PREFIX.c_str(),
5237 max_read, &vals, &more);
5238 if (r < 0) {
5239 if (r != -ENOENT) {
5240 CLS_ERR("error reading mirror instances: %s", cpp_strerror(r).c_str());
5241 }
5242 return r;
5243 }
5244
5245 for (auto &it : vals) {
5246 instance_ids->push_back(it.first.substr(INSTANCE_KEY_PREFIX.size()));
5247 }
5248
5249 if (!vals.empty()) {
5250 last_read = vals.rbegin()->first;
5251 }
5252 }
5253 return 0;
5254 }
5255
5256 int instances_add(cls_method_context_t hctx, const string &instance_id) {
5257 bufferlist bl;
5258
5259 int r = cls_cxx_map_set_val(hctx, instance_key(instance_id), &bl);
5260 if (r < 0) {
5261 CLS_ERR("error setting mirror instance %s: %s", instance_id.c_str(),
5262 cpp_strerror(r).c_str());
5263 return r;
5264 }
5265 return 0;
5266 }
5267
5268 int instances_remove(cls_method_context_t hctx, const string &instance_id) {
5269
5270 int r = cls_cxx_map_remove_key(hctx, instance_key(instance_id));
5271 if (r < 0) {
5272 CLS_ERR("error removing mirror instance %s: %s", instance_id.c_str(),
5273 cpp_strerror(r).c_str());
5274 return r;
5275 }
5276 return 0;
5277 }
5278
5279 int mirror_image_map_list(cls_method_context_t hctx,
5280 const std::string &start_after,
5281 uint64_t max_return,
5282 std::map<std::string, cls::rbd::MirrorImageMap> *image_mapping) {
5283 bool more = true;
5284 std::string last_read = mirror_image_map_key(start_after);
5285
5286 while (more && image_mapping->size() < max_return) {
5287 std::map<std::string, bufferlist> vals;
5288 CLS_LOG(20, "last read: '%s'", last_read.c_str());
5289
5290 int max_read = std::min<uint64_t>(RBD_MAX_KEYS_READ, max_return - image_mapping->size());
5291 int r = cls_cxx_map_get_vals(hctx, last_read, MIRROR_IMAGE_MAP_KEY_PREFIX,
5292 max_read, &vals, &more);
5293 if (r < 0) {
5294 CLS_ERR("error reading image map: %s", cpp_strerror(r).c_str());
5295 return r;
5296 }
5297
5298 if (vals.empty()) {
5299 return 0;
5300 }
5301
5302 for (auto it = vals.begin(); it != vals.end(); ++it) {
5303 const std::string &global_image_id =
5304 it->first.substr(MIRROR_IMAGE_MAP_KEY_PREFIX.size());
5305
5306 cls::rbd::MirrorImageMap mirror_image_map;
5307 auto iter = it->second.cbegin();
5308 try {
5309 decode(mirror_image_map, iter);
5310 } catch (const buffer::error &err) {
5311 CLS_ERR("could not decode image map payload: %s",
5312 cpp_strerror(r).c_str());
5313 return -EINVAL;
5314 }
5315
5316 image_mapping->insert(std::make_pair(global_image_id, mirror_image_map));
5317 }
5318
5319 if (!vals.empty()) {
5320 last_read = vals.rbegin()->first;
5321 }
5322 }
5323
5324 return 0;
5325 }
5326
5327 } // namespace mirror
5328
5329 /**
5330 * Input:
5331 * none
5332 *
5333 * Output:
5334 * @param uuid (std::string)
5335 * @returns 0 on success, negative error code on failure
5336 */
5337 int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
5338 bufferlist *out) {
5339 std::string mirror_uuid;
5340 int r = mirror::uuid_get(hctx, &mirror_uuid);
5341 if (r < 0) {
5342 return r;
5343 }
5344
5345 encode(mirror_uuid, *out);
5346 return 0;
5347 }
5348
5349 /**
5350 * Input:
5351 * @param mirror_uuid (std::string)
5352 *
5353 * Output:
5354 * @returns 0 on success, negative error code on failure
5355 */
5356 int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
5357 bufferlist *out) {
5358 std::string mirror_uuid;
5359 try {
5360 auto bl_it = in->cbegin();
5361 decode(mirror_uuid, bl_it);
5362 } catch (const buffer::error &err) {
5363 return -EINVAL;
5364 }
5365
5366 if (mirror_uuid.empty()) {
5367 CLS_ERR("cannot set empty mirror uuid");
5368 return -EINVAL;
5369 }
5370
5371 uint32_t mirror_mode;
5372 int r = read_key(hctx, mirror::MODE, &mirror_mode);
5373 if (r < 0 && r != -ENOENT) {
5374 return r;
5375 } else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) {
5376 CLS_ERR("cannot set mirror uuid while mirroring enabled");
5377 return -EINVAL;
5378 }
5379
5380 bufferlist mirror_uuid_bl;
5381 mirror_uuid_bl.append(mirror_uuid);
5382 r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl);
5383 if (r < 0) {
5384 CLS_ERR("failed to set mirror uuid");
5385 return r;
5386 }
5387 return 0;
5388 }
5389
5390 /**
5391 * Input:
5392 * none
5393 *
5394 * Output:
5395 * @param cls::rbd::MirrorMode (uint32_t)
5396 * @returns 0 on success, negative error code on failure
5397 */
5398 int mirror_mode_get(cls_method_context_t hctx, bufferlist *in,
5399 bufferlist *out) {
5400 uint32_t mirror_mode_decode;
5401 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
5402 if (r < 0) {
5403 return r;
5404 }
5405
5406 encode(mirror_mode_decode, *out);
5407 return 0;
5408 }
5409
5410 /**
5411 * Input:
5412 * @param mirror_mode (cls::rbd::MirrorMode) (uint32_t)
5413 *
5414 * Output:
5415 * @returns 0 on success, negative error code on failure
5416 */
5417 int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
5418 bufferlist *out) {
5419 uint32_t mirror_mode_decode;
5420 try {
5421 auto bl_it = in->cbegin();
5422 decode(mirror_mode_decode, bl_it);
5423 } catch (const buffer::error &err) {
5424 return -EINVAL;
5425 }
5426
5427 bool enabled;
5428 switch (static_cast<cls::rbd::MirrorMode>(mirror_mode_decode)) {
5429 case cls::rbd::MIRROR_MODE_DISABLED:
5430 enabled = false;
5431 break;
5432 case cls::rbd::MIRROR_MODE_IMAGE:
5433 case cls::rbd::MIRROR_MODE_POOL:
5434 enabled = true;
5435 break;
5436 default:
5437 CLS_ERR("invalid mirror mode: %d", mirror_mode_decode);
5438 return -EINVAL;
5439 }
5440
5441 int r;
5442 if (enabled) {
5443 std::string mirror_uuid;
5444 r = mirror::uuid_get(hctx, &mirror_uuid);
5445 if (r == -ENOENT) {
5446 return -EINVAL;
5447 } else if (r < 0) {
5448 return r;
5449 }
5450
5451 bufferlist bl;
5452 encode(mirror_mode_decode, bl);
5453
5454 r = cls_cxx_map_set_val(hctx, mirror::MODE, &bl);
5455 if (r < 0) {
5456 CLS_ERR("error enabling mirroring: %s", cpp_strerror(r).c_str());
5457 return r;
5458 }
5459 } else {
5460 std::vector<cls::rbd::MirrorPeer> peers;
5461 r = mirror::read_peers(hctx, &peers);
5462 if (r < 0 && r != -ENOENT) {
5463 return r;
5464 }
5465
5466 if (!peers.empty()) {
5467 CLS_ERR("mirroring peers still registered");
5468 return -EBUSY;
5469 }
5470
5471 r = remove_key(hctx, mirror::MODE);
5472 if (r < 0) {
5473 return r;
5474 }
5475
5476 r = remove_key(hctx, mirror::UUID);
5477 if (r < 0) {
5478 return r;
5479 }
5480 }
5481 return 0;
5482 }
5483
5484 /**
5485 * Input:
5486 * none
5487 *
5488 * Output:
5489 * @param std::vector<cls::rbd::MirrorPeer>: collection of peers
5490 * @returns 0 on success, negative error code on failure
5491 */
5492 int mirror_peer_list(cls_method_context_t hctx, bufferlist *in,
5493 bufferlist *out) {
5494 std::vector<cls::rbd::MirrorPeer> peers;
5495 int r = mirror::read_peers(hctx, &peers);
5496 if (r < 0 && r != -ENOENT) {
5497 return r;
5498 }
5499
5500 encode(peers, *out);
5501 return 0;
5502 }
5503
5504 /**
5505 * Input:
5506 * @param mirror_peer (cls::rbd::MirrorPeer)
5507 *
5508 * Output:
5509 * @returns 0 on success, negative error code on failure
5510 */
5511 int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
5512 bufferlist *out) {
5513 cls::rbd::MirrorPeer mirror_peer;
5514 try {
5515 auto it = in->cbegin();
5516 decode(mirror_peer, it);
5517 } catch (const buffer::error &err) {
5518 return -EINVAL;
5519 }
5520
5521 uint32_t mirror_mode_decode;
5522 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
5523 if (r < 0 && r != -ENOENT) {
5524 return r;
5525 } else if (r == -ENOENT ||
5526 mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) {
5527 CLS_ERR("mirroring must be enabled on the pool");
5528 return -EINVAL;
5529 } else if (!mirror_peer.is_valid()) {
5530 CLS_ERR("mirror peer is not valid");
5531 return -EINVAL;
5532 }
5533
5534 std::string mirror_uuid;
5535 r = mirror::uuid_get(hctx, &mirror_uuid);
5536 if (r < 0) {
5537 CLS_ERR("error retrieving mirroring uuid: %s", cpp_strerror(r).c_str());
5538 return r;
5539 } else if (mirror_peer.uuid == mirror_uuid) {
5540 CLS_ERR("peer uuid '%s' matches pool mirroring uuid",
5541 mirror_uuid.c_str());
5542 return -EINVAL;
5543 }
5544
5545 std::vector<cls::rbd::MirrorPeer> peers;
5546 r = mirror::read_peers(hctx, &peers);
5547 if (r < 0 && r != -ENOENT) {
5548 return r;
5549 }
5550
5551 for (auto const &peer : peers) {
5552 if (peer.uuid == mirror_peer.uuid) {
5553 CLS_ERR("peer uuid '%s' already exists",
5554 peer.uuid.c_str());
5555 return -ESTALE;
5556 } else if (peer.cluster_name == mirror_peer.cluster_name &&
5557 (peer.pool_id == -1 || mirror_peer.pool_id == -1 ||
5558 peer.pool_id == mirror_peer.pool_id)) {
5559 CLS_ERR("peer cluster name '%s' already exists",
5560 peer.cluster_name.c_str());
5561 return -EEXIST;
5562 }
5563 }
5564
5565 bufferlist bl;
5566 encode(mirror_peer, bl);
5567 r = cls_cxx_map_set_val(hctx, mirror::peer_key(mirror_peer.uuid),
5568 &bl);
5569 if (r < 0) {
5570 CLS_ERR("error adding peer: %s", cpp_strerror(r).c_str());
5571 return r;
5572 }
5573 return 0;
5574 }
5575
5576 /**
5577 * Input:
5578 * @param uuid (std::string)
5579 *
5580 * Output:
5581 * @returns 0 on success, negative error code on failure
5582 */
5583 int mirror_peer_remove(cls_method_context_t hctx, bufferlist *in,
5584 bufferlist *out) {
5585 std::string uuid;
5586 try {
5587 auto it = in->cbegin();
5588 decode(uuid, it);
5589 } catch (const buffer::error &err) {
5590 return -EINVAL;
5591 }
5592
5593 int r = cls_cxx_map_remove_key(hctx, mirror::peer_key(uuid));
5594 if (r < 0 && r != -ENOENT) {
5595 CLS_ERR("error removing peer: %s", cpp_strerror(r).c_str());
5596 return r;
5597 }
5598 return 0;
5599 }
5600
5601 /**
5602 * Input:
5603 * @param uuid (std::string)
5604 * @param client_name (std::string)
5605 *
5606 * Output:
5607 * @returns 0 on success, negative error code on failure
5608 */
5609 int mirror_peer_set_client(cls_method_context_t hctx, bufferlist *in,
5610 bufferlist *out) {
5611 std::string uuid;
5612 std::string client_name;
5613 try {
5614 auto it = in->cbegin();
5615 decode(uuid, it);
5616 decode(client_name, it);
5617 } catch (const buffer::error &err) {
5618 return -EINVAL;
5619 }
5620
5621 cls::rbd::MirrorPeer peer;
5622 int r = mirror::read_peer(hctx, uuid, &peer);
5623 if (r < 0) {
5624 return r;
5625 }
5626
5627 peer.client_name = client_name;
5628 r = mirror::write_peer(hctx, uuid, peer);
5629 if (r < 0) {
5630 return r;
5631 }
5632 return 0;
5633 }
5634
5635 /**
5636 * Input:
5637 * @param uuid (std::string)
5638 * @param cluster_name (std::string)
5639 *
5640 * Output:
5641 * @returns 0 on success, negative error code on failure
5642 */
5643 int mirror_peer_set_cluster(cls_method_context_t hctx, bufferlist *in,
5644 bufferlist *out) {
5645 std::string uuid;
5646 std::string cluster_name;
5647 try {
5648 auto it = in->cbegin();
5649 decode(uuid, it);
5650 decode(cluster_name, it);
5651 } catch (const buffer::error &err) {
5652 return -EINVAL;
5653 }
5654
5655 cls::rbd::MirrorPeer peer;
5656 int r = mirror::read_peer(hctx, uuid, &peer);
5657 if (r < 0) {
5658 return r;
5659 }
5660
5661 peer.cluster_name = cluster_name;
5662 r = mirror::write_peer(hctx, uuid, peer);
5663 if (r < 0) {
5664 return r;
5665 }
5666 return 0;
5667 }
5668
5669 /**
5670 * Input:
5671 * @param start_after which name to begin listing after
5672 * (use the empty string to start at the beginning)
5673 * @param max_return the maximum number of names to list
5674 *
5675 * Output:
5676 * @param std::map<std::string, std::string>: local id to global id map
5677 * @returns 0 on success, negative error code on failure
5678 */
5679 int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
5680 bufferlist *out) {
5681 std::string start_after;
5682 uint64_t max_return;
5683 try {
5684 auto iter = in->cbegin();
5685 decode(start_after, iter);
5686 decode(max_return, iter);
5687 } catch (const buffer::error &err) {
5688 return -EINVAL;
5689 }
5690
5691 int max_read = RBD_MAX_KEYS_READ;
5692 bool more = true;
5693 std::map<std::string, std::string> mirror_images;
5694 std::string last_read = mirror::image_key(start_after);
5695
5696 while (more && mirror_images.size() < max_return) {
5697 std::map<std::string, bufferlist> vals;
5698 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
5699 int r = cls_cxx_map_get_vals(hctx, last_read, mirror::IMAGE_KEY_PREFIX,
5700 max_read, &vals, &more);
5701 if (r < 0) {
5702 if (r != -ENOENT) {
5703 CLS_ERR("error reading mirror image directory by name: %s",
5704 cpp_strerror(r).c_str());
5705 }
5706 return r;
5707 }
5708
5709 for (auto it = vals.begin(); it != vals.end(); ++it) {
5710 const std::string &image_id =
5711 it->first.substr(mirror::IMAGE_KEY_PREFIX.size());
5712 cls::rbd::MirrorImage mirror_image;
5713 auto iter = it->second.cbegin();
5714 try {
5715 decode(mirror_image, iter);
5716 } catch (const buffer::error &err) {
5717 CLS_ERR("could not decode mirror image payload of image '%s'",
5718 image_id.c_str());
5719 return -EIO;
5720 }
5721
5722 mirror_images[image_id] = mirror_image.global_image_id;
5723 if (mirror_images.size() >= max_return) {
5724 break;
5725 }
5726 }
5727 if (!vals.empty()) {
5728 last_read = mirror::image_key(mirror_images.rbegin()->first);
5729 }
5730 }
5731
5732 encode(mirror_images, *out);
5733 return 0;
5734 }
5735
5736 /**
5737 * Input:
5738 * @param global_id (std::string)
5739 *
5740 * Output:
5741 * @param std::string - image id
5742 * @returns 0 on success, negative error code on failure
5743 */
5744 int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
5745 bufferlist *out) {
5746 std::string global_id;
5747 try {
5748 auto it = in->cbegin();
5749 decode(global_id, it);
5750 } catch (const buffer::error &err) {
5751 return -EINVAL;
5752 }
5753
5754 std::string image_id;
5755 int r = read_key(hctx, mirror::global_key(global_id), &image_id);
5756 if (r < 0) {
5757 if (r != -ENOENT) {
5758 CLS_ERR("error retrieving image id for global id '%s': %s",
5759 global_id.c_str(), cpp_strerror(r).c_str());
5760 }
5761 return r;
5762 }
5763
5764 encode(image_id, *out);
5765 return 0;
5766 }
5767
5768 /**
5769 * Input:
5770 * @param image_id (std::string)
5771 *
5772 * Output:
5773 * @param cls::rbd::MirrorImage - metadata associated with the image_id
5774 * @returns 0 on success, negative error code on failure
5775 */
5776 int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
5777 bufferlist *out) {
5778 string image_id;
5779 try {
5780 auto it = in->cbegin();
5781 decode(image_id, it);
5782 } catch (const buffer::error &err) {
5783 return -EINVAL;
5784 }
5785
5786 cls::rbd::MirrorImage mirror_image;
5787 int r = mirror::image_get(hctx, image_id, &mirror_image);
5788 if (r < 0) {
5789 return r;
5790 }
5791
5792 encode(mirror_image, *out);
5793 return 0;
5794 }
5795
5796 /**
5797 * Input:
5798 * @param image_id (std::string)
5799 * @param mirror_image (cls::rbd::MirrorImage)
5800 *
5801 * Output:
5802 * @returns 0 on success, negative error code on failure
5803 * @returns -EEXIST if there's an existing image_id with a different global_image_id
5804 */
5805 int mirror_image_set(cls_method_context_t hctx, bufferlist *in,
5806 bufferlist *out) {
5807 string image_id;
5808 cls::rbd::MirrorImage mirror_image;
5809 try {
5810 auto it = in->cbegin();
5811 decode(image_id, it);
5812 decode(mirror_image, it);
5813 } catch (const buffer::error &err) {
5814 return -EINVAL;
5815 }
5816
5817 int r = mirror::image_set(hctx, image_id, mirror_image);
5818 if (r < 0) {
5819 return r;
5820 }
5821 return 0;
5822 }
5823
5824 /**
5825 * Input:
5826 * @param image_id (std::string)
5827 *
5828 * Output:
5829 * @returns 0 on success, negative error code on failure
5830 */
5831 int mirror_image_remove(cls_method_context_t hctx, bufferlist *in,
5832 bufferlist *out) {
5833 string image_id;
5834 try {
5835 auto it = in->cbegin();
5836 decode(image_id, it);
5837 } catch (const buffer::error &err) {
5838 return -EINVAL;
5839 }
5840
5841 int r = mirror::image_remove(hctx, image_id);
5842 if (r < 0) {
5843 return r;
5844 }
5845 return 0;
5846 }
5847
5848 /**
5849 * Input:
5850 * @param global_image_id (std::string)
5851 * @param status (cls::rbd::MirrorImageStatus)
5852 *
5853 * Output:
5854 * @returns 0 on success, negative error code on failure
5855 */
5856 int mirror_image_status_set(cls_method_context_t hctx, bufferlist *in,
5857 bufferlist *out) {
5858 string global_image_id;
5859 cls::rbd::MirrorImageStatus status;
5860 try {
5861 auto it = in->cbegin();
5862 decode(global_image_id, it);
5863 decode(status, it);
5864 } catch (const buffer::error &err) {
5865 return -EINVAL;
5866 }
5867
5868 int r = mirror::image_status_set(hctx, global_image_id, status);
5869 if (r < 0) {
5870 return r;
5871 }
5872 return 0;
5873 }
5874
5875 /**
5876 * Input:
5877 * @param global_image_id (std::string)
5878 *
5879 * Output:
5880 * @returns 0 on success, negative error code on failure
5881 */
5882 int mirror_image_status_remove(cls_method_context_t hctx, bufferlist *in,
5883 bufferlist *out) {
5884 string global_image_id;
5885 try {
5886 auto it = in->cbegin();
5887 decode(global_image_id, it);
5888 } catch (const buffer::error &err) {
5889 return -EINVAL;
5890 }
5891
5892 int r = mirror::image_status_remove(hctx, global_image_id);
5893 if (r < 0) {
5894 return r;
5895 }
5896 return 0;
5897 }
5898
5899 /**
5900 * Input:
5901 * @param global_image_id (std::string)
5902 *
5903 * Output:
5904 * @param cls::rbd::MirrorImageStatus - metadata associated with the global_image_id
5905 * @returns 0 on success, negative error code on failure
5906 */
5907 int mirror_image_status_get(cls_method_context_t hctx, bufferlist *in,
5908 bufferlist *out) {
5909 string global_image_id;
5910 try {
5911 auto it = in->cbegin();
5912 decode(global_image_id, it);
5913 } catch (const buffer::error &err) {
5914 return -EINVAL;
5915 }
5916
5917 std::set<entity_inst_t> watchers;
5918 int r = mirror::list_watchers(hctx, &watchers);
5919 if (r < 0) {
5920 return r;
5921 }
5922
5923 cls::rbd::MirrorImageStatus status;
5924 r = mirror::image_status_get(hctx, global_image_id, watchers, &status);
5925 if (r < 0) {
5926 return r;
5927 }
5928
5929 encode(status, *out);
5930 return 0;
5931 }
5932
5933 /**
5934 * Input:
5935 * @param start_after which name to begin listing after
5936 * (use the empty string to start at the beginning)
5937 * @param max_return the maximum number of names to list
5938 *
5939 * Output:
5940 * @param std::map<std::string, cls::rbd::MirrorImage>: image id to image map
5941 * @param std::map<std::string, cls::rbd::MirrorImageStatus>: image it to status map
5942 * @returns 0 on success, negative error code on failure
5943 */
5944 int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
5945 bufferlist *out) {
5946 std::string start_after;
5947 uint64_t max_return;
5948 try {
5949 auto iter = in->cbegin();
5950 decode(start_after, iter);
5951 decode(max_return, iter);
5952 } catch (const buffer::error &err) {
5953 return -EINVAL;
5954 }
5955
5956 map<std::string, cls::rbd::MirrorImage> images;
5957 map<std::string, cls::rbd::MirrorImageStatus> statuses;
5958 int r = mirror::image_status_list(hctx, start_after, max_return, &images,
5959 &statuses);
5960 if (r < 0) {
5961 return r;
5962 }
5963
5964 encode(images, *out);
5965 encode(statuses, *out);
5966 return 0;
5967 }
5968
5969 /**
5970 * Input:
5971 * none
5972 *
5973 * Output:
5974 * @param std::map<cls::rbd::MirrorImageStatusState, int>: states counts
5975 * @returns 0 on success, negative error code on failure
5976 */
5977 int mirror_image_status_get_summary(cls_method_context_t hctx, bufferlist *in,
5978 bufferlist *out) {
5979 std::map<cls::rbd::MirrorImageStatusState, int> states;
5980
5981 int r = mirror::image_status_get_summary(hctx, &states);
5982 if (r < 0) {
5983 return r;
5984 }
5985
5986 encode(states, *out);
5987 return 0;
5988 }
5989
5990 /**
5991 * Input:
5992 * none
5993 *
5994 * Output:
5995 * @returns 0 on success, negative error code on failure
5996 */
5997 int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
5998 bufferlist *out) {
5999 int r = mirror::image_status_remove_down(hctx);
6000 if (r < 0) {
6001 return r;
6002 }
6003 return 0;
6004 }
6005
6006 /**
6007 * Input:
6008 * @param global_image_id (std::string)
6009 *
6010 * Output:
6011 * @param entity_inst_t - instance
6012 * @returns 0 on success, negative error code on failure
6013 */
6014 int mirror_image_instance_get(cls_method_context_t hctx, bufferlist *in,
6015 bufferlist *out) {
6016 string global_image_id;
6017 try {
6018 auto it = in->cbegin();
6019 decode(global_image_id, it);
6020 } catch (const buffer::error &err) {
6021 return -EINVAL;
6022 }
6023
6024 std::set<entity_inst_t> watchers;
6025 int r = mirror::list_watchers(hctx, &watchers);
6026 if (r < 0) {
6027 return r;
6028 }
6029
6030 entity_inst_t instance;
6031 r = mirror::image_instance_get(hctx, global_image_id, watchers, &instance);
6032 if (r < 0) {
6033 return r;
6034 }
6035
6036 encode(instance, *out, cls_get_features(hctx));
6037 return 0;
6038 }
6039
6040 /**
6041 * Input:
6042 * @param start_after which name to begin listing after
6043 * (use the empty string to start at the beginning)
6044 * @param max_return the maximum number of names to list
6045 *
6046 * Output:
6047 * @param std::map<std::string, entity_inst_t>: image id to instance map
6048 * @returns 0 on success, negative error code on failure
6049 */
6050 int mirror_image_instance_list(cls_method_context_t hctx, bufferlist *in,
6051 bufferlist *out) {
6052 std::string start_after;
6053 uint64_t max_return;
6054 try {
6055 auto iter = in->cbegin();
6056 decode(start_after, iter);
6057 decode(max_return, iter);
6058 } catch (const buffer::error &err) {
6059 return -EINVAL;
6060 }
6061
6062 map<std::string, entity_inst_t> instances;
6063 int r = mirror::image_instance_list(hctx, start_after, max_return,
6064 &instances);
6065 if (r < 0) {
6066 return r;
6067 }
6068
6069 encode(instances, *out, cls_get_features(hctx));
6070 return 0;
6071 }
6072
6073 /**
6074 * Input:
6075 * none
6076 *
6077 * Output:
6078 * @param std::vector<std::string>: instance ids
6079 * @returns 0 on success, negative error code on failure
6080 */
6081 int mirror_instances_list(cls_method_context_t hctx, bufferlist *in,
6082 bufferlist *out) {
6083 std::vector<std::string> instance_ids;
6084
6085 int r = mirror::instances_list(hctx, &instance_ids);
6086 if (r < 0) {
6087 return r;
6088 }
6089
6090 encode(instance_ids, *out);
6091 return 0;
6092 }
6093
6094 /**
6095 * Input:
6096 * @param instance_id (std::string)
6097 *
6098 * Output:
6099 * @returns 0 on success, negative error code on failure
6100 */
6101 int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
6102 bufferlist *out) {
6103 std::string instance_id;
6104 try {
6105 auto iter = in->cbegin();
6106 decode(instance_id, iter);
6107 } catch (const buffer::error &err) {
6108 return -EINVAL;
6109 }
6110
6111 int r = mirror::instances_add(hctx, instance_id);
6112 if (r < 0) {
6113 return r;
6114 }
6115 return 0;
6116 }
6117
6118 /**
6119 * Input:
6120 * @param instance_id (std::string)
6121 *
6122 * Output:
6123 * @returns 0 on success, negative error code on failure
6124 */
6125 int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
6126 bufferlist *out) {
6127 std::string instance_id;
6128 try {
6129 auto iter = in->cbegin();
6130 decode(instance_id, iter);
6131 } catch (const buffer::error &err) {
6132 return -EINVAL;
6133 }
6134
6135 int r = mirror::instances_remove(hctx, instance_id);
6136 if (r < 0) {
6137 return r;
6138 }
6139 return 0;
6140 }
6141
6142 /**
6143 * Input:
6144 * @param start_after: key to start after
6145 * @param max_return: max return items
6146 *
6147 * Output:
6148 * @param std::map<std::string, cls::rbd::MirrorImageMap>: image mapping
6149 * @returns 0 on success, negative error code on failure
6150 */
6151 int mirror_image_map_list(cls_method_context_t hctx, bufferlist *in,
6152 bufferlist *out) {
6153 std::string start_after;
6154 uint64_t max_return;
6155 try {
6156 auto it = in->cbegin();
6157 decode(start_after, it);
6158 decode(max_return, it);
6159 } catch (const buffer::error &err) {
6160 return -EINVAL;
6161 }
6162
6163 std::map<std::string, cls::rbd::MirrorImageMap> image_mapping;
6164 int r = mirror::mirror_image_map_list(hctx, start_after, max_return, &image_mapping);
6165 if (r < 0) {
6166 return r;
6167 }
6168
6169 encode(image_mapping, *out);
6170 return 0;
6171 }
6172
6173 /**
6174 * Input:
6175 * @param global_image_id: global image id
6176 * @param image_map: image map
6177 *
6178 * Output:
6179 * @returns 0 on success, negative error code on failure
6180 */
6181 int mirror_image_map_update(cls_method_context_t hctx, bufferlist *in,
6182 bufferlist *out) {
6183 std::string global_image_id;
6184 cls::rbd::MirrorImageMap image_map;
6185
6186 try {
6187 auto it = in->cbegin();
6188 decode(global_image_id, it);
6189 decode(image_map, it);
6190 } catch (const buffer::error &err) {
6191 return -EINVAL;
6192 }
6193
6194 bufferlist bl;
6195 encode(image_map, bl);
6196
6197 const std::string key = mirror::mirror_image_map_key(global_image_id);
6198 int r = cls_cxx_map_set_val(hctx, key, &bl);
6199 if (r < 0) {
6200 CLS_ERR("error updating image map %s: %s", key.c_str(),
6201 cpp_strerror(r).c_str());
6202 return r;
6203 }
6204
6205 return 0;
6206 }
6207
6208 /**
6209 * Input:
6210 * @param global_image_id: global image id
6211 *
6212 * Output:
6213 * @returns 0 on success, negative error code on failure
6214 */
6215 int mirror_image_map_remove(cls_method_context_t hctx, bufferlist *in,
6216 bufferlist *out) {
6217 std::string global_image_id;
6218
6219 try {
6220 auto it = in->cbegin();
6221 decode(global_image_id, it);
6222 } catch (const buffer::error &err) {
6223 return -EINVAL;
6224 }
6225
6226 const std::string key = mirror::mirror_image_map_key(global_image_id);
6227 int r = cls_cxx_map_remove_key(hctx, key);
6228 if (r < 0 && r != -ENOENT) {
6229 CLS_ERR("error removing image map %s: %s", key.c_str(),
6230 cpp_strerror(r).c_str());
6231 return r;
6232 }
6233
6234 return 0;
6235 }
6236
6237 namespace group {
6238
6239 /********************** methods for rbd_group_directory ***********************/
6240
6241 int dir_add(cls_method_context_t hctx,
6242 const string &name, const string &id,
6243 bool check_for_unique_id)
6244 {
6245 if (!name.size() || !is_valid_id(id)) {
6246 CLS_ERR("invalid group name '%s' or id '%s'",
6247 name.c_str(), id.c_str());
6248 return -EINVAL;
6249 }
6250
6251 CLS_LOG(20, "dir_add name=%s id=%s", name.c_str(), id.c_str());
6252
6253 string name_key = dir_key_for_name(name);
6254 string id_key = dir_key_for_id(id);
6255 string tmp;
6256 int r = read_key(hctx, name_key, &tmp);
6257 if (r != -ENOENT) {
6258 CLS_LOG(10, "name already exists");
6259 return -EEXIST;
6260 }
6261 r = read_key(hctx, id_key, &tmp);
6262 if (r != -ENOENT && check_for_unique_id) {
6263 CLS_LOG(10, "id already exists");
6264 return -EBADF;
6265 }
6266 bufferlist id_bl, name_bl;
6267 encode(id, id_bl);
6268 encode(name, name_bl);
6269 map<string, bufferlist> omap_vals;
6270 omap_vals[name_key] = id_bl;
6271 omap_vals[id_key] = name_bl;
6272 return cls_cxx_map_set_vals(hctx, &omap_vals);
6273 }
6274
6275 int dir_remove(cls_method_context_t hctx,
6276 const string &name, const string &id)
6277 {
6278 CLS_LOG(20, "dir_remove name=%s id=%s", name.c_str(), id.c_str());
6279
6280 string name_key = dir_key_for_name(name);
6281 string id_key = dir_key_for_id(id);
6282 string stored_name, stored_id;
6283
6284 int r = read_key(hctx, name_key, &stored_id);
6285 if (r < 0) {
6286 if (r != -ENOENT)
6287 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
6288 return r;
6289 }
6290 r = read_key(hctx, id_key, &stored_name);
6291 if (r < 0) {
6292 if (r != -ENOENT)
6293 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
6294 return r;
6295 }
6296
6297 // check if this op raced with a rename
6298 if (stored_name != name || stored_id != id) {
6299 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
6300 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
6301 return -ESTALE;
6302 }
6303
6304 r = cls_cxx_map_remove_key(hctx, name_key);
6305 if (r < 0) {
6306 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
6307 return r;
6308 }
6309
6310 r = cls_cxx_map_remove_key(hctx, id_key);
6311 if (r < 0) {
6312 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
6313 return r;
6314 }
6315
6316 return 0;
6317 }
6318
6319 static const string RBD_GROUP_SNAP_KEY_PREFIX = "snapshot_";
6320
6321 std::string snap_key(const std::string &snap_id) {
6322 ostringstream oss;
6323 oss << RBD_GROUP_SNAP_KEY_PREFIX << snap_id;
6324 return oss.str();
6325 }
6326
6327 int snap_list(cls_method_context_t hctx, cls::rbd::GroupSnapshot start_after,
6328 uint64_t max_return,
6329 std::vector<cls::rbd::GroupSnapshot> *group_snaps)
6330 {
6331 int max_read = RBD_MAX_KEYS_READ;
6332 std::map<string, bufferlist> vals;
6333 string last_read = snap_key(start_after.id);
6334
6335 group_snaps->clear();
6336
6337 bool more;
6338 do {
6339 int r = cls_cxx_map_get_vals(hctx, last_read,
6340 RBD_GROUP_SNAP_KEY_PREFIX,
6341 max_read, &vals, &more);
6342 if (r < 0)
6343 return r;
6344
6345 for (map<string, bufferlist>::iterator it = vals.begin();
6346 it != vals.end() && group_snaps->size() < max_return; ++it) {
6347
6348 auto iter = it->second.cbegin();
6349 cls::rbd::GroupSnapshot snap;
6350 try {
6351 decode(snap, iter);
6352 } catch (const buffer::error &err) {
6353 CLS_ERR("error decoding snapshot: %s", it->first.c_str());
6354 return -EIO;
6355 }
6356 CLS_LOG(20, "Discovered snapshot %s %s",
6357 snap.name.c_str(),
6358 snap.id.c_str());
6359 group_snaps->push_back(snap);
6360 }
6361
6362 } while (more && (group_snaps->size() < max_return));
6363
6364 return 0;
6365 }
6366
6367 static int check_duplicate_snap_name(cls_method_context_t hctx,
6368 const std::string &snap_name,
6369 const std::string &snap_id)
6370 {
6371 const int max_read = 1024;
6372 cls::rbd::GroupSnapshot snap_last;
6373 std::vector<cls::rbd::GroupSnapshot> page;
6374
6375 for (;;) {
6376 int r = snap_list(hctx, snap_last, max_read, &page);
6377 if (r < 0) {
6378 return r;
6379 }
6380 for (auto& snap: page) {
6381 if (snap.name == snap_name && snap.id != snap_id) {
6382 return -EEXIST;
6383 }
6384 }
6385
6386 if (page.size() < max_read) {
6387 break;
6388 }
6389
6390 snap_last = *page.rbegin();
6391 }
6392
6393 return 0;
6394 }
6395
6396 } // namespace group
6397
6398 /**
6399 * List groups from the directory.
6400 *
6401 * Input:
6402 * @param start_after (std::string)
6403 * @param max_return (int64_t)
6404 *
6405 * Output:
6406 * @param map of groups (name, id)
6407 * @return 0 on success, negative error code on failure
6408 */
6409 int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
6410 {
6411 string start_after;
6412 uint64_t max_return;
6413
6414 try {
6415 auto iter = in->cbegin();
6416 decode(start_after, iter);
6417 decode(max_return, iter);
6418 } catch (const buffer::error &err) {
6419 return -EINVAL;
6420 }
6421
6422 int max_read = RBD_MAX_KEYS_READ;
6423 bool more = true;
6424 map<string, string> groups;
6425 string last_read = dir_key_for_name(start_after);
6426
6427 while (more && groups.size() < max_return) {
6428 map<string, bufferlist> vals;
6429 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
6430 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
6431 max_read, &vals, &more);
6432 if (r < 0) {
6433 if (r != -ENOENT) {
6434 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
6435 }
6436 return r;
6437 }
6438
6439 for (pair<string, bufferlist> val: vals) {
6440 string id;
6441 auto iter = val.second.cbegin();
6442 try {
6443 decode(id, iter);
6444 } catch (const buffer::error &err) {
6445 CLS_ERR("could not decode id of group '%s'", val.first.c_str());
6446 return -EIO;
6447 }
6448 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
6449 groups[dir_name_from_key(val.first)] = id;
6450 if (groups.size() >= max_return)
6451 break;
6452 }
6453 if (!vals.empty()) {
6454 last_read = dir_key_for_name(groups.rbegin()->first);
6455 }
6456 }
6457
6458 encode(groups, *out);
6459
6460 return 0;
6461 }
6462
6463 /**
6464 * Add a group to the directory.
6465 *
6466 * Input:
6467 * @param name (std::string)
6468 * @param id (std::string)
6469 *
6470 * Output:
6471 * @return 0 on success, negative error code on failure
6472 */
6473 int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
6474 {
6475 int r = cls_cxx_create(hctx, false);
6476
6477 if (r < 0) {
6478 CLS_ERR("could not create group directory: %s",
6479 cpp_strerror(r).c_str());
6480 return r;
6481 }
6482
6483 string name, id;
6484 try {
6485 auto iter = in->cbegin();
6486 decode(name, iter);
6487 decode(id, iter);
6488 } catch (const buffer::error &err) {
6489 return -EINVAL;
6490 }
6491
6492 return group::dir_add(hctx, name, id, true);
6493 }
6494
6495 /**
6496 * Rename a group to the directory.
6497 *
6498 * Input:
6499 * @param src original name of the group (std::string)
6500 * @param dest new name of the group (std::string)
6501 * @param id the id of the group (std::string)
6502 *
6503 * Output:
6504 * @return 0 on success, negative error code on failure
6505 */
6506 int group_dir_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
6507 {
6508 string src, dest, id;
6509 try {
6510 auto iter = in->cbegin();
6511 decode(src, iter);
6512 decode(dest, iter);
6513 decode(id, iter);
6514 } catch (const buffer::error &err) {
6515 return -EINVAL;
6516 }
6517
6518 int r = group::dir_remove(hctx, src, id);
6519 if (r < 0)
6520 return r;
6521
6522 return group::dir_add(hctx, dest, id, false);
6523 }
6524
6525 /**
6526 * Remove a group from the directory.
6527 *
6528 * Input:
6529 * @param name (std::string)
6530 * @param id (std::string)
6531 *
6532 * Output:
6533 * @return 0 on success, negative error code on failure
6534 */
6535 int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
6536 {
6537 string name, id;
6538 try {
6539 auto iter = in->cbegin();
6540 decode(name, iter);
6541 decode(id, iter);
6542 } catch (const buffer::error &err) {
6543 return -EINVAL;
6544 }
6545
6546 return group::dir_remove(hctx, name, id);
6547 }
6548
6549 /**
6550 * Set state of an image in the group.
6551 *
6552 * Input:
6553 * @param image_status (cls::rbd::GroupImageStatus)
6554 *
6555 * Output:
6556 * @return 0 on success, negative error code on failure
6557 */
6558 int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
6559 {
6560 CLS_LOG(20, "group_image_set");
6561
6562 cls::rbd::GroupImageStatus st;
6563 try {
6564 auto iter = in->cbegin();
6565 decode(st, iter);
6566 } catch (const buffer::error &err) {
6567 return -EINVAL;
6568 }
6569
6570 string image_key = st.spec.image_key();
6571
6572 bufferlist image_val_bl;
6573 encode(st.state, image_val_bl);
6574 int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
6575 if (r < 0) {
6576 return r;
6577 }
6578
6579 return 0;
6580 }
6581
6582 /**
6583 * Remove reference to an image from the group.
6584 *
6585 * Input:
6586 * @param spec (cls::rbd::GroupImageSpec)
6587 *
6588 * Output:
6589 * @return 0 on success, negative error code on failure
6590 */
6591 int group_image_remove(cls_method_context_t hctx,
6592 bufferlist *in, bufferlist *out)
6593 {
6594 CLS_LOG(20, "group_image_remove");
6595 cls::rbd::GroupImageSpec spec;
6596 try {
6597 auto iter = in->cbegin();
6598 decode(spec, iter);
6599 } catch (const buffer::error &err) {
6600 return -EINVAL;
6601 }
6602
6603 string image_key = spec.image_key();
6604
6605 int r = cls_cxx_map_remove_key(hctx, image_key);
6606 if (r < 0) {
6607 CLS_ERR("error removing image from group: %s", cpp_strerror(r).c_str());
6608 return r;
6609 }
6610
6611 return 0;
6612 }
6613
6614 /*
6615 * List images in the group.
6616 *
6617 * Input:
6618 * @param start_after which name to begin listing after
6619 * (use the empty string to start at the beginning)
6620 * @param max_return the maximum number of names to list
6621 *
6622 * Output:
6623 * @param tuples of descriptions of the images: image_id, pool_id, image reference state.
6624 * @return 0 on success, negative error code on failure
6625 */
6626 int group_image_list(cls_method_context_t hctx,
6627 bufferlist *in, bufferlist *out)
6628 {
6629 CLS_LOG(20, "group_image_list");
6630 cls::rbd::GroupImageSpec start_after;
6631 uint64_t max_return;
6632 try {
6633 auto iter = in->cbegin();
6634 decode(start_after, iter);
6635 decode(max_return, iter);
6636 } catch (const buffer::error &err) {
6637 return -EINVAL;
6638 }
6639
6640 int max_read = RBD_MAX_KEYS_READ;
6641 std::map<string, bufferlist> vals;
6642 string last_read = start_after.image_key();
6643 std::vector<cls::rbd::GroupImageStatus> res;
6644 bool more;
6645 do {
6646 int r = cls_cxx_map_get_vals(hctx, last_read,
6647 cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
6648 max_read, &vals, &more);
6649 if (r < 0)
6650 return r;
6651
6652 for (map<string, bufferlist>::iterator it = vals.begin();
6653 it != vals.end() && res.size() < max_return; ++it) {
6654
6655 auto iter = it->second.cbegin();
6656 cls::rbd::GroupImageLinkState state;
6657 try {
6658 decode(state, iter);
6659 } catch (const buffer::error &err) {
6660 CLS_ERR("error decoding state for image: %s", it->first.c_str());
6661 return -EIO;
6662 }
6663 cls::rbd::GroupImageSpec spec;
6664 int r = cls::rbd::GroupImageSpec::from_key(it->first, &spec);
6665 if (r < 0)
6666 return r;
6667
6668 CLS_LOG(20, "Discovered image %s %" PRId64 " %d", spec.image_id.c_str(),
6669 spec.pool_id,
6670 (int)state);
6671 res.push_back(cls::rbd::GroupImageStatus(spec, state));
6672 }
6673 if (res.size() > 0) {
6674 last_read = res.rbegin()->spec.image_key();
6675 }
6676
6677 } while (more && (res.size() < max_return));
6678 encode(res, *out);
6679
6680 return 0;
6681 }
6682
6683 /**
6684 * Reference the group this image belongs to.
6685 *
6686 * Input:
6687 * @param group_id (std::string)
6688 * @param pool_id (int64_t)
6689 *
6690 * Output:
6691 * @return 0 on success, negative error code on failure
6692 */
6693 int image_group_add(cls_method_context_t hctx,
6694 bufferlist *in, bufferlist *out)
6695 {
6696 CLS_LOG(20, "image_group_add");
6697 cls::rbd::GroupSpec new_group;
6698 try {
6699 auto iter = in->cbegin();
6700 decode(new_group, iter);
6701 } catch (const buffer::error &err) {
6702 return -EINVAL;
6703 }
6704
6705 bufferlist existing_refbl;
6706
6707 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
6708 if (r == 0) {
6709 // If we are trying to link this image to the same group then return
6710 // success. If this image already belongs to another group then abort.
6711 cls::rbd::GroupSpec old_group;
6712 try {
6713 auto iter = existing_refbl.cbegin();
6714 decode(old_group, iter);
6715 } catch (const buffer::error &err) {
6716 return -EINVAL;
6717 }
6718
6719 if ((old_group.group_id != new_group.group_id) ||
6720 (old_group.pool_id != new_group.pool_id)) {
6721 return -EEXIST;
6722 } else {
6723 return 0; // In this case the values are already correct
6724 }
6725 } else if (r < 0 && r != -ENOENT) {
6726 // No entry means this image is not a member of any group.
6727 return r;
6728 }
6729
6730 r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_GROUP,
6731 RBD_OPERATION_FEATURE_GROUP);
6732 if (r < 0) {
6733 return r;
6734 }
6735
6736 bufferlist refbl;
6737 encode(new_group, refbl);
6738 r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
6739 if (r < 0) {
6740 return r;
6741 }
6742
6743 return 0;
6744 }
6745
6746 /**
6747 * Remove image's pointer to the group.
6748 *
6749 * Input:
6750 * @param cg_id (std::string)
6751 * @param pool_id (int64_t)
6752 *
6753 * Output:
6754 * @return 0 on success, negative error code on failure
6755 */
6756 int image_group_remove(cls_method_context_t hctx,
6757 bufferlist *in,
6758 bufferlist *out)
6759 {
6760 CLS_LOG(20, "image_group_remove");
6761 cls::rbd::GroupSpec spec;
6762 try {
6763 auto iter = in->cbegin();
6764 decode(spec, iter);
6765 } catch (const buffer::error &err) {
6766 return -EINVAL;
6767 }
6768
6769 bufferlist refbl;
6770 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
6771 if (r < 0) {
6772 return r;
6773 }
6774
6775 cls::rbd::GroupSpec ref_spec;
6776 auto iter = refbl.cbegin();
6777 try {
6778 decode(ref_spec, iter);
6779 } catch (const buffer::error &err) {
6780 return -EINVAL;
6781 }
6782
6783 if (ref_spec.pool_id != spec.pool_id || ref_spec.group_id != spec.group_id) {
6784 return -EBADF;
6785 }
6786
6787 r = cls_cxx_map_remove_key(hctx, RBD_GROUP_REF);
6788 if (r < 0) {
6789 return r;
6790 }
6791
6792 r = image::set_op_features(hctx, 0, RBD_OPERATION_FEATURE_GROUP);
6793 if (r < 0) {
6794 return r;
6795 }
6796
6797 return 0;
6798 }
6799
6800 /**
6801 * Retrieve the id and pool of the group this image belongs to.
6802 *
6803 * Input:
6804 * none
6805 *
6806 * Output:
6807 * @param GroupSpec
6808 * @return 0 on success, negative error code on failure
6809 */
6810 int image_group_get(cls_method_context_t hctx,
6811 bufferlist *in, bufferlist *out)
6812 {
6813 CLS_LOG(20, "image_group_get");
6814 bufferlist refbl;
6815 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
6816 if (r < 0 && r != -ENOENT) {
6817 return r;
6818 }
6819
6820 cls::rbd::GroupSpec spec;
6821
6822 if (r != -ENOENT) {
6823 auto iter = refbl.cbegin();
6824 try {
6825 decode(spec, iter);
6826 } catch (const buffer::error &err) {
6827 return -EINVAL;
6828 }
6829 }
6830
6831 encode(spec, *out);
6832 return 0;
6833 }
6834
6835 /**
6836 * Save initial snapshot record.
6837 *
6838 * Input:
6839 * @param GroupSnapshot
6840 *
6841 * Output:
6842 * @return 0 on success, negative error code on failure
6843 */
6844 int group_snap_set(cls_method_context_t hctx,
6845 bufferlist *in, bufferlist *out)
6846 {
6847 CLS_LOG(20, "group_snap_set");
6848 cls::rbd::GroupSnapshot group_snap;
6849 try {
6850 auto iter = in->cbegin();
6851 decode(group_snap, iter);
6852 } catch (const buffer::error &err) {
6853 return -EINVAL;
6854 }
6855
6856 if (group_snap.name.empty()) {
6857 CLS_ERR("group snapshot name is empty");
6858 return -EINVAL;
6859 }
6860 if (group_snap.id.empty()) {
6861 CLS_ERR("group snapshot id is empty");
6862 return -EINVAL;
6863 }
6864
6865 int r = group::check_duplicate_snap_name(hctx, group_snap.name,
6866 group_snap.id);
6867 if (r < 0) {
6868 return r;
6869 }
6870
6871 std::string key = group::snap_key(group_snap.id);
6872 if (group_snap.state == cls::rbd::GROUP_SNAPSHOT_STATE_INCOMPLETE) {
6873 bufferlist snap_bl;
6874 r = cls_cxx_map_get_val(hctx, key, &snap_bl);
6875 if (r < 0 && r != -ENOENT) {
6876 return r;
6877 } else if (r >= 0) {
6878 return -EEXIST;
6879 }
6880 }
6881
6882 bufferlist obl;
6883 encode(group_snap, obl);
6884 r = cls_cxx_map_set_val(hctx, key, &obl);
6885 return r;
6886 }
6887
6888 /**
6889 * Remove snapshot record.
6890 *
6891 * Input:
6892 * @param id Snapshot id
6893 *
6894 * Output:
6895 * @return 0 on success, negative error code on failure
6896 */
6897 int group_snap_remove(cls_method_context_t hctx,
6898 bufferlist *in, bufferlist *out)
6899 {
6900 CLS_LOG(20, "group_snap_remove");
6901 std::string snap_id;
6902 try {
6903 auto iter = in->cbegin();
6904 decode(snap_id, iter);
6905 } catch (const buffer::error &err) {
6906 return -EINVAL;
6907 }
6908
6909 std::string snap_key = group::snap_key(snap_id);
6910
6911 CLS_LOG(20, "removing snapshot with key %s", snap_key.c_str());
6912 int r = cls_cxx_map_remove_key(hctx, snap_key);
6913 return r;
6914 }
6915
6916 /**
6917 * Get group's snapshot by id.
6918 *
6919 * Input:
6920 * @param snapshot_id the id of the snapshot to look for.
6921 *
6922 * Output:
6923 * @param GroupSnapshot the requested snapshot
6924 * @return 0 on success, negative error code on failure
6925 */
6926 int group_snap_get_by_id(cls_method_context_t hctx,
6927 bufferlist *in, bufferlist *out)
6928 {
6929 CLS_LOG(20, "group_snap_get_by_id");
6930
6931 std::string snap_id;
6932 try {
6933 auto iter = in->cbegin();
6934 decode(snap_id, iter);
6935 } catch (const buffer::error &err) {
6936 return -EINVAL;
6937 }
6938
6939 bufferlist snapbl;
6940
6941 int r = cls_cxx_map_get_val(hctx, group::snap_key(snap_id), &snapbl);
6942 if (r < 0) {
6943 return r;
6944 }
6945
6946 cls::rbd::GroupSnapshot group_snap;
6947 auto iter = snapbl.cbegin();
6948 try {
6949 decode(group_snap, iter);
6950 } catch (const buffer::error &err) {
6951 CLS_ERR("error decoding snapshot: %s", snap_id.c_str());
6952 return -EIO;
6953 }
6954
6955 encode(group_snap, *out);
6956
6957 return 0;
6958 }
6959
6960 /**
6961 * List group's snapshots.
6962 *
6963 * Input:
6964 * @param start_after which name to begin listing after
6965 * (use the empty string to start at the beginning)
6966 * @param max_return the maximum number of snapshots to list
6967 *
6968 * Output:
6969 * @param list of snapshots
6970 * @return 0 on success, negative error code on failure
6971 */
6972 int group_snap_list(cls_method_context_t hctx,
6973 bufferlist *in, bufferlist *out)
6974 {
6975 CLS_LOG(20, "group_snap_list");
6976
6977 cls::rbd::GroupSnapshot start_after;
6978 uint64_t max_return;
6979 try {
6980 auto iter = in->cbegin();
6981 decode(start_after, iter);
6982 decode(max_return, iter);
6983 } catch (const buffer::error &err) {
6984 return -EINVAL;
6985 }
6986 std::vector<cls::rbd::GroupSnapshot> group_snaps;
6987 group::snap_list(hctx, start_after, max_return, &group_snaps);
6988
6989 encode(group_snaps, *out);
6990
6991 return 0;
6992 }
6993
6994 namespace trash {
6995
6996 static const std::string IMAGE_KEY_PREFIX("id_");
6997
6998 std::string image_key(const std::string &image_id) {
6999 return IMAGE_KEY_PREFIX + image_id;
7000 }
7001
7002 std::string image_id_from_key(const std::string &key) {
7003 return key.substr(IMAGE_KEY_PREFIX.size());
7004 }
7005
7006 } // namespace trash
7007
7008 /**
7009 * Add an image entry to the rbd trash. Creates the trash object if
7010 * needed, and stores the trash spec information of the deleted image.
7011 *
7012 * Input:
7013 * @param id the id of the image
7014 * @param trash_spec the spec info of the deleted image
7015 *
7016 * Output:
7017 * @returns -EEXIST if the image id is already in the trash
7018 * @returns 0 on success, negative error code on failure
7019 */
7020 int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7021 {
7022 int r = cls_cxx_create(hctx, false);
7023 if (r < 0) {
7024 CLS_ERR("could not create trash: %s", cpp_strerror(r).c_str());
7025 return r;
7026 }
7027
7028 string id;
7029 cls::rbd::TrashImageSpec trash_spec;
7030 try {
7031 auto iter = in->cbegin();
7032 decode(id, iter);
7033 decode(trash_spec, iter);
7034 } catch (const buffer::error &err) {
7035 return -EINVAL;
7036 }
7037
7038 if (!is_valid_id(id)) {
7039 CLS_ERR("trash_add: invalid id '%s'", id.c_str());
7040 return -EINVAL;
7041 }
7042
7043 CLS_LOG(20, "trash_add id=%s", id.c_str());
7044
7045 string key = trash::image_key(id);
7046 cls::rbd::TrashImageSpec tmp;
7047 r = read_key(hctx, key, &tmp);
7048 if (r < 0 && r != -ENOENT) {
7049 CLS_ERR("could not read key %s entry from trash: %s", key.c_str(),
7050 cpp_strerror(r).c_str());
7051 return r;
7052 } else if (r == 0) {
7053 CLS_LOG(10, "id already exists");
7054 return -EEXIST;
7055 }
7056
7057 map<string, bufferlist> omap_vals;
7058 encode(trash_spec, omap_vals[key]);
7059 return cls_cxx_map_set_vals(hctx, &omap_vals);
7060 }
7061
7062 /**
7063 * Removes an image entry from the rbd trash object.
7064 * image.
7065 *
7066 * Input:
7067 * @param id the id of the image
7068 *
7069 * Output:
7070 * @returns -ENOENT if the image id does not exist in the trash
7071 * @returns 0 on success, negative error code on failure
7072 */
7073 int trash_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7074 {
7075 string id;
7076 try {
7077 auto iter = in->cbegin();
7078 decode(id, iter);
7079 } catch (const buffer::error &err) {
7080 return -EINVAL;
7081 }
7082
7083 CLS_LOG(20, "trash_remove id=%s", id.c_str());
7084
7085 string key = trash::image_key(id);
7086 bufferlist tmp;
7087 int r = cls_cxx_map_get_val(hctx, key, &tmp);
7088 if (r < 0) {
7089 if (r != -ENOENT) {
7090 CLS_ERR("error reading entry key %s: %s", key.c_str(), cpp_strerror(r).c_str());
7091 }
7092 return r;
7093 }
7094
7095 r = cls_cxx_map_remove_key(hctx, key);
7096 if (r < 0) {
7097 CLS_ERR("error removing entry: %s", cpp_strerror(r).c_str());
7098 return r;
7099 }
7100
7101 return 0;
7102 }
7103
7104 /**
7105 * Returns the list of trash spec entries registered in the rbd_trash
7106 * object.
7107 *
7108 * Input:
7109 * @param start_after which name to begin listing after
7110 * (use the empty string to start at the beginning)
7111 * @param max_return the maximum number of names to list
7112 *
7113 * Output:
7114 * @param data the map between image id and trash spec info
7115 *
7116 * @returns 0 on success, negative error code on failure
7117 */
7118 int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7119 {
7120 string start_after;
7121 uint64_t max_return;
7122
7123 try {
7124 auto iter = in->cbegin();
7125 decode(start_after, iter);
7126 decode(max_return, iter);
7127 } catch (const buffer::error &err) {
7128 return -EINVAL;
7129 }
7130
7131 map<string, cls::rbd::TrashImageSpec> data;
7132 string last_read = trash::image_key(start_after);
7133 bool more = true;
7134
7135 CLS_LOG(20, "trash_get_images");
7136 while (data.size() < max_return) {
7137 map<string, bufferlist> raw_data;
7138 int max_read = std::min<int32_t>(RBD_MAX_KEYS_READ,
7139 max_return - data.size());
7140 int r = cls_cxx_map_get_vals(hctx, last_read, trash::IMAGE_KEY_PREFIX,
7141 max_read, &raw_data, &more);
7142 if (r < 0) {
7143 if (r != -ENOENT) {
7144 CLS_ERR("failed to read the vals off of disk: %s",
7145 cpp_strerror(r).c_str());
7146 }
7147 return r;
7148 }
7149 if (raw_data.empty()) {
7150 break;
7151 }
7152
7153 map<string, bufferlist>::iterator it = raw_data.begin();
7154 for (; it != raw_data.end(); ++it) {
7155 decode(data[trash::image_id_from_key(it->first)], it->second);
7156 }
7157
7158 if (!more) {
7159 break;
7160 }
7161
7162 last_read = raw_data.rbegin()->first;
7163 }
7164
7165 encode(data, *out);
7166 return 0;
7167 }
7168
7169 /**
7170 * Returns the trash spec entry of an image registered in the rbd_trash
7171 * object.
7172 *
7173 * Input:
7174 * @param id the id of the image
7175 *
7176 * Output:
7177 * @param out the trash spec entry
7178 *
7179 * @returns 0 on success, negative error code on failure
7180 */
7181 int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7182 {
7183 string id;
7184 try {
7185 auto iter = in->cbegin();
7186 decode(id, iter);
7187 } catch (const buffer::error &err) {
7188 return -EINVAL;
7189 }
7190
7191 CLS_LOG(20, "trash_get_image id=%s", id.c_str());
7192
7193
7194 string key = trash::image_key(id);
7195 bufferlist bl;
7196 int r = cls_cxx_map_get_val(hctx, key, out);
7197 if (r < 0 && r != -ENOENT) {
7198 CLS_ERR("error reading image from trash '%s': '%s'", id.c_str(),
7199 cpp_strerror(r).c_str());
7200 }
7201 return r;
7202 }
7203
7204 /**
7205 * Set state of an image in the rbd_trash object.
7206 *
7207 * Input:
7208 * @param id the id of the image
7209 * @param trash_state the state of the image to be set
7210 * @param expect_state the expected state of the image
7211 *
7212 * Output:
7213 * @returns 0 on success, negative error code on failure
7214 */
7215 int trash_state_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7216 {
7217 string id;
7218 cls::rbd::TrashImageState trash_state;
7219 cls::rbd::TrashImageState expect_state;
7220 try {
7221 bufferlist::const_iterator iter = in->begin();
7222 decode(id, iter);
7223 decode(trash_state, iter);
7224 decode(expect_state, iter);
7225 } catch (const buffer::error &err) {
7226 return -EINVAL;
7227 }
7228
7229 CLS_LOG(20, "trash_state_set id=%s", id.c_str());
7230
7231 string key = trash::image_key(id);
7232 cls::rbd::TrashImageSpec trash_spec;
7233 int r = read_key(hctx, key, &trash_spec);
7234 if (r < 0) {
7235 if (r != -ENOENT) {
7236 CLS_ERR("Could not read trash image spec off disk: %s",
7237 cpp_strerror(r).c_str());
7238 }
7239 return r;
7240 }
7241
7242 if (trash_spec.state == expect_state) {
7243 trash_spec.state = trash_state;
7244 r = write_key(hctx, key, trash_spec);
7245 if (r < 0) {
7246 CLS_ERR("error setting trash image state: %s", cpp_strerror(r).c_str());
7247 return r;
7248 }
7249
7250 return 0;
7251 } else if (trash_spec.state == trash_state) {
7252 return 0;
7253 } else {
7254 CLS_ERR("Current trash state: %d do not match expected: %d or set: %d",
7255 trash_spec.state, expect_state, trash_state);
7256 return -ESTALE;
7257 }
7258 }
7259
7260 namespace nspace {
7261
7262 const std::string NAME_KEY_PREFIX("name_");
7263
7264 std::string key_for_name(const std::string& name) {
7265 return NAME_KEY_PREFIX + name;
7266 }
7267
7268 std::string name_from_key(const std::string &key) {
7269 return key.substr(NAME_KEY_PREFIX.size());
7270 }
7271
7272 } // namespace nspace
7273
7274 /**
7275 * Add a namespace to the namespace directory.
7276 *
7277 * Input:
7278 * @param name the name of the namespace
7279 *
7280 * Output:
7281 * @returns -EEXIST if the namespace is already exists
7282 * @returns 0 on success, negative error code on failure
7283 */
7284 int namespace_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7285 {
7286 std::string name;
7287 try {
7288 auto iter = in->cbegin();
7289 decode(name, iter);
7290 } catch (const buffer::error &err) {
7291 return -EINVAL;
7292 }
7293
7294 std::string key(nspace::key_for_name(name));
7295 bufferlist value;
7296 int r = cls_cxx_map_get_val(hctx, key, &value);
7297 if (r < 0 && r != -ENOENT) {
7298 return r;
7299 } else if (r == 0) {
7300 return -EEXIST;
7301 }
7302
7303 r = cls_cxx_map_set_val(hctx, key, &value);
7304 if (r < 0) {
7305 CLS_ERR("failed to set omap key: %s", key.c_str());
7306 return r;
7307 }
7308
7309 return 0;
7310 }
7311
7312 /**
7313 * Remove a namespace from the namespace directory.
7314 *
7315 * Input:
7316 * @param name the name of the namespace
7317 *
7318 * Output:
7319 * @returns 0 on success, negative error code on failure
7320 */
7321 int namespace_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7322 {
7323 std::string name;
7324 try {
7325 auto iter = in->cbegin();
7326 decode(name, iter);
7327 } catch (const buffer::error &err) {
7328 return -EINVAL;
7329 }
7330
7331 std::string key(nspace::key_for_name(name));
7332 bufferlist bl;
7333 int r = cls_cxx_map_get_val(hctx, key, &bl);
7334 if (r < 0) {
7335 return r;
7336 }
7337
7338 r = cls_cxx_map_remove_key(hctx, key);
7339 if (r < 0) {
7340 return r;
7341 }
7342
7343 return 0;
7344 }
7345
7346 /**
7347 * Returns the list of namespaces in the rbd_namespace object
7348 *
7349 * Input:
7350 * @param start_after which name to begin listing after
7351 * (use the empty string to start at the beginning)
7352 * @param max_return the maximum number of names to list
7353 *
7354 * Output:
7355 * @param data list of namespace names
7356 * @returns 0 on success, negative error code on failure
7357 */
7358 int namespace_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7359 {
7360 string start_after;
7361 uint64_t max_return;
7362 try {
7363 auto iter = in->cbegin();
7364 decode(start_after, iter);
7365 decode(max_return, iter);
7366 } catch (const buffer::error &err) {
7367 return -EINVAL;
7368 }
7369
7370 std::list<std::string> data;
7371 std::string last_read = nspace::key_for_name(start_after);
7372 bool more = true;
7373
7374 CLS_LOG(20, "namespace_list");
7375 while (data.size() < max_return) {
7376 std::map<std::string, bufferlist> raw_data;
7377 int max_read = std::min<int32_t>(RBD_MAX_KEYS_READ,
7378 max_return - data.size());
7379 int r = cls_cxx_map_get_vals(hctx, last_read, nspace::NAME_KEY_PREFIX,
7380 max_read, &raw_data, &more);
7381 if (r < 0) {
7382 if (r != -ENOENT) {
7383 CLS_ERR("failed to read the vals off of disk: %s",
7384 cpp_strerror(r).c_str());
7385 }
7386 return r;
7387 }
7388
7389 for (auto& it : raw_data) {
7390 data.push_back(nspace::name_from_key(it.first));
7391 }
7392
7393 if (raw_data.empty() || !more) {
7394 break;
7395 }
7396
7397 last_read = raw_data.rbegin()->first;
7398 }
7399
7400 encode(data, *out);
7401 return 0;
7402 }
7403
7404 /**
7405 * Reclaim space for zeroed extents
7406 *
7407 * Input:
7408 * @param sparse_size minimal zeroed block to sparse
7409 * @param remove_empty boolean, true if the object should be removed if empty
7410 *
7411 * Output:
7412 * @returns -ENOENT if the object does not exist or has been removed
7413 * @returns 0 on success, negative error code on failure
7414 */
7415 int sparsify(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
7416 {
7417 size_t sparse_size;
7418 bool remove_empty;
7419 try {
7420 auto iter = in->cbegin();
7421 decode(sparse_size, iter);
7422 decode(remove_empty, iter);
7423 } catch (const buffer::error &err) {
7424 return -EINVAL;
7425 }
7426
7427 int r = check_exists(hctx);
7428 if (r < 0) {
7429 return r;
7430 }
7431
7432 bufferlist bl;
7433 r = cls_cxx_read(hctx, 0, 0, &bl);
7434 if (r < 0) {
7435 CLS_ERR("failed to read data off of disk: %s", cpp_strerror(r).c_str());
7436 return r;
7437 }
7438
7439 if (bl.is_zero()) {
7440 if (remove_empty) {
7441 CLS_LOG(20, "remove");
7442 r = cls_cxx_remove(hctx);
7443 if (r < 0) {
7444 CLS_ERR("remove failed: %s", cpp_strerror(r).c_str());
7445 return r;
7446 }
7447 } else if (bl.length() > 0) {
7448 CLS_LOG(20, "truncate");
7449 bufferlist write_bl;
7450 r = cls_cxx_replace(hctx, 0, 0, &write_bl);
7451 if (r < 0) {
7452 CLS_ERR("truncate failed: %s", cpp_strerror(r).c_str());
7453 return r;
7454 }
7455 } else {
7456 CLS_LOG(20, "skip empty");
7457 }
7458 return 0;
7459 }
7460
7461 bl.rebuild(buffer::ptr_node::create(bl.length()));
7462 size_t write_offset = 0;
7463 size_t write_length = 0;
7464 size_t offset = 0;
7465 size_t length = bl.length();
7466 const auto& ptr = bl.front();
7467 bool replace = true;
7468 while (offset < length) {
7469 if (calc_sparse_extent(ptr, sparse_size, length, &write_offset,
7470 &write_length, &offset)) {
7471 if (write_offset == 0 && write_length == length) {
7472 CLS_LOG(20, "nothing to do");
7473 return 0;
7474 }
7475 CLS_LOG(20, "write%s %" PRIu64 "~%" PRIu64, (replace ? "(replace)" : ""),
7476 write_offset, write_length);
7477 bufferlist write_bl;
7478 write_bl.push_back(buffer::ptr_node::create(ptr, write_offset,
7479 write_length));
7480 if (replace) {
7481 r = cls_cxx_replace(hctx, write_offset, write_length, &write_bl);
7482 replace = false;
7483 } else {
7484 r = cls_cxx_write(hctx, write_offset, write_length, &write_bl);
7485 }
7486 if (r < 0) {
7487 CLS_ERR("write failed: %s", cpp_strerror(r).c_str());
7488 return r;
7489 }
7490 write_offset = offset;
7491 write_length = 0;
7492 }
7493 }
7494
7495 return 0;
7496 }
7497
7498 CLS_INIT(rbd)
7499 {
7500 CLS_LOG(20, "Loaded rbd class!");
7501
7502 cls_handle_t h_class;
7503 cls_method_handle_t h_create;
7504 cls_method_handle_t h_get_features;
7505 cls_method_handle_t h_set_features;
7506 cls_method_handle_t h_get_size;
7507 cls_method_handle_t h_set_size;
7508 cls_method_handle_t h_get_parent;
7509 cls_method_handle_t h_set_parent;
7510 cls_method_handle_t h_remove_parent;
7511 cls_method_handle_t h_parent_get;
7512 cls_method_handle_t h_parent_overlap_get;
7513 cls_method_handle_t h_parent_attach;
7514 cls_method_handle_t h_parent_detach;
7515 cls_method_handle_t h_get_protection_status;
7516 cls_method_handle_t h_set_protection_status;
7517 cls_method_handle_t h_get_stripe_unit_count;
7518 cls_method_handle_t h_set_stripe_unit_count;
7519 cls_method_handle_t h_get_create_timestamp;
7520 cls_method_handle_t h_get_access_timestamp;
7521 cls_method_handle_t h_get_modify_timestamp;
7522 cls_method_handle_t h_get_flags;
7523 cls_method_handle_t h_set_flags;
7524 cls_method_handle_t h_op_features_get;
7525 cls_method_handle_t h_op_features_set;
7526 cls_method_handle_t h_add_child;
7527 cls_method_handle_t h_remove_child;
7528 cls_method_handle_t h_get_children;
7529 cls_method_handle_t h_get_snapcontext;
7530 cls_method_handle_t h_get_object_prefix;
7531 cls_method_handle_t h_get_data_pool;
7532 cls_method_handle_t h_get_snapshot_name;
7533 cls_method_handle_t h_get_snapshot_timestamp;
7534 cls_method_handle_t h_snapshot_get;
7535 cls_method_handle_t h_snapshot_add;
7536 cls_method_handle_t h_snapshot_remove;
7537 cls_method_handle_t h_snapshot_rename;
7538 cls_method_handle_t h_snapshot_trash_add;
7539 cls_method_handle_t h_get_all_features;
7540 cls_method_handle_t h_get_id;
7541 cls_method_handle_t h_set_id;
7542 cls_method_handle_t h_set_modify_timestamp;
7543 cls_method_handle_t h_set_access_timestamp;
7544 cls_method_handle_t h_dir_get_id;
7545 cls_method_handle_t h_dir_get_name;
7546 cls_method_handle_t h_dir_list;
7547 cls_method_handle_t h_dir_add_image;
7548 cls_method_handle_t h_dir_remove_image;
7549 cls_method_handle_t h_dir_rename_image;
7550 cls_method_handle_t h_dir_state_assert;
7551 cls_method_handle_t h_dir_state_set;
7552 cls_method_handle_t h_object_map_load;
7553 cls_method_handle_t h_object_map_save;
7554 cls_method_handle_t h_object_map_resize;
7555 cls_method_handle_t h_object_map_update;
7556 cls_method_handle_t h_object_map_snap_add;
7557 cls_method_handle_t h_object_map_snap_remove;
7558 cls_method_handle_t h_metadata_set;
7559 cls_method_handle_t h_metadata_remove;
7560 cls_method_handle_t h_metadata_list;
7561 cls_method_handle_t h_metadata_get;
7562 cls_method_handle_t h_snapshot_get_limit;
7563 cls_method_handle_t h_snapshot_set_limit;
7564 cls_method_handle_t h_child_attach;
7565 cls_method_handle_t h_child_detach;
7566 cls_method_handle_t h_children_list;
7567 cls_method_handle_t h_migration_set;
7568 cls_method_handle_t h_migration_set_state;
7569 cls_method_handle_t h_migration_get;
7570 cls_method_handle_t h_migration_remove;
7571 cls_method_handle_t h_old_snapshots_list;
7572 cls_method_handle_t h_old_snapshot_add;
7573 cls_method_handle_t h_old_snapshot_remove;
7574 cls_method_handle_t h_old_snapshot_rename;
7575 cls_method_handle_t h_mirror_uuid_get;
7576 cls_method_handle_t h_mirror_uuid_set;
7577 cls_method_handle_t h_mirror_mode_get;
7578 cls_method_handle_t h_mirror_mode_set;
7579 cls_method_handle_t h_mirror_peer_list;
7580 cls_method_handle_t h_mirror_peer_add;
7581 cls_method_handle_t h_mirror_peer_remove;
7582 cls_method_handle_t h_mirror_peer_set_client;
7583 cls_method_handle_t h_mirror_peer_set_cluster;
7584 cls_method_handle_t h_mirror_image_list;
7585 cls_method_handle_t h_mirror_image_get_image_id;
7586 cls_method_handle_t h_mirror_image_get;
7587 cls_method_handle_t h_mirror_image_set;
7588 cls_method_handle_t h_mirror_image_remove;
7589 cls_method_handle_t h_mirror_image_status_set;
7590 cls_method_handle_t h_mirror_image_status_remove;
7591 cls_method_handle_t h_mirror_image_status_get;
7592 cls_method_handle_t h_mirror_image_status_list;
7593 cls_method_handle_t h_mirror_image_status_get_summary;
7594 cls_method_handle_t h_mirror_image_status_remove_down;
7595 cls_method_handle_t h_mirror_image_instance_get;
7596 cls_method_handle_t h_mirror_image_instance_list;
7597 cls_method_handle_t h_mirror_instances_list;
7598 cls_method_handle_t h_mirror_instances_add;
7599 cls_method_handle_t h_mirror_instances_remove;
7600 cls_method_handle_t h_mirror_image_map_list;
7601 cls_method_handle_t h_mirror_image_map_update;
7602 cls_method_handle_t h_mirror_image_map_remove;
7603 cls_method_handle_t h_group_dir_list;
7604 cls_method_handle_t h_group_dir_add;
7605 cls_method_handle_t h_group_dir_remove;
7606 cls_method_handle_t h_group_dir_rename;
7607 cls_method_handle_t h_group_image_remove;
7608 cls_method_handle_t h_group_image_list;
7609 cls_method_handle_t h_group_image_set;
7610 cls_method_handle_t h_image_group_add;
7611 cls_method_handle_t h_image_group_remove;
7612 cls_method_handle_t h_image_group_get;
7613 cls_method_handle_t h_group_snap_set;
7614 cls_method_handle_t h_group_snap_remove;
7615 cls_method_handle_t h_group_snap_get_by_id;
7616 cls_method_handle_t h_group_snap_list;
7617 cls_method_handle_t h_trash_add;
7618 cls_method_handle_t h_trash_remove;
7619 cls_method_handle_t h_trash_list;
7620 cls_method_handle_t h_trash_get;
7621 cls_method_handle_t h_trash_state_set;
7622 cls_method_handle_t h_namespace_add;
7623 cls_method_handle_t h_namespace_remove;
7624 cls_method_handle_t h_namespace_list;
7625 cls_method_handle_t h_copyup;
7626 cls_method_handle_t h_sparse_copyup;
7627 cls_method_handle_t h_assert_snapc_seq;
7628 cls_method_handle_t h_sparsify;
7629
7630 cls_register("rbd", &h_class);
7631 cls_register_cxx_method(h_class, "create",
7632 CLS_METHOD_RD | CLS_METHOD_WR,
7633 create, &h_create);
7634 cls_register_cxx_method(h_class, "get_features",
7635 CLS_METHOD_RD,
7636 get_features, &h_get_features);
7637 cls_register_cxx_method(h_class, "set_features",
7638 CLS_METHOD_RD | CLS_METHOD_WR,
7639 set_features, &h_set_features);
7640 cls_register_cxx_method(h_class, "get_size",
7641 CLS_METHOD_RD,
7642 get_size, &h_get_size);
7643 cls_register_cxx_method(h_class, "set_size",
7644 CLS_METHOD_RD | CLS_METHOD_WR,
7645 set_size, &h_set_size);
7646 cls_register_cxx_method(h_class, "get_snapcontext",
7647 CLS_METHOD_RD,
7648 get_snapcontext, &h_get_snapcontext);
7649 cls_register_cxx_method(h_class, "get_object_prefix",
7650 CLS_METHOD_RD,
7651 get_object_prefix, &h_get_object_prefix);
7652 cls_register_cxx_method(h_class, "get_data_pool", CLS_METHOD_RD,
7653 get_data_pool, &h_get_data_pool);
7654 cls_register_cxx_method(h_class, "get_snapshot_name",
7655 CLS_METHOD_RD,
7656 get_snapshot_name, &h_get_snapshot_name);
7657 cls_register_cxx_method(h_class, "get_snapshot_timestamp",
7658 CLS_METHOD_RD,
7659 get_snapshot_timestamp, &h_get_snapshot_timestamp);
7660 cls_register_cxx_method(h_class, "snapshot_get",
7661 CLS_METHOD_RD,
7662 snapshot_get, &h_snapshot_get);
7663 cls_register_cxx_method(h_class, "snapshot_add",
7664 CLS_METHOD_RD | CLS_METHOD_WR,
7665 snapshot_add, &h_snapshot_add);
7666 cls_register_cxx_method(h_class, "snapshot_remove",
7667 CLS_METHOD_RD | CLS_METHOD_WR,
7668 snapshot_remove, &h_snapshot_remove);
7669 cls_register_cxx_method(h_class, "snapshot_rename",
7670 CLS_METHOD_RD | CLS_METHOD_WR,
7671 snapshot_rename, &h_snapshot_rename);
7672 cls_register_cxx_method(h_class, "snapshot_trash_add",
7673 CLS_METHOD_RD | CLS_METHOD_WR,
7674 snapshot_trash_add, &h_snapshot_trash_add);
7675 cls_register_cxx_method(h_class, "get_all_features",
7676 CLS_METHOD_RD,
7677 get_all_features, &h_get_all_features);
7678
7679 // NOTE: deprecate v1 parent APIs after mimic EOLed
7680 cls_register_cxx_method(h_class, "get_parent",
7681 CLS_METHOD_RD,
7682 get_parent, &h_get_parent);
7683 cls_register_cxx_method(h_class, "set_parent",
7684 CLS_METHOD_RD | CLS_METHOD_WR,
7685 set_parent, &h_set_parent);
7686 cls_register_cxx_method(h_class, "remove_parent",
7687 CLS_METHOD_RD | CLS_METHOD_WR,
7688 remove_parent, &h_remove_parent);
7689
7690 cls_register_cxx_method(h_class, "parent_get",
7691 CLS_METHOD_RD, parent_get, &h_parent_get);
7692 cls_register_cxx_method(h_class, "parent_overlap_get",
7693 CLS_METHOD_RD, parent_overlap_get,
7694 &h_parent_overlap_get);
7695 cls_register_cxx_method(h_class, "parent_attach",
7696 CLS_METHOD_RD | CLS_METHOD_WR,
7697 parent_attach, &h_parent_attach);
7698 cls_register_cxx_method(h_class, "parent_detach",
7699 CLS_METHOD_RD | CLS_METHOD_WR,
7700 parent_detach, &h_parent_detach);
7701
7702 cls_register_cxx_method(h_class, "set_protection_status",
7703 CLS_METHOD_RD | CLS_METHOD_WR,
7704 set_protection_status, &h_set_protection_status);
7705 cls_register_cxx_method(h_class, "get_protection_status",
7706 CLS_METHOD_RD,
7707 get_protection_status, &h_get_protection_status);
7708 cls_register_cxx_method(h_class, "get_stripe_unit_count",
7709 CLS_METHOD_RD,
7710 get_stripe_unit_count, &h_get_stripe_unit_count);
7711 cls_register_cxx_method(h_class, "set_stripe_unit_count",
7712 CLS_METHOD_RD | CLS_METHOD_WR,
7713 set_stripe_unit_count, &h_set_stripe_unit_count);
7714 cls_register_cxx_method(h_class, "get_create_timestamp",
7715 CLS_METHOD_RD,
7716 get_create_timestamp, &h_get_create_timestamp);
7717 cls_register_cxx_method(h_class, "get_access_timestamp",
7718 CLS_METHOD_RD,
7719 get_access_timestamp, &h_get_access_timestamp);
7720 cls_register_cxx_method(h_class, "get_modify_timestamp",
7721 CLS_METHOD_RD,
7722 get_modify_timestamp, &h_get_modify_timestamp);
7723 cls_register_cxx_method(h_class, "get_flags",
7724 CLS_METHOD_RD,
7725 get_flags, &h_get_flags);
7726 cls_register_cxx_method(h_class, "set_flags",
7727 CLS_METHOD_RD | CLS_METHOD_WR,
7728 set_flags, &h_set_flags);
7729 cls_register_cxx_method(h_class, "op_features_get", CLS_METHOD_RD,
7730 op_features_get, &h_op_features_get);
7731 cls_register_cxx_method(h_class, "op_features_set",
7732 CLS_METHOD_RD | CLS_METHOD_WR,
7733 op_features_set, &h_op_features_set);
7734 cls_register_cxx_method(h_class, "metadata_list",
7735 CLS_METHOD_RD,
7736 metadata_list, &h_metadata_list);
7737 cls_register_cxx_method(h_class, "metadata_set",
7738 CLS_METHOD_RD | CLS_METHOD_WR,
7739 metadata_set, &h_metadata_set);
7740 cls_register_cxx_method(h_class, "metadata_remove",
7741 CLS_METHOD_RD | CLS_METHOD_WR,
7742 metadata_remove, &h_metadata_remove);
7743 cls_register_cxx_method(h_class, "metadata_get",
7744 CLS_METHOD_RD,
7745 metadata_get, &h_metadata_get);
7746 cls_register_cxx_method(h_class, "snapshot_get_limit",
7747 CLS_METHOD_RD,
7748 snapshot_get_limit, &h_snapshot_get_limit);
7749 cls_register_cxx_method(h_class, "snapshot_set_limit",
7750 CLS_METHOD_RD | CLS_METHOD_WR,
7751 snapshot_set_limit, &h_snapshot_set_limit);
7752 cls_register_cxx_method(h_class, "child_attach",
7753 CLS_METHOD_RD | CLS_METHOD_WR,
7754 child_attach, &h_child_attach);
7755 cls_register_cxx_method(h_class, "child_detach",
7756 CLS_METHOD_RD | CLS_METHOD_WR,
7757 child_detach, &h_child_detach);
7758 cls_register_cxx_method(h_class, "children_list",
7759 CLS_METHOD_RD,
7760 children_list, &h_children_list);
7761 cls_register_cxx_method(h_class, "migration_set",
7762 CLS_METHOD_RD | CLS_METHOD_WR,
7763 migration_set, &h_migration_set);
7764 cls_register_cxx_method(h_class, "migration_set_state",
7765 CLS_METHOD_RD | CLS_METHOD_WR,
7766 migration_set_state, &h_migration_set_state);
7767 cls_register_cxx_method(h_class, "migration_get",
7768 CLS_METHOD_RD,
7769 migration_get, &h_migration_get);
7770 cls_register_cxx_method(h_class, "migration_remove",
7771 CLS_METHOD_RD | CLS_METHOD_WR,
7772 migration_remove, &h_migration_remove);
7773
7774 cls_register_cxx_method(h_class, "set_modify_timestamp",
7775 CLS_METHOD_RD | CLS_METHOD_WR,
7776 set_modify_timestamp, &h_set_modify_timestamp);
7777
7778 cls_register_cxx_method(h_class, "set_access_timestamp",
7779 CLS_METHOD_RD | CLS_METHOD_WR,
7780 set_access_timestamp, &h_set_access_timestamp);
7781
7782 /* methods for the rbd_children object */
7783 cls_register_cxx_method(h_class, "add_child",
7784 CLS_METHOD_RD | CLS_METHOD_WR,
7785 add_child, &h_add_child);
7786 cls_register_cxx_method(h_class, "remove_child",
7787 CLS_METHOD_RD | CLS_METHOD_WR,
7788 remove_child, &h_remove_child);
7789 cls_register_cxx_method(h_class, "get_children",
7790 CLS_METHOD_RD,
7791 get_children, &h_get_children);
7792
7793 /* methods for the rbd_id.$image_name objects */
7794 cls_register_cxx_method(h_class, "get_id",
7795 CLS_METHOD_RD,
7796 get_id, &h_get_id);
7797 cls_register_cxx_method(h_class, "set_id",
7798 CLS_METHOD_RD | CLS_METHOD_WR,
7799 set_id, &h_set_id);
7800
7801 /* methods for the rbd_directory object */
7802 cls_register_cxx_method(h_class, "dir_get_id",
7803 CLS_METHOD_RD,
7804 dir_get_id, &h_dir_get_id);
7805 cls_register_cxx_method(h_class, "dir_get_name",
7806 CLS_METHOD_RD,
7807 dir_get_name, &h_dir_get_name);
7808 cls_register_cxx_method(h_class, "dir_list",
7809 CLS_METHOD_RD,
7810 dir_list, &h_dir_list);
7811 cls_register_cxx_method(h_class, "dir_add_image",
7812 CLS_METHOD_RD | CLS_METHOD_WR,
7813 dir_add_image, &h_dir_add_image);
7814 cls_register_cxx_method(h_class, "dir_remove_image",
7815 CLS_METHOD_RD | CLS_METHOD_WR,
7816 dir_remove_image, &h_dir_remove_image);
7817 cls_register_cxx_method(h_class, "dir_rename_image",
7818 CLS_METHOD_RD | CLS_METHOD_WR,
7819 dir_rename_image, &h_dir_rename_image);
7820 cls_register_cxx_method(h_class, "dir_state_assert", CLS_METHOD_RD,
7821 dir_state_assert, &h_dir_state_assert);
7822 cls_register_cxx_method(h_class, "dir_state_set",
7823 CLS_METHOD_RD | CLS_METHOD_WR,
7824 dir_state_set, &h_dir_state_set);
7825
7826 /* methods for the rbd_object_map.$image_id object */
7827 cls_register_cxx_method(h_class, "object_map_load",
7828 CLS_METHOD_RD,
7829 object_map_load, &h_object_map_load);
7830 cls_register_cxx_method(h_class, "object_map_save",
7831 CLS_METHOD_RD | CLS_METHOD_WR,
7832 object_map_save, &h_object_map_save);
7833 cls_register_cxx_method(h_class, "object_map_resize",
7834 CLS_METHOD_RD | CLS_METHOD_WR,
7835 object_map_resize, &h_object_map_resize);
7836 cls_register_cxx_method(h_class, "object_map_update",
7837 CLS_METHOD_RD | CLS_METHOD_WR,
7838 object_map_update, &h_object_map_update);
7839 cls_register_cxx_method(h_class, "object_map_snap_add",
7840 CLS_METHOD_RD | CLS_METHOD_WR,
7841 object_map_snap_add, &h_object_map_snap_add);
7842 cls_register_cxx_method(h_class, "object_map_snap_remove",
7843 CLS_METHOD_RD | CLS_METHOD_WR,
7844 object_map_snap_remove, &h_object_map_snap_remove);
7845
7846 /* methods for the old format */
7847 cls_register_cxx_method(h_class, "snap_list",
7848 CLS_METHOD_RD,
7849 old_snapshots_list, &h_old_snapshots_list);
7850 cls_register_cxx_method(h_class, "snap_add",
7851 CLS_METHOD_RD | CLS_METHOD_WR,
7852 old_snapshot_add, &h_old_snapshot_add);
7853 cls_register_cxx_method(h_class, "snap_remove",
7854 CLS_METHOD_RD | CLS_METHOD_WR,
7855 old_snapshot_remove, &h_old_snapshot_remove);
7856 cls_register_cxx_method(h_class, "snap_rename",
7857 CLS_METHOD_RD | CLS_METHOD_WR,
7858 old_snapshot_rename, &h_old_snapshot_rename);
7859
7860 /* methods for the rbd_mirroring object */
7861 cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD,
7862 mirror_uuid_get, &h_mirror_uuid_get);
7863 cls_register_cxx_method(h_class, "mirror_uuid_set",
7864 CLS_METHOD_RD | CLS_METHOD_WR,
7865 mirror_uuid_set, &h_mirror_uuid_set);
7866 cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD,
7867 mirror_mode_get, &h_mirror_mode_get);
7868 cls_register_cxx_method(h_class, "mirror_mode_set",
7869 CLS_METHOD_RD | CLS_METHOD_WR,
7870 mirror_mode_set, &h_mirror_mode_set);
7871 cls_register_cxx_method(h_class, "mirror_peer_list", CLS_METHOD_RD,
7872 mirror_peer_list, &h_mirror_peer_list);
7873 cls_register_cxx_method(h_class, "mirror_peer_add",
7874 CLS_METHOD_RD | CLS_METHOD_WR,
7875 mirror_peer_add, &h_mirror_peer_add);
7876 cls_register_cxx_method(h_class, "mirror_peer_remove",
7877 CLS_METHOD_RD | CLS_METHOD_WR,
7878 mirror_peer_remove, &h_mirror_peer_remove);
7879 cls_register_cxx_method(h_class, "mirror_peer_set_client",
7880 CLS_METHOD_RD | CLS_METHOD_WR,
7881 mirror_peer_set_client, &h_mirror_peer_set_client);
7882 cls_register_cxx_method(h_class, "mirror_peer_set_cluster",
7883 CLS_METHOD_RD | CLS_METHOD_WR,
7884 mirror_peer_set_cluster, &h_mirror_peer_set_cluster);
7885 cls_register_cxx_method(h_class, "mirror_image_list", CLS_METHOD_RD,
7886 mirror_image_list, &h_mirror_image_list);
7887 cls_register_cxx_method(h_class, "mirror_image_get_image_id", CLS_METHOD_RD,
7888 mirror_image_get_image_id,
7889 &h_mirror_image_get_image_id);
7890 cls_register_cxx_method(h_class, "mirror_image_get", CLS_METHOD_RD,
7891 mirror_image_get, &h_mirror_image_get);
7892 cls_register_cxx_method(h_class, "mirror_image_set",
7893 CLS_METHOD_RD | CLS_METHOD_WR,
7894 mirror_image_set, &h_mirror_image_set);
7895 cls_register_cxx_method(h_class, "mirror_image_remove",
7896 CLS_METHOD_RD | CLS_METHOD_WR,
7897 mirror_image_remove, &h_mirror_image_remove);
7898 cls_register_cxx_method(h_class, "mirror_image_status_set",
7899 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
7900 mirror_image_status_set, &h_mirror_image_status_set);
7901 cls_register_cxx_method(h_class, "mirror_image_status_remove",
7902 CLS_METHOD_RD | CLS_METHOD_WR,
7903 mirror_image_status_remove,
7904 &h_mirror_image_status_remove);
7905 cls_register_cxx_method(h_class, "mirror_image_status_get", CLS_METHOD_RD,
7906 mirror_image_status_get, &h_mirror_image_status_get);
7907 cls_register_cxx_method(h_class, "mirror_image_status_list", CLS_METHOD_RD,
7908 mirror_image_status_list,
7909 &h_mirror_image_status_list);
7910 cls_register_cxx_method(h_class, "mirror_image_status_get_summary",
7911 CLS_METHOD_RD, mirror_image_status_get_summary,
7912 &h_mirror_image_status_get_summary);
7913 cls_register_cxx_method(h_class, "mirror_image_status_remove_down",
7914 CLS_METHOD_RD | CLS_METHOD_WR,
7915 mirror_image_status_remove_down,
7916 &h_mirror_image_status_remove_down);
7917 cls_register_cxx_method(h_class, "mirror_image_instance_get", CLS_METHOD_RD,
7918 mirror_image_instance_get,
7919 &h_mirror_image_instance_get);
7920 cls_register_cxx_method(h_class, "mirror_image_instance_list", CLS_METHOD_RD,
7921 mirror_image_instance_list,
7922 &h_mirror_image_instance_list);
7923 cls_register_cxx_method(h_class, "mirror_instances_list", CLS_METHOD_RD,
7924 mirror_instances_list, &h_mirror_instances_list);
7925 cls_register_cxx_method(h_class, "mirror_instances_add",
7926 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
7927 mirror_instances_add, &h_mirror_instances_add);
7928 cls_register_cxx_method(h_class, "mirror_instances_remove",
7929 CLS_METHOD_RD | CLS_METHOD_WR,
7930 mirror_instances_remove,
7931 &h_mirror_instances_remove);
7932 cls_register_cxx_method(h_class, "mirror_image_map_list",
7933 CLS_METHOD_RD, mirror_image_map_list,
7934 &h_mirror_image_map_list);
7935 cls_register_cxx_method(h_class, "mirror_image_map_update",
7936 CLS_METHOD_WR, mirror_image_map_update,
7937 &h_mirror_image_map_update);
7938 cls_register_cxx_method(h_class, "mirror_image_map_remove",
7939 CLS_METHOD_WR, mirror_image_map_remove,
7940 &h_mirror_image_map_remove);
7941
7942 /* methods for the groups feature */
7943 cls_register_cxx_method(h_class, "group_dir_list",
7944 CLS_METHOD_RD,
7945 group_dir_list, &h_group_dir_list);
7946 cls_register_cxx_method(h_class, "group_dir_add",
7947 CLS_METHOD_RD | CLS_METHOD_WR,
7948 group_dir_add, &h_group_dir_add);
7949 cls_register_cxx_method(h_class, "group_dir_remove",
7950 CLS_METHOD_RD | CLS_METHOD_WR,
7951 group_dir_remove, &h_group_dir_remove);
7952 cls_register_cxx_method(h_class, "group_dir_rename",
7953 CLS_METHOD_RD | CLS_METHOD_WR,
7954 group_dir_rename, &h_group_dir_rename);
7955 cls_register_cxx_method(h_class, "group_image_remove",
7956 CLS_METHOD_RD | CLS_METHOD_WR,
7957 group_image_remove, &h_group_image_remove);
7958 cls_register_cxx_method(h_class, "group_image_list",
7959 CLS_METHOD_RD,
7960 group_image_list, &h_group_image_list);
7961 cls_register_cxx_method(h_class, "group_image_set",
7962 CLS_METHOD_RD | CLS_METHOD_WR,
7963 group_image_set, &h_group_image_set);
7964 cls_register_cxx_method(h_class, "image_group_add",
7965 CLS_METHOD_RD | CLS_METHOD_WR,
7966 image_group_add, &h_image_group_add);
7967 cls_register_cxx_method(h_class, "image_group_remove",
7968 CLS_METHOD_RD | CLS_METHOD_WR,
7969 image_group_remove, &h_image_group_remove);
7970 cls_register_cxx_method(h_class, "image_group_get",
7971 CLS_METHOD_RD,
7972 image_group_get, &h_image_group_get);
7973 cls_register_cxx_method(h_class, "group_snap_set",
7974 CLS_METHOD_RD | CLS_METHOD_WR,
7975 group_snap_set, &h_group_snap_set);
7976 cls_register_cxx_method(h_class, "group_snap_remove",
7977 CLS_METHOD_RD | CLS_METHOD_WR,
7978 group_snap_remove, &h_group_snap_remove);
7979 cls_register_cxx_method(h_class, "group_snap_get_by_id",
7980 CLS_METHOD_RD,
7981 group_snap_get_by_id, &h_group_snap_get_by_id);
7982 cls_register_cxx_method(h_class, "group_snap_list",
7983 CLS_METHOD_RD,
7984 group_snap_list, &h_group_snap_list);
7985
7986 /* rbd_trash object methods */
7987 cls_register_cxx_method(h_class, "trash_add",
7988 CLS_METHOD_RD | CLS_METHOD_WR,
7989 trash_add, &h_trash_add);
7990 cls_register_cxx_method(h_class, "trash_remove",
7991 CLS_METHOD_RD | CLS_METHOD_WR,
7992 trash_remove, &h_trash_remove);
7993 cls_register_cxx_method(h_class, "trash_list",
7994 CLS_METHOD_RD,
7995 trash_list, &h_trash_list);
7996 cls_register_cxx_method(h_class, "trash_get",
7997 CLS_METHOD_RD,
7998 trash_get, &h_trash_get);
7999 cls_register_cxx_method(h_class, "trash_state_set",
8000 CLS_METHOD_RD | CLS_METHOD_WR,
8001 trash_state_set, &h_trash_state_set);
8002
8003 /* rbd_namespace object methods */
8004 cls_register_cxx_method(h_class, "namespace_add",
8005 CLS_METHOD_RD | CLS_METHOD_WR,
8006 namespace_add, &h_namespace_add);
8007 cls_register_cxx_method(h_class, "namespace_remove",
8008 CLS_METHOD_RD | CLS_METHOD_WR,
8009 namespace_remove, &h_namespace_remove);
8010 cls_register_cxx_method(h_class, "namespace_list", CLS_METHOD_RD,
8011 namespace_list, &h_namespace_list);
8012
8013 /* data object methods */
8014 cls_register_cxx_method(h_class, "copyup",
8015 CLS_METHOD_RD | CLS_METHOD_WR,
8016 copyup, &h_copyup);
8017 cls_register_cxx_method(h_class, "sparse_copyup",
8018 CLS_METHOD_RD | CLS_METHOD_WR,
8019 sparse_copyup, &h_sparse_copyup);
8020 cls_register_cxx_method(h_class, "assert_snapc_seq",
8021 CLS_METHOD_RD | CLS_METHOD_WR,
8022 assert_snapc_seq,
8023 &h_assert_snapc_seq);
8024 cls_register_cxx_method(h_class, "sparsify",
8025 CLS_METHOD_RD | CLS_METHOD_WR,
8026 sparsify, &h_sparsify);
8027 }
8028