1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_MOSDOP_H
17 #define CEPH_MOSDOP_H
18
19 #include <atomic>
20
21 #include "MOSDFastDispatchOp.h"
22 #include "include/ceph_features.h"
23 #include "common/hobject.h"
24
25 /*
26 * OSD op
27 *
28 * oid - object id
29 * op - OSD_OP_DELETE, etc.
30 *
31 */
32
33 class OSD;
34
35 class MOSDOp : public MOSDFastDispatchOp {
36 private:
37 static constexpr int HEAD_VERSION = 8;
38 static constexpr int COMPAT_VERSION = 3;
39
40 private:
41 uint32_t client_inc = 0;
42 __u32 osdmap_epoch = 0;
43 __u32 flags = 0;
44 utime_t mtime;
45 int32_t retry_attempt = -1; // 0 is first attempt. -1 if we don't know.
46
47 hobject_t hobj;
48 spg_t pgid;
49 ceph::buffer::list::const_iterator p;
50 // Decoding flags. Decoding is only needed for messages caught by pipe reader.
51 // Transition from true -> false without locks being held
52 // Can never see final_decode_needed == false and partial_decode_needed == true
53 std::atomic<bool> partial_decode_needed;
54 std::atomic<bool> final_decode_needed;
55 //
56 public:
57 std::vector<OSDOp> ops;
58 private:
59 snapid_t snap_seq;
60 std::vector<snapid_t> snaps;
61
(1) Event member_decl: |
Class member declaration for "features". |
Also see events: |
[uninit_member] |
62 uint64_t features;
63 bool bdata_encode;
64 osd_reqid_t reqid; // reqid explicitly set by sender
65
66 public:
67 friend class MOSDOpReply;
68
69 ceph_tid_t get_client_tid() { return header.tid; }
70 void set_snapid(const snapid_t& s) {
71 hobj.snap = s;
72 }
73 void set_snaps(const std::vector<snapid_t>& i) {
74 snaps = i;
75 }
76 void set_snap_seq(const snapid_t& s) { snap_seq = s; }
77 void set_reqid(const osd_reqid_t rid) {
78 reqid = rid;
79 }
80 void set_spg(spg_t p) {
81 pgid = p;
82 }
83
84 // Fields decoded in partial decoding
85 pg_t get_pg() const {
86 ceph_assert(!partial_decode_needed);
87 return pgid.pgid;
88 }
89 spg_t get_spg() const override {
90 ceph_assert(!partial_decode_needed);
91 return pgid;
92 }
93 pg_t get_raw_pg() const {
94 ceph_assert(!partial_decode_needed);
95 return pg_t(hobj.get_hash(), pgid.pgid.pool());
96 }
97 epoch_t get_map_epoch() const override {
98 ceph_assert(!partial_decode_needed);
99 return osdmap_epoch;
100 }
101 int get_flags() const {
102 ceph_assert(!partial_decode_needed);
103 return flags;
104 }
105 osd_reqid_t get_reqid() const {
106 ceph_assert(!partial_decode_needed);
107 if (reqid.name != entity_name_t() || reqid.tid != 0) {
108 return reqid;
109 } else {
110 if (!final_decode_needed)
111 ceph_assert(reqid.inc == (int32_t)client_inc); // decode() should have done this
112 return osd_reqid_t(get_orig_source(),
113 reqid.inc,
114 header.tid);
115 }
116 }
117
118 // Fields decoded in final decoding
119 int get_client_inc() const {
120 ceph_assert(!final_decode_needed);
121 return client_inc;
122 }
123 utime_t get_mtime() const {
124 ceph_assert(!final_decode_needed);
125 return mtime;
126 }
127 object_locator_t get_object_locator() const {
128 ceph_assert(!final_decode_needed);
129 if (hobj.oid.name.empty())
130 return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
131 else
132 return object_locator_t(hobj);
133 }
134 const object_t& get_oid() const {
135 ceph_assert(!final_decode_needed);
136 return hobj.oid;
137 }
138 const hobject_t &get_hobj() const {
139 return hobj;
140 }
141 snapid_t get_snapid() const {
142 ceph_assert(!final_decode_needed);
143 return hobj.snap;
144 }
145 const snapid_t& get_snap_seq() const {
146 ceph_assert(!final_decode_needed);
147 return snap_seq;
148 }
149 const std::vector<snapid_t> &get_snaps() const {
150 ceph_assert(!final_decode_needed);
151 return snaps;
152 }
153
154 /**
155 * get retry attempt
156 *
157 * 0 is the first attempt.
158 *
159 * @return retry attempt, or -1 if we don't know
160 */
161 int get_retry_attempt() const {
162 return retry_attempt;
163 }
164 uint64_t get_features() const {
165 if (features)
166 return features;
167 return get_connection()->get_features();
168 }
169
170 MOSDOp()
171 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
172 partial_decode_needed(true),
173 final_decode_needed(true),
(2) Event uninit_member: |
Non-static class member "features" is not initialized in this constructor nor in any functions that it calls. |
Also see events: |
[member_decl] |
174 bdata_encode(false) { }
175 MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid,
176 epoch_t _osdmap_epoch,
177 int _flags, uint64_t feat)
178 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
179 client_inc(inc),
180 osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
181 hobj(ho),
182 pgid(_pgid),
183 partial_decode_needed(false),
184 final_decode_needed(false),
185 features(feat),
186 bdata_encode(false) {
187 set_tid(tid);
188
189 // also put the client_inc in reqid.inc, so that get_reqid() can
190 // be used before the full message is decoded.
191 reqid.inc = inc;
192 }
193 private:
194 ~MOSDOp() override {}
195
196 public:
197 void set_mtime(utime_t mt) { mtime = mt; }
198 void set_mtime(ceph::real_time mt) {
199 mtime = ceph::real_clock::to_timespec(mt);
200 }
201
202 // ops
203 void add_simple_op(int o, uint64_t off, uint64_t len) {
204 OSDOp osd_op;
205 osd_op.op.op = o;
206 osd_op.op.extent.offset = off;
207 osd_op.op.extent.length = len;
208 ops.push_back(osd_op);
209 }
210 void write(uint64_t off, uint64_t len, ceph::buffer::list& bl) {
211 add_simple_op(CEPH_OSD_OP_WRITE, off, len);
212 data.claim(bl);
213 header.data_off = off;
214 }
215 void writefull(ceph::buffer::list& bl) {
216 add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
217 data.claim(bl);
218 header.data_off = 0;
219 }
220 void zero(uint64_t off, uint64_t len) {
221 add_simple_op(CEPH_OSD_OP_ZERO, off, len);
222 }
223 void truncate(uint64_t off) {
224 add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
225 }
226 void remove() {
227 add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
228 }
229
230 void read(uint64_t off, uint64_t len) {
231 add_simple_op(CEPH_OSD_OP_READ, off, len);
232 }
233 void stat() {
234 add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
235 }
236
237 bool has_flag(__u32 flag) const { return flags & flag; };
238
239 bool is_retry_attempt() const { return flags & CEPH_OSD_FLAG_RETRY; }
240 void set_retry_attempt(unsigned a) {
241 if (a)
242 flags |= CEPH_OSD_FLAG_RETRY;
243 else
244 flags &= ~CEPH_OSD_FLAG_RETRY;
245 retry_attempt = a;
246 }
247
248 // marshalling
249 void encode_payload(uint64_t features) override {
250 using ceph::encode;
251 if( false == bdata_encode ) {
252 OSDOp::merge_osd_op_vector_in_data(ops, data);
253 bdata_encode = true;
254 }
255
256 if ((features & CEPH_FEATURE_OBJECTLOCATOR) == 0) {
257 // here is the old structure we are encoding to: //
258 #if 0
259 struct ceph_osd_request_head {
260 ceph_le32 client_inc; /* client incarnation */
261 struct ceph_object_layout layout; /* pgid */
262 ceph_le32 osdmap_epoch; /* client's osdmap epoch */
263
264 ceph_le32 flags;
265
266 struct ceph_timespec mtime; /* for mutations only */
267 struct ceph_eversion reassert_version; /* if we are replaying op */
268
269 ceph_le32 object_len; /* length of object name */
270
271 ceph_le64 snapid; /* snapid to read */
272 ceph_le64 snap_seq; /* writer's snap context */
273 ceph_le32 num_snaps;
274
275 ceph_le16 num_ops;
276 struct ceph_osd_op ops[]; /* followed by ops[], obj, ticket, snaps */
277 } __attribute__ ((packed));
278 #endif
279 header.version = 1;
280
281 encode(client_inc, payload);
282
283 __u32 su = 0;
284 encode(get_raw_pg(), payload);
285 encode(su, payload);
286
287 encode(osdmap_epoch, payload);
288 encode(flags, payload);
289 encode(mtime, payload);
290 encode(eversion_t(), payload); // reassert_version
291
292 __u32 oid_len = hobj.oid.name.length();
293 encode(oid_len, payload);
294 encode(hobj.snap, payload);
295 encode(snap_seq, payload);
296 __u32 num_snaps = snaps.size();
297 encode(num_snaps, payload);
298
299 //::encode(ops, payload);
300 __u16 num_ops = ops.size();
301 encode(num_ops, payload);
302 for (unsigned i = 0; i < ops.size(); i++)
303 encode(ops[i].op, payload);
304
305 ceph::encode_nohead(hobj.oid.name, payload);
306 ceph::encode_nohead(snaps, payload);
307 } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
308 header.version = 6;
309 encode(client_inc, payload);
310 encode(osdmap_epoch, payload);
311 encode(flags, payload);
312 encode(mtime, payload);
313 encode(eversion_t(), payload); // reassert_version
314 encode(get_object_locator(), payload);
315 encode(get_raw_pg(), payload);
316
317 encode(hobj.oid, payload);
318
319 __u16 num_ops = ops.size();
320 encode(num_ops, payload);
321 for (unsigned i = 0; i < ops.size(); i++)
322 encode(ops[i].op, payload);
323
324 encode(hobj.snap, payload);
325 encode(snap_seq, payload);
326 encode(snaps, payload);
327
328 encode(retry_attempt, payload);
329 encode(features, payload);
330 if (reqid.name != entity_name_t() || reqid.tid != 0) {
331 encode(reqid, payload);
332 } else {
333 // don't include client_inc in the reqid for the legacy v6
334 // encoding or else we'll confuse older peers.
335 encode(osd_reqid_t(), payload);
336 }
337 } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
338 // reordered, v7 message encoding
339 header.version = 7;
340 encode(get_raw_pg(), payload);
341 encode(osdmap_epoch, payload);
342 encode(flags, payload);
343 encode(eversion_t(), payload); // reassert_version
344 encode(reqid, payload);
345 encode(client_inc, payload);
346 encode(mtime, payload);
347 encode(get_object_locator(), payload);
348 encode(hobj.oid, payload);
349
350 __u16 num_ops = ops.size();
351 encode(num_ops, payload);
352 for (unsigned i = 0; i < ops.size(); i++)
353 encode(ops[i].op, payload);
354
355 encode(hobj.snap, payload);
356 encode(snap_seq, payload);
357 encode(snaps, payload);
358
359 encode(retry_attempt, payload);
360 encode(features, payload);
361 } else {
362 // latest v8 encoding with hobject_t hash separate from pgid, no
363 // reassert version
364 header.version = HEAD_VERSION;
365
366 encode(pgid, payload);
367 encode(hobj.get_hash(), payload);
368 encode(osdmap_epoch, payload);
369 encode(flags, payload);
370 encode(reqid, payload);
371 encode_trace(payload, features);
372
373 // -- above decoded up front; below decoded post-dispatch thread --
374
375 encode(client_inc, payload);
376 encode(mtime, payload);
377 encode(get_object_locator(), payload);
378 encode(hobj.oid, payload);
379
380 __u16 num_ops = ops.size();
381 encode(num_ops, payload);
382 for (unsigned i = 0; i < ops.size(); i++)
383 encode(ops[i].op, payload);
384
385 encode(hobj.snap, payload);
386 encode(snap_seq, payload);
387 encode(snaps, payload);
388
389 encode(retry_attempt, payload);
390 encode(features, payload);
391 }
392 }
393
394 void decode_payload() override {
395 using ceph::decode;
396 ceph_assert(partial_decode_needed && final_decode_needed);
397 p = std::cbegin(payload);
398
399 // Always keep here the newest version of decoding order/rule
400 if (header.version == HEAD_VERSION) {
401 decode(pgid, p); // actual pgid
402 uint32_t hash;
403 decode(hash, p); // raw hash value
404 hobj.set_hash(hash);
405 decode(osdmap_epoch, p);
406 decode(flags, p);
407 decode(reqid, p);
408 decode_trace(p);
409 } else if (header.version == 7) {
410 decode(pgid.pgid, p); // raw pgid
411 hobj.set_hash(pgid.pgid.ps());
412 decode(osdmap_epoch, p);
413 decode(flags, p);
414 eversion_t reassert_version;
415 decode(reassert_version, p);
416 decode(reqid, p);
417 } else if (header.version < 2) {
418 // old decode
419 decode(client_inc, p);
420
421 old_pg_t opgid;
422 ceph::decode_raw(opgid, p);
423 pgid.pgid = opgid;
424
425 __u32 su;
426 decode(su, p);
427
428 decode(osdmap_epoch, p);
429 decode(flags, p);
430 decode(mtime, p);
431 eversion_t reassert_version;
432 decode(reassert_version, p);
433
434 __u32 oid_len;
435 decode(oid_len, p);
436 decode(hobj.snap, p);
437 decode(snap_seq, p);
438 __u32 num_snaps;
439 decode(num_snaps, p);
440
441 //::decode(ops, p);
442 __u16 num_ops;
443 decode(num_ops, p);
444 ops.resize(num_ops);
445 for (unsigned i = 0; i < num_ops; i++)
446 decode(ops[i].op, p);
447
448 ceph::decode_nohead(oid_len, hobj.oid.name, p);
449 ceph::decode_nohead(num_snaps, snaps, p);
450
451 // recalculate pgid hash value
452 pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
453 hobj.oid.name.c_str(),
454 hobj.oid.name.length()));
455 hobj.pool = pgid.pgid.pool();
456 hobj.set_hash(pgid.pgid.ps());
457
458 retry_attempt = -1;
459 features = 0;
460 OSDOp::split_osd_op_vector_in_data(ops, data);
461
462 // we did the full decode
463 final_decode_needed = false;
464
465 // put client_inc in reqid.inc for get_reqid()'s benefit
466 reqid = osd_reqid_t();
467 reqid.inc = client_inc;
468 } else if (header.version < 7) {
469 decode(client_inc, p);
470 decode(osdmap_epoch, p);
471 decode(flags, p);
472 decode(mtime, p);
473 eversion_t reassert_version;
474 decode(reassert_version, p);
475
476 object_locator_t oloc;
477 decode(oloc, p);
478
479 if (header.version < 3) {
480 old_pg_t opgid;
481 ceph::decode_raw(opgid, p);
482 pgid.pgid = opgid;
483 } else {
484 decode(pgid.pgid, p);
485 }
486
487 decode(hobj.oid, p);
488
489 //::decode(ops, p);
490 __u16 num_ops;
491 decode(num_ops, p);
492 ops.resize(num_ops);
493 for (unsigned i = 0; i < num_ops; i++)
494 decode(ops[i].op, p);
495
496 decode(hobj.snap, p);
497 decode(snap_seq, p);
498 decode(snaps, p);
499
500 if (header.version >= 4)
501 decode(retry_attempt, p);
502 else
503 retry_attempt = -1;
504
505 if (header.version >= 5)
506 decode(features, p);
507 else
508 features = 0;
509
510 if (header.version >= 6)
511 decode(reqid, p);
512 else
513 reqid = osd_reqid_t();
514
515 hobj.pool = pgid.pgid.pool();
516 hobj.set_key(oloc.key);
517 hobj.nspace = oloc.nspace;
518 hobj.set_hash(pgid.pgid.ps());
519
520 OSDOp::split_osd_op_vector_in_data(ops, data);
521
522 // we did the full decode
523 final_decode_needed = false;
524
525 // put client_inc in reqid.inc for get_reqid()'s benefit
526 if (reqid.name == entity_name_t() && reqid.tid == 0)
527 reqid.inc = client_inc;
528 }
529
530 partial_decode_needed = false;
531 }
532
533 bool finish_decode() {
534 using ceph::decode;
535 ceph_assert(!partial_decode_needed); // partial decoding required
536 if (!final_decode_needed)
537 return false; // Message is already final decoded
538 ceph_assert(header.version >= 7);
539
540 decode(client_inc, p);
541 decode(mtime, p);
542 object_locator_t oloc;
543 decode(oloc, p);
544 decode(hobj.oid, p);
545
546 __u16 num_ops;
547 decode(num_ops, p);
548 ops.resize(num_ops);
549 for (unsigned i = 0; i < num_ops; i++)
550 decode(ops[i].op, p);
551
552 decode(hobj.snap, p);
553 decode(snap_seq, p);
554 decode(snaps, p);
555
556 decode(retry_attempt, p);
557
558 decode(features, p);
559
560 hobj.pool = pgid.pgid.pool();
561 hobj.set_key(oloc.key);
562 hobj.nspace = oloc.nspace;
563
564 OSDOp::split_osd_op_vector_in_data(ops, data);
565
566 final_decode_needed = false;
567 return true;
568 }
569
570 void clear_buffers() override {
571 OSDOp::clear_data(ops);
572 bdata_encode = false;
573 }
574
575 std::string_view get_type_name() const override { return "osd_op"; }
576 void print(std::ostream& out) const override {
577 out << "osd_op(";
578 if (!partial_decode_needed) {
579 out << get_reqid() << ' ';
580 out << pgid;
581 if (!final_decode_needed) {
582 out << ' ';
583 out << hobj
584 << " " << ops
585 << " snapc " << get_snap_seq() << "=" << snaps;
586 if (is_retry_attempt())
587 out << " RETRY=" << get_retry_attempt();
588 } else {
589 out << " " << get_raw_pg() << " (undecoded)";
590 }
591 out << " " << ceph_osd_flag_string(get_flags());
592 out << " e" << osdmap_epoch;
593 }
594 out << ")";
595 }
596
597 private:
598 template<class T, typename... Args>
599 friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
600 };
601
602
603 #endif
604