1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	
16   	#ifndef CEPH_MOSDOP_H
17   	#define CEPH_MOSDOP_H
18   	
19   	#include <atomic>
20   	
21   	#include "MOSDFastDispatchOp.h"
22   	#include "include/ceph_features.h"
23   	#include "common/hobject.h"
24   	
25   	/*
26   	 * OSD op
27   	 *
28   	 * oid - object id
29   	 * op  - OSD_OP_DELETE, etc.
30   	 *
31   	 */
32   	
33   	class OSD;
34   	
35   	class MOSDOp : public MOSDFastDispatchOp {
36   	private:
37   	  static constexpr int HEAD_VERSION = 8;
38   	  static constexpr int COMPAT_VERSION = 3;
39   	
40   	private:
41   	  uint32_t client_inc = 0;
42   	  __u32 osdmap_epoch = 0;
43   	  __u32 flags = 0;
44   	  utime_t mtime;
45   	  int32_t retry_attempt = -1;   // 0 is first attempt.  -1 if we don't know.
46   	
47   	  hobject_t hobj;
48   	  spg_t pgid;
49   	  ceph::buffer::list::const_iterator p;
50   	  // Decoding flags. Decoding is only needed for messages caught by pipe reader.
51   	  // Transition from true -> false without locks being held
52   	  // Can never see final_decode_needed == false and partial_decode_needed == true
53   	  std::atomic<bool> partial_decode_needed;
54   	  std::atomic<bool> final_decode_needed;
55   	  //
56   	public:
57   	  std::vector<OSDOp> ops;
58   	private:
59   	  snapid_t snap_seq;
60   	  std::vector<snapid_t> snaps;
61   	
(1) Event member_decl: Class member declaration for "features".
Also see events: [uninit_member]
62   	  uint64_t features;
63   	  bool bdata_encode;
64   	  osd_reqid_t reqid; // reqid explicitly set by sender
65   	
66   	public:
67   	  friend class MOSDOpReply;
68   	
69   	  ceph_tid_t get_client_tid() { return header.tid; }
70   	  void set_snapid(const snapid_t& s) {
71   	    hobj.snap = s;
72   	  }
73   	  void set_snaps(const std::vector<snapid_t>& i) {
74   	    snaps = i;
75   	  }
76   	  void set_snap_seq(const snapid_t& s) { snap_seq = s; }
77   	  void set_reqid(const osd_reqid_t rid) {
78   	    reqid = rid;
79   	  }
80   	  void set_spg(spg_t p) {
81   	    pgid = p;
82   	  }
83   	
84   	  // Fields decoded in partial decoding
85   	  pg_t get_pg() const {
86   	    ceph_assert(!partial_decode_needed);
87   	    return pgid.pgid;
88   	  }
89   	  spg_t get_spg() const override {
90   	    ceph_assert(!partial_decode_needed);
91   	    return pgid;
92   	  }
93   	  pg_t get_raw_pg() const {
94   	    ceph_assert(!partial_decode_needed);
95   	    return pg_t(hobj.get_hash(), pgid.pgid.pool());
96   	  }
97   	  epoch_t get_map_epoch() const override {
98   	    ceph_assert(!partial_decode_needed);
99   	    return osdmap_epoch;
100  	  }
101  	  int get_flags() const {
102  	    ceph_assert(!partial_decode_needed);
103  	    return flags;
104  	  }
105  	  osd_reqid_t get_reqid() const {
106  	    ceph_assert(!partial_decode_needed);
107  	    if (reqid.name != entity_name_t() || reqid.tid != 0) {
108  	      return reqid;
109  	    } else {
110  	      if (!final_decode_needed)
111  		ceph_assert(reqid.inc == (int32_t)client_inc);  // decode() should have done this
112  	      return osd_reqid_t(get_orig_source(),
113  	                         reqid.inc,
114  				 header.tid);
115  	    }
116  	  }
117  	
118  	  // Fields decoded in final decoding
119  	  int get_client_inc() const {
120  	    ceph_assert(!final_decode_needed);
121  	    return client_inc;
122  	  }
123  	  utime_t get_mtime() const {
124  	    ceph_assert(!final_decode_needed);
125  	    return mtime;
126  	  }
127  	  object_locator_t get_object_locator() const {
128  	    ceph_assert(!final_decode_needed);
129  	    if (hobj.oid.name.empty())
130  	      return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
131  	    else
132  	      return object_locator_t(hobj);
133  	  }
134  	  const object_t& get_oid() const {
135  	    ceph_assert(!final_decode_needed);
136  	    return hobj.oid;
137  	  }
138  	  const hobject_t &get_hobj() const {
139  	    return hobj;
140  	  }
141  	  snapid_t get_snapid() const {
142  	    ceph_assert(!final_decode_needed);
143  	    return hobj.snap;
144  	  }
145  	  const snapid_t& get_snap_seq() const {
146  	    ceph_assert(!final_decode_needed);
147  	    return snap_seq;
148  	  }
149  	  const std::vector<snapid_t> &get_snaps() const {
150  	    ceph_assert(!final_decode_needed);
151  	    return snaps;
152  	  }
153  	
154  	  /**
155  	   * get retry attempt
156  	   *
157  	   * 0 is the first attempt.
158  	   *
159  	   * @return retry attempt, or -1 if we don't know
160  	   */
161  	  int get_retry_attempt() const {
162  	    return retry_attempt;
163  	  }
164  	  uint64_t get_features() const {
165  	    if (features)
166  	      return features;
167  	    return get_connection()->get_features();
168  	  }
169  	
170  	  MOSDOp()
171  	    : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
172  	      partial_decode_needed(true),
173  	      final_decode_needed(true),
(2) Event uninit_member: Non-static class member "features" is not initialized in this constructor nor in any functions that it calls.
Also see events: [member_decl]
174  	      bdata_encode(false) { }
175  	  MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid,
176  		 epoch_t _osdmap_epoch,
177  		 int _flags, uint64_t feat)
178  	    : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
179  	      client_inc(inc),
180  	      osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
181  	      hobj(ho),
182  	      pgid(_pgid),
183  	      partial_decode_needed(false),
184  	      final_decode_needed(false),
185  	      features(feat),
186  	      bdata_encode(false) {
187  	    set_tid(tid);
188  	
189  	    // also put the client_inc in reqid.inc, so that get_reqid() can
190  	    // be used before the full message is decoded.
191  	    reqid.inc = inc;
192  	  }
193  	private:
194  	  ~MOSDOp() override {}
195  	
196  	public:
197  	  void set_mtime(utime_t mt) { mtime = mt; }
198  	  void set_mtime(ceph::real_time mt) {
199  	    mtime = ceph::real_clock::to_timespec(mt);
200  	  }
201  	
202  	  // ops
203  	  void add_simple_op(int o, uint64_t off, uint64_t len) {
204  	    OSDOp osd_op;
205  	    osd_op.op.op = o;
206  	    osd_op.op.extent.offset = off;
207  	    osd_op.op.extent.length = len;
208  	    ops.push_back(osd_op);
209  	  }
210  	  void write(uint64_t off, uint64_t len, ceph::buffer::list& bl) {
211  	    add_simple_op(CEPH_OSD_OP_WRITE, off, len);
212  	    data.claim(bl);
213  	    header.data_off = off;
214  	  }
215  	  void writefull(ceph::buffer::list& bl) {
216  	    add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
217  	    data.claim(bl);
218  	    header.data_off = 0;
219  	  }
220  	  void zero(uint64_t off, uint64_t len) {
221  	    add_simple_op(CEPH_OSD_OP_ZERO, off, len);
222  	  }
223  	  void truncate(uint64_t off) {
224  	    add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
225  	  }
226  	  void remove() {
227  	    add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
228  	  }
229  	
230  	  void read(uint64_t off, uint64_t len) {
231  	    add_simple_op(CEPH_OSD_OP_READ, off, len);
232  	  }
233  	  void stat() {
234  	    add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
235  	  }
236  	
237  	  bool has_flag(__u32 flag) const { return flags & flag; };
238  	
239  	  bool is_retry_attempt() const { return flags & CEPH_OSD_FLAG_RETRY; }
240  	  void set_retry_attempt(unsigned a) { 
241  	    if (a)
242  	      flags |= CEPH_OSD_FLAG_RETRY;
243  	    else
244  	      flags &= ~CEPH_OSD_FLAG_RETRY;
245  	    retry_attempt = a;
246  	  }
247  	
248  	  // marshalling
249  	  void encode_payload(uint64_t features) override {
250  	    using ceph::encode;
251  	    if( false == bdata_encode ) {
252  	      OSDOp::merge_osd_op_vector_in_data(ops, data);
253  	      bdata_encode = true;
254  	    }
255  	
256  	    if ((features & CEPH_FEATURE_OBJECTLOCATOR) == 0) {
257  	      // here is the old structure we are encoding to: //
258  	#if 0
259  	struct ceph_osd_request_head {
260  		ceph_le32 client_inc;              /* client incarnation */
261  		struct ceph_object_layout layout;  /* pgid */
262  		ceph_le32 osdmap_epoch;            /* client's osdmap epoch */
263  	
264  		ceph_le32 flags;
265  	
266  		struct ceph_timespec mtime;        /* for mutations only */
267  		struct ceph_eversion reassert_version; /* if we are replaying op */
268  	
269  		ceph_le32 object_len;     /* length of object name */
270  	
271  		ceph_le64 snapid;         /* snapid to read */
272  		ceph_le64 snap_seq;       /* writer's snap context */
273  		ceph_le32 num_snaps;
274  	
275  		ceph_le16 num_ops;
276  		struct ceph_osd_op ops[];  /* followed by ops[], obj, ticket, snaps */
277  	} __attribute__ ((packed));
278  	#endif
279  	      header.version = 1;
280  	
281  	      encode(client_inc, payload);
282  	
283  	      __u32 su = 0;
284  	      encode(get_raw_pg(), payload);
285  	      encode(su, payload);
286  	
287  	      encode(osdmap_epoch, payload);
288  	      encode(flags, payload);
289  	      encode(mtime, payload);
290  	      encode(eversion_t(), payload);  // reassert_version
291  	
292  	      __u32 oid_len = hobj.oid.name.length();
293  	      encode(oid_len, payload);
294  	      encode(hobj.snap, payload);
295  	      encode(snap_seq, payload);
296  	      __u32 num_snaps = snaps.size();
297  	      encode(num_snaps, payload);
298  	      
299  	      //::encode(ops, payload);
300  	      __u16 num_ops = ops.size();
301  	      encode(num_ops, payload);
302  	      for (unsigned i = 0; i < ops.size(); i++)
303  		encode(ops[i].op, payload);
304  	
305  	      ceph::encode_nohead(hobj.oid.name, payload);
306  	      ceph::encode_nohead(snaps, payload);
307  	    } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
308  	      header.version = 6;
309  	      encode(client_inc, payload);
310  	      encode(osdmap_epoch, payload);
311  	      encode(flags, payload);
312  	      encode(mtime, payload);
313  	      encode(eversion_t(), payload); // reassert_version
314  	      encode(get_object_locator(), payload);
315  	      encode(get_raw_pg(), payload);
316  	
317  	      encode(hobj.oid, payload);
318  	
319  	      __u16 num_ops = ops.size();
320  	      encode(num_ops, payload);
321  	      for (unsigned i = 0; i < ops.size(); i++)
322  	        encode(ops[i].op, payload);
323  	
324  	      encode(hobj.snap, payload);
325  	      encode(snap_seq, payload);
326  	      encode(snaps, payload);
327  	
328  	      encode(retry_attempt, payload);
329  	      encode(features, payload);
330  	      if (reqid.name != entity_name_t() || reqid.tid != 0) {
331  		encode(reqid, payload);
332  	      } else {
333  		// don't include client_inc in the reqid for the legacy v6
334  		// encoding or else we'll confuse older peers.
335  		encode(osd_reqid_t(), payload);
336  	      }
337  	    } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
338  	      // reordered, v7 message encoding
339  	      header.version = 7;
340  	      encode(get_raw_pg(), payload);
341  	      encode(osdmap_epoch, payload);
342  	      encode(flags, payload);
343  	      encode(eversion_t(), payload); // reassert_version
344  	      encode(reqid, payload);
345  	      encode(client_inc, payload);
346  	      encode(mtime, payload);
347  	      encode(get_object_locator(), payload);
348  	      encode(hobj.oid, payload);
349  	
350  	      __u16 num_ops = ops.size();
351  	      encode(num_ops, payload);
352  	      for (unsigned i = 0; i < ops.size(); i++)
353  		encode(ops[i].op, payload);
354  	
355  	      encode(hobj.snap, payload);
356  	      encode(snap_seq, payload);
357  	      encode(snaps, payload);
358  	
359  	      encode(retry_attempt, payload);
360  	      encode(features, payload);
361  	    } else {
362  	      // latest v8 encoding with hobject_t hash separate from pgid, no
363  	      // reassert version
364  	      header.version = HEAD_VERSION;
365  	
366  	      encode(pgid, payload);
367  	      encode(hobj.get_hash(), payload);
368  	      encode(osdmap_epoch, payload);
369  	      encode(flags, payload);
370  	      encode(reqid, payload);
371  	      encode_trace(payload, features);
372  	
373  	      // -- above decoded up front; below decoded post-dispatch thread --
374  	
375  	      encode(client_inc, payload);
376  	      encode(mtime, payload);
377  	      encode(get_object_locator(), payload);
378  	      encode(hobj.oid, payload);
379  	
380  	      __u16 num_ops = ops.size();
381  	      encode(num_ops, payload);
382  	      for (unsigned i = 0; i < ops.size(); i++)
383  		encode(ops[i].op, payload);
384  	
385  	      encode(hobj.snap, payload);
386  	      encode(snap_seq, payload);
387  	      encode(snaps, payload);
388  	
389  	      encode(retry_attempt, payload);
390  	      encode(features, payload);
391  	    }
392  	  }
393  	
394  	  void decode_payload() override {
395  	    using ceph::decode;
396  	    ceph_assert(partial_decode_needed && final_decode_needed);
397  	    p = std::cbegin(payload);
398  	
399  	    // Always keep here the newest version of decoding order/rule
400  	    if (header.version == HEAD_VERSION) {
401  	      decode(pgid, p);      // actual pgid
402  	      uint32_t hash;
403  	      decode(hash, p); // raw hash value
404  	      hobj.set_hash(hash);
405  	      decode(osdmap_epoch, p);
406  	      decode(flags, p);
407  	      decode(reqid, p);
408  	      decode_trace(p);
409  	    } else if (header.version == 7) {
410  	      decode(pgid.pgid, p);      // raw pgid
411  	      hobj.set_hash(pgid.pgid.ps());
412  	      decode(osdmap_epoch, p);
413  	      decode(flags, p);
414  	      eversion_t reassert_version;
415  	      decode(reassert_version, p);
416  	      decode(reqid, p);
417  	    } else if (header.version < 2) {
418  	      // old decode
419  	      decode(client_inc, p);
420  	
421  	      old_pg_t opgid;
422  	      ceph::decode_raw(opgid, p);
423  	      pgid.pgid = opgid;
424  	
425  	      __u32 su;
426  	      decode(su, p);
427  	
428  	      decode(osdmap_epoch, p);
429  	      decode(flags, p);
430  	      decode(mtime, p);
431  	      eversion_t reassert_version;
432  	      decode(reassert_version, p);
433  	
434  	      __u32 oid_len;
435  	      decode(oid_len, p);
436  	      decode(hobj.snap, p);
437  	      decode(snap_seq, p);
438  	      __u32 num_snaps;
439  	      decode(num_snaps, p);
440  	      
441  	      //::decode(ops, p);
442  	      __u16 num_ops;
443  	      decode(num_ops, p);
444  	      ops.resize(num_ops);
445  	      for (unsigned i = 0; i < num_ops; i++)
446  		decode(ops[i].op, p);
447  	
448  	      ceph::decode_nohead(oid_len, hobj.oid.name, p);
449  	      ceph::decode_nohead(num_snaps, snaps, p);
450  	
451  	      // recalculate pgid hash value
452  	      pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
453  					     hobj.oid.name.c_str(),
454  					     hobj.oid.name.length()));
455  	      hobj.pool = pgid.pgid.pool();
456  	      hobj.set_hash(pgid.pgid.ps());
457  	
458  	      retry_attempt = -1;
459  	      features = 0;
460  	      OSDOp::split_osd_op_vector_in_data(ops, data);
461  	
462  	      // we did the full decode
463  	      final_decode_needed = false;
464  	
465  	      // put client_inc in reqid.inc for get_reqid()'s benefit
466  	      reqid = osd_reqid_t();
467  	      reqid.inc = client_inc;
468  	    } else if (header.version < 7) {
469  	      decode(client_inc, p);
470  	      decode(osdmap_epoch, p);
471  	      decode(flags, p);
472  	      decode(mtime, p);
473  	      eversion_t reassert_version;
474  	      decode(reassert_version, p);
475  	
476  	      object_locator_t oloc;
477  	      decode(oloc, p);
478  	
479  	      if (header.version < 3) {
480  		old_pg_t opgid;
481  		ceph::decode_raw(opgid, p);
482  		pgid.pgid = opgid;
483  	      } else {
484  		decode(pgid.pgid, p);
485  	      }
486  	
487  	      decode(hobj.oid, p);
488  	
489  	      //::decode(ops, p);
490  	      __u16 num_ops;
491  	      decode(num_ops, p);
492  	      ops.resize(num_ops);
493  	      for (unsigned i = 0; i < num_ops; i++)
494  	        decode(ops[i].op, p);
495  	
496  	      decode(hobj.snap, p);
497  	      decode(snap_seq, p);
498  	      decode(snaps, p);
499  	
500  	      if (header.version >= 4)
501  	        decode(retry_attempt, p);
502  	      else
503  	        retry_attempt = -1;
504  	
505  	      if (header.version >= 5)
506  	        decode(features, p);
507  	      else
508  		features = 0;
509  	
510  	      if (header.version >= 6)
511  		decode(reqid, p);
512  	      else
513  		reqid = osd_reqid_t();
514  	
515  	      hobj.pool = pgid.pgid.pool();
516  	      hobj.set_key(oloc.key);
517  	      hobj.nspace = oloc.nspace;
518  	      hobj.set_hash(pgid.pgid.ps());
519  	
520  	      OSDOp::split_osd_op_vector_in_data(ops, data);
521  	
522  	      // we did the full decode
523  	      final_decode_needed = false;
524  	
525  	      // put client_inc in reqid.inc for get_reqid()'s benefit
526  	      if (reqid.name == entity_name_t() && reqid.tid == 0)
527  		reqid.inc = client_inc;
528  	    }
529  	
530  	    partial_decode_needed = false;
531  	  }
532  	
533  	  bool finish_decode() {
534  	    using ceph::decode;
535  	    ceph_assert(!partial_decode_needed); // partial decoding required
536  	    if (!final_decode_needed)
537  	      return false; // Message is already final decoded
538  	    ceph_assert(header.version >= 7);
539  	
540  	    decode(client_inc, p);
541  	    decode(mtime, p);
542  	    object_locator_t oloc;
543  	    decode(oloc, p);
544  	    decode(hobj.oid, p);
545  	
546  	    __u16 num_ops;
547  	    decode(num_ops, p);
548  	    ops.resize(num_ops);
549  	    for (unsigned i = 0; i < num_ops; i++)
550  	      decode(ops[i].op, p);
551  	
552  	    decode(hobj.snap, p);
553  	    decode(snap_seq, p);
554  	    decode(snaps, p);
555  	
556  	    decode(retry_attempt, p);
557  	
558  	    decode(features, p);
559  	
560  	    hobj.pool = pgid.pgid.pool();
561  	    hobj.set_key(oloc.key);
562  	    hobj.nspace = oloc.nspace;
563  	
564  	    OSDOp::split_osd_op_vector_in_data(ops, data);
565  	
566  	    final_decode_needed = false;
567  	    return true;
568  	  }
569  	
570  	  void clear_buffers() override {
571  	    OSDOp::clear_data(ops);
572  	    bdata_encode = false;
573  	  }
574  	
575  	  std::string_view get_type_name() const override { return "osd_op"; }
576  	  void print(std::ostream& out) const override {
577  	    out << "osd_op(";
578  	    if (!partial_decode_needed) {
579  	      out << get_reqid() << ' ';
580  	      out << pgid;
581  	      if (!final_decode_needed) {
582  		out << ' ';
583  		out << hobj
584  		    << " " << ops
585  		    << " snapc " << get_snap_seq() << "=" << snaps;
586  		if (is_retry_attempt())
587  		  out << " RETRY=" << get_retry_attempt();
588  	      } else {
589  		out << " " << get_raw_pg() << " (undecoded)";
590  	      }
591  	      out << " " << ceph_osd_flag_string(get_flags());
592  	      out << " e" << osdmap_epoch;
593  	    }
594  	    out << ")";
595  	  }
596  	
597  	private:
598  	  template<class T, typename... Args>
599  	  friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
600  	};
601  	
602  	
603  	#endif
604