1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	#ifndef CEPH_MDSTYPES_H
4    	#define CEPH_MDSTYPES_H
5    	
6    	#include "include/int_types.h"
7    	
8    	#include <math.h>
9    	#include <ostream>
10   	#include <set>
11   	#include <map>
12   	#include <string_view>
13   	
14   	#include "common/config.h"
15   	#include "common/Clock.h"
16   	#include "common/DecayCounter.h"
17   	#include "common/entity_name.h"
18   	
19   	#include "include/Context.h"
20   	#include "include/frag.h"
21   	#include "include/xlist.h"
22   	#include "include/interval_set.h"
23   	#include "include/compact_map.h"
24   	#include "include/compact_set.h"
25   	#include "include/fs_types.h"
26   	
(1) Event include_recursion: #include file "../../src/mds/inode_backtrace.h" includes itself: inode_backtrace.h -> mdstypes.h -> inode_backtrace.h
(2) Event caretline: ^
27   	#include "inode_backtrace.h"
28   	
29   	#include <boost/spirit/include/qi.hpp>
30   	#include <boost/pool/pool.hpp>
31   	#include "include/ceph_assert.h"
32   	#include <boost/serialization/strong_typedef.hpp>
33   	
34   	#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v011"
35   	
36   	#define MDS_PORT_CACHE   0x200
37   	#define MDS_PORT_LOCKER  0x300
38   	#define MDS_PORT_MIGRATOR 0x400
39   	
40   	#define MAX_MDS                   0x100
41   	#define NUM_STRAY                 10
42   	
43   	#define MDS_INO_ROOT              1
44   	
45   	// No longer created but recognised in existing filesystems
46   	// so that we don't try to fragment it.
47   	#define MDS_INO_CEPH              2
48   	
49   	#define MDS_INO_GLOBAL_SNAPREALM  3
50   	
51   	#define MDS_INO_MDSDIR_OFFSET     (1*MAX_MDS)
52   	#define MDS_INO_STRAY_OFFSET      (6*MAX_MDS)
53   	
54   	// Locations for journal data
55   	#define MDS_INO_LOG_OFFSET        (2*MAX_MDS)
56   	#define MDS_INO_LOG_BACKUP_OFFSET (3*MAX_MDS)
57   	#define MDS_INO_LOG_POINTER_OFFSET    (4*MAX_MDS)
58   	#define MDS_INO_PURGE_QUEUE       (5*MAX_MDS)
59   	
60   	#define MDS_INO_SYSTEM_BASE       ((6*MAX_MDS) + (MAX_MDS * NUM_STRAY))
61   	
62   	#define MDS_INO_STRAY(x,i)  (MDS_INO_STRAY_OFFSET+((((unsigned)(x))*NUM_STRAY)+((unsigned)(i))))
63   	#define MDS_INO_MDSDIR(x) (MDS_INO_MDSDIR_OFFSET+((unsigned)x))
64   	
65   	#define MDS_INO_IS_STRAY(i)  ((i) >= MDS_INO_STRAY_OFFSET  && (i) < (MDS_INO_STRAY_OFFSET+(MAX_MDS*NUM_STRAY)))
66   	#define MDS_INO_IS_MDSDIR(i) ((i) >= MDS_INO_MDSDIR_OFFSET && (i) < (MDS_INO_MDSDIR_OFFSET+MAX_MDS))
67   	#define MDS_INO_MDSDIR_OWNER(i) (signed ((unsigned (i)) - MDS_INO_MDSDIR_OFFSET))
68   	#define MDS_INO_IS_BASE(i)   ((i) == MDS_INO_ROOT || (i) == MDS_INO_GLOBAL_SNAPREALM || MDS_INO_IS_MDSDIR(i))
69   	#define MDS_INO_STRAY_OWNER(i) (signed (((unsigned (i)) - MDS_INO_STRAY_OFFSET) / NUM_STRAY))
70   	#define MDS_INO_STRAY_INDEX(i) (((unsigned (i)) - MDS_INO_STRAY_OFFSET) % NUM_STRAY)
71   	
72   	#define MDS_TRAVERSE_FORWARD       1
73   	#define MDS_TRAVERSE_DISCOVER      2    // skips permissions checks etc.
74   	#define MDS_TRAVERSE_DISCOVERXLOCK 3    // succeeds on (foreign?) null, xlocked dentries.
75   	
76   	
77   	typedef int32_t mds_rank_t;
78   	constexpr mds_rank_t MDS_RANK_NONE = -1;
79   	
80   	BOOST_STRONG_TYPEDEF(uint64_t, mds_gid_t)
81   	extern const mds_gid_t MDS_GID_NONE;
82   	
83   	typedef int32_t fs_cluster_id_t;
84   	constexpr fs_cluster_id_t FS_CLUSTER_ID_NONE = -1;
85   	// The namespace ID of the anonymous default filesystem from legacy systems
86   	constexpr fs_cluster_id_t FS_CLUSTER_ID_ANONYMOUS = 0;
87   	
88   	class mds_role_t
89   	{
90   	  public:
91   	  fs_cluster_id_t fscid;
92   	  mds_rank_t rank;
93   	
94   	  mds_role_t(fs_cluster_id_t fscid_, mds_rank_t rank_)
95   	    : fscid(fscid_), rank(rank_)
96   	  {}
97   	  mds_role_t()
98   	    : fscid(FS_CLUSTER_ID_NONE), rank(MDS_RANK_NONE)
99   	  {}
100  	  bool operator<(mds_role_t const &rhs) const
101  	  {
102  	    if (fscid < rhs.fscid) {
103  	      return true;
104  	    } else if (fscid == rhs.fscid) {
105  	      return rank < rhs.rank;
106  	    } else {
107  	      return false;
108  	    }
109  	  }
110  	
111  	  bool is_none() const
112  	  {
113  	    return (rank == MDS_RANK_NONE);
114  	  }
115  	};
116  	std::ostream& operator<<(std::ostream &out, const mds_role_t &role);
117  	
118  	
119  	// CAPS
120  	
121  	inline string gcap_string(int cap)
122  	{
123  	  string s;
124  	  if (cap & CEPH_CAP_GSHARED) s += "s";  
125  	  if (cap & CEPH_CAP_GEXCL) s += "x";
126  	  if (cap & CEPH_CAP_GCACHE) s += "c";
127  	  if (cap & CEPH_CAP_GRD) s += "r";
128  	  if (cap & CEPH_CAP_GWR) s += "w";
129  	  if (cap & CEPH_CAP_GBUFFER) s += "b";
130  	  if (cap & CEPH_CAP_GWREXTEND) s += "a";
131  	  if (cap & CEPH_CAP_GLAZYIO) s += "l";
132  	  return s;
133  	}
134  	inline string ccap_string(int cap)
135  	{
136  	  string s;
137  	  if (cap & CEPH_CAP_PIN) s += "p";
138  	
139  	  int a = (cap >> CEPH_CAP_SAUTH) & 3;
140  	  if (a) s += 'A' + gcap_string(a);
141  	
142  	  a = (cap >> CEPH_CAP_SLINK) & 3;
143  	  if (a) s += 'L' + gcap_string(a);
144  	
145  	  a = (cap >> CEPH_CAP_SXATTR) & 3;
146  	  if (a) s += 'X' + gcap_string(a);
147  	
148  	  a = cap >> CEPH_CAP_SFILE;
149  	  if (a) s += 'F' + gcap_string(a);
150  	
151  	  if (s.length() == 0)
152  	    s = "-";
153  	  return s;
154  	}
155  	
156  	
157  	struct scatter_info_t {
158  	  version_t version = 0;
159  	
160  	  scatter_info_t() {}
161  	};
162  	
163  	struct frag_info_t : public scatter_info_t {
164  	  // this frag
165  	  utime_t mtime;
166  	  uint64_t change_attr = 0;
167  	  int64_t nfiles = 0;        // files
168  	  int64_t nsubdirs = 0;      // subdirs
169  	
170  	  frag_info_t() {}
171  	
172  	  int64_t size() const { return nfiles + nsubdirs; }
173  	
174  	  void zero() {
175  	    *this = frag_info_t();
176  	  }
177  	
178  	  // *this += cur - acc;
179  	  void add_delta(const frag_info_t &cur, const frag_info_t &acc, bool *touched_mtime=0, bool *touched_chattr=0) {
180  	    if (cur.mtime > mtime) {
181  	      mtime = cur.mtime;
182  	      if (touched_mtime)
183  		*touched_mtime = true;
184  	    }
185  	    if (cur.change_attr > change_attr) {
186  	      change_attr = cur.change_attr;
187  	      if (touched_chattr)
188  		*touched_chattr = true;
189  	    }
190  	    nfiles += cur.nfiles - acc.nfiles;
191  	    nsubdirs += cur.nsubdirs - acc.nsubdirs;
192  	  }
193  	
194  	  void add(const frag_info_t& other) {
195  	    if (other.mtime > mtime)
196  	      mtime = other.mtime;
197  	    if (other.change_attr > change_attr)
198  	      change_attr = other.change_attr;
199  	    nfiles += other.nfiles;
200  	    nsubdirs += other.nsubdirs;
201  	  }
202  	
203  	  bool same_sums(const frag_info_t &o) const {
204  	    return mtime <= o.mtime &&
205  		nfiles == o.nfiles &&
206  		nsubdirs == o.nsubdirs;
207  	  }
208  	
209  	  void encode(bufferlist &bl) const;
210  	  void decode(bufferlist::const_iterator& bl);
211  	  void dump(Formatter *f) const;
212  	  static void generate_test_instances(std::list<frag_info_t*>& ls);
213  	};
214  	WRITE_CLASS_ENCODER(frag_info_t)
215  	
216  	inline bool operator==(const frag_info_t &l, const frag_info_t &r) {
217  	  return memcmp(&l, &r, sizeof(l)) == 0;
218  	}
219  	inline bool operator!=(const frag_info_t &l, const frag_info_t &r) {
220  	  return !(l == r);
221  	}
222  	
223  	std::ostream& operator<<(std::ostream &out, const frag_info_t &f);
224  	
225  	
226  	struct nest_info_t : public scatter_info_t {
227  	  // this frag + children
228  	  utime_t rctime;
229  	  int64_t rbytes = 0;
230  	  int64_t rfiles = 0;
231  	  int64_t rsubdirs = 0;
232  	  int64_t rsize() const { return rfiles + rsubdirs; }
233  	
234  	  int64_t rsnaps = 0;
235  	
236  	  nest_info_t() {}
237  	
238  	  void zero() {
239  	    *this = nest_info_t();
240  	  }
241  	
242  	  void sub(const nest_info_t &other) {
243  	    add(other, -1);
244  	  }
245  	  void add(const nest_info_t &other, int fac=1) {
246  	    if (other.rctime > rctime)
247  	      rctime = other.rctime;
248  	    rbytes += fac*other.rbytes;
249  	    rfiles += fac*other.rfiles;
250  	    rsubdirs += fac*other.rsubdirs;
251  	    rsnaps += fac*other.rsnaps;
252  	  }
253  	
254  	  // *this += cur - acc;
255  	  void add_delta(const nest_info_t &cur, const nest_info_t &acc) {
256  	    if (cur.rctime > rctime)
257  	      rctime = cur.rctime;
258  	    rbytes += cur.rbytes - acc.rbytes;
259  	    rfiles += cur.rfiles - acc.rfiles;
260  	    rsubdirs += cur.rsubdirs - acc.rsubdirs;
261  	    rsnaps += cur.rsnaps - acc.rsnaps;
262  	  }
263  	
264  	  bool same_sums(const nest_info_t &o) const {
265  	    return rctime <= o.rctime &&
266  	        rbytes == o.rbytes &&
267  	        rfiles == o.rfiles &&
268  	        rsubdirs == o.rsubdirs &&
269  	        rsnaps == o.rsnaps;
270  	  }
271  	
272  	  void encode(bufferlist &bl) const;
273  	  void decode(bufferlist::const_iterator& bl);
274  	  void dump(Formatter *f) const;
275  	  static void generate_test_instances(std::list<nest_info_t*>& ls);
276  	};
277  	WRITE_CLASS_ENCODER(nest_info_t)
278  	
279  	inline bool operator==(const nest_info_t &l, const nest_info_t &r) {
280  	  return memcmp(&l, &r, sizeof(l)) == 0;
281  	}
282  	inline bool operator!=(const nest_info_t &l, const nest_info_t &r) {
283  	  return !(l == r);
284  	}
285  	
286  	std::ostream& operator<<(std::ostream &out, const nest_info_t &n);
287  	
288  	
289  	struct vinodeno_t {
290  	  inodeno_t ino;
291  	  snapid_t snapid;
292  	  vinodeno_t() {}
293  	  vinodeno_t(inodeno_t i, snapid_t s) : ino(i), snapid(s) {}
294  	
295  	  void encode(bufferlist& bl) const {
296  	    using ceph::encode;
297  	    encode(ino, bl);
298  	    encode(snapid, bl);
299  	  }
300  	  void decode(bufferlist::const_iterator& p) {
301  	    using ceph::decode;
302  	    decode(ino, p);
303  	    decode(snapid, p);
304  	  }
305  	};
306  	WRITE_CLASS_ENCODER(vinodeno_t)
307  	
308  	inline bool operator==(const vinodeno_t &l, const vinodeno_t &r) {
309  	  return l.ino == r.ino && l.snapid == r.snapid;
310  	}
311  	inline bool operator!=(const vinodeno_t &l, const vinodeno_t &r) {
312  	  return !(l == r);
313  	}
314  	inline bool operator<(const vinodeno_t &l, const vinodeno_t &r) {
315  	  return 
316  	    l.ino < r.ino ||
317  	    (l.ino == r.ino && l.snapid < r.snapid);
318  	}
319  	
320  	struct quota_info_t
321  	{
322  	  int64_t max_bytes = 0;
323  	  int64_t max_files = 0;
324  	 
325  	  quota_info_t() {}
326  	
327  	  void encode(bufferlist& bl) const {
328  	    ENCODE_START(1, 1, bl);
329  	    encode(max_bytes, bl);
330  	    encode(max_files, bl);
331  	    ENCODE_FINISH(bl);
332  	  }
333  	  void decode(bufferlist::const_iterator& p) {
334  	    DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, p);
335  	    decode(max_bytes, p);
336  	    decode(max_files, p);
337  	    DECODE_FINISH(p);
338  	  }
339  	
340  	  void dump(Formatter *f) const;
341  	  static void generate_test_instances(std::list<quota_info_t *>& ls);
342  	
343  	  bool is_valid() const {
344  	    return max_bytes >=0 && max_files >=0;
345  	  }
346  	  bool is_enable() const {
347  	    return max_bytes || max_files;
348  	  }
349  	};
350  	WRITE_CLASS_ENCODER(quota_info_t)
351  	
352  	inline bool operator==(const quota_info_t &l, const quota_info_t &r) {
353  	  return memcmp(&l, &r, sizeof(l)) == 0;
354  	}
355  	
356  	ostream& operator<<(ostream &out, const quota_info_t &n);
357  	
358  	namespace std {
359  	  template<> struct hash<vinodeno_t> {
360  	    size_t operator()(const vinodeno_t &vino) const { 
361  	      hash<inodeno_t> H;
362  	      hash<uint64_t> I;
363  	      return H(vino.ino) ^ I(vino.snapid);
364  	    }
365  	  };
366  	} // namespace std
367  	
368  	
369  	
370  	
371  	inline std::ostream& operator<<(std::ostream &out, const vinodeno_t &vino) {
372  	  out << vino.ino;
373  	  if (vino.snapid == CEPH_NOSNAP)
374  	    out << ".head";
375  	  else if (vino.snapid)
376  	    out << '.' << vino.snapid;
377  	  return out;
378  	}
379  	
380  	
381  	/*
382  	 * client_writeable_range_t
383  	 */
384  	struct client_writeable_range_t {
385  	  struct byte_range_t {
386  	    uint64_t first = 0, last = 0;    // interval client can write to
387  	    byte_range_t() {}
388  	  };
389  	
390  	  byte_range_t range;
391  	  snapid_t follows = 0;     // aka "data+metadata flushed thru"
392  	
393  	  client_writeable_range_t() {}
394  	
395  	  void encode(bufferlist &bl) const;
396  	  void decode(bufferlist::const_iterator& bl);
397  	  void dump(Formatter *f) const;
398  	  static void generate_test_instances(std::list<client_writeable_range_t*>& ls);
399  	};
400  	
401  	inline void decode(client_writeable_range_t::byte_range_t& range, bufferlist::const_iterator& bl) {
402  	  decode(range.first, bl);
403  	  decode(range.last, bl);
404  	}
405  	
406  	WRITE_CLASS_ENCODER(client_writeable_range_t)
407  	
408  	std::ostream& operator<<(std::ostream& out, const client_writeable_range_t& r);
409  	
410  	inline bool operator==(const client_writeable_range_t& l,
411  			       const client_writeable_range_t& r) {
412  	  return l.range.first == r.range.first && l.range.last == r.range.last &&
413  	    l.follows == r.follows;
414  	}
415  	
416  	struct inline_data_t {
417  	private:
418  	  std::unique_ptr<bufferlist> blp;
419  	public:
420  	  version_t version = 1;
421  	
422  	  void free_data() {
423  	    blp.reset();
424  	  }
425  	  bufferlist& get_data() {
426  	    if (!blp)
427  	      blp.reset(new bufferlist);
428  	    return *blp;
429  	  }
430  	  size_t length() const { return blp ? blp->length() : 0; }
431  	
432  	  inline_data_t() {}
433  	  inline_data_t(const inline_data_t& o) : version(o.version) {
434  	    if (o.blp)
435  	      get_data() = *o.blp;
436  	  }
437  	  inline_data_t& operator=(const inline_data_t& o) {
438  	    version = o.version;
439  	    if (o.blp)
440  	      get_data() = *o.blp;
441  	    else
442  	      free_data();
443  	    return *this;
444  	  }
445  	  bool operator==(const inline_data_t& o) const {
446  	   return length() == o.length() &&
447  		  (length() == 0 ||
448  		   (*const_cast<bufferlist*>(blp.get()) == *const_cast<bufferlist*>(o.blp.get())));
449  	  }
450  	  bool operator!=(const inline_data_t& o) const {
451  	    return !(*this == o);
452  	  }
453  	  void encode(bufferlist &bl) const;
454  	  void decode(bufferlist::const_iterator& bl);
455  	};
456  	WRITE_CLASS_ENCODER(inline_data_t)
457  	
458  	enum {
459  	  DAMAGE_STATS,     // statistics (dirstat, size, etc)
460  	  DAMAGE_RSTATS,    // recursive statistics (rstat, accounted_rstat)
461  	  DAMAGE_FRAGTREE   // fragtree -- repair by searching
462  	};
463  	typedef uint32_t damage_flags_t;
464  	
465  	/*
466  	 * inode_t
467  	 */
468  	template<template<typename> class Allocator = std::allocator>
469  	struct inode_t {
470  	  /**
471  	   * ***************
472  	   * Do not forget to add any new fields to the compare() function.
473  	   * ***************
474  	   */
475  	  // base (immutable)
476  	  inodeno_t ino = 0;
477  	  uint32_t   rdev = 0;    // if special file
478  	
479  	  // affected by any inode change...
480  	  utime_t    ctime;   // inode change time
481  	  utime_t    btime;   // birth time
482  	
483  	  // perm (namespace permissions)
484  	  uint32_t   mode = 0;
485  	  uid_t      uid = 0;
486  	  gid_t      gid = 0;
487  	
488  	  // nlink
489  	  int32_t    nlink = 0;
490  	
491  	  // file (data access)
492  	  ceph_dir_layout  dir_layout;    // [dir only]
493  	  file_layout_t layout;
494  	  compact_set<int64_t, std::less<int64_t>, Allocator<int64_t>> old_pools;
495  	  uint64_t   size = 0;        // on directory, # dentries
496  	  uint64_t   max_size_ever = 0; // max size the file has ever been
497  	  uint32_t   truncate_seq = 0;
498  	  uint64_t   truncate_size = 0, truncate_from = 0;
499  	  uint32_t   truncate_pending = 0;
500  	  utime_t    mtime;   // file data modify time.
501  	  utime_t    atime;   // file data access time.
502  	  uint32_t   time_warp_seq = 0;  // count of (potential) mtime/atime timewarps (i.e., utimes())
503  	  inline_data_t inline_data; // FIXME check
504  	
505  	  // change attribute
506  	  uint64_t   change_attr = 0;
507  	
508  	  using client_range_map = std::map<client_t,client_writeable_range_t,std::less<client_t>,Allocator<std::pair<const client_t,client_writeable_range_t>>>;
509  	  client_range_map client_ranges;  // client(s) can write to these ranges
510  	
511  	  // dirfrag, recursive accountin
512  	  frag_info_t dirstat;         // protected by my filelock
513  	  nest_info_t rstat;           // protected by my nestlock
514  	  nest_info_t accounted_rstat; // protected by parent's nestlock
515  	
516  	  quota_info_t quota;
517  	
518  	  mds_rank_t export_pin = MDS_RANK_NONE;
519  	 
520  	  // special stuff
521  	  version_t version = 0;           // auth only
522  	  version_t file_data_version = 0; // auth only
523  	  version_t xattr_version = 0;
524  	
525  	  utime_t last_scrub_stamp;    // start time of last complete scrub
526  	  version_t last_scrub_version = 0;// (parent) start version of last complete scrub
527  	
528  	  version_t backtrace_version = 0;
529  	
530  	  snapid_t oldest_snap;
531  	
532  	  std::basic_string<char,std::char_traits<char>,Allocator<char>> stray_prior_path; //stores path before unlink
533  	
534  	  inode_t()
535  	  {
536  	    clear_layout();
537  	    memset(&dir_layout, 0, sizeof(dir_layout));
538  	  }
539  	
540  	  // file type
541  	  bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
542  	  bool is_dir()     const { return (mode & S_IFMT) == S_IFDIR; }
543  	  bool is_file()    const { return (mode & S_IFMT) == S_IFREG; }
544  	
545  	  bool is_truncating() const { return (truncate_pending > 0); }
546  	  void truncate(uint64_t old_size, uint64_t new_size) {
547  	    ceph_assert(new_size < old_size);
548  	    if (old_size > max_size_ever)
549  	      max_size_ever = old_size;
550  	    truncate_from = old_size;
551  	    size = new_size;
552  	    rstat.rbytes = new_size;
553  	    truncate_size = size;
554  	    truncate_seq++;
555  	    truncate_pending++;
556  	  }
557  	
558  	  bool has_layout() const {
559  	    return layout != file_layout_t();
560  	  }
561  	
562  	  void clear_layout() {
563  	    layout = file_layout_t();
564  	  }
565  	
566  	  uint64_t get_layout_size_increment() const {
567  	    return layout.get_period();
568  	  }
569  	
570  	  bool is_dirty_rstat() const { return !(rstat == accounted_rstat); }
571  	
572  	  uint64_t get_max_size() const {
573  	    uint64_t max = 0;
574  	      for (std::map<client_t,client_writeable_range_t>::const_iterator p = client_ranges.begin();
575  		   p != client_ranges.end();
576  		   ++p)
577  		if (p->second.range.last > max)
578  		  max = p->second.range.last;
579  	      return max;
580  	  }
581  	  void set_max_size(uint64_t new_max) {
582  	    if (new_max == 0) {
583  	      client_ranges.clear();
584  	    } else {
585  	      for (std::map<client_t,client_writeable_range_t>::iterator p = client_ranges.begin();
586  		   p != client_ranges.end();
587  		   ++p)
588  		p->second.range.last = new_max;
589  	    }
590  	  }
591  	
592  	  void trim_client_ranges(snapid_t last) {
593  	    std::map<client_t, client_writeable_range_t>::iterator p = client_ranges.begin();
594  	    while (p != client_ranges.end()) {
595  	      if (p->second.follows >= last)
596  		client_ranges.erase(p++);
597  	      else
598  		++p;
599  	    }
600  	  }
601  	
602  	  bool is_backtrace_updated() const {
603  	    return backtrace_version == version;
604  	  }
605  	  void update_backtrace(version_t pv=0) {
606  	    backtrace_version = pv ? pv : version;
607  	  }
608  	
609  	  void add_old_pool(int64_t l) {
610  	    backtrace_version = version;
611  	    old_pools.insert(l);
612  	  }
613  	
614  	  void encode(bufferlist &bl, uint64_t features) const;
615  	  void decode(bufferlist::const_iterator& bl);
616  	  void dump(Formatter *f) const;
617  	  static void generate_test_instances(std::list<inode_t*>& ls);
618  	  /**
619  	   * Compare this inode_t with another that represent *the same inode*
620  	   * at different points in time.
621  	   * @pre The inodes are the same ino
622  	   *
623  	   * @param other The inode_t to compare ourselves with
624  	   * @param divergent A bool pointer which will be set to true
625  	   * if the values are different in a way that can't be explained
626  	   * by one being a newer version than the other.
627  	   *
628  	   * @returns 1 if we are newer than the other, 0 if equal, -1 if older.
629  	   */
630  	  int compare(const inode_t &other, bool *divergent) const;
631  	private:
632  	  bool older_is_consistent(const inode_t &other) const;
633  	};
634  	
635  	// These methods may be moved back to mdstypes.cc when we have pmr
636  	template<template<typename> class Allocator>
637  	void inode_t<Allocator>::encode(bufferlist &bl, uint64_t features) const
638  	{
639  	  ENCODE_START(15, 6, bl);
640  	
641  	  encode(ino, bl);
642  	  encode(rdev, bl);
643  	  encode(ctime, bl);
644  	
645  	  encode(mode, bl);
646  	  encode(uid, bl);
647  	  encode(gid, bl);
648  	
649  	  encode(nlink, bl);
650  	  {
651  	    // removed field
652  	    bool anchored = 0;
653  	    encode(anchored, bl);
654  	  }
655  	
656  	  encode(dir_layout, bl);
657  	  encode(layout, bl, features);
658  	  encode(size, bl);
659  	  encode(truncate_seq, bl);
660  	  encode(truncate_size, bl);
661  	  encode(truncate_from, bl);
662  	  encode(truncate_pending, bl);
663  	  encode(mtime, bl);
664  	  encode(atime, bl);
665  	  encode(time_warp_seq, bl);
666  	  encode(client_ranges, bl);
667  	
668  	  encode(dirstat, bl);
669  	  encode(rstat, bl);
670  	  encode(accounted_rstat, bl);
671  	
672  	  encode(version, bl);
673  	  encode(file_data_version, bl);
674  	  encode(xattr_version, bl);
675  	  encode(backtrace_version, bl);
676  	  encode(old_pools, bl);
677  	  encode(max_size_ever, bl);
678  	  encode(inline_data, bl);
679  	  encode(quota, bl);
680  	
681  	  encode(stray_prior_path, bl);
682  	
683  	  encode(last_scrub_version, bl);
684  	  encode(last_scrub_stamp, bl);
685  	
686  	  encode(btime, bl);
687  	  encode(change_attr, bl);
688  	
689  	  encode(export_pin, bl);
690  	
691  	  ENCODE_FINISH(bl);
692  	}
693  	
694  	template<template<typename> class Allocator>
695  	void inode_t<Allocator>::decode(bufferlist::const_iterator &p)
696  	{
697  	  DECODE_START_LEGACY_COMPAT_LEN(15, 6, 6, p);
698  	
699  	  decode(ino, p);
700  	  decode(rdev, p);
701  	  decode(ctime, p);
702  	
703  	  decode(mode, p);
704  	  decode(uid, p);
705  	  decode(gid, p);
706  	
707  	  decode(nlink, p);
708  	  {
709  	    bool anchored;
710  	    decode(anchored, p);
711  	  }
712  	
713  	  if (struct_v >= 4)
714  	    decode(dir_layout, p);
715  	  else
716  	    memset(&dir_layout, 0, sizeof(dir_layout));
717  	  decode(layout, p);
718  	  decode(size, p);
719  	  decode(truncate_seq, p);
720  	  decode(truncate_size, p);
721  	  decode(truncate_from, p);
722  	  if (struct_v >= 5)
723  	    decode(truncate_pending, p);
724  	  else
725  	    truncate_pending = 0;
726  	  decode(mtime, p);
727  	  decode(atime, p);
728  	  decode(time_warp_seq, p);
729  	  if (struct_v >= 3) {
730  	    decode(client_ranges, p);
731  	  } else {
732  	    map<client_t, client_writeable_range_t::byte_range_t> m;
733  	    decode(m, p);
734  	    for (map<client_t, client_writeable_range_t::byte_range_t>::iterator
735  		q = m.begin(); q != m.end(); ++q)
736  	      client_ranges[q->first].range = q->second;
737  	  }
738  	
739  	  decode(dirstat, p);
740  	  decode(rstat, p);
741  	  decode(accounted_rstat, p);
742  	
743  	  decode(version, p);
744  	  decode(file_data_version, p);
745  	  decode(xattr_version, p);
746  	  if (struct_v >= 2)
747  	    decode(backtrace_version, p);
748  	  if (struct_v >= 7)
749  	    decode(old_pools, p);
750  	  if (struct_v >= 8)
751  	    decode(max_size_ever, p);
752  	  if (struct_v >= 9) {
753  	    decode(inline_data, p);
754  	  } else {
755  	    inline_data.version = CEPH_INLINE_NONE;
756  	  }
757  	  if (struct_v < 10)
758  	    backtrace_version = 0; // force update backtrace
759  	  if (struct_v >= 11)
760  	    decode(quota, p);
761  	
762  	  if (struct_v >= 12) {
763  	    std::string tmp;
764  	    decode(tmp, p);
765  	    stray_prior_path = std::string_view(tmp);
766  	  }
767  	
768  	  if (struct_v >= 13) {
769  	    decode(last_scrub_version, p);
770  	    decode(last_scrub_stamp, p);
771  	  }
772  	  if (struct_v >= 14) {
773  	    decode(btime, p);
774  	    decode(change_attr, p);
775  	  } else {
776  	    btime = utime_t();
777  	    change_attr = 0;
778  	  }
779  	
780  	  if (struct_v >= 15) {
781  	    decode(export_pin, p);
782  	  } else {
783  	    export_pin = MDS_RANK_NONE;
784  	  }
785  	
786  	  DECODE_FINISH(p);
787  	}
788  	
789  	template<template<typename> class Allocator>
790  	void inode_t<Allocator>::dump(Formatter *f) const
791  	{
792  	  f->dump_unsigned("ino", ino);
793  	  f->dump_unsigned("rdev", rdev);
794  	  f->dump_stream("ctime") << ctime;
795  	  f->dump_stream("btime") << btime;
796  	  f->dump_unsigned("mode", mode);
797  	  f->dump_unsigned("uid", uid);
798  	  f->dump_unsigned("gid", gid);
799  	  f->dump_unsigned("nlink", nlink);
800  	
801  	  f->open_object_section("dir_layout");
802  	  ::dump(dir_layout, f);
803  	  f->close_section();
804  	
805  	  f->dump_object("layout", layout);
806  	
807  	  f->open_array_section("old_pools");
808  	  for (const auto &p : old_pools) {
809  	    f->dump_int("pool", p);
810  	  }
811  	  f->close_section();
812  	
813  	  f->dump_unsigned("size", size);
814  	  f->dump_unsigned("truncate_seq", truncate_seq);
815  	  f->dump_unsigned("truncate_size", truncate_size);
816  	  f->dump_unsigned("truncate_from", truncate_from);
817  	  f->dump_unsigned("truncate_pending", truncate_pending);
818  	  f->dump_stream("mtime") << mtime;
819  	  f->dump_stream("atime") << atime;
820  	  f->dump_unsigned("time_warp_seq", time_warp_seq);
821  	  f->dump_unsigned("change_attr", change_attr);
822  	  f->dump_int("export_pin", export_pin);
823  	
824  	  f->open_array_section("client_ranges");
825  	  for (const auto &p : client_ranges) {
826  	    f->open_object_section("client");
827  	    f->dump_unsigned("client", p.first.v);
828  	    p.second.dump(f);
829  	    f->close_section();
830  	  }
831  	  f->close_section();
832  	
833  	  f->open_object_section("dirstat");
834  	  dirstat.dump(f);
835  	  f->close_section();
836  	
837  	  f->open_object_section("rstat");
838  	  rstat.dump(f);
839  	  f->close_section();
840  	
841  	  f->open_object_section("accounted_rstat");
842  	  accounted_rstat.dump(f);
843  	  f->close_section();
844  	
845  	  f->dump_unsigned("version", version);
846  	  f->dump_unsigned("file_data_version", file_data_version);
847  	  f->dump_unsigned("xattr_version", xattr_version);
848  	  f->dump_unsigned("backtrace_version", backtrace_version);
849  	
850  	  f->dump_string("stray_prior_path", stray_prior_path);
851  	  f->dump_unsigned("max_size_ever", max_size_ever);
852  	
853  	  f->open_object_section("quota");
854  	  quota.dump(f);
855  	  f->close_section();
856  	
857  	  f->dump_stream("last_scrub_stamp") << last_scrub_stamp;
858  	  f->dump_unsigned("last_scrub_version", last_scrub_version);
859  	}
860  	
861  	template<template<typename> class Allocator>
862  	void inode_t<Allocator>::generate_test_instances(std::list<inode_t*>& ls)
863  	{
864  	  ls.push_back(new inode_t<Allocator>);
865  	  ls.push_back(new inode_t<Allocator>);
866  	  ls.back()->ino = 1;
867  	  // i am lazy.
868  	}
869  	
870  	template<template<typename> class Allocator>
871  	int inode_t<Allocator>::compare(const inode_t<Allocator> &other, bool *divergent) const
872  	{
873  	  ceph_assert(ino == other.ino);
874  	  *divergent = false;
875  	  if (version == other.version) {
876  	    if (rdev != other.rdev ||
877  	        ctime != other.ctime ||
878  	        btime != other.btime ||
879  	        mode != other.mode ||
880  	        uid != other.uid ||
881  	        gid != other.gid ||
882  	        nlink != other.nlink ||
883  	        memcmp(&dir_layout, &other.dir_layout, sizeof(dir_layout)) ||
884  	        layout != other.layout ||
885  	        old_pools != other.old_pools ||
886  	        size != other.size ||
887  	        max_size_ever != other.max_size_ever ||
888  	        truncate_seq != other.truncate_seq ||
889  	        truncate_size != other.truncate_size ||
890  	        truncate_from != other.truncate_from ||
891  	        truncate_pending != other.truncate_pending ||
892  		change_attr != other.change_attr ||
893  	        mtime != other.mtime ||
894  	        atime != other.atime ||
895  	        time_warp_seq != other.time_warp_seq ||
896  	        inline_data != other.inline_data ||
897  	        client_ranges != other.client_ranges ||
898  	        !(dirstat == other.dirstat) ||
899  	        !(rstat == other.rstat) ||
900  	        !(accounted_rstat == other.accounted_rstat) ||
901  	        file_data_version != other.file_data_version ||
902  	        xattr_version != other.xattr_version ||
903  	        backtrace_version != other.backtrace_version) {
904  	      *divergent = true;
905  	    }
906  	    return 0;
907  	  } else if (version > other.version) {
908  	    *divergent = !older_is_consistent(other);
909  	    return 1;
910  	  } else {
911  	    ceph_assert(version < other.version);
912  	    *divergent = !other.older_is_consistent(*this);
913  	    return -1;
914  	  }
915  	}
916  	
917  	template<template<typename> class Allocator>
918  	bool inode_t<Allocator>::older_is_consistent(const inode_t<Allocator> &other) const
919  	{
920  	  if (max_size_ever < other.max_size_ever ||
921  	      truncate_seq < other.truncate_seq ||
922  	      time_warp_seq < other.time_warp_seq ||
923  	      inline_data.version < other.inline_data.version ||
924  	      dirstat.version < other.dirstat.version ||
925  	      rstat.version < other.rstat.version ||
926  	      accounted_rstat.version < other.accounted_rstat.version ||
927  	      file_data_version < other.file_data_version ||
928  	      xattr_version < other.xattr_version ||
929  	      backtrace_version < other.backtrace_version) {
930  	    return false;
931  	  }
932  	  return true;
933  	}
934  	
935  	template<template<typename> class Allocator>
936  	inline void encode(const inode_t<Allocator> &c, ::ceph::bufferlist &bl, uint64_t features)
937  	{
938  	  ENCODE_DUMP_PRE();
939  	  c.encode(bl, features);
940  	  ENCODE_DUMP_POST(cl);
941  	}
942  	template<template<typename> class Allocator>
943  	inline void decode(inode_t<Allocator> &c, ::ceph::bufferlist::const_iterator &p)
944  	{
945  	  c.decode(p);
946  	}
947  	
948  	template<template<typename> class Allocator>
949  	using alloc_string = std::basic_string<char,std::char_traits<char>,Allocator<char>>;
950  	
951  	template<template<typename> class Allocator>
952  	using xattr_map = compact_map<alloc_string<Allocator>, bufferptr, std::less<alloc_string<Allocator>>, Allocator<std::pair<const alloc_string<Allocator>, bufferptr>>>; // FIXME bufferptr not in mempool
953  	
954  	/*
955  	 * old_inode_t
956  	 */
957  	template<template<typename> class Allocator = std::allocator>
958  	struct old_inode_t {
959  	  snapid_t first;
960  	  inode_t<Allocator> inode;
961  	  xattr_map<Allocator> xattrs;
962  	
963  	  void encode(bufferlist &bl, uint64_t features) const;
964  	  void decode(bufferlist::const_iterator& bl);
965  	  void dump(Formatter *f) const;
966  	  static void generate_test_instances(std::list<old_inode_t*>& ls);
967  	};
968  	
969  	// These methods may be moved back to mdstypes.cc when we have pmr
970  	template<template<typename> class Allocator>
971  	void old_inode_t<Allocator>::encode(bufferlist& bl, uint64_t features) const
972  	{
973  	  ENCODE_START(2, 2, bl);
974  	  encode(first, bl);
975  	  encode(inode, bl, features);
976  	  encode(xattrs, bl);
977  	  ENCODE_FINISH(bl);
978  	}
979  	
980  	template<template<typename> class Allocator>
981  	void old_inode_t<Allocator>::decode(bufferlist::const_iterator& bl)
982  	{
983  	  DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
984  	  decode(first, bl);
985  	  decode(inode, bl);
986  	  decode(xattrs, bl);
987  	  DECODE_FINISH(bl);
988  	}
989  	
990  	template<template<typename> class Allocator>
991  	void old_inode_t<Allocator>::dump(Formatter *f) const
992  	{
993  	  f->dump_unsigned("first", first);
994  	  inode.dump(f);
995  	  f->open_object_section("xattrs");
996  	  for (const auto &p : xattrs) {
997  	    std::string v(p.second.c_str(), p.second.length());
998  	    f->dump_string(p.first.c_str(), v);
999  	  }
1000 	  f->close_section();
1001 	}
1002 	
1003 	template<template<typename> class Allocator>
1004 	void old_inode_t<Allocator>::generate_test_instances(std::list<old_inode_t<Allocator>*>& ls)
1005 	{
1006 	  ls.push_back(new old_inode_t<Allocator>);
1007 	  ls.push_back(new old_inode_t<Allocator>);
1008 	  ls.back()->first = 2;
1009 	  std::list<inode_t<Allocator>*> ils;
1010 	  inode_t<Allocator>::generate_test_instances(ils);
1011 	  ls.back()->inode = *ils.back();
1012 	  ls.back()->xattrs["user.foo"] = buffer::copy("asdf", 4);
1013 	  ls.back()->xattrs["user.unprintable"] = buffer::copy("\000\001\002", 3);
1014 	}
1015 	
1016 	template<template<typename> class Allocator>
1017 	inline void encode(const old_inode_t<Allocator> &c, ::ceph::bufferlist &bl, uint64_t features)
1018 	{
1019 	  ENCODE_DUMP_PRE();
1020 	  c.encode(bl, features);
1021 	  ENCODE_DUMP_POST(cl);
1022 	}
1023 	template<template<typename> class Allocator>
1024 	inline void decode(old_inode_t<Allocator> &c, ::ceph::bufferlist::const_iterator &p)
1025 	{
1026 	  c.decode(p);
1027 	}
1028 	
1029 	
1030 	/*
1031 	 * like an inode, but for a dir frag 
1032 	 */
1033 	struct fnode_t {
1034 	  version_t version = 0;
1035 	  snapid_t snap_purged_thru;   // the max_last_destroy snapid we've been purged thru
1036 	  frag_info_t fragstat, accounted_fragstat;
1037 	  nest_info_t rstat, accounted_rstat;
1038 	  damage_flags_t damage_flags = 0;
1039 	
1040 	  // we know we and all our descendants have been scrubbed since this version
1041 	  version_t recursive_scrub_version = 0;
1042 	  utime_t recursive_scrub_stamp;
1043 	  // version at which we last scrubbed our personal data structures
1044 	  version_t localized_scrub_version = 0;
1045 	  utime_t localized_scrub_stamp;
1046 	
1047 	  void encode(bufferlist &bl) const;
1048 	  void decode(bufferlist::const_iterator& bl);
1049 	  void dump(Formatter *f) const;
1050 	  static void generate_test_instances(std::list<fnode_t*>& ls);
1051 	  fnode_t() {}
1052 	};
1053 	WRITE_CLASS_ENCODER(fnode_t)
1054 	
1055 	
1056 	struct old_rstat_t {
1057 	  snapid_t first;
1058 	  nest_info_t rstat, accounted_rstat;
1059 	
1060 	  void encode(bufferlist& bl) const;
1061 	  void decode(bufferlist::const_iterator& p);
1062 	  void dump(Formatter *f) const;
1063 	  static void generate_test_instances(std::list<old_rstat_t*>& ls);
1064 	};
1065 	WRITE_CLASS_ENCODER(old_rstat_t)
1066 	
1067 	inline std::ostream& operator<<(std::ostream& out, const old_rstat_t& o) {
1068 	  return out << "old_rstat(first " << o.first << " " << o.rstat << " " << o.accounted_rstat << ")";
1069 	}
1070 	
1071 	/*
1072 	 * feature_bitset_t
1073 	 */
1074 	class feature_bitset_t {
1075 	public:
1076 	  typedef uint64_t block_type;
1077 	  static const size_t bits_per_block = sizeof(block_type) * 8;
1078 	
1079 	  feature_bitset_t(const feature_bitset_t& other) : _vec(other._vec) {}
1080 	  feature_bitset_t(feature_bitset_t&& other) : _vec(std::move(other._vec)) {}
1081 	  feature_bitset_t(unsigned long value = 0);
1082 	  feature_bitset_t(const vector<size_t>& array);
1083 	  feature_bitset_t& operator=(const feature_bitset_t& other) {
1084 	    _vec = other._vec;
1085 	    return *this;
1086 	  }
1087 	  feature_bitset_t& operator=(feature_bitset_t&& other) {
1088 	    _vec = std::move(other._vec);
1089 	    return *this;
1090 	  }
1091 	  bool empty() const {
1092 	    for (auto& v : _vec) {
1093 	      if (v)
1094 		return false;
1095 	    }
1096 	    return true;
1097 	  }
1098 	  bool test(size_t bit) const {
1099 	    if (bit >= bits_per_block * _vec.size())
1100 	      return false;
1101 	    return _vec[bit / bits_per_block] & ((block_type)1 << (bit % bits_per_block));
1102 	  }
1103 	  void clear() {
1104 	    _vec.clear();
1105 	  }
1106 	  feature_bitset_t& operator-=(const feature_bitset_t& other);
1107 	  void encode(bufferlist& bl) const;
1108 	  void decode(bufferlist::const_iterator &p);
1109 	  void print(ostream& out) const;
1110 	private:
1111 	  vector<block_type> _vec;
1112 	};
1113 	WRITE_CLASS_ENCODER(feature_bitset_t)
1114 	
1115 	inline std::ostream& operator<<(std::ostream& out, const feature_bitset_t& s) {
1116 	  s.print(out);
1117 	  return out;
1118 	}
1119 	
1120 	/*
1121 	 * client_metadata_t
1122 	 */
1123 	struct client_metadata_t {
1124 	  using kv_map_t = std::map<std::string,std::string>;
1125 	  using iterator = kv_map_t::const_iterator;
1126 	
1127 	  kv_map_t kv_map;
1128 	  feature_bitset_t features;
1129 	
1130 	  client_metadata_t() {}
1131 	  client_metadata_t(const client_metadata_t& other) :
1132 	    kv_map(other.kv_map), features(other.features) {}
1133 	  client_metadata_t(client_metadata_t&& other) :
1134 	    kv_map(std::move(other.kv_map)), features(std::move(other.features)) {}
1135 	  client_metadata_t(kv_map_t&& kv, feature_bitset_t &&f) :
1136 	    kv_map(std::move(kv)), features(std::move(f)) {}
1137 	  client_metadata_t(const kv_map_t& kv, const feature_bitset_t &f) :
1138 	    kv_map(kv), features(f) {}
1139 	  client_metadata_t& operator=(const client_metadata_t& other) {
1140 	    kv_map = other.kv_map;
1141 	    features = other.features;
1142 	    return *this;
1143 	  }
1144 	
1145 	  bool empty() const { return kv_map.empty() && features.empty(); }
1146 	  iterator find(const std::string& key) const { return kv_map.find(key); }
1147 	  iterator begin() const { return kv_map.begin(); }
1148 	  iterator end() const { return kv_map.end(); }
1149 	  void erase(iterator it) { kv_map.erase(it); }
1150 	  std::string& operator[](const std::string& key) { return kv_map[key]; }
1151 	  void merge(const client_metadata_t& other) {
1152 	    kv_map.insert(other.kv_map.begin(), other.kv_map.end());
1153 	    features = other.features;
1154 	  }
1155 	  void clear() {
1156 	    kv_map.clear();
1157 	    features.clear();
1158 	  }
1159 	
1160 	  void encode(bufferlist& bl) const;
1161 	  void decode(bufferlist::const_iterator& p);
1162 	  void dump(Formatter *f) const;
1163 	};
1164 	WRITE_CLASS_ENCODER(client_metadata_t)
1165 	
1166 	/*
1167 	 * session_info_t
1168 	 */
1169 	struct session_info_t {
1170 	  entity_inst_t inst;
1171 	  std::map<ceph_tid_t,inodeno_t> completed_requests;
1172 	  interval_set<inodeno_t> prealloc_inos;   // preallocated, ready to use.
1173 	  interval_set<inodeno_t> used_inos;       // journaling use
1174 	  client_metadata_t client_metadata;
1175 	  std::set<ceph_tid_t> completed_flushes;
1176 	  EntityName auth_name;
1177 	
1178 	  client_t get_client() const { return client_t(inst.name.num()); }
1179 	  bool has_feature(size_t bit) const { return client_metadata.features.test(bit); }
1180 	  const entity_name_t& get_source() const { return inst.name; }
1181 	
1182 	  void clear_meta() {
1183 	    prealloc_inos.clear();
1184 	    used_inos.clear();
1185 	    completed_requests.clear();
1186 	    completed_flushes.clear();
1187 	    client_metadata.clear();
1188 	  }
1189 	
1190 	  void encode(bufferlist& bl, uint64_t features) const;
1191 	  void decode(bufferlist::const_iterator& p);
1192 	  void dump(Formatter *f) const;
1193 	  static void generate_test_instances(std::list<session_info_t*>& ls);
1194 	};
1195 	WRITE_CLASS_ENCODER_FEATURES(session_info_t)
1196 	
1197 	
1198 	// =======
1199 	// dentries
1200 	
1201 	struct dentry_key_t {
1202 	  snapid_t snapid = 0;
1203 	  std::string_view name;
1204 	  __u32 hash = 0;
1205 	  dentry_key_t() {}
1206 	  dentry_key_t(snapid_t s, std::string_view n, __u32 h=0) :
1207 	    snapid(s), name(n), hash(h) {}
1208 	
1209 	  bool is_valid() { return name.length() || snapid; }
1210 	
1211 	  // encode into something that can be decoded as a string.
1212 	  // name_ (head) or name_%x (!head)
1213 	  void encode(bufferlist& bl) const {
1214 	    string key;
1215 	    encode(key);
1216 	    using ceph::encode;
1217 	    encode(key, bl);
1218 	  }
1219 	  void encode(string& key) const {
1220 	    char b[20];
1221 	    if (snapid != CEPH_NOSNAP) {
1222 	      uint64_t val(snapid);
1223 	      snprintf(b, sizeof(b), "%" PRIx64, val);
1224 	    } else {
1225 	      snprintf(b, sizeof(b), "%s", "head");
1226 	    }
1227 	    ostringstream oss;
1228 	    oss << name << "_" << b;
1229 	    key = oss.str();
1230 	  }
1231 	  static void decode_helper(bufferlist::const_iterator& bl, string& nm, snapid_t& sn) {
1232 	    string key;
1233 	    decode(key, bl);
1234 	    decode_helper(key, nm, sn);
1235 	  }
1236 	  static void decode_helper(std::string_view key, string& nm, snapid_t& sn) {
1237 	    size_t i = key.find_last_of('_');
1238 	    ceph_assert(i != string::npos);
1239 	    if (key.compare(i+1, std::string_view::npos, "head") == 0) {
1240 	      // name_head
1241 	      sn = CEPH_NOSNAP;
1242 	    } else {
1243 	      // name_%x
1244 	      long long unsigned x = 0;
1245 	      std::string x_str(key.substr(i+1));
1246 	      sscanf(x_str.c_str(), "%llx", &x);
1247 	      sn = x;
1248 	    }  
1249 	    nm = key.substr(0, i);
1250 	  }
1251 	};
1252 	
1253 	inline std::ostream& operator<<(std::ostream& out, const dentry_key_t &k)
1254 	{
1255 	  return out << "(" << k.name << "," << k.snapid << ")";
1256 	}
1257 	
1258 	inline bool operator<(const dentry_key_t& k1, const dentry_key_t& k2)
1259 	{
1260 	  /*
1261 	   * order by hash, name, snap
1262 	   */
1263 	  int c = ceph_frag_value(k1.hash) - ceph_frag_value(k2.hash);
1264 	  if (c)
1265 	    return c < 0;
1266 	  c = k1.name.compare(k2.name);
1267 	  if (c)
1268 	    return c < 0;
1269 	  return k1.snapid < k2.snapid;
1270 	}
1271 	
1272 	
1273 	/*
1274 	 * string_snap_t is a simple (string, snapid_t) pair
1275 	 */
1276 	struct string_snap_t {
1277 	  string name;
1278 	  snapid_t snapid;
1279 	  string_snap_t() {}
1280 	  string_snap_t(std::string_view n, snapid_t s) : name(n), snapid(s) {}
1281 	
1282 	  void encode(bufferlist& bl) const;
1283 	  void decode(bufferlist::const_iterator& p);
1284 	  void dump(Formatter *f) const;
1285 	  static void generate_test_instances(std::list<string_snap_t*>& ls);
1286 	};
1287 	WRITE_CLASS_ENCODER(string_snap_t)
1288 	
1289 	inline bool operator<(const string_snap_t& l, const string_snap_t& r) {
1290 	  int c = l.name.compare(r.name);
1291 	  return c < 0 || (c == 0 && l.snapid < r.snapid);
1292 	}
1293 	
1294 	inline std::ostream& operator<<(std::ostream& out, const string_snap_t &k)
1295 	{
1296 	  return out << "(" << k.name << "," << k.snapid << ")";
1297 	}
1298 	
1299 	/*
1300 	 * mds_table_pending_t
1301 	 *
1302 	 * mds's requesting any pending ops.  child needs to encode the corresponding
1303 	 * pending mutation state in the table.
1304 	 */
1305 	struct mds_table_pending_t {
1306 	  uint64_t reqid = 0;
1307 	  __s32 mds = 0;
1308 	  version_t tid = 0;
1309 	  mds_table_pending_t() {}
1310 	  void encode(bufferlist& bl) const;
1311 	  void decode(bufferlist::const_iterator& bl);
1312 	  void dump(Formatter *f) const;
1313 	  static void generate_test_instances(std::list<mds_table_pending_t*>& ls);
1314 	};
1315 	WRITE_CLASS_ENCODER(mds_table_pending_t)
1316 	
1317 	
1318 	// =========
1319 	// requests
1320 	
1321 	struct metareqid_t {
1322 	  entity_name_t name;
1323 	  uint64_t tid = 0;
1324 	  metareqid_t() {}
1325 	  metareqid_t(entity_name_t n, ceph_tid_t t) : name(n), tid(t) {}
1326 	  void encode(bufferlist& bl) const {
1327 	    using ceph::encode;
1328 	    encode(name, bl);
1329 	    encode(tid, bl);
1330 	  }
1331 	  void decode(bufferlist::const_iterator &p) {
1332 	    using ceph::decode;
1333 	    decode(name, p);
1334 	    decode(tid, p);
1335 	  }
1336 	};
1337 	WRITE_CLASS_ENCODER(metareqid_t)
1338 	
1339 	inline std::ostream& operator<<(std::ostream& out, const metareqid_t& r) {
1340 	  return out << r.name << ":" << r.tid;
1341 	}
1342 	
1343 	inline bool operator==(const metareqid_t& l, const metareqid_t& r) {
1344 	  return (l.name == r.name) && (l.tid == r.tid);
1345 	}
1346 	inline bool operator!=(const metareqid_t& l, const metareqid_t& r) {
1347 	  return (l.name != r.name) || (l.tid != r.tid);
1348 	}
1349 	inline bool operator<(const metareqid_t& l, const metareqid_t& r) {
1350 	  return (l.name < r.name) || 
1351 	    (l.name == r.name && l.tid < r.tid);
1352 	}
1353 	inline bool operator<=(const metareqid_t& l, const metareqid_t& r) {
1354 	  return (l.name < r.name) ||
1355 	    (l.name == r.name && l.tid <= r.tid);
1356 	}
1357 	inline bool operator>(const metareqid_t& l, const metareqid_t& r) { return !(l <= r); }
1358 	inline bool operator>=(const metareqid_t& l, const metareqid_t& r) { return !(l < r); }
1359 	
1360 	namespace std {
1361 	  template<> struct hash<metareqid_t> {
1362 	    size_t operator()(const metareqid_t &r) const { 
1363 	      hash<uint64_t> H;
1364 	      return H(r.name.num()) ^ H(r.name.type()) ^ H(r.tid);
1365 	    }
1366 	  };
1367 	} // namespace std
1368 	
1369 	
1370 	// cap info for client reconnect
1371 	struct cap_reconnect_t {
1372 	  string path;
1373 	  mutable ceph_mds_cap_reconnect capinfo;
1374 	  snapid_t snap_follows;
1375 	  bufferlist flockbl;
1376 	
1377 	  cap_reconnect_t() {
1378 	    memset(&capinfo, 0, sizeof(capinfo));
1379 	    snap_follows = 0;
1380 	  }
1381 	  cap_reconnect_t(uint64_t cap_id, inodeno_t pino, std::string_view p, int w, int i,
1382 			  inodeno_t sr, snapid_t sf, bufferlist& lb) :
1383 	    path(p) {
1384 	    capinfo.cap_id = cap_id;
1385 	    capinfo.wanted = w;
1386 	    capinfo.issued = i;
1387 	    capinfo.snaprealm = sr;
1388 	    capinfo.pathbase = pino;
1389 	    capinfo.flock_len = 0;
1390 	    snap_follows = sf;
1391 	    flockbl.claim(lb);
1392 	  }
1393 	  void encode(bufferlist& bl) const;
1394 	  void decode(bufferlist::const_iterator& bl);
1395 	  void encode_old(bufferlist& bl) const;
1396 	  void decode_old(bufferlist::const_iterator& bl);
1397 	
1398 	  void dump(Formatter *f) const;
1399 	  static void generate_test_instances(std::list<cap_reconnect_t*>& ls);
1400 	};
1401 	WRITE_CLASS_ENCODER(cap_reconnect_t)
1402 	
1403 	struct snaprealm_reconnect_t {
1404 	  mutable ceph_mds_snaprealm_reconnect realm;
1405 	
1406 	  snaprealm_reconnect_t() {
1407 	    memset(&realm, 0, sizeof(realm));
1408 	  }
1409 	  snaprealm_reconnect_t(inodeno_t ino, snapid_t seq, inodeno_t parent) {
1410 	    realm.ino = ino;
1411 	    realm.seq = seq;
1412 	    realm.parent = parent;
1413 	  }
1414 	  void encode(bufferlist& bl) const;
1415 	  void decode(bufferlist::const_iterator& bl);
1416 	  void encode_old(bufferlist& bl) const;
1417 	  void decode_old(bufferlist::const_iterator& bl);
1418 	
1419 	  void dump(Formatter *f) const;
1420 	  static void generate_test_instances(std::list<snaprealm_reconnect_t*>& ls);
1421 	};
1422 	WRITE_CLASS_ENCODER(snaprealm_reconnect_t)
1423 	
1424 	// compat for pre-FLOCK feature
1425 	struct old_ceph_mds_cap_reconnect {
1426 		ceph_le64 cap_id;
1427 		ceph_le32 wanted;
1428 		ceph_le32 issued;
1429 	  ceph_le64 old_size;
1430 	  struct ceph_timespec old_mtime, old_atime;
1431 		ceph_le64 snaprealm;
1432 		ceph_le64 pathbase;        /* base ino for our path to this ino */
1433 	} __attribute__ ((packed));
1434 	WRITE_RAW_ENCODER(old_ceph_mds_cap_reconnect)
1435 	
1436 	struct old_cap_reconnect_t {
1437 	  string path;
1438 	  old_ceph_mds_cap_reconnect capinfo;
1439 	
1440 	  const old_cap_reconnect_t& operator=(const cap_reconnect_t& n) {
1441 	    path = n.path;
1442 	    capinfo.cap_id = n.capinfo.cap_id;
1443 	    capinfo.wanted = n.capinfo.wanted;
1444 	    capinfo.issued = n.capinfo.issued;
1445 	    capinfo.snaprealm = n.capinfo.snaprealm;
1446 	    capinfo.pathbase = n.capinfo.pathbase;
1447 	    return *this;
1448 	  }
1449 	  operator cap_reconnect_t() {
1450 	    cap_reconnect_t n;
1451 	    n.path = path;
1452 	    n.capinfo.cap_id = capinfo.cap_id;
1453 	    n.capinfo.wanted = capinfo.wanted;
1454 	    n.capinfo.issued = capinfo.issued;
1455 	    n.capinfo.snaprealm = capinfo.snaprealm;
1456 	    n.capinfo.pathbase = capinfo.pathbase;
1457 	    return n;
1458 	  }
1459 	
1460 	  void encode(bufferlist& bl) const {
1461 	    using ceph::encode;
1462 	    encode(path, bl);
1463 	    encode(capinfo, bl);
1464 	  }
1465 	  void decode(bufferlist::const_iterator& bl) {
1466 	    using ceph::decode;
1467 	    decode(path, bl);
1468 	    decode(capinfo, bl);
1469 	  }
1470 	};
1471 	WRITE_CLASS_ENCODER(old_cap_reconnect_t)
1472 	
1473 	
1474 	// ================================================================
1475 	// dir frag
1476 	
1477 	struct dirfrag_t {
1478 	  inodeno_t ino = 0;
1479 	  frag_t    frag;
1480 	
1481 	  dirfrag_t() {}
1482 	  dirfrag_t(inodeno_t i, frag_t f) : ino(i), frag(f) { }
1483 	
1484 	  void encode(bufferlist& bl) const {
1485 	    using ceph::encode;
1486 	    encode(ino, bl);
1487 	    encode(frag, bl);
1488 	  }
1489 	  void decode(bufferlist::const_iterator& bl) {
1490 	    using ceph::decode;
1491 	    decode(ino, bl);
1492 	    decode(frag, bl);
1493 	  }
1494 	};
1495 	WRITE_CLASS_ENCODER(dirfrag_t)
1496 	
1497 	
1498 	inline std::ostream& operator<<(std::ostream& out, const dirfrag_t &df) {
1499 	  out << df.ino;
1500 	  if (!df.frag.is_root()) out << "." << df.frag;
1501 	  return out;
1502 	}
1503 	inline bool operator<(dirfrag_t l, dirfrag_t r) {
1504 	  if (l.ino < r.ino) return true;
1505 	  if (l.ino == r.ino && l.frag < r.frag) return true;
1506 	  return false;
1507 	}
1508 	inline bool operator==(dirfrag_t l, dirfrag_t r) {
1509 	  return l.ino == r.ino && l.frag == r.frag;
1510 	}
1511 	
1512 	namespace std {
1513 	  template<> struct hash<dirfrag_t> {
1514 	    size_t operator()(const dirfrag_t &df) const { 
1515 	      static rjhash<uint64_t> H;
1516 	      static rjhash<uint32_t> I;
1517 	      return H(df.ino) ^ I(df.frag);
1518 	    }
1519 	  };
1520 	} // namespace std
1521 	
1522 	
1523 	
1524 	// ================================================================
1525 	
1526 	#define META_POP_IRD     0
1527 	#define META_POP_IWR     1
1528 	#define META_POP_READDIR 2
1529 	#define META_POP_FETCH   3
1530 	#define META_POP_STORE   4
1531 	#define META_NPOP        5
1532 	
1533 	class inode_load_vec_t {
1534 	public:
1535 	  using time = DecayCounter::time;
1536 	  using clock = DecayCounter::clock;
1537 	  static const size_t NUM = 2;
1538 	
1539 	  inode_load_vec_t() : vec{DecayCounter(DecayRate()), DecayCounter(DecayRate())} {}
1540 	  inode_load_vec_t(const DecayRate &rate) : vec{DecayCounter(rate), DecayCounter(rate)} {}
1541 	
1542 	  DecayCounter &get(int t) { 
1543 	    return vec[t]; 
1544 	  }
1545 	  void zero() {
1546 	    for (auto &d : vec) {
1547 	      d.reset();
1548 	    }
1549 	  }
1550 	  void encode(bufferlist &bl) const;
1551 	  void decode(bufferlist::const_iterator& p);
1552 	  void dump(Formatter *f) const;
1553 	  static void generate_test_instances(std::list<inode_load_vec_t*>& ls);
1554 	
1555 	private:
1556 	  std::array<DecayCounter, NUM> vec;
1557 	};
1558 	inline void encode(const inode_load_vec_t &c, bufferlist &bl) {
1559 	  c.encode(bl);
1560 	}
1561 	inline void decode(inode_load_vec_t & c, bufferlist::const_iterator &p) {
1562 	  c.decode(p);
1563 	}
1564 	
1565 	class dirfrag_load_vec_t {
1566 	public:
1567 	  using time = DecayCounter::time;
1568 	  using clock = DecayCounter::clock;
1569 	  static const size_t NUM = 5;
1570 	
1571 	  dirfrag_load_vec_t() :
1572 	      vec{DecayCounter(DecayRate()),
1573 	          DecayCounter(DecayRate()),
1574 	          DecayCounter(DecayRate()),
1575 	          DecayCounter(DecayRate()),
1576 	          DecayCounter(DecayRate())
1577 	         }
1578 	  {}
1579 	  dirfrag_load_vec_t(const DecayRate &rate) : 
1580 	      vec{DecayCounter(rate), DecayCounter(rate), DecayCounter(rate), DecayCounter(rate), DecayCounter(rate)}
1581 	  {}
1582 	
1583 	  void encode(bufferlist &bl) const {
1584 	    ENCODE_START(2, 2, bl);
1585 	    for (const auto &i : vec) {
1586 	      encode(i, bl);
1587 	    }
1588 	    ENCODE_FINISH(bl);
1589 	  }
1590 	  void decode(bufferlist::const_iterator &p) {
1591 	    DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, p);
1592 	    for (auto &i : vec) {
1593 	      decode(i, p);
1594 	    }
1595 	    DECODE_FINISH(p);
1596 	  }
1597 	  void dump(Formatter *f) const;
1598 	  void dump(Formatter *f, const DecayRate& rate) const;
1599 	  static void generate_test_instances(std::list<dirfrag_load_vec_t*>& ls);
1600 	
1601 	  const DecayCounter &get(int t) const {
1602 	    return vec[t];
1603 	  }
1604 	  DecayCounter &get(int t) {
1605 	    return vec[t];
1606 	  }
1607 	  void adjust(double d) {
1608 	    for (auto &i : vec) {
1609 	      i.adjust(d);
1610 	    }
1611 	  }
1612 	  void zero() {
1613 	    for (auto &i : vec) {
1614 	      i.reset();
1615 	    }
1616 	  }
1617 	  double meta_load() const {
1618 	    return 
1619 	      1*vec[META_POP_IRD].get() + 
1620 	      2*vec[META_POP_IWR].get() +
1621 	      1*vec[META_POP_READDIR].get() +
1622 	      2*vec[META_POP_FETCH].get() +
1623 	      4*vec[META_POP_STORE].get();
1624 	  }
1625 	
1626 	  void add(dirfrag_load_vec_t& r) {
1627 	    for (size_t i=0; i<dirfrag_load_vec_t::NUM; i++)
1628 	      vec[i].adjust(r.vec[i].get());
1629 	  }
1630 	  void sub(dirfrag_load_vec_t& r) {
1631 	    for (size_t i=0; i<dirfrag_load_vec_t::NUM; i++)
1632 	      vec[i].adjust(-r.vec[i].get());
1633 	  }
1634 	  void scale(double f) {
1635 	    for (size_t i=0; i<dirfrag_load_vec_t::NUM; i++)
1636 	      vec[i].scale(f);
1637 	  }
1638 	
1639 	private:
1640 	  friend inline std::ostream& operator<<(std::ostream& out, const dirfrag_load_vec_t& dl);
1641 	  std::array<DecayCounter, NUM> vec;
1642 	};
1643 	
1644 	inline void encode(const dirfrag_load_vec_t &c, bufferlist &bl) {
1645 	  c.encode(bl);
1646 	}
1647 	inline void decode(dirfrag_load_vec_t& c, bufferlist::const_iterator &p) {
1648 	  c.decode(p);
1649 	}
1650 	
1651 	inline std::ostream& operator<<(std::ostream& out, const dirfrag_load_vec_t& dl)
1652 	{
1653 	  std::ostringstream ss;
1654 	  ss << std::setprecision(1) << std::fixed
1655 	     << "[pop"
1656 	        " IRD:" << dl.vec[0]
1657 	     << " IWR:" << dl.vec[1]
1658 	     << " RDR:" << dl.vec[2]
1659 	     << " FET:" << dl.vec[3]
1660 	     << " STR:" << dl.vec[4]
1661 	     << " *LOAD:" << dl.meta_load() << "]";
1662 	  return out << ss.str() << std::endl;
1663 	}
1664 	
1665 	
1666 	/* mds_load_t
1667 	 * mds load
1668 	 */
1669 	
1670 	struct mds_load_t {
1671 	  using clock = dirfrag_load_vec_t::clock;
1672 	  using time = dirfrag_load_vec_t::time;
1673 	
1674 	  dirfrag_load_vec_t auth;
1675 	  dirfrag_load_vec_t all;
1676 	
1677 	  mds_load_t() : auth(DecayRate()), all(DecayRate()) {}
1678 	  mds_load_t(const DecayRate &rate) : auth(rate), all(rate) {}
1679 	
1680 	  double req_rate = 0.0;
1681 	  double cache_hit_rate = 0.0;
1682 	  double queue_len = 0.0;
1683 	
1684 	  double cpu_load_avg = 0.0;
1685 	
1686 	  double mds_load() const;  // defiend in MDBalancer.cc
1687 	  void encode(bufferlist& bl) const;
1688 	  void decode(bufferlist::const_iterator& bl);
1689 	  void dump(Formatter *f) const;
1690 	  static void generate_test_instances(std::list<mds_load_t*>& ls);
1691 	};
1692 	inline void encode(const mds_load_t &c, bufferlist &bl) {
1693 	  c.encode(bl);
1694 	}
1695 	inline void decode(mds_load_t &c, bufferlist::const_iterator &p) {
1696 	  c.decode(p);
1697 	}
1698 	
1699 	inline std::ostream& operator<<(std::ostream& out, const mds_load_t& load)
1700 	{
1701 	  return out << "mdsload<" << load.auth << "/" << load.all
1702 	             << ", req " << load.req_rate 
1703 	             << ", hr " << load.cache_hit_rate
1704 	             << ", qlen " << load.queue_len
1705 		     << ", cpu " << load.cpu_load_avg
1706 	             << ">";
1707 	}
1708 	
1709 	class load_spread_t {
1710 	public:
1711 	  using time = DecayCounter::time;
1712 	  using clock = DecayCounter::clock;
1713 	  static const int MAX = 4;
1714 	  int last[MAX];
1715 	  int p = 0, n = 0;
1716 	  DecayCounter count;
1717 	
1718 	public:
1719 	  load_spread_t() = delete;
1720 	  load_spread_t(const DecayRate &rate) : count(rate)
1721 	  {
1722 	    for (int i=0; i<MAX; i++)
1723 	      last[i] = -1;
1724 	  } 
1725 	
1726 	  double hit(int who) {
1727 	    for (int i=0; i<n; i++)
1728 	      if (last[i] == who) 
1729 		return count.get_last();
1730 	
1731 	    // we're new(ish)
1732 	    last[p++] = who;
1733 	    if (n < MAX) n++;
1734 	    if (n == 1) return 0.0;
1735 	
1736 	    if (p == MAX) p = 0;
1737 	
1738 	    return count.hit();
1739 	  }
1740 	  double get() const {
1741 	    return count.get();
1742 	  }
1743 	};
1744 	
1745 	
1746 	
1747 	// ================================================================
1748 	typedef std::pair<mds_rank_t, mds_rank_t> mds_authority_t;
1749 	
1750 	// -- authority delegation --
1751 	// directory authority types
1752 	//  >= 0 is the auth mds
1753 	#define CDIR_AUTH_PARENT   mds_rank_t(-1)   // default
1754 	#define CDIR_AUTH_UNKNOWN  mds_rank_t(-2)
1755 	#define CDIR_AUTH_DEFAULT  mds_authority_t(CDIR_AUTH_PARENT, CDIR_AUTH_UNKNOWN)
1756 	#define CDIR_AUTH_UNDEF    mds_authority_t(CDIR_AUTH_UNKNOWN, CDIR_AUTH_UNKNOWN)
1757 	//#define CDIR_AUTH_ROOTINODE pair<int,int>( 0, -2)
1758 	
1759 	class MDSCacheObjectInfo {
1760 	public:
1761 	  inodeno_t ino = 0;
1762 	  dirfrag_t dirfrag;
1763 	  string dname;
1764 	  snapid_t snapid;
1765 	
1766 	  MDSCacheObjectInfo() {}
1767 	
1768 	  void encode(bufferlist& bl) const;
1769 	  void decode(bufferlist::const_iterator& bl);
1770 	  void dump(Formatter *f) const;
1771 	  static void generate_test_instances(std::list<MDSCacheObjectInfo*>& ls);
1772 	};
1773 	
1774 	inline std::ostream& operator<<(std::ostream& out, const MDSCacheObjectInfo &info) {
1775 	  if (info.ino) return out << info.ino << "." << info.snapid;
1776 	  if (info.dname.length()) return out << info.dirfrag << "/" << info.dname
1777 	    << " snap " << info.snapid;
1778 	  return out << info.dirfrag;
1779 	}
1780 	
1781 	inline bool operator==(const MDSCacheObjectInfo& l, const MDSCacheObjectInfo& r) {
1782 	  if (l.ino || r.ino)
1783 	    return l.ino == r.ino && l.snapid == r.snapid;
1784 	  else
1785 	    return l.dirfrag == r.dirfrag && l.dname == r.dname;
1786 	}
1787 	WRITE_CLASS_ENCODER(MDSCacheObjectInfo)
1788 	
1789 	
1790 	// parse a map of keys/values.
1791 	namespace qi = boost::spirit::qi;
1792 	
1793 	template <typename Iterator>
1794 	struct keys_and_values
1795 	  : qi::grammar<Iterator, std::map<string, string>()>
1796 	{
1797 	    keys_and_values()
1798 	      : keys_and_values::base_type(query)
1799 	    {
1800 	      query =  pair >> *(qi::lit(' ') >> pair);
1801 	      pair  =  key >> '=' >> value;
1802 	      key   =  qi::char_("a-zA-Z_") >> *qi::char_("a-zA-Z_0-9");
1803 	      value = +qi::char_("a-zA-Z_0-9");
1804 	    }
1805 	    qi::rule<Iterator, std::map<string, string>()> query;
1806 	    qi::rule<Iterator, std::pair<string, string>()> pair;
1807 	    qi::rule<Iterator, string()> key, value;
1808 	};
1809 	
1810 	#endif
1811