1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
17
18 #include <condition_variable>
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <memory>
23 #include <sstream>
24 #include <type_traits>
25
26 #include <boost/thread/shared_mutex.hpp>
27
28 #include "include/ceph_assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
32
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/config_obs.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
39 #include "common/Finisher.h"
40 #include "common/Throttle.h"
41
42 #include "messages/MOSDOp.h"
43 #include "msg/Dispatcher.h"
44
45 #include "osd/OSDMap.h"
46
47 class Context;
48 class Messenger;
49 class MonClient;
50 class Message;
51 class Finisher;
52
53 class MPoolOpReply;
54
55 class MGetPoolStatsReply;
56 class MStatfsReply;
57 class MCommandReply;
58 class MWatchNotify;
59
60 class PerfCounters;
61
62 // -----------------------------------------
63
64 struct ObjectOperation {
65 std::vector<OSDOp> ops;
66 int flags;
67 int priority;
68
69 std::vector<ceph::buffer::list*> out_bl;
70 std::vector<Context*> out_handler;
71 std::vector<int*> out_rval;
72
73 ObjectOperation() : flags(0), priority(0) {}
74 ~ObjectOperation() {
75 while (!out_handler.empty()) {
76 delete out_handler.back();
77 out_handler.pop_back();
78 }
79 }
80
81 size_t size() {
82 return ops.size();
83 }
84
85 void set_last_op_flags(int flags) {
86 ceph_assert(!ops.empty());
87 ops.rbegin()->op.flags = flags;
88 }
89
90 class C_TwoContexts;
91 /**
92 * Add a callback to run when this operation completes,
93 * after any other callbacks for it.
94 */
95 void add_handler(Context *extra);
96
97 OSDOp& add_op(int op) {
98 int s = ops.size();
99 ops.resize(s+1);
100 ops[s].op.op = op;
101 out_bl.resize(s+1);
102 out_bl[s] = NULL;
103 out_handler.resize(s+1);
104 out_handler[s] = NULL;
105 out_rval.resize(s+1);
106 out_rval[s] = NULL;
107 return ops[s];
108 }
109 void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list& bl) {
110 OSDOp& osd_op = add_op(op);
111 osd_op.op.extent.offset = off;
112 osd_op.op.extent.length = len;
113 osd_op.indata.claim_append(bl);
114 }
115 void add_writesame(int op, uint64_t off, uint64_t write_len,
116 ceph::buffer::list& bl) {
117 OSDOp& osd_op = add_op(op);
118 osd_op.op.writesame.offset = off;
119 osd_op.op.writesame.length = write_len;
120 osd_op.op.writesame.data_length = bl.length();
121 osd_op.indata.claim_append(bl);
122 }
123 void add_xattr(int op, const char *name, const ceph::buffer::list& data) {
124 OSDOp& osd_op = add_op(op);
125 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
126 osd_op.op.xattr.value_len = data.length();
127 if (name)
128 osd_op.indata.append(name, osd_op.op.xattr.name_len);
129 osd_op.indata.append(data);
130 }
131 void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
132 uint8_t cmp_mode, const ceph::buffer::list& data) {
133 OSDOp& osd_op = add_op(op);
134 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
135 osd_op.op.xattr.value_len = data.length();
136 osd_op.op.xattr.cmp_op = cmp_op;
137 osd_op.op.xattr.cmp_mode = cmp_mode;
138 if (name)
139 osd_op.indata.append(name, osd_op.op.xattr.name_len);
140 osd_op.indata.append(data);
141 }
142 void add_call(int op, const char *cname, const char *method,
143 ceph::buffer::list &indata,
144 ceph::buffer::list *outbl, Context *ctx, int *prval) {
145 OSDOp& osd_op = add_op(op);
146
147 unsigned p = ops.size() - 1;
148 out_handler[p] = ctx;
149 out_bl[p] = outbl;
150 out_rval[p] = prval;
151
152 osd_op.op.cls.class_len = strlen(cname);
153 osd_op.op.cls.method_len = strlen(method);
154 osd_op.op.cls.indata_len = indata.length();
155 osd_op.indata.append(cname, osd_op.op.cls.class_len);
156 osd_op.indata.append(method, osd_op.op.cls.method_len);
157 osd_op.indata.append(indata);
158 }
159 void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
160 epoch_t start_epoch) {
161 using ceph::encode;
162 OSDOp& osd_op = add_op(op);
163 osd_op.op.pgls.count = count;
164 osd_op.op.pgls.start_epoch = start_epoch;
165 encode(cookie, osd_op.indata);
166 }
167 void add_pgls_filter(int op, uint64_t count, const ceph::buffer::list& filter,
168 collection_list_handle_t cookie, epoch_t start_epoch) {
169 using ceph::encode;
170 OSDOp& osd_op = add_op(op);
171 osd_op.op.pgls.count = count;
172 osd_op.op.pgls.start_epoch = start_epoch;
173 std::string cname = "pg";
174 std::string mname = "filter";
175 encode(cname, osd_op.indata);
176 encode(mname, osd_op.indata);
177 osd_op.indata.append(filter);
178 encode(cookie, osd_op.indata);
179 }
180 void add_alloc_hint(int op, uint64_t expected_object_size,
181 uint64_t expected_write_size,
182 uint32_t flags) {
183 OSDOp& osd_op = add_op(op);
184 osd_op.op.alloc_hint.expected_object_size = expected_object_size;
185 osd_op.op.alloc_hint.expected_write_size = expected_write_size;
186 osd_op.op.alloc_hint.flags = flags;
187 }
188
189 // ------
190
191 // pg
192 void pg_ls(uint64_t count, ceph::buffer::list& filter,
193 collection_list_handle_t cookie, epoch_t start_epoch) {
194 if (filter.length() == 0)
195 add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
196 else
197 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
198 start_epoch);
199 flags |= CEPH_OSD_FLAG_PGOP;
200 }
201
202 void pg_nls(uint64_t count, const ceph::buffer::list& filter,
203 collection_list_handle_t cookie, epoch_t start_epoch) {
204 if (filter.length() == 0)
205 add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
206 else
207 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
208 start_epoch);
209 flags |= CEPH_OSD_FLAG_PGOP;
210 }
211
212 void scrub_ls(const librados::object_id_t& start_after,
213 uint64_t max_to_get,
214 std::vector<librados::inconsistent_obj_t> *objects,
215 uint32_t *interval,
216 int *rval);
217 void scrub_ls(const librados::object_id_t& start_after,
218 uint64_t max_to_get,
219 std::vector<librados::inconsistent_snapset_t> *objects,
220 uint32_t *interval,
221 int *rval);
222
223 void create(bool excl) {
224 OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
225 o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
226 }
227
228 struct C_ObjectOperation_stat : public Context {
229 ceph::buffer::list bl;
230 uint64_t *psize;
231 ceph::real_time *pmtime;
232 time_t *ptime;
233 struct timespec *pts;
234 int *prval;
235 C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
236 int *prval)
237 : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
238 void finish(int r) override {
239 using ceph::decode;
240 if (r >= 0) {
241 auto p = bl.cbegin();
242 try {
243 uint64_t size;
244 ceph::real_time mtime;
245 decode(size, p);
246 decode(mtime, p);
247 if (psize)
248 *psize = size;
249 if (pmtime)
250 *pmtime = mtime;
251 if (ptime)
252 *ptime = ceph::real_clock::to_time_t(mtime);
253 if (pts)
254 *pts = ceph::real_clock::to_timespec(mtime);
255 } catch (ceph::buffer::error& e) {
256 if (prval)
257 *prval = -EIO;
258 }
259 }
260 }
261 };
262 void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
263 add_op(CEPH_OSD_OP_STAT);
264 unsigned p = ops.size() - 1;
265 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
266 prval);
267 out_bl[p] = &h->bl;
268 out_handler[p] = h;
269 out_rval[p] = prval;
270 }
271 void stat(uint64_t *psize, time_t *ptime, int *prval) {
272 add_op(CEPH_OSD_OP_STAT);
273 unsigned p = ops.size() - 1;
274 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
275 prval);
276 out_bl[p] = &h->bl;
277 out_handler[p] = h;
278 out_rval[p] = prval;
279 }
280 void stat(uint64_t *psize, struct timespec *pts, int *prval) {
281 add_op(CEPH_OSD_OP_STAT);
282 unsigned p = ops.size() - 1;
283 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
284 prval);
285 out_bl[p] = &h->bl;
286 out_handler[p] = h;
287 out_rval[p] = prval;
288 }
289 // object cmpext
290 struct C_ObjectOperation_cmpext : public Context {
291 int *prval;
292 explicit C_ObjectOperation_cmpext(int *prval)
293 : prval(prval) {}
294
295 void finish(int r) {
296 if (prval)
297 *prval = r;
298 }
299 };
300
301 void cmpext(uint64_t off, ceph::buffer::list& cmp_bl, int *prval) {
302 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
303 unsigned p = ops.size() - 1;
304 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
305 out_handler[p] = h;
306 out_rval[p] = prval;
307 }
308
309 // Used by C API
310 void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
311 ceph::buffer::list cmp_bl;
312 cmp_bl.append(cmp_buf, cmp_len);
313 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
314 unsigned p = ops.size() - 1;
315 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
316 out_handler[p] = h;
317 out_rval[p] = prval;
318 }
319
320 void read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, int *prval,
321 Context* ctx) {
322 ceph::buffer::list bl;
323 add_data(CEPH_OSD_OP_READ, off, len, bl);
324 unsigned p = ops.size() - 1;
325 out_bl[p] = pbl;
326 out_rval[p] = prval;
327 out_handler[p] = ctx;
328 }
329
330 struct C_ObjectOperation_sparse_read : public Context {
331 ceph::buffer::list bl;
332 ceph::buffer::list *data_bl;
333 std::map<uint64_t, uint64_t> *extents;
334 int *prval;
335 C_ObjectOperation_sparse_read(ceph::buffer::list *data_bl,
336 std::map<uint64_t, uint64_t> *extents,
337 int *prval)
338 : data_bl(data_bl), extents(extents), prval(prval) {}
339 void finish(int r) override {
340 using ceph::decode;
341 auto iter = bl.cbegin();
342 if (r >= 0) {
343 // NOTE: it's possible the sub-op has not been executed but the result
344 // code remains zeroed. Avoid the costly exception handling on a
345 // potential IO path.
346 if (bl.length() > 0) {
347 try {
348 decode(*extents, iter);
349 decode(*data_bl, iter);
350 } catch (ceph::buffer::error& e) {
351 if (prval)
352 *prval = -EIO;
353 }
354 } else if (prval) {
355 *prval = -EIO;
356 }
357 }
358 }
359 };
360 void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
361 ceph::buffer::list *data_bl, int *prval) {
362 ceph::buffer::list bl;
363 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
364 unsigned p = ops.size() - 1;
365 C_ObjectOperation_sparse_read *h =
366 new C_ObjectOperation_sparse_read(data_bl, m, prval);
367 out_bl[p] = &h->bl;
368 out_handler[p] = h;
369 out_rval[p] = prval;
370 }
371 void write(uint64_t off, ceph::buffer::list& bl,
372 uint64_t truncate_size,
373 uint32_t truncate_seq) {
374 add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
375 OSDOp& o = *ops.rbegin();
376 o.op.extent.truncate_size = truncate_size;
377 o.op.extent.truncate_seq = truncate_seq;
378 }
379 void write(uint64_t off, ceph::buffer::list& bl) {
380 write(off, bl, 0, 0);
381 }
382 void write_full(ceph::buffer::list& bl) {
383 add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
384 }
385 void writesame(uint64_t off, uint64_t write_len, ceph::buffer::list& bl) {
386 add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
387 }
388 void append(ceph::buffer::list& bl) {
389 add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
390 }
391 void zero(uint64_t off, uint64_t len) {
392 ceph::buffer::list bl;
393 add_data(CEPH_OSD_OP_ZERO, off, len, bl);
394 }
395 void truncate(uint64_t off) {
396 ceph::buffer::list bl;
397 add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
398 }
399 void remove() {
400 ceph::buffer::list bl;
401 add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
402 }
403 void mapext(uint64_t off, uint64_t len) {
404 ceph::buffer::list bl;
405 add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
406 }
407 void sparse_read(uint64_t off, uint64_t len) {
408 ceph::buffer::list bl;
409 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
410 }
411
412 void checksum(uint8_t type, const ceph::buffer::list &init_value_bl,
413 uint64_t off, uint64_t len, size_t chunk_size,
414 ceph::buffer::list *pbl, int *prval, Context *ctx) {
415 OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
416 osd_op.op.checksum.offset = off;
417 osd_op.op.checksum.length = len;
418 osd_op.op.checksum.type = type;
419 osd_op.op.checksum.chunk_size = chunk_size;
420 osd_op.indata.append(init_value_bl);
421
422 unsigned p = ops.size() - 1;
423 out_bl[p] = pbl;
424 out_rval[p] = prval;
425 out_handler[p] = ctx;
426 }
427
428 // object attrs
429 void getxattr(const char *name, ceph::buffer::list *pbl, int *prval) {
430 ceph::buffer::list bl;
431 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
432 unsigned p = ops.size() - 1;
433 out_bl[p] = pbl;
434 out_rval[p] = prval;
435 }
436 struct C_ObjectOperation_decodevals : public Context {
437 uint64_t max_entries;
438 ceph::buffer::list bl;
439 std::map<std::string,ceph::buffer::list> *pattrs;
440 bool *ptruncated;
441 int *prval;
442 C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,ceph::buffer::list> *pa,
443 bool *pt, int *pr)
444 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
445 if (ptruncated) {
446 *ptruncated = false;
447 }
448 }
449 void finish(int r) override {
450 using ceph::decode;
451 if (r >= 0) {
452 auto p = bl.cbegin();
453 try {
454 if (pattrs)
455 decode(*pattrs, p);
456 if (ptruncated) {
457 std::map<std::string,ceph::buffer::list> ignore;
458 if (!pattrs) {
459 decode(ignore, p);
460 pattrs = &ignore;
461 }
462 if (!p.end()) {
463 decode(*ptruncated, p);
464 } else {
465 // the OSD did not provide this. since old OSDs do not
466 // enfoce omap result limits either, we can infer it from
467 // the size of the result
468 *ptruncated = (pattrs->size() == max_entries);
469 }
470 }
471 }
472 catch (ceph::buffer::error& e) {
473 if (prval)
474 *prval = -EIO;
475 }
476 }
477 }
478 };
479 struct C_ObjectOperation_decodekeys : public Context {
480 uint64_t max_entries;
481 ceph::buffer::list bl;
482 std::set<std::string> *pattrs;
483 bool *ptruncated;
484 int *prval;
485 C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
486 int *pr)
487 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
488 if (ptruncated) {
489 *ptruncated = false;
490 }
491 }
492 void finish(int r) override {
493 if (r >= 0) {
494 using ceph::decode;
495 auto p = bl.cbegin();
496 try {
497 if (pattrs)
498 decode(*pattrs, p);
499 if (ptruncated) {
500 std::set<std::string> ignore;
501 if (!pattrs) {
502 decode(ignore, p);
503 pattrs = &ignore;
504 }
505 if (!p.end()) {
506 decode(*ptruncated, p);
507 } else {
508 // the OSD did not provide this. since old OSDs do not
509 // enforce omap result limits either, we can infer it from
510 // the size of the result
511 *ptruncated = (pattrs->size() == max_entries);
512 }
513 }
514 }
515 catch (ceph::buffer::error& e) {
516 if (prval)
517 *prval = -EIO;
518 }
519 }
520 }
521 };
522 struct C_ObjectOperation_decodewatchers : public Context {
523 ceph::buffer::list bl;
524 std::list<obj_watch_t> *pwatchers;
525 int *prval;
526 C_ObjectOperation_decodewatchers(std::list<obj_watch_t> *pw, int *pr)
527 : pwatchers(pw), prval(pr) {}
528 void finish(int r) override {
529 using ceph::decode;
530 if (r >= 0) {
531 auto p = bl.cbegin();
532 try {
533 obj_list_watch_response_t resp;
534 decode(resp, p);
535 if (pwatchers) {
536 for (const auto& watch_item : resp.entries) {
537 obj_watch_t ow;
538 std::string sa = watch_item.addr.get_legacy_str();
539 strncpy(ow.addr, sa.c_str(), sizeof(ow.addr) - 1);
540 ow.addr[sizeof(ow.addr) - 1] = '\0';
541 ow.watcher_id = watch_item.name.num();
542 ow.cookie = watch_item.cookie;
543 ow.timeout_seconds = watch_item.timeout_seconds;
544 pwatchers->push_back(std::move(ow));
545 }
546 }
547 }
548 catch (ceph::buffer::error& e) {
549 if (prval)
550 *prval = -EIO;
551 }
552 }
553 }
554 };
555 struct C_ObjectOperation_decodesnaps : public Context {
556 ceph::buffer::list bl;
557 librados::snap_set_t *psnaps;
558 int *prval;
559 C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
560 : psnaps(ps), prval(pr) {}
561 void finish(int r) override {
562 if (r >= 0) {
563 using ceph::decode;
564 auto p = bl.cbegin();
565 try {
566 obj_list_snap_response_t resp;
567 decode(resp, p);
568 if (psnaps) {
569 psnaps->clones.clear();
570 for (auto ci = resp.clones.begin(); ci != resp.clones.end(); ++ci) {
571 librados::clone_info_t clone;
572
573 clone.cloneid = ci->cloneid;
574 clone.snaps.reserve(ci->snaps.size());
575 clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
576 ci->snaps.end());
577 clone.overlap = ci->overlap;
578 clone.size = ci->size;
579
580 psnaps->clones.push_back(clone);
581 }
582 psnaps->seq = resp.seq;
583 }
584 } catch (ceph::buffer::error& e) {
585 if (prval)
586 *prval = -EIO;
587 }
588 }
589 }
590 };
591 void getxattrs(std::map<std::string,ceph::buffer::list> *pattrs, int *prval) {
592 add_op(CEPH_OSD_OP_GETXATTRS);
593 if (pattrs || prval) {
594 unsigned p = ops.size() - 1;
595 C_ObjectOperation_decodevals *h
596 = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
597 out_handler[p] = h;
598 out_bl[p] = &h->bl;
599 out_rval[p] = prval;
600 }
601 }
602 void setxattr(const char *name, const ceph::buffer::list& bl) {
603 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
604 }
605 void setxattr(const char *name, const std::string& s) {
606 ceph::buffer::list bl;
607 bl.append(s);
608 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
609 }
610 void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
611 const ceph::buffer::list& bl) {
612 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
613 }
614 void rmxattr(const char *name) {
615 ceph::buffer::list bl;
616 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
617 }
618 void setxattrs(std::map<std::string, ceph::buffer::list>& attrs) {
619 using ceph::encode;
620 ceph::buffer::list bl;
621 encode(attrs, bl);
622 add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
623 }
624 void resetxattrs(const char *prefix, std::map<std::string, ceph::buffer::list>& attrs) {
625 using ceph::encode;
626 ceph::buffer::list bl;
627 encode(attrs, bl);
628 add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
629 }
630
631 // trivialmap
632 void tmap_update(ceph::buffer::list& bl) {
633 add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
634 }
635
636 // objectmap
637 void omap_get_keys(const std::string &start_after,
638 uint64_t max_to_get,
639 std::set<std::string> *out_set,
640 bool *ptruncated,
641 int *prval) {
642 using ceph::encode;
643 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
644 ceph::buffer::list bl;
645 encode(start_after, bl);
646 encode(max_to_get, bl);
647 op.op.extent.offset = 0;
648 op.op.extent.length = bl.length();
649 op.indata.claim_append(bl);
650 if (prval || ptruncated || out_set) {
651 unsigned p = ops.size() - 1;
652 C_ObjectOperation_decodekeys *h =
653 new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
654 out_handler[p] = h;
655 out_bl[p] = &h->bl;
656 out_rval[p] = prval;
657 }
658 }
659
660 void omap_get_vals(const std::string &start_after,
661 const std::string &filter_prefix,
662 uint64_t max_to_get,
663 std::map<std::string, ceph::buffer::list> *out_set,
664 bool *ptruncated,
665 int *prval) {
666 using ceph::encode;
667 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
668 ceph::buffer::list bl;
669 encode(start_after, bl);
670 encode(max_to_get, bl);
671 encode(filter_prefix, bl);
672 op.op.extent.offset = 0;
673 op.op.extent.length = bl.length();
674 op.indata.claim_append(bl);
675 if (prval || out_set || ptruncated) {
676 unsigned p = ops.size() - 1;
677 C_ObjectOperation_decodevals *h =
678 new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
679 out_handler[p] = h;
680 out_bl[p] = &h->bl;
681 out_rval[p] = prval;
682 }
683 }
684
685 void omap_get_vals_by_keys(const std::set<std::string> &to_get,
686 std::map<std::string, ceph::buffer::list> *out_set,
687 int *prval) {
688 using ceph::encode;
689 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
690 ceph::buffer::list bl;
691 encode(to_get, bl);
692 op.op.extent.offset = 0;
693 op.op.extent.length = bl.length();
694 op.indata.claim_append(bl);
695 if (prval || out_set) {
696 unsigned p = ops.size() - 1;
697 C_ObjectOperation_decodevals *h =
698 new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
699 out_handler[p] = h;
700 out_bl[p] = &h->bl;
701 out_rval[p] = prval;
702 }
703 }
704
705 void omap_cmp(const std::map<std::string, std::pair<ceph::buffer::list,int> > &assertions,
706 int *prval) {
707 using ceph::encode;
708 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
709 ceph::buffer::list bl;
710 encode(assertions, bl);
711 op.op.extent.offset = 0;
712 op.op.extent.length = bl.length();
713 op.indata.claim_append(bl);
714 if (prval) {
715 unsigned p = ops.size() - 1;
716 out_rval[p] = prval;
717 }
718 }
719
720 struct C_ObjectOperation_copyget : public Context {
721 ceph::buffer::list bl;
722 object_copy_cursor_t *cursor;
723 uint64_t *out_size;
724 ceph::real_time *out_mtime;
725 std::map<std::string,ceph::buffer::list> *out_attrs;
726 ceph::buffer::list *out_data, *out_omap_header, *out_omap_data;
727 std::vector<snapid_t> *out_snaps;
728 snapid_t *out_snap_seq;
729 uint32_t *out_flags;
730 uint32_t *out_data_digest;
731 uint32_t *out_omap_digest;
732 mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids;
733 mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
734 uint64_t *out_truncate_seq;
735 uint64_t *out_truncate_size;
736 int *prval;
737 C_ObjectOperation_copyget(object_copy_cursor_t *c,
738 uint64_t *s,
739 ceph::real_time *m,
740 std::map<std::string,ceph::buffer::list> *a,
741 ceph::buffer::list *d, ceph::buffer::list *oh,
742 ceph::buffer::list *o,
743 std::vector<snapid_t> *osnaps,
744 snapid_t *osnap_seq,
745 uint32_t *flags,
746 uint32_t *dd,
747 uint32_t *od,
748 mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *oreqids,
749 mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
750 uint64_t *otseq,
751 uint64_t *otsize,
752 int *r)
753 : cursor(c),
754 out_size(s), out_mtime(m),
755 out_attrs(a), out_data(d), out_omap_header(oh),
756 out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
757 out_flags(flags), out_data_digest(dd), out_omap_digest(od),
758 out_reqids(oreqids),
759 out_reqid_return_codes(oreqid_return_codes),
760 out_truncate_seq(otseq),
761 out_truncate_size(otsize),
762 prval(r) {}
763 void finish(int r) override {
764 using ceph::decode;
765 // reqids are copied on ENOENT
766 if (r < 0 && r != -ENOENT)
767 return;
768 try {
769 auto p = bl.cbegin();
770 object_copy_data_t copy_reply;
771 decode(copy_reply, p);
772 if (r == -ENOENT) {
773 if (out_reqids)
774 *out_reqids = copy_reply.reqids;
775 return;
776 }
777 if (out_size)
778 *out_size = copy_reply.size;
779 if (out_mtime)
780 *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
781 if (out_attrs)
782 *out_attrs = copy_reply.attrs;
783 if (out_data)
784 out_data->claim_append(copy_reply.data);
785 if (out_omap_header)
786 out_omap_header->claim_append(copy_reply.omap_header);
787 if (out_omap_data)
788 *out_omap_data = copy_reply.omap_data;
789 if (out_snaps)
790 *out_snaps = copy_reply.snaps;
791 if (out_snap_seq)
792 *out_snap_seq = copy_reply.snap_seq;
793 if (out_flags)
794 *out_flags = copy_reply.flags;
795 if (out_data_digest)
796 *out_data_digest = copy_reply.data_digest;
797 if (out_omap_digest)
798 *out_omap_digest = copy_reply.omap_digest;
799 if (out_reqids)
800 *out_reqids = copy_reply.reqids;
801 if (out_reqid_return_codes)
802 *out_reqid_return_codes = copy_reply.reqid_return_codes;
803 if (out_truncate_seq)
804 *out_truncate_seq = copy_reply.truncate_seq;
805 if (out_truncate_size)
806 *out_truncate_size = copy_reply.truncate_size;
807 *cursor = copy_reply.cursor;
808 } catch (ceph::buffer::error& e) {
809 if (prval)
810 *prval = -EIO;
811 }
812 }
813 };
814
815 void copy_get(object_copy_cursor_t *cursor,
816 uint64_t max,
817 uint64_t *out_size,
818 ceph::real_time *out_mtime,
819 std::map<std::string,ceph::buffer::list> *out_attrs,
820 ceph::buffer::list *out_data,
821 ceph::buffer::list *out_omap_header,
822 ceph::buffer::list *out_omap_data,
823 std::vector<snapid_t> *out_snaps,
824 snapid_t *out_snap_seq,
825 uint32_t *out_flags,
826 uint32_t *out_data_digest,
827 uint32_t *out_omap_digest,
828 mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids,
829 mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
830 uint64_t *truncate_seq,
831 uint64_t *truncate_size,
832 int *prval) {
833 using ceph::encode;
834 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
835 osd_op.op.copy_get.max = max;
836 encode(*cursor, osd_op.indata);
837 encode(max, osd_op.indata);
838 unsigned p = ops.size() - 1;
839 out_rval[p] = prval;
840 C_ObjectOperation_copyget *h =
841 new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
842 out_attrs, out_data, out_omap_header,
843 out_omap_data, out_snaps, out_snap_seq,
844 out_flags, out_data_digest,
845 out_omap_digest, out_reqids,
846 out_reqid_return_codes, truncate_seq,
847 truncate_size, prval);
848 out_bl[p] = &h->bl;
849 out_handler[p] = h;
850 }
851
852 void undirty() {
853 add_op(CEPH_OSD_OP_UNDIRTY);
854 }
855
856 struct C_ObjectOperation_isdirty : public Context {
857 ceph::buffer::list bl;
858 bool *pisdirty;
859 int *prval;
860 C_ObjectOperation_isdirty(bool *p, int *r)
861 : pisdirty(p), prval(r) {}
862 void finish(int r) override {
863 using ceph::decode;
864 if (r < 0)
865 return;
866 try {
867 auto p = bl.cbegin();
868 bool isdirty;
869 decode(isdirty, p);
870 if (pisdirty)
871 *pisdirty = isdirty;
872 } catch (ceph::buffer::error& e) {
873 if (prval)
874 *prval = -EIO;
875 }
876 }
877 };
878
879 void is_dirty(bool *pisdirty, int *prval) {
880 add_op(CEPH_OSD_OP_ISDIRTY);
881 unsigned p = ops.size() - 1;
882 out_rval[p] = prval;
883 C_ObjectOperation_isdirty *h =
884 new C_ObjectOperation_isdirty(pisdirty, prval);
885 out_bl[p] = &h->bl;
886 out_handler[p] = h;
887 }
888
889 struct C_ObjectOperation_hit_set_ls : public Context {
890 ceph::buffer::list bl;
891 std::list< std::pair<time_t, time_t> > *ptls;
892 std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
893 int *prval;
894 C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
895 std::list< std::pair<ceph::real_time,
896 ceph::real_time> > *ut,
897 int *r)
898 : ptls(t), putls(ut), prval(r) {}
899 void finish(int r) override {
900 using ceph::decode;
901 if (r < 0)
902 return;
903 try {
904 auto p = bl.cbegin();
905 std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
906 decode(ls, p);
907 if (ptls) {
908 ptls->clear();
909 for (auto p = ls.begin(); p != ls.end(); ++p)
910 // round initial timestamp up to the next full second to
911 // keep this a valid interval.
912 ptls->push_back(
913 std::make_pair(ceph::real_clock::to_time_t(
914 ceph::ceil(p->first,
915 // Sadly, no time literals until C++14.
916 std::chrono::seconds(1))),
917 ceph::real_clock::to_time_t(p->second)));
918 }
919 if (putls)
920 putls->swap(ls);
921 } catch (ceph::buffer::error& e) {
922 r = -EIO;
923 }
924 if (prval)
925 *prval = r;
926 }
927 };
928
929 /**
930 * std::list available HitSets.
931 *
932 * We will get back a std::list of time intervals. Note that the most
933 * recent range may have an empty end timestamp if it is still
934 * accumulating.
935 *
936 * @param pls [out] std::list of time intervals
937 * @param prval [out] return value
938 */
939 void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
940 add_op(CEPH_OSD_OP_PG_HITSET_LS);
941 unsigned p = ops.size() - 1;
942 out_rval[p] = prval;
943 C_ObjectOperation_hit_set_ls *h =
944 new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
945 out_bl[p] = &h->bl;
946 out_handler[p] = h;
947 }
948 void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
949 int *prval) {
950 add_op(CEPH_OSD_OP_PG_HITSET_LS);
951 unsigned p = ops.size() - 1;
952 out_rval[p] = prval;
953 C_ObjectOperation_hit_set_ls *h =
954 new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
955 out_bl[p] = &h->bl;
956 out_handler[p] = h;
957 }
958
959 /**
960 * get HitSet
961 *
962 * Return an encoded HitSet that includes the provided time
963 * interval.
964 *
965 * @param stamp [in] timestamp
966 * @param pbl [out] target buffer for encoded HitSet
967 * @param prval [out] return value
968 */
969 void hit_set_get(ceph::real_time stamp, ceph::buffer::list *pbl, int *prval) {
970 OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
971 op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
972 unsigned p = ops.size() - 1;
973 out_rval[p] = prval;
974 out_bl[p] = pbl;
975 }
976
977 void omap_get_header(ceph::buffer::list *bl, int *prval) {
978 add_op(CEPH_OSD_OP_OMAPGETHEADER);
979 unsigned p = ops.size() - 1;
980 out_bl[p] = bl;
981 out_rval[p] = prval;
982 }
983
984 void omap_set(const std::map<std::string, ceph::buffer::list>& map) {
985 using ceph::encode;
986 ceph::buffer::list bl;
987 encode(map, bl);
988 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
989 }
990
991 void omap_set_header(ceph::buffer::list &bl) {
992 add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
993 }
994
995 void omap_clear() {
996 add_op(CEPH_OSD_OP_OMAPCLEAR);
997 }
998
999 void omap_rm_keys(const std::set<std::string> &to_remove) {
1000 using ceph::encode;
1001 ceph::buffer::list bl;
1002 encode(to_remove, bl);
1003 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
1004 }
1005
1006 void omap_rm_range(std::string_view key_begin, std::string_view key_end) {
1007 bufferlist bl;
1008 encode(key_begin, bl);
1009 encode(key_end, bl);
1010 add_data(CEPH_OSD_OP_OMAPRMKEYRANGE, 0, bl.length(), bl);
1011 }
1012
1013 // object classes
1014 void call(const char *cname, const char *method, ceph::buffer::list &indata) {
1015 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
1016 }
1017
1018 void call(const char *cname, const char *method, ceph::buffer::list &indata,
1019 ceph::buffer::list *outdata, Context *ctx, int *prval) {
1020 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
1021 }
1022
1023 // watch/notify
1024 void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1025 OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1026 osd_op.op.watch.cookie = cookie;
1027 osd_op.op.watch.op = op;
1028 osd_op.op.watch.timeout = timeout;
1029 }
1030
1031 void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1032 ceph::buffer::list &bl, ceph::buffer::list *inbl) {
1033 using ceph::encode;
1034 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1035 osd_op.op.notify.cookie = cookie;
1036 encode(prot_ver, *inbl);
1037 encode(timeout, *inbl);
1038 encode(bl, *inbl);
1039 osd_op.indata.append(*inbl);
1040 }
1041
1042 void notify_ack(uint64_t notify_id, uint64_t cookie,
1043 ceph::buffer::list& reply_bl) {
1044 using ceph::encode;
1045 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1046 ceph::buffer::list bl;
1047 encode(notify_id, bl);
1048 encode(cookie, bl);
1049 encode(reply_bl, bl);
1050 osd_op.indata.append(bl);
1051 }
1052
1053 void list_watchers(std::list<obj_watch_t> *out,
1054 int *prval) {
1055 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1056 if (prval || out) {
1057 unsigned p = ops.size() - 1;
1058 C_ObjectOperation_decodewatchers *h =
1059 new C_ObjectOperation_decodewatchers(out, prval);
1060 out_handler[p] = h;
1061 out_bl[p] = &h->bl;
1062 out_rval[p] = prval;
1063 }
1064 }
1065
1066 void list_snaps(librados::snap_set_t *out, int *prval) {
1067 (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1068 if (prval || out) {
1069 unsigned p = ops.size() - 1;
1070 C_ObjectOperation_decodesnaps *h =
1071 new C_ObjectOperation_decodesnaps(out, prval);
1072 out_handler[p] = h;
1073 out_bl[p] = &h->bl;
1074 out_rval[p] = prval;
1075 }
1076 }
1077
1078 void assert_version(uint64_t ver) {
1079 OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1080 osd_op.op.assert_ver.ver = ver;
1081 }
1082
1083 void cmpxattr(const char *name, const ceph::buffer::list& val,
1084 int op, int mode) {
1085 add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1086 OSDOp& o = *ops.rbegin();
1087 o.op.xattr.cmp_op = op;
1088 o.op.xattr.cmp_mode = mode;
1089 }
1090
1091 void rollback(uint64_t snapid) {
1092 OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1093 osd_op.op.snap.snapid = snapid;
1094 }
1095
1096 void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1097 version_t src_version, unsigned flags,
1098 unsigned src_fadvise_flags) {
1099 using ceph::encode;
1100 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1101 osd_op.op.copy_from.snapid = snapid;
1102 osd_op.op.copy_from.src_version = src_version;
1103 osd_op.op.copy_from.flags = flags;
1104 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1105 encode(src, osd_op.indata);
1106 encode(src_oloc, osd_op.indata);
1107 }
1108
1109 /**
1110 * writeback content to backing tier
1111 *
1112 * If object is marked dirty in the cache tier, write back content
1113 * to backing tier. If the object is clean this is a no-op.
1114 *
1115 * If writeback races with an update, the update will block.
1116 *
1117 * use with IGNORE_CACHE to avoid triggering promote.
1118 */
1119 void cache_flush() {
1120 add_op(CEPH_OSD_OP_CACHE_FLUSH);
1121 }
1122
1123 /**
1124 * writeback content to backing tier
1125 *
1126 * If object is marked dirty in the cache tier, write back content
1127 * to backing tier. If the object is clean this is a no-op.
1128 *
1129 * If writeback races with an update, return EAGAIN. Requires that
1130 * the SKIPRWLOCKS flag be set.
1131 *
1132 * use with IGNORE_CACHE to avoid triggering promote.
1133 */
1134 void cache_try_flush() {
1135 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1136 }
1137
1138 /**
1139 * evict object from cache tier
1140 *
1141 * If object is marked clean, remove the object from the cache tier.
1142 * Otherwise, return EBUSY.
1143 *
1144 * use with IGNORE_CACHE to avoid triggering promote.
1145 */
1146 void cache_evict() {
1147 add_op(CEPH_OSD_OP_CACHE_EVICT);
1148 }
1149
1150 /*
1151 * Extensible tier
1152 */
1153 void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
1154 version_t tgt_version, int flag) {
1155 using ceph::encode;
1156 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1157 osd_op.op.copy_from.snapid = snapid;
1158 osd_op.op.copy_from.src_version = tgt_version;
1159 encode(tgt, osd_op.indata);
1160 encode(tgt_oloc, osd_op.indata);
1161 set_last_op_flags(flag);
1162 }
1163
1164 void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
1165 object_t tgt_oid, uint64_t tgt_offset, int flag) {
1166 using ceph::encode;
1167 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
1168 encode(src_offset, osd_op.indata);
1169 encode(src_length, osd_op.indata);
1170 encode(tgt_oloc, osd_op.indata);
1171 encode(tgt_oid, osd_op.indata);
1172 encode(tgt_offset, osd_op.indata);
1173 set_last_op_flags(flag);
1174 }
1175
1176 void tier_promote() {
1177 add_op(CEPH_OSD_OP_TIER_PROMOTE);
1178 }
1179
1180 void unset_manifest() {
1181 add_op(CEPH_OSD_OP_UNSET_MANIFEST);
1182 }
1183
1184 void tier_flush() {
1185 add_op(CEPH_OSD_OP_TIER_FLUSH);
1186 }
1187
1188 void set_alloc_hint(uint64_t expected_object_size,
1189 uint64_t expected_write_size,
1190 uint32_t flags) {
1191 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1192 expected_write_size, flags);
1193
1194 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1195 // not worth a feature bit. Set FAILOK per-op flag to make
1196 // sure older osds don't trip over an unsupported opcode.
1197 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1198 }
1199
1200 void dup(std::vector<OSDOp>& sops) {
1201 ops = sops;
1202 out_bl.resize(sops.size());
1203 out_handler.resize(sops.size());
1204 out_rval.resize(sops.size());
1205 for (uint32_t i = 0; i < sops.size(); i++) {
1206 out_bl[i] = &sops[i].outdata;
1207 out_handler[i] = NULL;
1208 out_rval[i] = &sops[i].rval;
1209 }
1210 }
1211
1212 /**
1213 * Pin/unpin an object in cache tier
1214 */
1215 void cache_pin() {
1216 add_op(CEPH_OSD_OP_CACHE_PIN);
1217 }
1218
1219 void cache_unpin() {
1220 add_op(CEPH_OSD_OP_CACHE_UNPIN);
1221 }
1222 };
1223
1224
1225 // ----------------
1226
1227
1228 class Objecter : public md_config_obs_t, public Dispatcher {
1229 public:
1230 // config observer bits
1231 const char** get_tracked_conf_keys() const override;
1232 void handle_conf_change(const ConfigProxy& conf,
1233 const std::set <std::string> &changed) override;
1234
1235 public:
1236 Messenger *messenger;
1237 MonClient *monc;
1238 Finisher *finisher;
1239 ZTracer::Endpoint trace_endpoint;
1240 private:
1241 std::unique_ptr<OSDMap> osdmap;
1242 public:
1243 using Dispatcher::cct;
1244 std::multimap<std::string,std::string> crush_location;
1245
1246 std::atomic<bool> initialized{false};
1247
1248 private:
1249 std::atomic<uint64_t> last_tid{0};
1250 std::atomic<unsigned> inflight_ops{0};
1251 std::atomic<int> client_inc{-1};
1252 uint64_t max_linger_id{0};
1253 std::atomic<unsigned> num_in_flight{0};
1254 std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1255 bool keep_balanced_budget = false;
1256 bool honor_pool_full = true;
1257 bool pool_full_try = false;
1258
1259 // If this is true, accumulate a set of blacklisted entities
1260 // to be drained by consume_blacklist_events.
1261 bool blacklist_events_enabled = false;
1262 std::set<entity_addr_t> blacklist_events;
1263 struct pg_mapping_t {
1264 epoch_t epoch = 0;
1265 std::vector<int> up;
1266 int up_primary = -1;
1267 std::vector<int> acting;
1268 int acting_primary = -1;
1269
1270 pg_mapping_t() {}
1271 pg_mapping_t(epoch_t epoch, std::vector<int> up, int up_primary,
1272 std::vector<int> acting, int acting_primary)
1273 : epoch(epoch), up(up), up_primary(up_primary),
1274 acting(acting), acting_primary(acting_primary) {}
1275 };
1276 std::shared_mutex pg_mapping_lock;
1277 // pool -> pg mapping
1278 std::map<int64_t, std::vector<pg_mapping_t>> pg_mappings;
1279
1280 // convenient accessors
1281 bool lookup_pg_mapping(const pg_t& pg, pg_mapping_t* pg_mapping) {
1282 std::shared_lock l{pg_mapping_lock};
1283 auto it = pg_mappings.find(pg.pool());
1284 if (it == pg_mappings.end())
1285 return false;
1286 auto& mapping_array = it->second;
1287 if (pg.ps() >= mapping_array.size())
1288 return false;
1289 if (mapping_array[pg.ps()].epoch != pg_mapping->epoch) // stale
1290 return false;
1291 *pg_mapping = mapping_array[pg.ps()];
1292 return true;
1293 }
1294 void update_pg_mapping(const pg_t& pg, pg_mapping_t&& pg_mapping) {
1295 std::lock_guard l{pg_mapping_lock};
1296 auto& mapping_array = pg_mappings[pg.pool()];
1297 ceph_assert(pg.ps() < mapping_array.size());
1298 mapping_array[pg.ps()] = std::move(pg_mapping);
1299 }
1300 void prune_pg_mapping(const mempool::osdmap::map<int64_t,pg_pool_t>& pools) {
1301 std::lock_guard l{pg_mapping_lock};
1302 for (auto& pool : pools) {
1303 auto& mapping_array = pg_mappings[pool.first];
1304 size_t pg_num = pool.second.get_pg_num();
1305 if (mapping_array.size() != pg_num) {
1306 // catch both pg_num increasing & decreasing
1307 mapping_array.resize(pg_num);
1308 }
1309 }
1310 for (auto it = pg_mappings.begin(); it != pg_mappings.end(); ) {
1311 if (!pools.count(it->first)) {
1312 // pool is gone
1313 pg_mappings.erase(it++);
1314 continue;
1315 }
1316 it++;
1317 }
1318 }
1319
1320 public:
1321 void maybe_request_map();
1322
1323 void enable_blacklist_events();
1324 private:
1325
1326 void _maybe_request_map();
1327
1328 version_t last_seen_osdmap_version = 0;
1329 version_t last_seen_pgmap_version = 0;
1330
1331 mutable std::shared_mutex rwlock;
1332 using lock_guard = std::lock_guard<decltype(rwlock)>;
1333 using unique_lock = std::unique_lock<decltype(rwlock)>;
1334 using shared_lock = boost::shared_lock<decltype(rwlock)>;
1335 using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1336 ceph::timer<ceph::coarse_mono_clock> timer;
1337
1338 PerfCounters *logger = nullptr;
1339
1340 uint64_t tick_event = 0;
1341
1342 void start_tick();
1343 void tick();
1344 void update_crush_location();
1345
1346 class RequestStateHook;
1347
1348 RequestStateHook *m_request_state_hook = nullptr;
1349
1350 public:
1351 /*** track pending operations ***/
1352 // read
1353
1354 struct OSDSession;
1355
1356 struct op_target_t {
1357 int flags = 0;
1358
1359 epoch_t epoch = 0; ///< latest epoch we calculated the mapping
1360
1361 object_t base_oid;
1362 object_locator_t base_oloc;
1363 object_t target_oid;
1364 object_locator_t target_oloc;
1365
1366 ///< true if we are directed at base_pgid, not base_oid
1367 bool precalc_pgid = false;
1368
1369 ///< true if we have ever mapped to a valid pool
1370 bool pool_ever_existed = false;
1371
1372 ///< explcit pg target, if any
1373 pg_t base_pgid;
1374
1375 pg_t pgid; ///< last (raw) pg we mapped to
1376 spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1377 unsigned pg_num = 0; ///< last pg_num we mapped to
1378 unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1379 unsigned pg_num_pending = 0; ///< last pg_num we mapped to
1380 std::vector<int> up; ///< set of up osds for last pg we mapped to
1381 std::vector<int> acting; ///< set of acting osds for last pg we mapped to
1382 int up_primary = -1; ///< last up_primary we mapped to
1383 int acting_primary = -1; ///< last acting_primary we mapped to
1384 int size = -1; ///< the size of the pool when were were last mapped
1385 int min_size = -1; ///< the min size of the pool when were were last mapped
1386 bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1387 bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1388
1389 bool used_replica = false;
1390 bool paused = false;
1391
1392 int osd = -1; ///< the final target osd, or -1
1393
1394 epoch_t last_force_resend = 0;
1395
1396 op_target_t(object_t oid, object_locator_t oloc, int flags)
1397 : flags(flags),
1398 base_oid(oid),
1399 base_oloc(oloc)
1400 {}
1401
1402 explicit op_target_t(pg_t pgid)
1403 : base_oloc(pgid.pool(), pgid.ps()),
1404 precalc_pgid(true),
1405 base_pgid(pgid)
1406 {}
1407
1408 op_target_t() = default;
1409
1410 hobject_t get_hobj() {
1411 return hobject_t(target_oid,
1412 target_oloc.key,
1413 CEPH_NOSNAP,
1414 target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1415 target_oloc.pool,
1416 target_oloc.nspace);
1417 }
1418
1419 bool contained_by(const hobject_t& begin, const hobject_t& end) {
1420 hobject_t h = get_hobj();
1421 int r = cmp(h, begin);
1422 return r == 0 || (r > 0 && h < end);
1423 }
1424
1425 void dump(ceph::Formatter *f) const;
1426 };
1427
1428 struct Op : public RefCountedObject {
1429 OSDSession *session;
1430 int incarnation;
1431
1432 op_target_t target;
1433
1434 ConnectionRef con; // for rx buffer only
1435 uint64_t features; // explicitly specified op features
1436
1437 std::vector<OSDOp> ops;
1438
1439 snapid_t snapid;
1440 SnapContext snapc;
1441 ceph::real_time mtime;
1442
1443 ceph::buffer::list *outbl;
1444 std::vector<ceph::buffer::list*> out_bl;
1445 std::vector<Context*> out_handler;
1446 std::vector<int*> out_rval;
1447
1448 int priority;
1449 Context *onfinish;
1450 uint64_t ontimeout;
1451
1452 ceph_tid_t tid;
1453 int attempts;
1454
1455 version_t *objver;
1456 epoch_t *reply_epoch;
1457
1458 ceph::coarse_mono_time stamp;
1459
1460 epoch_t map_dne_bound;
1461
1462 int budget;
1463
1464 /// true if we should resend this message on failure
1465 bool should_resend;
1466
1467 /// true if the throttle budget is get/put on a series of OPs,
1468 /// instead of per OP basis, when this flag is set, the budget is
1469 /// acquired before sending the very first OP of the series and
1470 /// released upon receiving the last OP reply.
1471 bool ctx_budgeted;
1472
1473 int *data_offset;
1474
1475 osd_reqid_t reqid; // explicitly setting reqid
1476 ZTracer::Trace trace;
1477
1478 Op(const object_t& o, const object_locator_t& ol, std::vector<OSDOp>& op,
1479 int f, Context *fin, version_t *ov, int *offset = NULL,
1480 ZTracer::Trace *parent_trace = nullptr) :
1481 session(NULL), incarnation(0),
1482 target(o, ol, f),
1483 con(NULL),
1484 features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1485 snapid(CEPH_NOSNAP),
1486 outbl(NULL),
1487 priority(0),
1488 onfinish(fin),
1489 ontimeout(0),
1490 tid(0),
1491 attempts(0),
1492 objver(ov),
1493 reply_epoch(NULL),
1494 map_dne_bound(0),
1495 budget(-1),
1496 should_resend(true),
1497 ctx_budgeted(false),
1498 data_offset(offset) {
1499 ops.swap(op);
1500
1501 /* initialize out_* to match op std::vector */
1502 out_bl.resize(ops.size());
1503 out_rval.resize(ops.size());
1504 out_handler.resize(ops.size());
1505 for (unsigned i = 0; i < ops.size(); i++) {
1506 out_bl[i] = NULL;
1507 out_handler[i] = NULL;
1508 out_rval[i] = NULL;
1509 }
1510
1511 if (target.base_oloc.key == o)
1512 target.base_oloc.key.clear();
1513
1514 if (parent_trace && parent_trace->valid()) {
1515 trace.init("op", nullptr, parent_trace);
1516 trace.event("start");
1517 }
1518 }
1519
1520 bool operator<(const Op& other) const {
1521 return tid < other.tid;
1522 }
1523
1524 bool respects_full() const {
1525 return
1526 (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1527 !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1528 }
1529
1530 private:
1531 ~Op() override {
1532 while (!out_handler.empty()) {
1533 delete out_handler.back();
1534 out_handler.pop_back();
1535 }
1536 trace.event("finish");
1537 }
1538 };
1539
1540 struct C_Op_Map_Latest : public Context {
1541 Objecter *objecter;
1542 ceph_tid_t tid;
1543 version_t latest;
1544 C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1545 latest(0) {}
1546 void finish(int r) override;
1547 };
1548
1549 struct C_Command_Map_Latest : public Context {
1550 Objecter *objecter;
1551 uint64_t tid;
1552 version_t latest;
1553 C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1554 latest(0) {}
1555 void finish(int r) override;
1556 };
1557
1558 struct C_Stat : public Context {
1559 ceph::buffer::list bl;
1560 uint64_t *psize;
1561 ceph::real_time *pmtime;
1562 Context *fin;
1563 C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1564 psize(ps), pmtime(pm), fin(c) {}
1565 void finish(int r) override {
1566 using ceph::decode;
1567 if (r >= 0) {
1568 auto p = bl.cbegin();
1569 uint64_t s;
1570 ceph::real_time m;
1571 decode(s, p);
1572 decode(m, p);
1573 if (psize)
1574 *psize = s;
1575 if (pmtime)
1576 *pmtime = m;
1577 }
1578 fin->complete(r);
1579 }
1580 };
1581
1582 struct C_GetAttrs : public Context {
1583 ceph::buffer::list bl;
1584 std::map<std::string,ceph::buffer::list>& attrset;
1585 Context *fin;
1586 C_GetAttrs(std::map<std::string, ceph::buffer::list>& set, Context *c) : attrset(set),
1587 fin(c) {}
1588 void finish(int r) override {
1589 using ceph::decode;
1590 if (r >= 0) {
1591 auto p = bl.cbegin();
1592 decode(attrset, p);
1593 }
1594 fin->complete(r);
1595 }
1596 };
1597
1598
1599 // Pools and statistics
1600 struct NListContext {
1601 collection_list_handle_t pos;
1602
1603 // these are for !sortbitwise compat only
1604 int current_pg = 0;
1605 int starting_pg_num = 0;
1606 bool sort_bitwise = false;
1607
1608 bool at_end_of_pool = false; ///< publicly visible end flag
1609
1610 int64_t pool_id = -1;
1611 int pool_snap_seq = 0;
1612 uint64_t max_entries = 0;
1613 std::string nspace;
1614
1615 ceph::buffer::list bl; // raw data read to here
1616 std::list<librados::ListObjectImpl> list;
1617
1618 ceph::buffer::list filter;
1619
1620 // The budget associated with this context, once it is set (>= 0),
1621 // the budget is not get/released on OP basis, instead the budget
1622 // is acquired before sending the first OP and released upon receiving
1623 // the last op reply.
1624 int ctx_budget = -1;
1625
1626 bool at_end() const {
1627 return at_end_of_pool;
1628 }
1629
1630 uint32_t get_pg_hash_position() const {
1631 return pos.get_hash();
1632 }
1633 };
1634
1635 struct C_NList : public Context {
1636 NListContext *list_context;
1637 Context *final_finish;
1638 Objecter *objecter;
1639 epoch_t epoch;
1640 C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1641 list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1642 void finish(int r) override {
1643 if (r >= 0) {
1644 objecter->_nlist_reply(list_context, r, final_finish, epoch);
1645 } else {
1646 final_finish->complete(r);
1647 }
1648 }
1649 };
1650
1651 struct PoolStatOp {
1652 ceph_tid_t tid;
1653 std::list<std::string> pools;
1654
1655 std::map<std::string,pool_stat_t> *pool_stats;
1656 bool *per_pool;
1657 Context *onfinish;
1658 uint64_t ontimeout;
1659
1660 ceph::coarse_mono_time last_submit;
1661 };
1662
1663 struct StatfsOp {
1664 ceph_tid_t tid;
1665 struct ceph_statfs *stats;
1666 boost::optional<int64_t> data_pool;
1667 Context *onfinish;
1668 uint64_t ontimeout;
1669
1670 ceph::coarse_mono_time last_submit;
1671 };
1672
1673 struct PoolOp {
1674 ceph_tid_t tid;
1675 int64_t pool;
1676 std::string name;
1677 Context *onfinish;
1678 uint64_t ontimeout;
1679 int pool_op;
1680 int16_t crush_rule;
1681 snapid_t snapid;
1682 ceph::buffer::list *blp;
1683
1684 ceph::coarse_mono_time last_submit;
1685 PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1686 crush_rule(0), snapid(0), blp(NULL) {}
1687 };
1688
1689 // -- osd commands --
1690 struct CommandOp : public RefCountedObject {
1691 OSDSession *session = nullptr;
1692 ceph_tid_t tid = 0;
1693 std::vector<std::string> cmd;
1694 ceph::buffer::list inbl;
1695 ceph::buffer::list *poutbl = nullptr;
1696 std::string *prs = nullptr;
1697
1698 // target_osd == -1 means target_pg is valid
1699 const int target_osd = -1;
1700 const pg_t target_pg;
1701
1702 op_target_t target;
1703
1704 epoch_t map_dne_bound = 0;
1705 int map_check_error = 0; // error to return if std::map check fails
1706 const char *map_check_error_str = nullptr;
1707
1708 Context *onfinish = nullptr;
1709 uint64_t ontimeout = 0;
1710 ceph::coarse_mono_time last_submit;
1711
1712 CommandOp(
1713 int target_osd,
1714 const std::vector<std::string> &cmd,
1715 ceph::buffer::list inbl,
1716 ceph::buffer::list *poutbl,
1717 std::string *prs,
1718 Context *onfinish)
1719 : cmd(cmd),
1720 inbl(inbl),
1721 poutbl(poutbl),
1722 prs(prs),
1723 target_osd(target_osd),
1724 onfinish(onfinish) {}
1725
1726 CommandOp(
1727 pg_t pgid,
1728 const std::vector<std::string> &cmd,
1729 ceph::buffer::list inbl,
1730 ceph::buffer::list *poutbl,
1731 std::string *prs,
1732 Context *onfinish)
1733 : cmd(cmd),
1734 inbl(inbl),
1735 poutbl(poutbl),
1736 prs(prs),
1737 target_pg(pgid),
1738 target(pgid),
1739 onfinish(onfinish) {}
1740
1741 };
1742
1743 void submit_command(CommandOp *c, ceph_tid_t *ptid);
1744 int _calc_command_target(CommandOp *c, shunique_lock &sul);
1745 void _assign_command_session(CommandOp *c, shunique_lock &sul);
1746 void _send_command(CommandOp *c);
1747 int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1748 void _finish_command(CommandOp *c, int r, std::string rs);
1749 void handle_command_reply(MCommandReply *m);
1750
1751
1752 // -- lingering ops --
1753
1754 struct WatchContext {
1755 // this simply mirrors librados WatchCtx2
1756 virtual void handle_notify(uint64_t notify_id,
1757 uint64_t cookie,
1758 uint64_t notifier_id,
1759 ceph::buffer::list& bl) = 0;
1760 virtual void handle_error(uint64_t cookie, int err) = 0;
1761 virtual ~WatchContext() {}
1762 };
1763
1764 struct LingerOp : public RefCountedObject {
1765 uint64_t linger_id;
1766
1767 op_target_t target;
1768
1769 snapid_t snap;
1770 SnapContext snapc;
1771 ceph::real_time mtime;
1772
1773 std::vector<OSDOp> ops;
1774 ceph::buffer::list inbl;
1775 ceph::buffer::list *poutbl;
1776 version_t *pobjver;
1777
1778 bool is_watch;
1779 ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
1780 int last_error; ///< error from last failed ping|reconnect, if any
1781 std::shared_mutex watch_lock;
1782 using lock_guard = std::unique_lock<decltype(watch_lock)>;
1783 using unique_lock = std::unique_lock<decltype(watch_lock)>;
1784 using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1785 using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1786
1787 // queue of pending async operations, with the timestamp of
1788 // when they were queued.
1789 std::list<ceph::coarse_mono_time> watch_pending_async;
1790
1791 uint32_t register_gen;
1792 bool registered;
1793 bool canceled;
1794 Context *on_reg_commit;
1795
1796 // we trigger these from an async finisher
1797 Context *on_notify_finish;
1798 ceph::buffer::list *notify_result_bl;
1799 uint64_t notify_id;
1800
1801 WatchContext *watch_context;
1802
1803 OSDSession *session;
1804
1805 Objecter *objecter;
1806 int ctx_budget;
1807 ceph_tid_t register_tid;
1808 ceph_tid_t ping_tid;
1809 epoch_t map_dne_bound;
1810
1811 void _queued_async() {
1812 // watch_lock ust be locked unique
1813 watch_pending_async.push_back(ceph::coarse_mono_clock::now());
1814 }
1815 void finished_async() {
1816 unique_lock l(watch_lock);
1817 ceph_assert(!watch_pending_async.empty());
1818 watch_pending_async.pop_front();
1819 }
1820
1821 explicit LingerOp(Objecter *o) : linger_id(0),
1822 target(object_t(), object_locator_t(), 0),
1823 snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1824 is_watch(false), last_error(0),
1825 register_gen(0),
1826 registered(false),
1827 canceled(false),
1828 on_reg_commit(NULL),
1829 on_notify_finish(NULL),
1830 notify_result_bl(NULL),
1831 notify_id(0),
1832 watch_context(NULL),
1833 session(NULL),
1834 objecter(o),
1835 ctx_budget(-1),
1836 register_tid(0),
1837 ping_tid(0),
1838 map_dne_bound(0) {}
1839
1840 const LingerOp &operator=(const LingerOp& r) = delete;
1841 LingerOp(const LingerOp& o) = delete;
1842
1843 uint64_t get_cookie() {
1844 return reinterpret_cast<uint64_t>(this);
1845 }
1846
1847 private:
1848 ~LingerOp() override {
1849 delete watch_context;
1850 }
1851 };
1852
1853 struct C_Linger_Commit : public Context {
1854 Objecter *objecter;
1855 LingerOp *info;
1856 ceph::buffer::list outbl; // used for notify only
1857 C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1858 info->get();
1859 }
1860 ~C_Linger_Commit() override {
1861 info->put();
1862 }
1863 void finish(int r) override {
1864 objecter->_linger_commit(info, r, outbl);
1865 }
1866 };
1867
1868 struct C_Linger_Reconnect : public Context {
1869 Objecter *objecter;
1870 LingerOp *info;
1871 C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1872 info->get();
1873 }
1874 ~C_Linger_Reconnect() override {
1875 info->put();
1876 }
1877 void finish(int r) override {
1878 objecter->_linger_reconnect(info, r);
1879 }
1880 };
1881
1882 struct C_Linger_Ping : public Context {
1883 Objecter *objecter;
1884 LingerOp *info;
1885 ceph::coarse_mono_time sent;
1886 uint32_t register_gen;
1887 C_Linger_Ping(Objecter *o, LingerOp *l)
1888 : objecter(o), info(l), register_gen(info->register_gen) {
1889 info->get();
1890 }
(1) Event exn_spec_violation: |
An exception of type "std::length_error" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
1891 ~C_Linger_Ping() override {
(2) Event fun_call_w_exception: |
Called function throws an exception of type "std::length_error". [details] |
Also see events: |
[exn_spec_violation] |
1892 info->put();
1893 }
1894 void finish(int r) override {
1895 objecter->_linger_ping(info, r, sent, register_gen);
1896 }
1897 };
1898
1899 struct C_Linger_Map_Latest : public Context {
1900 Objecter *objecter;
1901 uint64_t linger_id;
1902 version_t latest;
1903 C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1904 objecter(o), linger_id(id), latest(0) {}
1905 void finish(int r) override;
1906 };
1907
1908 // -- osd sessions --
1909 struct OSDBackoff {
1910 spg_t pgid;
1911 uint64_t id;
1912 hobject_t begin, end;
1913 };
1914
1915 struct OSDSession : public RefCountedObject {
1916 std::shared_mutex lock;
1917 using lock_guard = std::lock_guard<decltype(lock)>;
1918 using unique_lock = std::unique_lock<decltype(lock)>;
1919 using shared_lock = boost::shared_lock<decltype(lock)>;
1920 using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1921
1922 // pending ops
1923 std::map<ceph_tid_t,Op*> ops;
1924 std::map<uint64_t, LingerOp*> linger_ops;
1925 std::map<ceph_tid_t,CommandOp*> command_ops;
1926
1927 // backoffs
1928 std::map<spg_t,std::map<hobject_t,OSDBackoff>> backoffs;
1929 std::map<uint64_t,OSDBackoff*> backoffs_by_id;
1930
1931 int osd;
1932 int incarnation;
1933 ConnectionRef con;
1934 int num_locks;
1935 std::unique_ptr<std::mutex[]> completion_locks;
1936 using unique_completion_lock = std::unique_lock<
1937 decltype(completion_locks)::element_type>;
1938
1939
1940 OSDSession(CephContext *cct, int o) :
1941 osd(o), incarnation(0), con(NULL),
1942 num_locks(cct->_conf->objecter_completion_locks_per_session),
1943 completion_locks(new std::mutex[num_locks]) {}
1944
1945 ~OSDSession() override;
1946
1947 bool is_homeless() { return (osd == -1); }
1948
1949 unique_completion_lock get_lock(object_t& oid);
1950 };
1951 std::map<int,OSDSession*> osd_sessions;
1952
1953 bool osdmap_full_flag() const;
1954 bool osdmap_pool_full(const int64_t pool_id) const;
1955
1956 private:
1957
1958 /**
1959 * Test pg_pool_t::FLAG_FULL on a pool
1960 *
1961 * @return true if the pool exists and has the flag set, or
1962 * the global full flag is set, else false
1963 */
1964 bool _osdmap_pool_full(const int64_t pool_id) const;
1965 bool _osdmap_pool_full(const pg_pool_t &p) const;
1966 void update_pool_full_map(std::map<int64_t, bool>& pool_full_map);
1967
1968 std::map<uint64_t, LingerOp*> linger_ops;
1969 // we use this just to confirm a cookie is valid before dereferencing the ptr
1970 std::set<LingerOp*> linger_ops_set;
1971
1972 std::map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1973 std::map<ceph_tid_t,StatfsOp*> statfs_ops;
1974 std::map<ceph_tid_t,PoolOp*> pool_ops;
1975 std::atomic<unsigned> num_homeless_ops{0};
1976
1977 OSDSession *homeless_session;
1978
1979 // ops waiting for an osdmap with a new pool or confirmation that
1980 // the pool does not exist (may be expanded to other uses later)
1981 std::map<uint64_t, LingerOp*> check_latest_map_lingers;
1982 std::map<ceph_tid_t, Op*> check_latest_map_ops;
1983 std::map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1984
1985 std::map<epoch_t,std::list< std::pair<Context*, int> > > waiting_for_map;
1986
1987 ceph::timespan mon_timeout;
1988 ceph::timespan osd_timeout;
1989
1990 MOSDOp *_prepare_osd_op(Op *op);
1991 void _send_op(Op *op);
1992 void _send_op_account(Op *op);
1993 void _cancel_linger_op(Op *op);
1994 void _finish_op(Op *op, int r);
1995 static bool is_pg_changed(
1996 int oldprimary,
1997 const std::vector<int>& oldacting,
1998 int newprimary,
1999 const std::vector<int>& newacting,
2000 bool any_change=false);
2001 enum recalc_op_target_result {
2002 RECALC_OP_TARGET_NO_ACTION = 0,
2003 RECALC_OP_TARGET_NEED_RESEND,
2004 RECALC_OP_TARGET_POOL_DNE,
2005 RECALC_OP_TARGET_OSD_DNE,
2006 RECALC_OP_TARGET_OSD_DOWN,
2007 };
2008 bool _osdmap_full_flag() const;
2009 bool _osdmap_has_pool_full() const;
2010 void _prune_snapc(
2011 const mempool::osdmap::map<int64_t, snap_interval_set_t>& new_removed_snaps,
2012 Op *op);
2013
2014 bool target_should_be_paused(op_target_t *op);
2015 int _calc_target(op_target_t *t, Connection *con,
2016 bool any_change = false);
2017 int _map_session(op_target_t *op, OSDSession **s,
2018 shunique_lock& lc);
2019
2020 void _session_op_assign(OSDSession *s, Op *op);
2021 void _session_op_remove(OSDSession *s, Op *op);
2022 void _session_linger_op_assign(OSDSession *to, LingerOp *op);
2023 void _session_linger_op_remove(OSDSession *from, LingerOp *op);
2024 void _session_command_op_assign(OSDSession *to, CommandOp *op);
2025 void _session_command_op_remove(OSDSession *from, CommandOp *op);
2026
2027 int _assign_op_target_session(Op *op, shunique_lock& lc,
2028 bool src_session_locked,
2029 bool dst_session_locked);
2030 int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
2031
2032 void _linger_submit(LingerOp *info, shunique_lock& sul);
2033 void _send_linger(LingerOp *info, shunique_lock& sul);
2034 void _linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl);
2035 void _linger_reconnect(LingerOp *info, int r);
2036 void _send_linger_ping(LingerOp *info);
2037 void _linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
2038 uint32_t register_gen);
2039 int _normalize_watch_error(int r);
2040
2041 friend class C_DoWatchError;
2042 public:
2043 void linger_callback_flush(Context *ctx) {
2044 finisher->queue(ctx);
2045 }
2046
2047 private:
2048 void _check_op_pool_dne(Op *op, unique_lock *sl);
2049 void _send_op_map_check(Op *op);
2050 void _op_cancel_map_check(Op *op);
2051 void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
2052 void _send_linger_map_check(LingerOp *op);
2053 void _linger_cancel_map_check(LingerOp *op);
2054 void _check_command_map_dne(CommandOp *op);
2055 void _send_command_map_check(CommandOp *op);
2056 void _command_cancel_map_check(CommandOp *op);
2057
2058 void _kick_requests(OSDSession *session, std::map<uint64_t, LingerOp *>& lresend);
2059 void _linger_ops_resend(std::map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
2060
2061 int _get_session(int osd, OSDSession **session, shunique_lock& sul);
2062 void put_session(OSDSession *s);
2063 void get_session(OSDSession *s);
2064 void _reopen_session(OSDSession *session);
2065 void close_session(OSDSession *session);
2066
2067 void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
2068 epoch_t reply_epoch);
2069
2070 void resend_mon_ops();
2071
2072 /**
2073 * handle a budget for in-flight ops
2074 * budget is taken whenever an op goes into the ops std::map
2075 * and returned whenever an op is removed from the std::map
2076 * If throttle_op needs to throttle it will unlock client_lock.
2077 */
2078 int calc_op_budget(const std::vector<OSDOp>& ops);
2079 void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
2080 int _take_op_budget(Op *op, shunique_lock& sul) {
2081 ceph_assert(sul && sul.mutex() == &rwlock);
2082 int op_budget = calc_op_budget(op->ops);
2083 if (keep_balanced_budget) {
2084 _throttle_op(op, sul, op_budget);
2085 } else { // update take_linger_budget to match this!
2086 op_throttle_bytes.take(op_budget);
2087 op_throttle_ops.take(1);
2088 }
2089 op->budget = op_budget;
2090 return op_budget;
2091 }
2092 int take_linger_budget(LingerOp *info);
2093 friend class WatchContext; // to invoke put_up_budget_bytes
2094 void put_op_budget_bytes(int op_budget) {
2095 ceph_assert(op_budget >= 0);
2096 op_throttle_bytes.put(op_budget);
2097 op_throttle_ops.put(1);
2098 }
2099 void put_nlist_context_budget(NListContext *list_context);
2100 Throttle op_throttle_bytes, op_throttle_ops;
2101
2102 public:
2103 Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
2104 Finisher *fin,
2105 double mon_timeout,
2106 double osd_timeout);
2107 ~Objecter() override;
2108
2109 void init();
2110 void start(const OSDMap *o = nullptr);
2111 void shutdown();
2112
2113 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2114 // whatever functionality you want to use the OSDMap in a lambda like:
2115 //
2116 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2117 //
2118 // or
2119 //
2120 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2121 //
2122 // Do not call into something that will try to lock the OSDMap from
2123 // here or you will have great woe and misery.
2124
2125 template<typename Callback, typename...Args>
2126 decltype(auto) with_osdmap(Callback&& cb, Args&&... args) {
2127 shared_lock l(rwlock);
2128 return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2129 }
2130
2131
2132 /**
2133 * Tell the objecter to throttle outgoing ops according to its
2134 * budget (in _conf). If you do this, ops can block, in
2135 * which case it will unlock client_lock and sleep until
2136 * incoming messages reduce the used budget low enough for
2137 * the ops to continue going; then it will lock client_lock again.
2138 */
2139 void set_balanced_budget() { keep_balanced_budget = true; }
2140 void unset_balanced_budget() { keep_balanced_budget = false; }
2141
2142 void set_honor_pool_full() { honor_pool_full = true; }
2143 void unset_honor_pool_full() { honor_pool_full = false; }
2144
2145 void set_pool_full_try() { pool_full_try = true; }
2146 void unset_pool_full_try() { pool_full_try = false; }
2147
2148 void _scan_requests(
2149 OSDSession *s,
2150 bool skipped_map,
2151 bool cluster_full,
2152 std::map<int64_t, bool> *pool_full_map,
2153 std::map<ceph_tid_t, Op*>& need_resend,
2154 std::list<LingerOp*>& need_resend_linger,
2155 std::map<ceph_tid_t, CommandOp*>& need_resend_command,
2156 shunique_lock& sul);
2157
2158 int64_t get_object_hash_position(int64_t pool, const std::string& key,
2159 const std::string& ns);
2160 int64_t get_object_pg_hash_position(int64_t pool, const std::string& key,
2161 const std::string& ns);
2162
2163 // messages
2164 public:
2165 bool ms_dispatch(Message *m) override;
2166 bool ms_can_fast_dispatch_any() const override {
2167 return true;
2168 }
2169 bool ms_can_fast_dispatch(const Message *m) const override {
2170 switch (m->get_type()) {
2171 case CEPH_MSG_OSD_OPREPLY:
2172 case CEPH_MSG_WATCH_NOTIFY:
2173 return true;
2174 default:
2175 return false;
2176 }
2177 }
2178 void ms_fast_dispatch(Message *m) override {
2179 if (!ms_dispatch(m)) {
2180 m->put();
2181 }
2182 }
2183
2184 void handle_osd_op_reply(class MOSDOpReply *m);
2185 void handle_osd_backoff(class MOSDBackoff *m);
2186 void handle_watch_notify(class MWatchNotify *m);
2187 void handle_osd_map(class MOSDMap *m);
2188 void wait_for_osd_map();
2189
2190 /**
2191 * Get std::list of entities blacklisted since this was last called,
2192 * and reset the std::list.
2193 *
2194 * Uses a std::set because typical use case is to compare some
2195 * other std::list of clients to see which overlap with the blacklisted
2196 * addrs.
2197 *
2198 */
2199 void consume_blacklist_events(std::set<entity_addr_t> *events);
2200
2201 int pool_snap_by_name(int64_t poolid,
2202 const char *snap_name,
2203 snapid_t *snap) const;
2204 int pool_snap_get_info(int64_t poolid, snapid_t snap,
2205 pool_snap_info_t *info) const;
2206 int pool_snap_list(int64_t poolid, std::vector<uint64_t> *snaps);
2207 private:
2208
2209 void emit_blacklist_events(const OSDMap::Incremental &inc);
2210 void emit_blacklist_events(const OSDMap &old_osd_map,
2211 const OSDMap &new_osd_map);
2212
2213 // low-level
2214 void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2215 void _op_submit_with_budget(Op *op, shunique_lock& lc,
2216 ceph_tid_t *ptid,
2217 int *ctx_budget = NULL);
2218 // public interface
2219 public:
2220 void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2221 bool is_active() {
2222 shared_lock l(rwlock);
2223 return !((!inflight_ops) && linger_ops.empty() &&
2224 poolstat_ops.empty() && statfs_ops.empty());
2225 }
2226
2227 /**
2228 * Output in-flight requests
2229 */
2230 void _dump_active(OSDSession *s);
2231 void _dump_active();
2232 void dump_active();
2233 void dump_requests(ceph::Formatter *fmt);
2234 void _dump_ops(const OSDSession *s, ceph::Formatter *fmt);
2235 void dump_ops(ceph::Formatter *fmt);
2236 void _dump_linger_ops(const OSDSession *s, ceph::Formatter *fmt);
2237 void dump_linger_ops(ceph::Formatter *fmt);
2238 void _dump_command_ops(const OSDSession *s, ceph::Formatter *fmt);
2239 void dump_command_ops(ceph::Formatter *fmt);
2240 void dump_pool_ops(ceph::Formatter *fmt) const;
2241 void dump_pool_stat_ops(ceph::Formatter *fmt) const;
2242 void dump_statfs_ops(ceph::Formatter *fmt) const;
2243
2244 int get_client_incarnation() const { return client_inc; }
2245 void set_client_incarnation(int inc) { client_inc = inc; }
2246
2247 bool have_map(epoch_t epoch);
2248 /// wait for epoch; true if we already have it
2249 bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2250 void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2251 void wait_for_latest_osdmap(Context *fin);
2252 void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2253
2254 /** Get the current set of global op flags */
2255 int get_global_op_flags() const { return global_op_flags; }
2256 /** Add a flag to the global op flags, not really atomic operation */
2257 void add_global_op_flags(int flag) {
2258 global_op_flags.fetch_or(flag);
2259 }
2260 /** Clear the passed flags from the global op flag set */
2261 void clear_global_op_flag(int flags) {
2262 global_op_flags.fetch_and(~flags);
2263 }
2264
2265 /// cancel an in-progress request with the given return code
2266 private:
2267 int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2268 int _op_cancel(ceph_tid_t tid, int r);
2269 public:
2270 int op_cancel(ceph_tid_t tid, int r);
2271 int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
2272
2273 /**
2274 * Any write op which is in progress at the start of this call shall no
2275 * longer be in progress when this call ends. Operations started after the
2276 * start of this call may still be in progress when this call ends.
2277 *
2278 * @return the latest possible epoch in which a cancelled op could have
2279 * existed, or -1 if nothing was cancelled.
2280 */
2281 epoch_t op_cancel_writes(int r, int64_t pool=-1);
2282
2283 // commands
2284 void osd_command(int osd, const std::vector<std::string>& cmd,
2285 const ceph::buffer::list& inbl, ceph_tid_t *ptid,
2286 ceph::buffer::list *poutbl, std::string *prs, Context *onfinish) {
2287 ceph_assert(osd >= 0);
2288 CommandOp *c = new CommandOp(
2289 osd,
2290 cmd,
2291 inbl,
2292 poutbl,
2293 prs,
2294 onfinish);
2295 submit_command(c, ptid);
2296 }
2297 void pg_command(pg_t pgid, const std::vector<std::string>& cmd,
2298 const ceph::buffer::list& inbl, ceph_tid_t *ptid,
2299 ceph::buffer::list *poutbl, std::string *prs, Context *onfinish) {
2300 CommandOp *c = new CommandOp(
2301 pgid,
2302 cmd,
2303 inbl,
2304 poutbl,
2305 prs,
2306 onfinish);
2307 submit_command(c, ptid);
2308 }
2309
2310 // mid-level helpers
2311 Op *prepare_mutate_op(
2312 const object_t& oid, const object_locator_t& oloc,
2313 ObjectOperation& op, const SnapContext& snapc,
2314 ceph::real_time mtime, int flags,
2315 Context *oncommit, version_t *objver = NULL,
2316 osd_reqid_t reqid = osd_reqid_t(),
2317 ZTracer::Trace *parent_trace = nullptr) {
2318 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2319 CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2320 o->priority = op.priority;
2321 o->mtime = mtime;
2322 o->snapc = snapc;
2323 o->out_rval.swap(op.out_rval);
2324 o->out_bl.swap(op.out_bl);
2325 o->out_handler.swap(op.out_handler);
2326 o->reqid = reqid;
2327 return o;
2328 }
2329 ceph_tid_t mutate(
2330 const object_t& oid, const object_locator_t& oloc,
2331 ObjectOperation& op, const SnapContext& snapc,
2332 ceph::real_time mtime, int flags,
2333 Context *oncommit, version_t *objver = NULL,
2334 osd_reqid_t reqid = osd_reqid_t()) {
2335 Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2336 oncommit, objver, reqid);
2337 ceph_tid_t tid;
2338 op_submit(o, &tid);
2339 return tid;
2340 }
2341 Op *prepare_read_op(
2342 const object_t& oid, const object_locator_t& oloc,
2343 ObjectOperation& op,
2344 snapid_t snapid, ceph::buffer::list *pbl, int flags,
2345 Context *onack, version_t *objver = NULL,
2346 int *data_offset = NULL,
2347 uint64_t features = 0,
2348 ZTracer::Trace *parent_trace = nullptr) {
2349 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2350 CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2351 o->priority = op.priority;
2352 o->snapid = snapid;
2353 o->outbl = pbl;
2354 if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2355 o->outbl = op.out_bl[0];
2356 o->out_bl.swap(op.out_bl);
2357 o->out_handler.swap(op.out_handler);
2358 o->out_rval.swap(op.out_rval);
2359 return o;
2360 }
2361 ceph_tid_t read(
2362 const object_t& oid, const object_locator_t& oloc,
2363 ObjectOperation& op,
2364 snapid_t snapid, ceph::buffer::list *pbl, int flags,
2365 Context *onack, version_t *objver = NULL,
2366 int *data_offset = NULL,
2367 uint64_t features = 0) {
2368 Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2369 data_offset);
2370 if (features)
2371 o->features = features;
2372 ceph_tid_t tid;
2373 op_submit(o, &tid);
2374 return tid;
2375 }
2376 Op *prepare_pg_read_op(
2377 uint32_t hash, object_locator_t oloc,
2378 ObjectOperation& op, ceph::buffer::list *pbl, int flags,
2379 Context *onack, epoch_t *reply_epoch,
2380 int *ctx_budget) {
2381 Op *o = new Op(object_t(), oloc,
2382 op.ops,
2383 flags | global_op_flags | CEPH_OSD_FLAG_READ |
2384 CEPH_OSD_FLAG_IGNORE_OVERLAY,
2385 onack, NULL);
2386 o->target.precalc_pgid = true;
2387 o->target.base_pgid = pg_t(hash, oloc.pool);
2388 o->priority = op.priority;
2389 o->snapid = CEPH_NOSNAP;
2390 o->outbl = pbl;
2391 o->out_bl.swap(op.out_bl);
2392 o->out_handler.swap(op.out_handler);
2393 o->out_rval.swap(op.out_rval);
2394 o->reply_epoch = reply_epoch;
2395 if (ctx_budget) {
2396 // budget is tracked by listing context
2397 o->ctx_budgeted = true;
2398 }
2399 return o;
2400 }
2401 ceph_tid_t pg_read(
2402 uint32_t hash, object_locator_t oloc,
2403 ObjectOperation& op, ceph::buffer::list *pbl, int flags,
2404 Context *onack, epoch_t *reply_epoch,
2405 int *ctx_budget) {
2406 Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2407 onack, reply_epoch, ctx_budget);
2408 ceph_tid_t tid;
2409 op_submit(o, &tid, ctx_budget);
2410 return tid;
2411 }
2412
2413 // caller owns a ref
2414 LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2415 int flags);
2416 ceph_tid_t linger_watch(LingerOp *info,
2417 ObjectOperation& op,
2418 const SnapContext& snapc, ceph::real_time mtime,
2419 ceph::buffer::list& inbl,
2420 Context *onfinish,
2421 version_t *objver);
2422 ceph_tid_t linger_notify(LingerOp *info,
2423 ObjectOperation& op,
2424 snapid_t snap, ceph::buffer::list& inbl,
2425 ceph::buffer::list *poutbl,
2426 Context *onack,
2427 version_t *objver);
2428 int linger_check(LingerOp *info);
2429 void linger_cancel(LingerOp *info); // releases a reference
2430 void _linger_cancel(LingerOp *info);
2431
2432 void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2433
2434 /**
2435 * set up initial ops in the op std::vector, and allocate a final op slot.
2436 *
2437 * The caller is responsible for filling in the final ops_count ops.
2438 *
2439 * @param ops op std::vector
2440 * @param ops_count number of final ops the caller will fill in
2441 * @param extra_ops pointer to [array of] initial op[s]
2442 * @return index of final op (for caller to fill in)
2443 */
2444 int init_ops(std::vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2445 int i;
2446 int extra = 0;
2447
2448 if (extra_ops)
2449 extra = extra_ops->ops.size();
2450
2451 ops.resize(ops_count + extra);
2452
2453 for (i=0; i<extra; i++) {
2454 ops[i] = extra_ops->ops[i];
2455 }
2456
2457 return i;
2458 }
2459
2460
2461 // high-level helpers
2462 Op *prepare_stat_op(
2463 const object_t& oid, const object_locator_t& oloc,
2464 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2465 int flags, Context *onfinish, version_t *objver = NULL,
2466 ObjectOperation *extra_ops = NULL) {
2467 std::vector<OSDOp> ops;
2468 int i = init_ops(ops, 1, extra_ops);
2469 ops[i].op.op = CEPH_OSD_OP_STAT;
2470 C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2471 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2472 CEPH_OSD_FLAG_READ, fin, objver);
2473 o->snapid = snap;
2474 o->outbl = &fin->bl;
2475 return o;
2476 }
2477 ceph_tid_t stat(
2478 const object_t& oid, const object_locator_t& oloc,
2479 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2480 int flags, Context *onfinish, version_t *objver = NULL,
2481 ObjectOperation *extra_ops = NULL) {
2482 Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2483 onfinish, objver, extra_ops);
2484 ceph_tid_t tid;
2485 op_submit(o, &tid);
2486 return tid;
2487 }
2488
2489 Op *prepare_read_op(
2490 const object_t& oid, const object_locator_t& oloc,
2491 uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
2492 int flags, Context *onfinish, version_t *objver = NULL,
2493 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2494 ZTracer::Trace *parent_trace = nullptr) {
2495 std::vector<OSDOp> ops;
2496 int i = init_ops(ops, 1, extra_ops);
2497 ops[i].op.op = CEPH_OSD_OP_READ;
2498 ops[i].op.extent.offset = off;
2499 ops[i].op.extent.length = len;
2500 ops[i].op.extent.truncate_size = 0;
2501 ops[i].op.extent.truncate_seq = 0;
2502 ops[i].op.flags = op_flags;
2503 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2504 CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2505 o->snapid = snap;
2506 o->outbl = pbl;
2507 return o;
2508 }
2509 ceph_tid_t read(
2510 const object_t& oid, const object_locator_t& oloc,
2511 uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
2512 int flags, Context *onfinish, version_t *objver = NULL,
2513 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2514 Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2515 onfinish, objver, extra_ops, op_flags);
2516 ceph_tid_t tid;
2517 op_submit(o, &tid);
2518 return tid;
2519 }
2520
2521 Op *prepare_cmpext_op(
2522 const object_t& oid, const object_locator_t& oloc,
2523 uint64_t off, ceph::buffer::list &cmp_bl,
2524 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2525 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2526 std::vector<OSDOp> ops;
2527 int i = init_ops(ops, 1, extra_ops);
2528 ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2529 ops[i].op.extent.offset = off;
2530 ops[i].op.extent.length = cmp_bl.length();
2531 ops[i].op.extent.truncate_size = 0;
2532 ops[i].op.extent.truncate_seq = 0;
2533 ops[i].indata = cmp_bl;
2534 ops[i].op.flags = op_flags;
2535 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2536 CEPH_OSD_FLAG_READ, onfinish, objver);
2537 o->snapid = snap;
2538 return o;
2539 }
2540
2541 ceph_tid_t cmpext(
2542 const object_t& oid, const object_locator_t& oloc,
2543 uint64_t off, ceph::buffer::list &cmp_bl,
2544 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2545 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2546 Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2547 flags, onfinish, objver, extra_ops, op_flags);
2548 ceph_tid_t tid;
2549 op_submit(o, &tid);
2550 return tid;
2551 }
2552
2553 ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2554 uint64_t off, uint64_t len, snapid_t snap,
2555 ceph::buffer::list *pbl, int flags, uint64_t trunc_size,
2556 __u32 trunc_seq, Context *onfinish,
2557 version_t *objver = NULL,
2558 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2559 std::vector<OSDOp> ops;
2560 int i = init_ops(ops, 1, extra_ops);
2561 ops[i].op.op = CEPH_OSD_OP_READ;
2562 ops[i].op.extent.offset = off;
2563 ops[i].op.extent.length = len;
2564 ops[i].op.extent.truncate_size = trunc_size;
2565 ops[i].op.extent.truncate_seq = trunc_seq;
2566 ops[i].op.flags = op_flags;
2567 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2568 CEPH_OSD_FLAG_READ, onfinish, objver);
2569 o->snapid = snap;
2570 o->outbl = pbl;
2571 ceph_tid_t tid;
2572 op_submit(o, &tid);
2573 return tid;
2574 }
2575 ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2576 uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
2577 int flags, Context *onfinish, version_t *objver = NULL,
2578 ObjectOperation *extra_ops = NULL) {
2579 std::vector<OSDOp> ops;
2580 int i = init_ops(ops, 1, extra_ops);
2581 ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2582 ops[i].op.extent.offset = off;
2583 ops[i].op.extent.length = len;
2584 ops[i].op.extent.truncate_size = 0;
2585 ops[i].op.extent.truncate_seq = 0;
2586 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2587 CEPH_OSD_FLAG_READ, onfinish, objver);
2588 o->snapid = snap;
2589 o->outbl = pbl;
2590 ceph_tid_t tid;
2591 op_submit(o, &tid);
2592 return tid;
2593 }
2594 ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2595 const char *name, snapid_t snap, ceph::buffer::list *pbl, int flags,
2596 Context *onfinish,
2597 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2598 std::vector<OSDOp> ops;
2599 int i = init_ops(ops, 1, extra_ops);
2600 ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2601 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2602 ops[i].op.xattr.value_len = 0;
2603 if (name)
2604 ops[i].indata.append(name, ops[i].op.xattr.name_len);
2605 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2606 CEPH_OSD_FLAG_READ, onfinish, objver);
2607 o->snapid = snap;
2608 o->outbl = pbl;
2609 ceph_tid_t tid;
2610 op_submit(o, &tid);
2611 return tid;
2612 }
2613
2614 ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2615 snapid_t snap, std::map<std::string,ceph::buffer::list>& attrset,
2616 int flags, Context *onfinish, version_t *objver = NULL,
2617 ObjectOperation *extra_ops = NULL) {
2618 std::vector<OSDOp> ops;
2619 int i = init_ops(ops, 1, extra_ops);
2620 ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2621 C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2622 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2623 CEPH_OSD_FLAG_READ, fin, objver);
2624 o->snapid = snap;
2625 o->outbl = &fin->bl;
2626 ceph_tid_t tid;
2627 op_submit(o, &tid);
2628 return tid;
2629 }
2630
2631 ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2632 snapid_t snap, ceph::buffer::list *pbl, int flags,
2633 Context *onfinish, version_t *objver = NULL,
2634 ObjectOperation *extra_ops = NULL) {
2635 return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
2636 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2637 }
2638
2639
2640 // writes
2641 ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2642 std::vector<OSDOp>& ops, ceph::real_time mtime,
2643 const SnapContext& snapc, int flags,
2644 Context *oncommit,
2645 version_t *objver = NULL) {
2646 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2647 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2648 o->mtime = mtime;
2649 o->snapc = snapc;
2650 ceph_tid_t tid;
2651 op_submit(o, &tid);
2652 return tid;
2653 }
2654 Op *prepare_write_op(
2655 const object_t& oid, const object_locator_t& oloc,
2656 uint64_t off, uint64_t len, const SnapContext& snapc,
2657 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2658 Context *oncommit, version_t *objver = NULL,
2659 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2660 ZTracer::Trace *parent_trace = nullptr) {
2661 std::vector<OSDOp> ops;
2662 int i = init_ops(ops, 1, extra_ops);
2663 ops[i].op.op = CEPH_OSD_OP_WRITE;
2664 ops[i].op.extent.offset = off;
2665 ops[i].op.extent.length = len;
2666 ops[i].op.extent.truncate_size = 0;
2667 ops[i].op.extent.truncate_seq = 0;
2668 ops[i].indata = bl;
2669 ops[i].op.flags = op_flags;
2670 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2671 CEPH_OSD_FLAG_WRITE, oncommit, objver,
2672 nullptr, parent_trace);
2673 o->mtime = mtime;
2674 o->snapc = snapc;
2675 return o;
2676 }
2677 ceph_tid_t write(
2678 const object_t& oid, const object_locator_t& oloc,
2679 uint64_t off, uint64_t len, const SnapContext& snapc,
2680 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2681 Context *oncommit, version_t *objver = NULL,
2682 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2683 Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2684 oncommit, objver, extra_ops, op_flags);
2685 ceph_tid_t tid;
2686 op_submit(o, &tid);
2687 return tid;
2688 }
2689 Op *prepare_append_op(
2690 const object_t& oid, const object_locator_t& oloc,
2691 uint64_t len, const SnapContext& snapc,
2692 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2693 Context *oncommit,
2694 version_t *objver = NULL,
2695 ObjectOperation *extra_ops = NULL) {
2696 std::vector<OSDOp> ops;
2697 int i = init_ops(ops, 1, extra_ops);
2698 ops[i].op.op = CEPH_OSD_OP_APPEND;
2699 ops[i].op.extent.offset = 0;
2700 ops[i].op.extent.length = len;
2701 ops[i].op.extent.truncate_size = 0;
2702 ops[i].op.extent.truncate_seq = 0;
2703 ops[i].indata = bl;
2704 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2705 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2706 o->mtime = mtime;
2707 o->snapc = snapc;
2708 return o;
2709 }
2710 ceph_tid_t append(
2711 const object_t& oid, const object_locator_t& oloc,
2712 uint64_t len, const SnapContext& snapc,
2713 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2714 Context *oncommit,
2715 version_t *objver = NULL,
2716 ObjectOperation *extra_ops = NULL) {
2717 Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2718 oncommit, objver, extra_ops);
2719 ceph_tid_t tid;
2720 op_submit(o, &tid);
2721 return tid;
2722 }
2723 ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2724 uint64_t off, uint64_t len, const SnapContext& snapc,
2725 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
2726 uint64_t trunc_size, __u32 trunc_seq,
2727 Context *oncommit,
2728 version_t *objver = NULL,
2729 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2730 std::vector<OSDOp> ops;
2731 int i = init_ops(ops, 1, extra_ops);
2732 ops[i].op.op = CEPH_OSD_OP_WRITE;
2733 ops[i].op.extent.offset = off;
2734 ops[i].op.extent.length = len;
2735 ops[i].op.extent.truncate_size = trunc_size;
2736 ops[i].op.extent.truncate_seq = trunc_seq;
2737 ops[i].indata = bl;
2738 ops[i].op.flags = op_flags;
2739 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2740 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2741 o->mtime = mtime;
2742 o->snapc = snapc;
2743 ceph_tid_t tid;
2744 op_submit(o, &tid);
2745 return tid;
2746 }
2747 Op *prepare_write_full_op(
2748 const object_t& oid, const object_locator_t& oloc,
2749 const SnapContext& snapc, const ceph::buffer::list &bl,
2750 ceph::real_time mtime, int flags,
2751 Context *oncommit, version_t *objver = NULL,
2752 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2753 std::vector<OSDOp> ops;
2754 int i = init_ops(ops, 1, extra_ops);
2755 ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2756 ops[i].op.extent.offset = 0;
2757 ops[i].op.extent.length = bl.length();
2758 ops[i].indata = bl;
2759 ops[i].op.flags = op_flags;
2760 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2761 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2762 o->mtime = mtime;
2763 o->snapc = snapc;
2764 return o;
2765 }
2766 ceph_tid_t write_full(
2767 const object_t& oid, const object_locator_t& oloc,
2768 const SnapContext& snapc, const ceph::buffer::list &bl,
2769 ceph::real_time mtime, int flags,
2770 Context *oncommit, version_t *objver = NULL,
2771 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2772 Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2773 oncommit, objver, extra_ops, op_flags);
2774 ceph_tid_t tid;
2775 op_submit(o, &tid);
2776 return tid;
2777 }
2778 Op *prepare_writesame_op(
2779 const object_t& oid, const object_locator_t& oloc,
2780 uint64_t write_len, uint64_t off,
2781 const SnapContext& snapc, const ceph::buffer::list &bl,
2782 ceph::real_time mtime, int flags,
2783 Context *oncommit, version_t *objver = NULL,
2784 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2785
2786 std::vector<OSDOp> ops;
2787 int i = init_ops(ops, 1, extra_ops);
2788 ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2789 ops[i].op.writesame.offset = off;
2790 ops[i].op.writesame.length = write_len;
2791 ops[i].op.writesame.data_length = bl.length();
2792 ops[i].indata = bl;
2793 ops[i].op.flags = op_flags;
2794 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2795 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2796 o->mtime = mtime;
2797 o->snapc = snapc;
2798 return o;
2799 }
2800 ceph_tid_t writesame(
2801 const object_t& oid, const object_locator_t& oloc,
2802 uint64_t write_len, uint64_t off,
2803 const SnapContext& snapc, const ceph::buffer::list &bl,
2804 ceph::real_time mtime, int flags,
2805 Context *oncommit, version_t *objver = NULL,
2806 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2807
2808 Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2809 mtime, flags, oncommit, objver,
2810 extra_ops, op_flags);
2811
2812 ceph_tid_t tid;
2813 op_submit(o, &tid);
2814 return tid;
2815 }
2816 ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2817 const SnapContext& snapc, ceph::real_time mtime, int flags,
2818 uint64_t trunc_size, __u32 trunc_seq,
2819 Context *oncommit, version_t *objver = NULL,
2820 ObjectOperation *extra_ops = NULL) {
2821 std::vector<OSDOp> ops;
2822 int i = init_ops(ops, 1, extra_ops);
2823 ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2824 ops[i].op.extent.offset = trunc_size;
2825 ops[i].op.extent.truncate_size = trunc_size;
2826 ops[i].op.extent.truncate_seq = trunc_seq;
2827 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2828 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2829 o->mtime = mtime;
2830 o->snapc = snapc;
2831 ceph_tid_t tid;
2832 op_submit(o, &tid);
2833 return tid;
2834 }
2835 ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2836 uint64_t off, uint64_t len, const SnapContext& snapc,
2837 ceph::real_time mtime, int flags, Context *oncommit,
2838 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2839 std::vector<OSDOp> ops;
2840 int i = init_ops(ops, 1, extra_ops);
2841 ops[i].op.op = CEPH_OSD_OP_ZERO;
2842 ops[i].op.extent.offset = off;
2843 ops[i].op.extent.length = len;
2844 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2845 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2846 o->mtime = mtime;
2847 o->snapc = snapc;
2848 ceph_tid_t tid;
2849 op_submit(o, &tid);
2850 return tid;
2851 }
2852 ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2853 const SnapContext& snapc, snapid_t snapid,
2854 ceph::real_time mtime, Context *oncommit,
2855 version_t *objver = NULL,
2856 ObjectOperation *extra_ops = NULL) {
2857 std::vector<OSDOp> ops;
2858 int i = init_ops(ops, 1, extra_ops);
2859 ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2860 ops[i].op.snap.snapid = snapid;
2861 Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2862 o->mtime = mtime;
2863 o->snapc = snapc;
2864 ceph_tid_t tid;
2865 op_submit(o, &tid);
2866 return tid;
2867 }
2868 ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2869 const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2870 int create_flags, Context *oncommit,
2871 version_t *objver = NULL,
2872 ObjectOperation *extra_ops = NULL) {
2873 std::vector<OSDOp> ops;
2874 int i = init_ops(ops, 1, extra_ops);
2875 ops[i].op.op = CEPH_OSD_OP_CREATE;
2876 ops[i].op.flags = create_flags;
2877 Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
2878 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2879 o->mtime = mtime;
2880 o->snapc = snapc;
2881 ceph_tid_t tid;
2882 op_submit(o, &tid);
2883 return tid;
2884 }
2885 Op *prepare_remove_op(
2886 const object_t& oid, const object_locator_t& oloc,
2887 const SnapContext& snapc, ceph::real_time mtime, int flags,
2888 Context *oncommit,
2889 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2890 std::vector<OSDOp> ops;
2891 int i = init_ops(ops, 1, extra_ops);
2892 ops[i].op.op = CEPH_OSD_OP_DELETE;
2893 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2894 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2895 o->mtime = mtime;
2896 o->snapc = snapc;
2897 return o;
2898 }
2899 ceph_tid_t remove(
2900 const object_t& oid, const object_locator_t& oloc,
2901 const SnapContext& snapc, ceph::real_time mtime, int flags,
2902 Context *oncommit,
2903 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2904 Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2905 oncommit, objver, extra_ops);
2906 ceph_tid_t tid;
2907 op_submit(o, &tid);
2908 return tid;
2909 }
2910
2911 ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2912 const char *name, const SnapContext& snapc, const ceph::buffer::list &bl,
2913 ceph::real_time mtime, int flags,
2914 Context *oncommit,
2915 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2916 std::vector<OSDOp> ops;
2917 int i = init_ops(ops, 1, extra_ops);
2918 ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2919 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2920 ops[i].op.xattr.value_len = bl.length();
2921 if (name)
2922 ops[i].indata.append(name, ops[i].op.xattr.name_len);
2923 ops[i].indata.append(bl);
2924 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2925 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2926 o->mtime = mtime;
2927 o->snapc = snapc;
2928 ceph_tid_t tid;
2929 op_submit(o, &tid);
2930 return tid;
2931 }
2932 ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2933 const char *name, const SnapContext& snapc,
2934 ceph::real_time mtime, int flags,
2935 Context *oncommit,
2936 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2937 std::vector<OSDOp> ops;
2938 int i = init_ops(ops, 1, extra_ops);
2939 ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2940 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2941 ops[i].op.xattr.value_len = 0;
2942 if (name)
2943 ops[i].indata.append(name, ops[i].op.xattr.name_len);
2944 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2945 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2946 o->mtime = mtime;
2947 o->snapc = snapc;
2948 ceph_tid_t tid;
2949 op_submit(o, &tid);
2950 return tid;
2951 }
2952
2953 void list_nobjects(NListContext *p, Context *onfinish);
2954 uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2955 uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2956 void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2957
2958 hobject_t enumerate_objects_begin();
2959 hobject_t enumerate_objects_end();
2960 //hobject_t enumerate_objects_begin(int n, int m);
2961 void enumerate_objects(
2962 int64_t pool_id,
2963 const std::string &ns,
2964 const hobject_t &start,
2965 const hobject_t &end,
2966 const uint32_t max,
2967 const ceph::buffer::list &filter_bl,
2968 std::list<librados::ListObjectImpl> *result,
2969 hobject_t *next,
2970 Context *on_finish);
2971
2972 void _enumerate_reply(
2973 ceph::buffer::list &bl,
2974 int r,
2975 const hobject_t &end,
2976 const int64_t pool_id,
2977 int budget,
2978 epoch_t reply_epoch,
2979 std::list<librados::ListObjectImpl> *result,
2980 hobject_t *next,
2981 Context *on_finish);
2982 friend class C_EnumerateReply;
2983
2984 // -------------------------
2985 // pool ops
2986 private:
2987 void pool_op_submit(PoolOp *op);
2988 void _pool_op_submit(PoolOp *op);
2989 void _finish_pool_op(PoolOp *op, int r);
2990 void _do_delete_pool(int64_t pool, Context *onfinish);
2991 public:
2992 int create_pool_snap(int64_t pool, std::string& snapName, Context *onfinish);
2993 int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2994 Context *onfinish);
2995 int delete_pool_snap(int64_t pool, std::string& snapName, Context *onfinish);
2996 int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2997
2998 int create_pool(std::string& name, Context *onfinish,
2999 int crush_rule=-1);
3000 int delete_pool(int64_t pool, Context *onfinish);
3001 int delete_pool(const std::string& name, Context *onfinish);
3002
3003 void handle_pool_op_reply(MPoolOpReply *m);
3004 int pool_op_cancel(ceph_tid_t tid, int r);
3005
3006 // --------------------------
3007 // pool stats
3008 private:
3009 void _poolstat_submit(PoolStatOp *op);
3010 public:
3011 void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
3012 void get_pool_stats(std::list<std::string>& pools,
3013 std::map<std::string,pool_stat_t> *result,
3014 bool *per_pool,
3015 Context *onfinish);
3016 int pool_stat_op_cancel(ceph_tid_t tid, int r);
3017 void _finish_pool_stat_op(PoolStatOp *op, int r);
3018
3019 // ---------------------------
3020 // df stats
3021 private:
3022 void _fs_stats_submit(StatfsOp *op);
3023 public:
3024 void handle_fs_stats_reply(MStatfsReply *m);
3025 void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
3026 Context *onfinish);
3027 int statfs_op_cancel(ceph_tid_t tid, int r);
3028 void _finish_statfs_op(StatfsOp *op, int r);
3029
3030 // ---------------------------
3031 // some scatter/gather hackery
3032
3033 void _sg_read_finish(std::vector<ObjectExtent>& extents,
3034 std::vector<ceph::buffer::list>& resultbl,
3035 ceph::buffer::list *bl, Context *onfinish);
3036
3037 struct C_SGRead : public Context {
3038 Objecter *objecter;
3039 std::vector<ObjectExtent> extents;
3040 std::vector<ceph::buffer::list> resultbl;
3041 ceph::buffer::list *bl;
3042 Context *onfinish;
3043 C_SGRead(Objecter *ob,
3044 std::vector<ObjectExtent>& e, std::vector<ceph::buffer::list>& r, ceph::buffer::list *b,
3045 Context *c) :
3046 objecter(ob), bl(b), onfinish(c) {
3047 extents.swap(e);
3048 resultbl.swap(r);
3049 }
3050 void finish(int r) override {
3051 objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
3052 }
3053 };
3054
3055 void sg_read_trunc(std::vector<ObjectExtent>& extents, snapid_t snap,
3056 ceph::buffer::list *bl, int flags, uint64_t trunc_size,
3057 __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
3058 if (extents.size() == 1) {
3059 read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3060 extents[0].length, snap, bl, flags, extents[0].truncate_size,
3061 trunc_seq, onfinish, 0, 0, op_flags);
3062 } else {
3063 C_GatherBuilder gather(cct);
3064 std::vector<ceph::buffer::list> resultbl(extents.size());
3065 int i=0;
3066 for (auto p = extents.begin(); p != extents.end(); ++p) {
3067 read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
3068 flags, p->truncate_size, trunc_seq, gather.new_sub(),
3069 0, 0, op_flags);
3070 }
3071 gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
3072 gather.activate();
3073 }
3074 }
3075
3076 void sg_read(std::vector<ObjectExtent>& extents, snapid_t snap, ceph::buffer::list *bl,
3077 int flags, Context *onfinish, int op_flags = 0) {
3078 sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
3079 }
3080
3081 void sg_write_trunc(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
3082 const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
3083 uint64_t trunc_size, __u32 trunc_seq,
3084 Context *oncommit, int op_flags = 0) {
3085 if (extents.size() == 1) {
3086 write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3087 extents[0].length, snapc, bl, mtime, flags,
3088 extents[0].truncate_size, trunc_seq, oncommit,
3089 0, 0, op_flags);
3090 } else {
3091 C_GatherBuilder gcom(cct, oncommit);
3092 for (auto p = extents.begin(); p != extents.end(); ++p) {
3093 ceph::buffer::list cur;
3094 for (auto bit = p->buffer_extents.begin();
3095 bit != p->buffer_extents.end();
3096 ++bit)
3097 bl.copy(bit->first, bit->second, cur);
3098 ceph_assert(cur.length() == p->length);
3099 write_trunc(p->oid, p->oloc, p->offset, p->length,
3100 snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3101 oncommit ? gcom.new_sub():0,
3102 0, 0, op_flags);
3103 }
3104 gcom.activate();
3105 }
3106 }
3107
3108 void sg_write(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
3109 const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
3110 Context *oncommit, int op_flags = 0) {
3111 sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3112 op_flags);
3113 }
3114
3115 void ms_handle_connect(Connection *con) override;
3116 bool ms_handle_reset(Connection *con) override;
3117 void ms_handle_remote_reset(Connection *con) override;
3118 bool ms_handle_refused(Connection *con) override;
3119
3120 void blacklist_self(bool set);
3121
3122 private:
3123 epoch_t epoch_barrier = 0;
3124 bool retry_writes_after_first_reply;
3125 public:
3126 void set_epoch_barrier(epoch_t epoch);
3127
3128 PerfCounters *get_logger() {
3129 return logger;
3130 }
3131 };
3132
3133 #endif
3134