1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "cls/journal/cls_journal_client.h"
5    	#include "include/rados/librados.hpp"
6    	#include "include/buffer.h"
7    	#include "include/Context.h"
8    	#include "common/Cond.h"
9    	#include <errno.h>
10   	
11   	namespace cls {
12   	namespace journal {
13   	namespace client {
14   	using ceph::encode;
15   	using ceph::decode;
16   	
17   	namespace {
18   	
19   	struct C_AioExec : public Context {
20   	  librados::IoCtx &ioctx;
21   	  std::string oid;
22   	
23   	  C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid)
24   	    : ioctx(_ioctx), oid(_oid) {
25   	  }
26   	
27   	  static void rados_callback(rados_completion_t c, void *arg) {
28   	    Context *ctx = reinterpret_cast<Context *>(arg);
29   	    ctx->complete(rados_aio_get_return_value(c));
30   	  }
31   	};
32   	
33   	struct C_ClientList : public C_AioExec {
34   	  std::set<cls::journal::Client> *clients;
35   	  Context *on_finish;
36   	  bufferlist outbl;
37   	
38   	  C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid,
39   	               std::set<cls::journal::Client> *_clients,
40   	               Context *_on_finish)
41   	    : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {}
42   	
43   	  void send(const std::string &start_after) {
44   	    bufferlist inbl;
45   	    encode(start_after, inbl);
46   	    encode(JOURNAL_MAX_RETURN, inbl);
47   	
48   	    librados::ObjectReadOperation op;
49   	    op.exec("journal", "client_list", inbl);
50   	
51   	    outbl.clear();
52   	    librados::AioCompletion *rados_completion =
53   	       librados::Rados::aio_create_completion(this, rados_callback, NULL);
54   	    int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
55   	    ceph_assert(r == 0);
56   	    rados_completion->release();
57   	  }
58   	
59   	  void complete(int r) override {
60   	    if (r < 0) {
61   	      finish(r);
62   	      return;
63   	    }
64   	
65   	    try {
66   	      auto iter = outbl.cbegin();
67   	      std::set<cls::journal::Client> partial_clients;
68   	      decode(partial_clients, iter);
69   	
70   	      std::string start_after;
71   	      if (!partial_clients.empty()) {
72   	        start_after = partial_clients.rbegin()->id;
73   	        clients->insert(partial_clients.begin(), partial_clients.end());
74   	      }
75   	
76   	      if (partial_clients.size() < JOURNAL_MAX_RETURN) {
77   	        finish(0);
78   	      } else {
79   	        send(start_after);
80   	      }
81   	    } catch (const buffer::error &err) {
82   	      finish(-EBADMSG);
83   	    }
84   	  }
85   	
86   	  void finish(int r) override {
87   	    on_finish->complete(r);
88   	    delete this;
89   	  }
90   	};
91   	
92   	struct C_ImmutableMetadata : public C_AioExec {
93   	  uint8_t *order;
94   	  uint8_t *splay_width;
95   	  int64_t *pool_id;
96   	  Context *on_finish;
97   	  bufferlist outbl;
98   	
99   	  C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
100  	                      uint8_t *_order, uint8_t *_splay_width,
101  			      int64_t *_pool_id, Context *_on_finish)
102  	    : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width),
103  	      pool_id(_pool_id), on_finish(_on_finish) {
104  	  }
105  	
106  	  void send() {
107  	    librados::ObjectReadOperation op;
108  	    bufferlist inbl;
109  	    op.exec("journal", "get_order", inbl);
110  	    op.exec("journal", "get_splay_width", inbl);
111  	    op.exec("journal", "get_pool_id", inbl);
112  	
113  	    librados::AioCompletion *rados_completion =
114  	      librados::Rados::aio_create_completion(this, rados_callback, NULL);
115  	    int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
116  	    ceph_assert(r == 0);
117  	    rados_completion->release();
118  	  }
119  	
120  	  void finish(int r) override {
121  	    if (r == 0) {
122  	      try {
123  	        auto iter = outbl.cbegin();
124  	        decode(*order, iter);
125  	        decode(*splay_width, iter);
126  	        decode(*pool_id, iter);
127  	      } catch (const buffer::error &err) {
128  	        r = -EBADMSG;
129  	      }
130  	    }
131  	    on_finish->complete(r);
132  	  }
133  	};
134  	
135  	struct C_MutableMetadata : public C_AioExec {
136  	  uint64_t *minimum_set;
137  	  uint64_t *active_set;
138  	  C_ClientList *client_list;
139  	  bufferlist outbl;
140  	
141  	  C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
142  	                    uint64_t *_minimum_set, uint64_t *_active_set,
143  	                    C_ClientList *_client_list)
144  	    : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set),
145  	      active_set(_active_set), client_list(_client_list) {}
146  	
147  	  void send() {
148  	    librados::ObjectReadOperation op;
149  	    bufferlist inbl;
150  	    op.exec("journal", "get_minimum_set", inbl);
151  	    op.exec("journal", "get_active_set", inbl);
152  	
153  	    librados::AioCompletion *rados_completion =
154  	      librados::Rados::aio_create_completion(this, rados_callback, NULL);
155  	    int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
156  	    ceph_assert(r == 0);
157  	    rados_completion->release();
158  	  }
159  	
160  	  void finish(int r) override {
161  	    if (r == 0) {
162  	      try {
163  	        auto iter = outbl.cbegin();
164  	        decode(*minimum_set, iter);
165  	        decode(*active_set, iter);
166  	        client_list->send("");
167  	      } catch (const buffer::error &err) {
168  	        r = -EBADMSG;
169  	      }
170  	    }
171  	    if (r < 0) {
172  	      client_list->complete(r);
173  	    }
174  	  }
175  	};
176  	
177  	
178  	} // anonymous namespace
179  	
180  	void create(librados::ObjectWriteOperation *op,
181  	            uint8_t order, uint8_t splay, int64_t pool_id) {
182  	  bufferlist bl;
183  	  encode(order, bl);
184  	  encode(splay, bl);
185  	  encode(pool_id, bl);
186  	
187  	  op->exec("journal", "create", bl);
188  	}
189  	
190  	int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
191  	           uint8_t splay, int64_t pool_id) {
192  	  librados::ObjectWriteOperation op;
193  	  create(&op, order, splay, pool_id);
194  	
195  	  int r = ioctx.operate(oid, &op);
196  	  if (r < 0) {
197  	    return r;
198  	  }
199  	  return 0;
200  	}
201  	
202  	void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
203  	                            uint8_t *order, uint8_t *splay_width,
204  	                            int64_t *pool_id, Context *on_finish) {
205  	  C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order,
206  	                                                          splay_width, pool_id,
207  	                                                          on_finish);
208  	  metadata->send();
209  	}
210  	
211  	void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
212  	                          uint64_t *minimum_set, uint64_t *active_set,
213  	                          std::set<cls::journal::Client> *clients,
214  	                          Context *on_finish) {
215  	  C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
216  	  C_MutableMetadata *metadata = new C_MutableMetadata(
217  	    ioctx, oid, minimum_set, active_set, client_list);
218  	  metadata->send();
219  	}
220  	
221  	void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
222  	  bufferlist bl;
223  	  encode(object_set, bl);
224  	  op->exec("journal", "set_minimum_set", bl);
225  	}
226  	
227  	void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
228  	  bufferlist bl;
229  	  encode(object_set, bl);
230  	  op->exec("journal", "set_active_set", bl);
231  	}
232  	
233  	int get_client(librados::IoCtx &ioctx, const std::string &oid,
234  	               const std::string &id, cls::journal::Client *client) {
235  	  librados::ObjectReadOperation op;
236  	  get_client_start(&op, id);
237  	
238  	  bufferlist out_bl;
239  	  int r = ioctx.operate(oid, &op, &out_bl);
240  	  if (r < 0) {
241  	    return r;
242  	  }
243  	
244  	  auto iter = out_bl.cbegin();
245  	  r = get_client_finish(&iter, client);
246  	  if (r < 0) {
247  	    return r;
248  	  }
249  	  return 0;
250  	}
251  	
252  	void get_client_start(librados::ObjectReadOperation *op,
253  	                      const std::string &id) {
254  	  bufferlist bl;
255  	  encode(id, bl);
256  	  op->exec("journal", "get_client", bl);
257  	}
258  	
259  	int get_client_finish(bufferlist::const_iterator *iter,
260  	                      cls::journal::Client *client) {
261  	  try {
262  	    decode(*client, *iter);
263  	  } catch (const buffer::error &err) {
264  	    return -EBADMSG;
265  	  }
266  	  return 0;
267  	}
268  	
269  	int client_register(librados::IoCtx &ioctx, const std::string &oid,
270  	                    const std::string &id, const bufferlist &data) {
271  	  librados::ObjectWriteOperation op;
272  	  client_register(&op, id, data);
273  	  return ioctx.operate(oid, &op);
274  	}
275  	
276  	void client_register(librados::ObjectWriteOperation *op,
277  	                     const std::string &id, const bufferlist &data) {
278  	  bufferlist bl;
279  	  encode(id, bl);
280  	  encode(data, bl);
281  	  op->exec("journal", "client_register", bl);
282  	}
283  	
284  	int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
285  	                       const std::string &id, const bufferlist &data) {
286  	  librados::ObjectWriteOperation op;
287  	  client_update_data(&op, id, data);
288  	  return ioctx.operate(oid, &op);
289  	}
290  	
291  	void client_update_data(librados::ObjectWriteOperation *op,
292  	                        const std::string &id, const bufferlist &data) {
293  	  bufferlist bl;
294  	  encode(id, bl);
295  	  encode(data, bl);
296  	  op->exec("journal", "client_update_data", bl);
297  	}
298  	
299  	int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
300  	                        const std::string &id, cls::journal::ClientState state) {
301  	  librados::ObjectWriteOperation op;
302  	  client_update_state(&op, id, state);
303  	  return ioctx.operate(oid, &op);
304  	}
305  	
306  	void client_update_state(librados::ObjectWriteOperation *op,
307  	                         const std::string &id,
308  	                         cls::journal::ClientState state) {
309  	  bufferlist bl;
310  	  encode(id, bl);
(1) Event overrun-buffer-val: Overrunning buffer pointed to by "__u8 const(static_cast<uint8_t>(state))" of 1 bytes by passing it to a function which accesses it at byte offset 7. [details]
311  	  encode(static_cast<uint8_t>(state), bl);
312  	  op->exec("journal", "client_update_state", bl);
313  	}
314  	
315  	int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
316  	                       const std::string &id) {
317  	  librados::ObjectWriteOperation op;
318  	  client_unregister(&op, id);
319  	  return ioctx.operate(oid, &op);
320  	}
321  	
322  	void client_unregister(librados::ObjectWriteOperation *op,
323  			       const std::string &id) {
324  	
325  	  bufferlist bl;
326  	  encode(id, bl);
327  	  op->exec("journal", "client_unregister", bl);
328  	}
329  	
330  	void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
331  	                   const cls::journal::ObjectSetPosition &commit_position) {
332  	  bufferlist bl;
333  	  encode(id, bl);
334  	  encode(commit_position, bl);
335  	  op->exec("journal", "client_commit", bl);
336  	}
337  	
338  	int client_list(librados::IoCtx &ioctx, const std::string &oid,
339  	                std::set<cls::journal::Client> *clients) {
340  	  C_SaferCond cond;
341  	  client_list(ioctx, oid, clients, &cond);
342  	  return cond.wait();
343  	}
344  	
345  	void client_list(librados::IoCtx &ioctx, const std::string &oid,
346  	                 std::set<cls::journal::Client> *clients, Context *on_finish) {
347  	  C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
348  	  client_list->send("");
349  	}
350  	
351  	int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
352  	                     uint64_t *tag_tid) {
353  	  librados::ObjectReadOperation op;
354  	  get_next_tag_tid_start(&op);
355  	
356  	  bufferlist out_bl;
357  	  int r = ioctx.operate(oid, &op, &out_bl);
358  	  if (r < 0) {
359  	    return r;
360  	  }
361  	
362  	  auto iter = out_bl.cbegin();
363  	  r = get_next_tag_tid_finish(&iter, tag_tid);
364  	  if (r < 0) {
365  	    return r;
366  	  }
367  	  return 0;
368  	}
369  	
370  	void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
371  	  bufferlist bl;
372  	  op->exec("journal", "get_next_tag_tid", bl);
373  	}
374  	
375  	int get_next_tag_tid_finish(bufferlist::const_iterator *iter,
376  	                            uint64_t *tag_tid) {
377  	  try {
378  	    decode(*tag_tid, *iter);
379  	  } catch (const buffer::error &err) {
380  	    return -EBADMSG;
381  	  }
382  	  return 0;
383  	}
384  	
385  	int get_tag(librados::IoCtx &ioctx, const std::string &oid,
386  	            uint64_t tag_tid, cls::journal::Tag *tag) {
387  	  librados::ObjectReadOperation op;
388  	  get_tag_start(&op, tag_tid);
389  	
390  	  bufferlist out_bl;
391  	  int r = ioctx.operate(oid, &op, &out_bl);
392  	  if (r < 0) {
393  	    return r;
394  	  }
395  	
396  	  auto iter = out_bl.cbegin();
397  	  r = get_tag_finish(&iter, tag);
398  	  if (r < 0) {
399  	    return r;
400  	  }
401  	  return 0;
402  	}
403  	
404  	void get_tag_start(librados::ObjectReadOperation *op,
405  	                   uint64_t tag_tid) {
406  	  bufferlist bl;
407  	  encode(tag_tid, bl);
408  	  op->exec("journal", "get_tag", bl);
409  	}
410  	
411  	int get_tag_finish(bufferlist::const_iterator *iter, cls::journal::Tag *tag) {
412  	  try {
413  	    decode(*tag, *iter);
414  	  } catch (const buffer::error &err) {
415  	    return -EBADMSG;
416  	  }
417  	  return 0;
418  	}
419  	
420  	int tag_create(librados::IoCtx &ioctx, const std::string &oid,
421  	               uint64_t tag_tid, uint64_t tag_class,
422  	               const bufferlist &data) {
423  	  librados::ObjectWriteOperation op;
424  	  tag_create(&op, tag_tid, tag_class, data);
425  	  return ioctx.operate(oid, &op);
426  	}
427  	
428  	void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
429  	                uint64_t tag_class, const bufferlist &data) {
430  	  bufferlist bl;
431  	  encode(tag_tid, bl);
432  	  encode(tag_class, bl);
433  	  encode(data, bl);
434  	  op->exec("journal", "tag_create", bl);
435  	}
436  	
437  	int tag_list(librados::IoCtx &ioctx, const std::string &oid,
438  	             const std::string &client_id, boost::optional<uint64_t> tag_class,
439  	             std::set<cls::journal::Tag> *tags) {
440  	  tags->clear();
441  	  uint64_t start_after_tag_tid = 0;
442  	  while (true) {
443  	    librados::ObjectReadOperation op;
444  	    tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
445  	                   tag_class);
446  	
447  	    bufferlist out_bl;
448  	    int r = ioctx.operate(oid, &op, &out_bl);
449  	    if (r < 0) {
450  	      return r;
451  	    }
452  	
453  	    auto iter = out_bl.cbegin();
454  	    std::set<cls::journal::Tag> decode_tags;
455  	    r = tag_list_finish(&iter, &decode_tags);
456  	    if (r < 0) {
457  	      return r;
458  	    }
459  	
460  	    tags->insert(decode_tags.begin(), decode_tags.end());
461  	    if (decode_tags.size() < JOURNAL_MAX_RETURN) {
462  	      break;
463  	    }
464  	  }
465  	  return 0;
466  	}
467  	
468  	void tag_list_start(librados::ObjectReadOperation *op,
469  	                    uint64_t start_after_tag_tid, uint64_t max_return,
470  	                    const std::string &client_id,
471  	                    boost::optional<uint64_t> tag_class) {
472  	  bufferlist bl;
473  	  encode(start_after_tag_tid, bl);
474  	  encode(max_return, bl);
475  	  encode(client_id, bl);
476  	  encode(tag_class, bl);
477  	  op->exec("journal", "tag_list", bl);
478  	}
479  	
480  	int tag_list_finish(bufferlist::const_iterator *iter,
481  	                    std::set<cls::journal::Tag> *tags) {
482  	  try {
483  	    decode(*tags, *iter);
484  	  } catch (const buffer::error &err) {
485  	    return -EBADMSG;
486  	  }
487  	  return 0;
488  	}
489  	
490  	void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
491  	  bufferlist bl;
492  	  encode(soft_max_size, bl);
493  	  op->exec("journal", "guard_append", bl);
494  	}
495  	
496  	void append(librados::ObjectWriteOperation *op, uint64_t soft_max_size,
497  	            bufferlist &data) {
498  	  bufferlist bl;
499  	  encode(soft_max_size, bl);
500  	  encode(data, bl);
501  	
502  	  op->exec("journal", "append", bl);
503  	}
504  	
505  	} // namespace client
506  	} // namespace journal
507  	} // namespace cls
508