1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "cls/journal/cls_journal_client.h"
5 #include "include/rados/librados.hpp"
6 #include "include/buffer.h"
7 #include "include/Context.h"
8 #include "common/Cond.h"
9 #include <errno.h>
10
11 namespace cls {
12 namespace journal {
13 namespace client {
14 using ceph::encode;
15 using ceph::decode;
16
17 namespace {
18
19 struct C_AioExec : public Context {
20 librados::IoCtx &ioctx;
21 std::string oid;
22
23 C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid)
24 : ioctx(_ioctx), oid(_oid) {
25 }
26
27 static void rados_callback(rados_completion_t c, void *arg) {
28 Context *ctx = reinterpret_cast<Context *>(arg);
29 ctx->complete(rados_aio_get_return_value(c));
30 }
31 };
32
33 struct C_ClientList : public C_AioExec {
34 std::set<cls::journal::Client> *clients;
35 Context *on_finish;
36 bufferlist outbl;
37
38 C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid,
39 std::set<cls::journal::Client> *_clients,
40 Context *_on_finish)
41 : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {}
42
43 void send(const std::string &start_after) {
44 bufferlist inbl;
45 encode(start_after, inbl);
46 encode(JOURNAL_MAX_RETURN, inbl);
47
48 librados::ObjectReadOperation op;
49 op.exec("journal", "client_list", inbl);
50
51 outbl.clear();
52 librados::AioCompletion *rados_completion =
53 librados::Rados::aio_create_completion(this, rados_callback, NULL);
54 int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
55 ceph_assert(r == 0);
56 rados_completion->release();
57 }
58
59 void complete(int r) override {
60 if (r < 0) {
61 finish(r);
62 return;
63 }
64
65 try {
66 auto iter = outbl.cbegin();
67 std::set<cls::journal::Client> partial_clients;
68 decode(partial_clients, iter);
69
70 std::string start_after;
71 if (!partial_clients.empty()) {
72 start_after = partial_clients.rbegin()->id;
73 clients->insert(partial_clients.begin(), partial_clients.end());
74 }
75
76 if (partial_clients.size() < JOURNAL_MAX_RETURN) {
77 finish(0);
78 } else {
79 send(start_after);
80 }
81 } catch (const buffer::error &err) {
82 finish(-EBADMSG);
83 }
84 }
85
86 void finish(int r) override {
87 on_finish->complete(r);
88 delete this;
89 }
90 };
91
92 struct C_ImmutableMetadata : public C_AioExec {
93 uint8_t *order;
94 uint8_t *splay_width;
95 int64_t *pool_id;
96 Context *on_finish;
97 bufferlist outbl;
98
99 C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
100 uint8_t *_order, uint8_t *_splay_width,
101 int64_t *_pool_id, Context *_on_finish)
102 : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width),
103 pool_id(_pool_id), on_finish(_on_finish) {
104 }
105
106 void send() {
107 librados::ObjectReadOperation op;
108 bufferlist inbl;
109 op.exec("journal", "get_order", inbl);
110 op.exec("journal", "get_splay_width", inbl);
111 op.exec("journal", "get_pool_id", inbl);
112
113 librados::AioCompletion *rados_completion =
114 librados::Rados::aio_create_completion(this, rados_callback, NULL);
115 int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
116 ceph_assert(r == 0);
117 rados_completion->release();
118 }
119
120 void finish(int r) override {
121 if (r == 0) {
122 try {
123 auto iter = outbl.cbegin();
124 decode(*order, iter);
125 decode(*splay_width, iter);
126 decode(*pool_id, iter);
127 } catch (const buffer::error &err) {
128 r = -EBADMSG;
129 }
130 }
131 on_finish->complete(r);
132 }
133 };
134
135 struct C_MutableMetadata : public C_AioExec {
136 uint64_t *minimum_set;
137 uint64_t *active_set;
138 C_ClientList *client_list;
139 bufferlist outbl;
140
141 C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
142 uint64_t *_minimum_set, uint64_t *_active_set,
143 C_ClientList *_client_list)
144 : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set),
145 active_set(_active_set), client_list(_client_list) {}
146
147 void send() {
148 librados::ObjectReadOperation op;
149 bufferlist inbl;
150 op.exec("journal", "get_minimum_set", inbl);
151 op.exec("journal", "get_active_set", inbl);
152
153 librados::AioCompletion *rados_completion =
154 librados::Rados::aio_create_completion(this, rados_callback, NULL);
155 int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
156 ceph_assert(r == 0);
157 rados_completion->release();
158 }
159
160 void finish(int r) override {
161 if (r == 0) {
162 try {
163 auto iter = outbl.cbegin();
164 decode(*minimum_set, iter);
165 decode(*active_set, iter);
166 client_list->send("");
167 } catch (const buffer::error &err) {
168 r = -EBADMSG;
169 }
170 }
171 if (r < 0) {
172 client_list->complete(r);
173 }
174 }
175 };
176
177
178 } // anonymous namespace
179
180 void create(librados::ObjectWriteOperation *op,
181 uint8_t order, uint8_t splay, int64_t pool_id) {
182 bufferlist bl;
183 encode(order, bl);
184 encode(splay, bl);
185 encode(pool_id, bl);
186
187 op->exec("journal", "create", bl);
188 }
189
190 int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
191 uint8_t splay, int64_t pool_id) {
192 librados::ObjectWriteOperation op;
193 create(&op, order, splay, pool_id);
194
195 int r = ioctx.operate(oid, &op);
196 if (r < 0) {
197 return r;
198 }
199 return 0;
200 }
201
202 void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
203 uint8_t *order, uint8_t *splay_width,
204 int64_t *pool_id, Context *on_finish) {
205 C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order,
206 splay_width, pool_id,
207 on_finish);
208 metadata->send();
209 }
210
211 void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
212 uint64_t *minimum_set, uint64_t *active_set,
213 std::set<cls::journal::Client> *clients,
214 Context *on_finish) {
215 C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
216 C_MutableMetadata *metadata = new C_MutableMetadata(
217 ioctx, oid, minimum_set, active_set, client_list);
218 metadata->send();
219 }
220
221 void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
222 bufferlist bl;
223 encode(object_set, bl);
224 op->exec("journal", "set_minimum_set", bl);
225 }
226
227 void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
228 bufferlist bl;
229 encode(object_set, bl);
230 op->exec("journal", "set_active_set", bl);
231 }
232
233 int get_client(librados::IoCtx &ioctx, const std::string &oid,
234 const std::string &id, cls::journal::Client *client) {
235 librados::ObjectReadOperation op;
236 get_client_start(&op, id);
237
238 bufferlist out_bl;
239 int r = ioctx.operate(oid, &op, &out_bl);
240 if (r < 0) {
241 return r;
242 }
243
244 auto iter = out_bl.cbegin();
245 r = get_client_finish(&iter, client);
246 if (r < 0) {
247 return r;
248 }
249 return 0;
250 }
251
252 void get_client_start(librados::ObjectReadOperation *op,
253 const std::string &id) {
254 bufferlist bl;
255 encode(id, bl);
256 op->exec("journal", "get_client", bl);
257 }
258
259 int get_client_finish(bufferlist::const_iterator *iter,
260 cls::journal::Client *client) {
261 try {
262 decode(*client, *iter);
263 } catch (const buffer::error &err) {
264 return -EBADMSG;
265 }
266 return 0;
267 }
268
269 int client_register(librados::IoCtx &ioctx, const std::string &oid,
270 const std::string &id, const bufferlist &data) {
271 librados::ObjectWriteOperation op;
272 client_register(&op, id, data);
273 return ioctx.operate(oid, &op);
274 }
275
276 void client_register(librados::ObjectWriteOperation *op,
277 const std::string &id, const bufferlist &data) {
278 bufferlist bl;
279 encode(id, bl);
280 encode(data, bl);
281 op->exec("journal", "client_register", bl);
282 }
283
284 int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
285 const std::string &id, const bufferlist &data) {
286 librados::ObjectWriteOperation op;
287 client_update_data(&op, id, data);
288 return ioctx.operate(oid, &op);
289 }
290
291 void client_update_data(librados::ObjectWriteOperation *op,
292 const std::string &id, const bufferlist &data) {
293 bufferlist bl;
294 encode(id, bl);
295 encode(data, bl);
296 op->exec("journal", "client_update_data", bl);
297 }
298
299 int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
300 const std::string &id, cls::journal::ClientState state) {
301 librados::ObjectWriteOperation op;
302 client_update_state(&op, id, state);
303 return ioctx.operate(oid, &op);
304 }
305
306 void client_update_state(librados::ObjectWriteOperation *op,
307 const std::string &id,
308 cls::journal::ClientState state) {
309 bufferlist bl;
310 encode(id, bl);
(1) Event overrun-buffer-val: |
Overrunning buffer pointed to by "__u8 const(static_cast<uint8_t>(state))" of 1 bytes by passing it to a function which accesses it at byte offset 7. [details] |
311 encode(static_cast<uint8_t>(state), bl);
312 op->exec("journal", "client_update_state", bl);
313 }
314
315 int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
316 const std::string &id) {
317 librados::ObjectWriteOperation op;
318 client_unregister(&op, id);
319 return ioctx.operate(oid, &op);
320 }
321
322 void client_unregister(librados::ObjectWriteOperation *op,
323 const std::string &id) {
324
325 bufferlist bl;
326 encode(id, bl);
327 op->exec("journal", "client_unregister", bl);
328 }
329
330 void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
331 const cls::journal::ObjectSetPosition &commit_position) {
332 bufferlist bl;
333 encode(id, bl);
334 encode(commit_position, bl);
335 op->exec("journal", "client_commit", bl);
336 }
337
338 int client_list(librados::IoCtx &ioctx, const std::string &oid,
339 std::set<cls::journal::Client> *clients) {
340 C_SaferCond cond;
341 client_list(ioctx, oid, clients, &cond);
342 return cond.wait();
343 }
344
345 void client_list(librados::IoCtx &ioctx, const std::string &oid,
346 std::set<cls::journal::Client> *clients, Context *on_finish) {
347 C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
348 client_list->send("");
349 }
350
351 int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
352 uint64_t *tag_tid) {
353 librados::ObjectReadOperation op;
354 get_next_tag_tid_start(&op);
355
356 bufferlist out_bl;
357 int r = ioctx.operate(oid, &op, &out_bl);
358 if (r < 0) {
359 return r;
360 }
361
362 auto iter = out_bl.cbegin();
363 r = get_next_tag_tid_finish(&iter, tag_tid);
364 if (r < 0) {
365 return r;
366 }
367 return 0;
368 }
369
370 void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
371 bufferlist bl;
372 op->exec("journal", "get_next_tag_tid", bl);
373 }
374
375 int get_next_tag_tid_finish(bufferlist::const_iterator *iter,
376 uint64_t *tag_tid) {
377 try {
378 decode(*tag_tid, *iter);
379 } catch (const buffer::error &err) {
380 return -EBADMSG;
381 }
382 return 0;
383 }
384
385 int get_tag(librados::IoCtx &ioctx, const std::string &oid,
386 uint64_t tag_tid, cls::journal::Tag *tag) {
387 librados::ObjectReadOperation op;
388 get_tag_start(&op, tag_tid);
389
390 bufferlist out_bl;
391 int r = ioctx.operate(oid, &op, &out_bl);
392 if (r < 0) {
393 return r;
394 }
395
396 auto iter = out_bl.cbegin();
397 r = get_tag_finish(&iter, tag);
398 if (r < 0) {
399 return r;
400 }
401 return 0;
402 }
403
404 void get_tag_start(librados::ObjectReadOperation *op,
405 uint64_t tag_tid) {
406 bufferlist bl;
407 encode(tag_tid, bl);
408 op->exec("journal", "get_tag", bl);
409 }
410
411 int get_tag_finish(bufferlist::const_iterator *iter, cls::journal::Tag *tag) {
412 try {
413 decode(*tag, *iter);
414 } catch (const buffer::error &err) {
415 return -EBADMSG;
416 }
417 return 0;
418 }
419
420 int tag_create(librados::IoCtx &ioctx, const std::string &oid,
421 uint64_t tag_tid, uint64_t tag_class,
422 const bufferlist &data) {
423 librados::ObjectWriteOperation op;
424 tag_create(&op, tag_tid, tag_class, data);
425 return ioctx.operate(oid, &op);
426 }
427
428 void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
429 uint64_t tag_class, const bufferlist &data) {
430 bufferlist bl;
431 encode(tag_tid, bl);
432 encode(tag_class, bl);
433 encode(data, bl);
434 op->exec("journal", "tag_create", bl);
435 }
436
437 int tag_list(librados::IoCtx &ioctx, const std::string &oid,
438 const std::string &client_id, boost::optional<uint64_t> tag_class,
439 std::set<cls::journal::Tag> *tags) {
440 tags->clear();
441 uint64_t start_after_tag_tid = 0;
442 while (true) {
443 librados::ObjectReadOperation op;
444 tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
445 tag_class);
446
447 bufferlist out_bl;
448 int r = ioctx.operate(oid, &op, &out_bl);
449 if (r < 0) {
450 return r;
451 }
452
453 auto iter = out_bl.cbegin();
454 std::set<cls::journal::Tag> decode_tags;
455 r = tag_list_finish(&iter, &decode_tags);
456 if (r < 0) {
457 return r;
458 }
459
460 tags->insert(decode_tags.begin(), decode_tags.end());
461 if (decode_tags.size() < JOURNAL_MAX_RETURN) {
462 break;
463 }
464 }
465 return 0;
466 }
467
468 void tag_list_start(librados::ObjectReadOperation *op,
469 uint64_t start_after_tag_tid, uint64_t max_return,
470 const std::string &client_id,
471 boost::optional<uint64_t> tag_class) {
472 bufferlist bl;
473 encode(start_after_tag_tid, bl);
474 encode(max_return, bl);
475 encode(client_id, bl);
476 encode(tag_class, bl);
477 op->exec("journal", "tag_list", bl);
478 }
479
480 int tag_list_finish(bufferlist::const_iterator *iter,
481 std::set<cls::journal::Tag> *tags) {
482 try {
483 decode(*tags, *iter);
484 } catch (const buffer::error &err) {
485 return -EBADMSG;
486 }
487 return 0;
488 }
489
490 void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
491 bufferlist bl;
492 encode(soft_max_size, bl);
493 op->exec("journal", "guard_append", bl);
494 }
495
496 void append(librados::ObjectWriteOperation *op, uint64_t soft_max_size,
497 bufferlist &data) {
498 bufferlist bl;
499 encode(soft_max_size, bl);
500 encode(data, bl);
501
502 op->exec("journal", "append", bl);
503 }
504
505 } // namespace client
506 } // namespace journal
507 } // namespace cls
508