1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 * Copyright (C) 2013,2014 Cloudwatt <libre.licensing@cloudwatt.com>
8    	 *
9    	 * Author: Loic Dachary <loic@dachary.org>
10   	 *
11   	 * This is free software; you can redistribute it and/or
12   	 * modify it under the terms of the GNU Lesser General Public
13   	 * License version 2.1, as published by the Free Software
14   	 * Foundation.  See file COPYING.
15   	 *
16   	 */
17   	
18   	#include <algorithm>
19   	#include <optional>
20   	#include <random>
21   	
22   	#include <boost/algorithm/string.hpp>
23   	
24   	#include "OSDMap.h"
25   	#include "common/config.h"
26   	#include "common/errno.h"
27   	#include "common/Formatter.h"
28   	#include "common/TextTable.h"
29   	#include "include/ceph_features.h"
30   	#include "include/str_map.h"
31   	
32   	#include "common/code_environment.h"
33   	#include "mon/health_check.h"
34   	
35   	#include "crush/CrushTreeDumper.h"
36   	#include "common/Clock.h"
37   	#include "mon/PGMap.h"
38   	
39   	using std::list;
40   	using std::make_pair;
41   	using std::map;
42   	using std::multimap;
43   	using std::ostream;
44   	using std::ostringstream;
45   	using std::pair;
46   	using std::set;
47   	using std::string;
48   	using std::stringstream;
49   	using std::unordered_map;
50   	using std::vector;
51   	
52   	using ceph::decode;
53   	using ceph::encode;
54   	using ceph::Formatter;
55   	
56   	#define dout_subsys ceph_subsys_osd
57   	
58   	MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMap, osdmap, osdmap);
59   	MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMap::Incremental, osdmap_inc, osdmap);
60   	
61   	
62   	// ----------------------------------
63   	// osd_info_t
64   	
65   	void osd_info_t::dump(Formatter *f) const
66   	{
67   	  f->dump_int("last_clean_begin", last_clean_begin);
68   	  f->dump_int("last_clean_end", last_clean_end);
69   	  f->dump_int("up_from", up_from);
70   	  f->dump_int("up_thru", up_thru);
71   	  f->dump_int("down_at", down_at);
72   	  f->dump_int("lost_at", lost_at);
73   	}
74   	
75   	void osd_info_t::encode(ceph::buffer::list& bl) const
76   	{
77   	  using ceph::encode;
78   	  __u8 struct_v = 1;
79   	  encode(struct_v, bl);
80   	  encode(last_clean_begin, bl);
81   	  encode(last_clean_end, bl);
82   	  encode(up_from, bl);
83   	  encode(up_thru, bl);
84   	  encode(down_at, bl);
85   	  encode(lost_at, bl);
86   	}
87   	
88   	void osd_info_t::decode(ceph::buffer::list::const_iterator& bl)
89   	{
90   	  using ceph::decode;
91   	  __u8 struct_v;
92   	  decode(struct_v, bl);
93   	  decode(last_clean_begin, bl);
94   	  decode(last_clean_end, bl);
95   	  decode(up_from, bl);
96   	  decode(up_thru, bl);
97   	  decode(down_at, bl);
98   	  decode(lost_at, bl);
99   	}
100  	
101  	void osd_info_t::generate_test_instances(list<osd_info_t*>& o)
102  	{
103  	  o.push_back(new osd_info_t);
104  	  o.push_back(new osd_info_t);
105  	  o.back()->last_clean_begin = 1;
106  	  o.back()->last_clean_end = 2;
107  	  o.back()->up_from = 30;
108  	  o.back()->up_thru = 40;
109  	  o.back()->down_at = 5;
110  	  o.back()->lost_at = 6;
111  	}
112  	
113  	ostream& operator<<(ostream& out, const osd_info_t& info)
114  	{
115  	  out << "up_from " << info.up_from
116  	      << " up_thru " << info.up_thru
117  	      << " down_at " << info.down_at
118  	      << " last_clean_interval [" << info.last_clean_begin << "," << info.last_clean_end << ")";
119  	  if (info.lost_at)
120  	    out << " lost_at " << info.lost_at;
121  	  return out;
122  	}
123  	
124  	// ----------------------------------
125  	// osd_xinfo_t
126  	
127  	void osd_xinfo_t::dump(Formatter *f) const
128  	{
129  	  f->dump_stream("down_stamp") << down_stamp;
130  	  f->dump_float("laggy_probability", laggy_probability);
131  	  f->dump_int("laggy_interval", laggy_interval);
132  	  f->dump_int("features", features);
133  	  f->dump_unsigned("old_weight", old_weight);
134  	  f->dump_stream("last_purged_snaps_scrub") << last_purged_snaps_scrub;
135  	  f->dump_int("dead_epoch", dead_epoch);
136  	}
137  	
138  	void osd_xinfo_t::encode(ceph::buffer::list& bl, uint64_t enc_features) const
139  	{
140  	  uint8_t v = 4;
141  	  if (!HAVE_FEATURE(enc_features, SERVER_OCTOPUS)) {
142  	    v = 3;
143  	  }
144  	  ENCODE_START(v, 1, bl);
145  	  encode(down_stamp, bl);
146  	  __u32 lp = laggy_probability * 0xfffffffful;
147  	  encode(lp, bl);
148  	  encode(laggy_interval, bl);
149  	  encode(features, bl);
150  	  encode(old_weight, bl);
151  	  if (v >= 4) {
152  	    encode(last_purged_snaps_scrub, bl);
153  	    encode(dead_epoch, bl);
154  	  }
155  	  ENCODE_FINISH(bl);
156  	}
157  	
158  	void osd_xinfo_t::decode(ceph::buffer::list::const_iterator& bl)
159  	{
160  	  DECODE_START(4, bl);
161  	  decode(down_stamp, bl);
162  	  __u32 lp;
163  	  decode(lp, bl);
164  	  laggy_probability = (float)lp / (float)0xffffffff;
165  	  decode(laggy_interval, bl);
166  	  if (struct_v >= 2)
167  	    decode(features, bl);
168  	  else
169  	    features = 0;
170  	  if (struct_v >= 3)
171  	    decode(old_weight, bl);
172  	  else
173  	    old_weight = 0;
174  	  if (struct_v >= 4) {
175  	    decode(last_purged_snaps_scrub, bl);
176  	    decode(dead_epoch, bl);
177  	  } else {
178  	    dead_epoch = 0;
179  	  }
180  	  DECODE_FINISH(bl);
181  	}
182  	
183  	void osd_xinfo_t::generate_test_instances(list<osd_xinfo_t*>& o)
184  	{
185  	  o.push_back(new osd_xinfo_t);
186  	  o.push_back(new osd_xinfo_t);
187  	  o.back()->down_stamp = utime_t(2, 3);
188  	  o.back()->laggy_probability = .123;
189  	  o.back()->laggy_interval = 123456;
190  	  o.back()->old_weight = 0x7fff;
191  	}
192  	
193  	ostream& operator<<(ostream& out, const osd_xinfo_t& xi)
194  	{
195  	  return out << "down_stamp " << xi.down_stamp
196  		     << " laggy_probability " << xi.laggy_probability
197  		     << " laggy_interval " << xi.laggy_interval
198  		     << " old_weight " << xi.old_weight
199  		     << " last_purged_snaps_scrub " << xi.last_purged_snaps_scrub
200  		     << " dead_epoch " << xi.dead_epoch;
201  	}
202  	
203  	// ----------------------------------
204  	// OSDMap::Incremental
205  	
206  	int OSDMap::Incremental::get_net_marked_out(const OSDMap *previous) const
207  	{
208  	  int n = 0;
209  	  for (auto &weight : new_weight) {
210  	    if (weight.second == CEPH_OSD_OUT && !previous->is_out(weight.first))
211  	      n++;  // marked out
212  	    else if (weight.second != CEPH_OSD_OUT && previous->is_out(weight.first))
213  	      n--;  // marked in
214  	  }
215  	  return n;
216  	}
217  	
218  	int OSDMap::Incremental::get_net_marked_down(const OSDMap *previous) const
219  	{
220  	  int n = 0;
221  	  for (auto &state : new_state) { // 
222  	    if (state.second & CEPH_OSD_UP) {
223  	      if (previous->is_up(state.first))
224  		n++;  // marked down
225  	      else
226  		n--;  // marked up
227  	    }
228  	  }
229  	  return n;
230  	}
231  	
232  	int OSDMap::Incremental::identify_osd(uuid_d u) const
233  	{
234  	  for (auto &uuid : new_uuid)
235  	    if (uuid.second == u)
236  	      return uuid.first;
237  	  return -1;
238  	}
239  	
240  	int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
241  							  const OSDMap& osdmap)
242  	{
243  	  ceph_assert(epoch == osdmap.get_epoch() + 1);
244  	
245  	  for (auto &new_pool : new_pools) {
246  	    if (!new_pool.second.tiers.empty()) {
247  	      pg_pool_t& base = new_pool.second;
248  	
249  	      auto new_rem_it = new_removed_snaps.find(new_pool.first);
250  	
251  	      for (const auto &tier_pool : base.tiers) {
252  		const auto &r = new_pools.find(tier_pool);
253  		pg_pool_t *tier = 0;
254  		if (r == new_pools.end()) {
255  		  const pg_pool_t *orig = osdmap.get_pg_pool(tier_pool);
256  		  if (!orig) {
257  		    lderr(cct) << __func__ << " no pool " << tier_pool << dendl;
258  		    return -EIO;
259  		  }
260  		  tier = get_new_pool(tier_pool, orig);
261  		} else {
262  		  tier = &r->second;
263  		}
264  		if (tier->tier_of != new_pool.first) {
265  		  lderr(cct) << __func__ << " " << r->first << " tier_of != " << new_pool.first << dendl;
266  		  return -EIO;
267  		}
268  	
269  	        ldout(cct, 10) << __func__ << " from " << new_pool.first << " to "
270  	                       << tier_pool << dendl;
271  		tier->snap_seq = base.snap_seq;
272  		tier->snap_epoch = base.snap_epoch;
273  		tier->snaps = base.snaps;
274  		tier->removed_snaps = base.removed_snaps;
275  		tier->flags |= base.flags & (pg_pool_t::FLAG_SELFMANAGED_SNAPS|
276  					     pg_pool_t::FLAG_POOL_SNAPS);
277  	
278  		if (new_rem_it != new_removed_snaps.end()) {
279  		  new_removed_snaps[tier_pool] = new_rem_it->second;
280  		}
281  	      }
282  	    }
283  	  }
284  	  return 0;
285  	}
286  	
287  	// ----------------------------------
288  	// OSDMap
289  	
290  	bool OSDMap::subtree_is_down(int id, set<int> *down_cache) const
291  	{
292  	  if (id >= 0)
293  	    return is_down(id);
294  	
295  	  if (down_cache &&
296  	      down_cache->count(id)) {
297  	    return true;
298  	  }
299  	
300  	  list<int> children;
301  	  crush->get_children(id, &children);
302  	  for (const auto &child : children) {
303  	    if (!subtree_is_down(child, down_cache)) {
304  	      return false;
305  	    }
306  	  }
307  	  if (down_cache) {
308  	    down_cache->insert(id);
309  	  }
310  	  return true;
311  	}
312  	
313  	bool OSDMap::containing_subtree_is_down(CephContext *cct, int id, int subtree_type, set<int> *down_cache) const
314  	{
315  	  // use a stack-local down_cache if we didn't get one from the
316  	  // caller.  then at least this particular call will avoid duplicated
317  	  // work.
318  	  set<int> local_down_cache;
319  	  if (!down_cache) {
320  	    down_cache = &local_down_cache;
321  	  }
322  	
323  	  int current = id;
324  	  while (true) {
325  	    int type;
326  	    if (current >= 0) {
327  	      type = 0;
328  	    } else {
329  	      type = crush->get_bucket_type(current);
330  	    }
331  	    ceph_assert(type >= 0);
332  	
333  	    if (!subtree_is_down(current, down_cache)) {
334  	      ldout(cct, 30) << "containing_subtree_is_down(" << id << ") = false" << dendl;
335  	      return false;
336  	    }
337  	
338  	    // is this a big enough subtree to be marked as down?
339  	    if (type >= subtree_type) {
340  	      ldout(cct, 30) << "containing_subtree_is_down(" << id << ") = true ... " << type << " >= " << subtree_type << dendl;
341  	      return true;
342  	    }
343  	
344  	    int r = crush->get_immediate_parent_id(current, &current);
345  	    if (r < 0) {
346  	      return false;
347  	    }
348  	  }
349  	}
350  	
351  	bool OSDMap::subtree_type_is_down(
352  	  CephContext *cct,
353  	  int id,
354  	  int subtree_type,
355  	  set<int> *down_in_osds,
356  	  set<int> *up_in_osds,
357  	  set<int> *subtree_up,
358  	  unordered_map<int, set<int> > *subtree_type_down) const
359  	{
360  	  if (id >= 0) {
361  	    bool is_down_ret = is_down(id);
362  	    if (!is_out(id)) {
363  	      if (is_down_ret) {
364  	        down_in_osds->insert(id);
365  	      } else {
366  	        up_in_osds->insert(id);
367  	      }
368  	    }
369  	    return is_down_ret;
370  	  }
371  	
372  	  if (subtree_type_down &&
373  	      (*subtree_type_down)[subtree_type].count(id)) {
374  	    return true;
375  	  }
376  	
377  	  list<int> children;
378  	  crush->get_children(id, &children);
379  	  for (const auto &child : children) {
380  	    if (!subtree_type_is_down(
381  		  cct, child, crush->get_bucket_type(child),
382  		  down_in_osds, up_in_osds, subtree_up, subtree_type_down)) {
383  	      subtree_up->insert(id);
384  	      return false;
385  	    }
386  	  }
387  	  if (subtree_type_down) {
388  	    (*subtree_type_down)[subtree_type].insert(id);
389  	  }
390  	  return true;
391  	}
392  	
393  	void OSDMap::Incremental::encode_client_old(ceph::buffer::list& bl) const
394  	{
395  	  using ceph::encode;
396  	  __u16 v = 5;
397  	  encode(v, bl);
398  	  encode(fsid, bl);
399  	  encode(epoch, bl);
400  	  encode(modified, bl);
401  	  int32_t new_t = new_pool_max;
402  	  encode(new_t, bl);
403  	  encode(new_flags, bl);
404  	  encode(fullmap, bl);
405  	  encode(crush, bl);
406  	
407  	  encode(new_max_osd, bl);
408  	  // for encode(new_pools, bl);
409  	  __u32 n = new_pools.size();
410  	  encode(n, bl);
411  	  for (const auto &new_pool : new_pools) {
412  	    n = new_pool.first;
413  	    encode(n, bl);
414  	    encode(new_pool.second, bl, 0);
415  	  }
416  	  // for encode(new_pool_names, bl);
417  	  n = new_pool_names.size();
418  	  encode(n, bl);
419  	
420  	  for (const auto &new_pool_name : new_pool_names) {
421  	    n = new_pool_name.first;
422  	    encode(n, bl);
423  	    encode(new_pool_name.second, bl);
424  	  }
425  	  // for encode(old_pools, bl);
426  	  n = old_pools.size();
427  	  encode(n, bl);
428  	  for (auto &old_pool : old_pools) {
429  	    n = old_pool;
430  	    encode(n, bl);
431  	  }
432  	  encode(new_up_client, bl, 0);
433  	  {
434  	    // legacy is map<int32_t,uint8_t>
435  	    uint32_t n = new_state.size();
436  	    encode(n, bl);
437  	    for (auto p : new_state) {
438  	      encode(p.first, bl);
439  	      encode((uint8_t)p.second, bl);
440  	    }
441  	  }
442  	  encode(new_weight, bl);
443  	  // for encode(new_pg_temp, bl);
444  	  n = new_pg_temp.size();
445  	  encode(n, bl);
446  	
447  	  for (const auto &pg_temp : new_pg_temp) {
448  	    old_pg_t opg = pg_temp.first.get_old_pg();
449  	    encode(opg, bl);
450  	    encode(pg_temp.second, bl);
451  	  }
452  	}
453  	
454  	void OSDMap::Incremental::encode_classic(ceph::buffer::list& bl, uint64_t features) const
455  	{
456  	  using ceph::encode;
457  	  if ((features & CEPH_FEATURE_PGID64) == 0) {
458  	    encode_client_old(bl);
459  	    return;
460  	  }
461  	
462  	  // base
463  	  __u16 v = 6;
464  	  encode(v, bl);
465  	  encode(fsid, bl);
466  	  encode(epoch, bl);
467  	  encode(modified, bl);
468  	  encode(new_pool_max, bl);
469  	  encode(new_flags, bl);
470  	  encode(fullmap, bl);
471  	  encode(crush, bl);
472  	
473  	  encode(new_max_osd, bl);
474  	  encode(new_pools, bl, features);
475  	  encode(new_pool_names, bl);
476  	  encode(old_pools, bl);
477  	  encode(new_up_client, bl, features);
478  	  {
479  	    uint32_t n = new_state.size();
480  	    encode(n, bl);
481  	    for (auto p : new_state) {
482  	      encode(p.first, bl);
483  	      encode((uint8_t)p.second, bl);
484  	    }
485  	  }
486  	  encode(new_weight, bl);
487  	  encode(new_pg_temp, bl);
488  	
489  	  // extended
490  	  __u16 ev = 10;
491  	  encode(ev, bl);
492  	  encode(new_hb_back_up, bl, features);
493  	  encode(new_up_thru, bl);
494  	  encode(new_last_clean_interval, bl);
495  	  encode(new_lost, bl);
496  	  encode(new_blacklist, bl, features);
497  	  encode(old_blacklist, bl, features);
498  	  encode(new_up_cluster, bl, features);
499  	  encode(cluster_snapshot, bl);
500  	  encode(new_uuid, bl);
501  	  encode(new_xinfo, bl, features);
502  	  encode(new_hb_front_up, bl, features);
503  	}
504  	
505  	template<class T>
506  	static void encode_addrvec_map_as_addr(const T& m, ceph::buffer::list& bl, uint64_t f)
507  	{
508  	  uint32_t n = m.size();
509  	  encode(n, bl);
510  	  for (auto& i : m) {
511  	    encode(i.first, bl);
512  	    encode(i.second.legacy_addr(), bl, f);
513  	  }
514  	}
515  	
516  	template<class T>
517  	static void encode_addrvec_pvec_as_addr(const T& m, ceph::buffer::list& bl, uint64_t f)
518  	{
519  	  uint32_t n = m.size();
520  	  encode(n, bl);
521  	  for (auto& i : m) {
522  	    if (i) {
523  	      encode(i->legacy_addr(), bl, f);
524  	    } else {
525  	      encode(entity_addr_t(), bl, f);
526  	    }
527  	  }
528  	}
529  	
530  	/* for a description of osdmap incremental versions, and when they were
531  	 * introduced, please refer to
532  	 *    doc/dev/osd_internals/osdmap_versions.txt
533  	 */
534  	void OSDMap::Incremental::encode(ceph::buffer::list& bl, uint64_t features) const
535  	{
536  	  using ceph::encode;
537  	  if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
538  	    encode_classic(bl, features);
539  	    return;
540  	  }
541  	
542  	  // only a select set of callers should *ever* be encoding new
543  	  // OSDMaps.  others should be passing around the canonical encoded
544  	  // buffers from on high.  select out those callers by passing in an
545  	  // "impossible" feature bit.
546  	  ceph_assert(features & CEPH_FEATURE_RESERVED);
547  	  features &= ~CEPH_FEATURE_RESERVED;
548  	
549  	  size_t start_offset = bl.length();
550  	  size_t tail_offset;
551  	  size_t crc_offset;
552  	  std::optional<ceph::buffer::list::contiguous_filler> crc_filler;
553  	
554  	  // meta-encoding: how we include client-used and osd-specific data
555  	  ENCODE_START(8, 7, bl);
556  	
557  	  {
558  	    uint8_t v = 8;
559  	    if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
560  	      v = 3;
561  	    } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
562  	      v = 5;
563  	    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
564  	      v = 6;
565  	    }
566  	    ENCODE_START(v, 1, bl); // client-usable data
567  	    encode(fsid, bl);
568  	    encode(epoch, bl);
569  	    encode(modified, bl);
570  	    encode(new_pool_max, bl);
571  	    encode(new_flags, bl);
572  	    encode(fullmap, bl);
573  	    encode(crush, bl);
574  	
575  	    encode(new_max_osd, bl);
576  	    encode(new_pools, bl, features);
577  	    encode(new_pool_names, bl);
578  	    encode(old_pools, bl);
579  	    if (v >= 7) {
580  	      encode(new_up_client, bl, features);
581  	    } else {
582  	      encode_addrvec_map_as_addr(new_up_client, bl, features);
583  	    }
584  	    if (v >= 5) {
585  	      encode(new_state, bl);
586  	    } else {
587  	      uint32_t n = new_state.size();
588  	      encode(n, bl);
589  	      for (auto p : new_state) {
590  		encode(p.first, bl);
591  		encode((uint8_t)p.second, bl);
592  	      }
593  	    }
594  	    encode(new_weight, bl);
595  	    encode(new_pg_temp, bl);
596  	    encode(new_primary_temp, bl);
597  	    encode(new_primary_affinity, bl);
598  	    encode(new_erasure_code_profiles, bl);
599  	    encode(old_erasure_code_profiles, bl);
600  	    if (v >= 4) {
601  	      encode(new_pg_upmap, bl);
602  	      encode(old_pg_upmap, bl);
603  	      encode(new_pg_upmap_items, bl);
604  	      encode(old_pg_upmap_items, bl);
605  	    }
606  	    if (v >= 6) {
607  	      encode(new_removed_snaps, bl);
608  	      encode(new_purged_snaps, bl);
609  	    }
610  	    if (v >= 8) {
611  	      encode(new_last_up_change, bl);
612  	      encode(new_last_in_change, bl);
613  	    }
614  	    ENCODE_FINISH(bl); // client-usable data
615  	  }
616  	
617  	  {
618  	    uint8_t target_v = 9;
619  	    if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
620  	      target_v = 2;
621  	    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
622  	      target_v = 6;
623  	    }
624  	    ENCODE_START(target_v, 1, bl); // extended, osd-only data
625  	    if (target_v < 7) {
626  	      encode_addrvec_map_as_addr(new_hb_back_up, bl, features);
627  	    } else {
628  	      encode(new_hb_back_up, bl, features);
629  	    }
630  	    encode(new_up_thru, bl);
631  	    encode(new_last_clean_interval, bl);
632  	    encode(new_lost, bl);
633  	    encode(new_blacklist, bl, features);
634  	    encode(old_blacklist, bl, features);
635  	    if (target_v < 7) {
636  	      encode_addrvec_map_as_addr(new_up_cluster, bl, features);
637  	    } else {
638  	      encode(new_up_cluster, bl, features);
639  	    }
640  	    encode(cluster_snapshot, bl);
641  	    encode(new_uuid, bl);
642  	    encode(new_xinfo, bl, features);
643  	    if (target_v < 7) {
644  	      encode_addrvec_map_as_addr(new_hb_front_up, bl, features);
645  	    } else {
646  	      encode(new_hb_front_up, bl, features);
647  	    }
648  	    encode(features, bl);         // NOTE: features arg, not the member
649  	    if (target_v >= 3) {
650  	      encode(new_nearfull_ratio, bl);
651  	      encode(new_full_ratio, bl);
652  	      encode(new_backfillfull_ratio, bl);
653  	    }
654  	    // 5 was string-based new_require_min_compat_client
655  	    if (target_v >= 6) {
656  	      encode(new_require_min_compat_client, bl);
657  	      encode(new_require_osd_release, bl);
658  	    }
659  	    if (target_v >= 8) {
660  	      encode(new_crush_node_flags, bl);
661  	    }
662  	    if (target_v >= 9) {
663  	      encode(new_device_class_flags, bl);
664  	    }
665  	    ENCODE_FINISH(bl); // osd-only data
666  	  }
667  	
668  	  crc_offset = bl.length();
669  	  crc_filler = bl.append_hole(sizeof(uint32_t));
670  	  tail_offset = bl.length();
671  	
672  	  encode(full_crc, bl);
673  	
674  	  ENCODE_FINISH(bl); // meta-encoding wrapper
675  	
676  	  // fill in crc
677  	  ceph::buffer::list front;
678  	  front.substr_of(bl, start_offset, crc_offset - start_offset);
679  	  inc_crc = front.crc32c(-1);
680  	  ceph::buffer::list tail;
681  	  tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
682  	  inc_crc = tail.crc32c(inc_crc);
683  	  ceph_le32 crc_le;
684  	  crc_le = inc_crc;
685  	  crc_filler->copy_in(4u, (char*)&crc_le);
686  	  have_crc = true;
687  	}
688  	
689  	void OSDMap::Incremental::decode_classic(ceph::buffer::list::const_iterator &p)
690  	{
691  	  using ceph::decode;
692  	  __u32 n, t;
693  	  // base
694  	  __u16 v;
695  	  decode(v, p);
696  	  decode(fsid, p);
697  	  decode(epoch, p);
698  	  decode(modified, p);
699  	  if (v == 4 || v == 5) {
700  	    decode(n, p);
701  	    new_pool_max = n;
702  	  } else if (v >= 6)
703  	    decode(new_pool_max, p);
704  	  decode(new_flags, p);
705  	  decode(fullmap, p);
706  	  decode(crush, p);
707  	
708  	  decode(new_max_osd, p);
709  	  if (v < 6) {
710  	    new_pools.clear();
711  	    decode(n, p);
712  	    while (n--) {
713  	      decode(t, p);
714  	      decode(new_pools[t], p);
715  	    }
716  	  } else {
717  	    decode(new_pools, p);
718  	  }
719  	  if (v == 5) {
720  	    new_pool_names.clear();
721  	    decode(n, p);
722  	    while (n--) {
723  	      decode(t, p);
724  	      decode(new_pool_names[t], p);
725  	    }
726  	  } else if (v >= 6) {
727  	    decode(new_pool_names, p);
728  	  }
729  	  if (v < 6) {
730  	    old_pools.clear();
731  	    decode(n, p);
732  	    while (n--) {
733  	      decode(t, p);
734  	      old_pools.insert(t);
735  	    }
736  	  } else {
737  	    decode(old_pools, p);
738  	  }
739  	  decode(new_up_client, p);
740  	  {
741  	    map<int32_t,uint8_t> ns;
742  	    decode(ns, p);
743  	    for (auto q : ns) {
744  	      new_state[q.first] = q.second;
745  	    }
746  	  }
747  	  decode(new_weight, p);
748  	
749  	  if (v < 6) {
750  	    new_pg_temp.clear();
751  	    decode(n, p);
752  	    while (n--) {
753  	      old_pg_t opg;
754  	      ceph::decode_raw(opg, p);
755  	      decode(new_pg_temp[pg_t(opg)], p);
756  	    }
757  	  } else {
758  	    decode(new_pg_temp, p);
759  	  }
760  	
761  	  // decode short map, too.
762  	  if (v == 5 && p.end())
763  	    return;
764  	
765  	  // extended
766  	  __u16 ev = 0;
767  	  if (v >= 5)
768  	    decode(ev, p);
769  	  decode(new_hb_back_up, p);
770  	  if (v < 5)
771  	    decode(new_pool_names, p);
772  	  decode(new_up_thru, p);
773  	  decode(new_last_clean_interval, p);
774  	  decode(new_lost, p);
775  	  decode(new_blacklist, p);
776  	  decode(old_blacklist, p);
777  	  if (ev >= 6)
778  	    decode(new_up_cluster, p);
779  	  if (ev >= 7)
780  	    decode(cluster_snapshot, p);
781  	  if (ev >= 8)
782  	    decode(new_uuid, p);
783  	  if (ev >= 9)
784  	    decode(new_xinfo, p);
785  	  if (ev >= 10)
786  	    decode(new_hb_front_up, p);
787  	}
788  	
789  	/* for a description of osdmap incremental versions, and when they were
790  	 * introduced, please refer to
791  	 *    doc/dev/osd_internals/osdmap_versions.txt
792  	 */
793  	void OSDMap::Incremental::decode(ceph::buffer::list::const_iterator& bl)
794  	{
795  	  using ceph::decode;
796  	  /**
797  	   * Older encodings of the Incremental had a single struct_v which
798  	   * covered the whole encoding, and was prior to our modern
799  	   * stuff which includes a compatv and a size. So if we see
800  	   * a struct_v < 7, we must rewind to the beginning and use our
801  	   * classic decoder.
802  	   */
803  	  size_t start_offset = bl.get_off();
804  	  size_t tail_offset = 0;
805  	  ceph::buffer::list crc_front, crc_tail;
806  	
807  	  DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
808  	  if (struct_v < 7) {
809  	    bl.seek(start_offset);
810  	    decode_classic(bl);
811  	    encode_features = 0;
812  	    if (struct_v >= 6)
813  	      encode_features = CEPH_FEATURE_PGID64;
814  	    else
815  	      encode_features = 0;
816  	    return;
817  	  }
818  	  {
819  	    DECODE_START(8, bl); // client-usable data
820  	    decode(fsid, bl);
821  	    decode(epoch, bl);
822  	    decode(modified, bl);
823  	    decode(new_pool_max, bl);
824  	    decode(new_flags, bl);
825  	    decode(fullmap, bl);
826  	    decode(crush, bl);
827  	
828  	    decode(new_max_osd, bl);
829  	    decode(new_pools, bl);
830  	    decode(new_pool_names, bl);
831  	    decode(old_pools, bl);
832  	    decode(new_up_client, bl);
833  	    if (struct_v >= 5) {
834  	      decode(new_state, bl);
835  	    } else {
836  	      map<int32_t,uint8_t> ns;
837  	      decode(ns, bl);
838  	      for (auto q : ns) {
839  		new_state[q.first] = q.second;
840  	      }
841  	    }
842  	    decode(new_weight, bl);
843  	    decode(new_pg_temp, bl);
844  	    decode(new_primary_temp, bl);
845  	    if (struct_v >= 2)
846  	      decode(new_primary_affinity, bl);
847  	    else
848  	      new_primary_affinity.clear();
849  	    if (struct_v >= 3) {
850  	      decode(new_erasure_code_profiles, bl);
851  	      decode(old_erasure_code_profiles, bl);
852  	    } else {
853  	      new_erasure_code_profiles.clear();
854  	      old_erasure_code_profiles.clear();
855  	    }
856  	    if (struct_v >= 4) {
857  	      decode(new_pg_upmap, bl);
858  	      decode(old_pg_upmap, bl);
859  	      decode(new_pg_upmap_items, bl);
860  	      decode(old_pg_upmap_items, bl);
861  	    }
862  	    if (struct_v >= 6) {
863  	      decode(new_removed_snaps, bl);
864  	      decode(new_purged_snaps, bl);
865  	    }
866  	    if (struct_v >= 8) {
867  	      decode(new_last_up_change, bl);
868  	      decode(new_last_in_change, bl);
869  	    }
870  	    DECODE_FINISH(bl); // client-usable data
871  	  }
872  	
873  	  {
874  	    DECODE_START(9, bl); // extended, osd-only data
875  	    decode(new_hb_back_up, bl);
876  	    decode(new_up_thru, bl);
877  	    decode(new_last_clean_interval, bl);
878  	    decode(new_lost, bl);
879  	    decode(new_blacklist, bl);
880  	    decode(old_blacklist, bl);
881  	    decode(new_up_cluster, bl);
882  	    decode(cluster_snapshot, bl);
883  	    decode(new_uuid, bl);
884  	    decode(new_xinfo, bl);
885  	    decode(new_hb_front_up, bl);
886  	    if (struct_v >= 2)
887  	      decode(encode_features, bl);
888  	    else
889  	      encode_features = CEPH_FEATURE_PGID64 | CEPH_FEATURE_OSDMAP_ENC;
890  	    if (struct_v >= 3) {
891  	      decode(new_nearfull_ratio, bl);
892  	      decode(new_full_ratio, bl);
893  	    } else {
894  	      new_nearfull_ratio = -1;
895  	      new_full_ratio = -1;
896  	    }
897  	    if (struct_v >= 4) {
898  	      decode(new_backfillfull_ratio, bl);
899  	    } else {
900  	      new_backfillfull_ratio = -1;
901  	    }
902  	    if (struct_v == 5) {
903  	      string r;
904  	      decode(r, bl);
905  	      if (r.length()) {
906  		new_require_min_compat_client = ceph_release_from_name(r);
907  	      }
908  	    }
909  	    if (struct_v >= 6) {
910  	      decode(new_require_min_compat_client, bl);
911  	      decode(new_require_osd_release, bl);
912  	    } else {
913  	      if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
914  		// only for compat with post-kraken pre-luminous test clusters
915  		new_require_osd_release = ceph_release_t::luminous;
916  		new_flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
917  	      } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_KRAKEN)) {
918  		new_require_osd_release = ceph_release_t::kraken;
919  	      } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_JEWEL)) {
920  		new_require_osd_release = ceph_release_t::jewel;
921  	      } else {
922  		new_require_osd_release = ceph_release_t::unknown;
923  	      }
924  	    }
925  	    if (struct_v >= 8) {
926  	      decode(new_crush_node_flags, bl);
927  	    }
928  	    if (struct_v >= 9) {
929  	      decode(new_device_class_flags, bl);
930  	    }
931  	    DECODE_FINISH(bl); // osd-only data
932  	  }
933  	
934  	  if (struct_v >= 8) {
935  	    have_crc = true;
936  	    crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
937  	    decode(inc_crc, bl);
938  	    tail_offset = bl.get_off();
939  	    decode(full_crc, bl);
940  	  } else {
941  	    have_crc = false;
942  	    full_crc = 0;
943  	    inc_crc = 0;
944  	  }
945  	
946  	  DECODE_FINISH(bl); // wrapper
947  	
948  	  if (have_crc) {
949  	    // verify crc
950  	    uint32_t actual = crc_front.crc32c(-1);
951  	    if (tail_offset < bl.get_off()) {
952  	      ceph::buffer::list tail;
953  	      tail.substr_of(bl.get_bl(), tail_offset, bl.get_off() - tail_offset);
954  	      actual = tail.crc32c(actual);
955  	    }
956  	    if (inc_crc != actual) {
957  	      ostringstream ss;
958  	      ss << "bad crc, actual " << actual << " != expected " << inc_crc;
959  	      string s = ss.str();
960  	      throw ceph::buffer::malformed_input(s.c_str());
961  	    }
962  	  }
963  	}
964  	
965  	void OSDMap::Incremental::dump(Formatter *f) const
966  	{
967  	  f->dump_int("epoch", epoch);
968  	  f->dump_stream("fsid") << fsid;
969  	  f->dump_stream("modified") << modified;
970  	  f->dump_stream("new_last_up_change") << new_last_up_change;
971  	  f->dump_stream("new_last_in_change") << new_last_in_change;
972  	  f->dump_int("new_pool_max", new_pool_max);
973  	  f->dump_int("new_flags", new_flags);
974  	  f->dump_float("new_full_ratio", new_full_ratio);
975  	  f->dump_float("new_nearfull_ratio", new_nearfull_ratio);
976  	  f->dump_float("new_backfillfull_ratio", new_backfillfull_ratio);
977  	  f->dump_int("new_require_min_compat_client", ceph::to_integer<int>(new_require_min_compat_client));
978  	  f->dump_int("new_require_osd_release", ceph::to_integer<int>(new_require_osd_release));
979  	
980  	  if (fullmap.length()) {
981  	    f->open_object_section("full_map");
982  	    OSDMap full;
983  	    ceph::buffer::list fbl = fullmap;  // kludge around constness.
984  	    auto p = fbl.cbegin();
985  	    full.decode(p);
986  	    full.dump(f);
987  	    f->close_section();
988  	  }
989  	  if (crush.length()) {
990  	    f->open_object_section("crush");
991  	    CrushWrapper c;
992  	    ceph::buffer::list tbl = crush;  // kludge around constness.
993  	    auto p = tbl.cbegin();
994  	    c.decode(p);
995  	    c.dump(f);
996  	    f->close_section();
997  	  }
998  	
999  	  f->dump_int("new_max_osd", new_max_osd);
1000 	
1001 	  f->open_array_section("new_pools");
1002 	
1003 	  for (const auto &new_pool : new_pools) {
1004 	    f->open_object_section("pool");
1005 	    f->dump_int("pool", new_pool.first);
1006 	    new_pool.second.dump(f);
1007 	    f->close_section();
1008 	  }
1009 	  f->close_section();
1010 	  f->open_array_section("new_pool_names");
1011 	
1012 	  for (const auto &new_pool_name : new_pool_names) {
1013 	    f->open_object_section("pool_name");
1014 	    f->dump_int("pool", new_pool_name.first);
1015 	    f->dump_string("name", new_pool_name.second);
1016 	    f->close_section();
1017 	  }
1018 	  f->close_section();
1019 	  f->open_array_section("old_pools");
1020 	
1021 	  for (const auto &old_pool : old_pools)
1022 	    f->dump_int("pool", old_pool);
1023 	  f->close_section();
1024 	
1025 	  f->open_array_section("new_up_osds");
1026 	
1027 	  for (const auto &upclient : new_up_client) {
1028 	    f->open_object_section("osd");
1029 	    f->dump_int("osd", upclient.first);
1030 	    f->dump_stream("public_addr") << upclient.second.legacy_addr();
1031 	    f->dump_object("public_addrs", upclient.second);
1032 	    if (auto p = new_up_cluster.find(upclient.first);
1033 		p != new_up_cluster.end()) {
1034 	      f->dump_stream("cluster_addr") << p->second.legacy_addr();
1035 	      f->dump_object("cluster_addrs", p->second);
1036 	    }
1037 	    if (auto p = new_hb_back_up.find(upclient.first);
1038 		p != new_hb_back_up.end()) {
1039 	      f->dump_object("heartbeat_back_addrs", p->second);
1040 	    }
1041 	    if (auto p = new_hb_front_up.find(upclient.first);
1042 		p != new_hb_front_up.end()) {
1043 	      f->dump_object("heartbeat_front_addrs", p->second);
1044 	    }
1045 	    f->close_section();
1046 	  }
1047 	  f->close_section();
1048 	
1049 	  f->open_array_section("new_weight");
1050 	
1051 	  for (const auto &weight : new_weight) {
1052 	    f->open_object_section("osd");
1053 	    f->dump_int("osd", weight.first);
1054 	    f->dump_int("weight", weight.second);
1055 	    f->close_section();
1056 	  }
1057 	  f->close_section();
1058 	
1059 	  f->open_array_section("osd_state_xor");
1060 	  for (const auto &ns : new_state) {
1061 	    f->open_object_section("osd");
1062 	    f->dump_int("osd", ns.first);
1063 	    set<string> st;
1064 	    calc_state_set(new_state.find(ns.first)->second, st);
1065 	    f->open_array_section("state_xor");
1066 	    for (auto &state : st)
1067 	      f->dump_string("state", state);
1068 	    f->close_section();
1069 	    f->close_section();
1070 	  }
1071 	  f->close_section();
1072 	
1073 	  f->open_array_section("new_pg_temp");
1074 	
1075 	  for (const auto &pg_temp : new_pg_temp) {
1076 	    f->open_object_section("pg");
1077 	    f->dump_stream("pgid") << pg_temp.first;
1078 	    f->open_array_section("osds");
1079 	
1080 	    for (const auto &osd : pg_temp.second)
1081 	      f->dump_int("osd", osd);
1082 	    f->close_section();
1083 	    f->close_section();    
1084 	  }
1085 	  f->close_section();
1086 	
1087 	  f->open_array_section("primary_temp");
1088 	
1089 	  for (const auto &primary_temp : new_primary_temp) {
1090 	    f->dump_stream("pgid") << primary_temp.first;
1091 	    f->dump_int("osd", primary_temp.second);
1092 	  }
1093 	  f->close_section(); // primary_temp
1094 	
1095 	  f->open_array_section("new_pg_upmap");
1096 	  for (auto& i : new_pg_upmap) {
1097 	    f->open_object_section("mapping");
1098 	    f->dump_stream("pgid") << i.first;
1099 	    f->open_array_section("osds");
1100 	    for (auto osd : i.second) {
1101 	      f->dump_int("osd", osd);
1102 	    }
1103 	    f->close_section();
1104 	    f->close_section();
1105 	  }
1106 	  f->close_section();
1107 	  f->open_array_section("old_pg_upmap");
1108 	  for (auto& i : old_pg_upmap) {
1109 	    f->dump_stream("pgid") << i;
1110 	  }
1111 	  f->close_section();
1112 	
1113 	  f->open_array_section("new_pg_upmap_items");
1114 	  for (auto& i : new_pg_upmap_items) {
1115 	    f->open_object_section("mapping");
1116 	    f->dump_stream("pgid") << i.first;
1117 	    f->open_array_section("mappings");
1118 	    for (auto& p : i.second) {
1119 	      f->open_object_section("mapping");
1120 	      f->dump_int("from", p.first);
1121 	      f->dump_int("to", p.second);
1122 	      f->close_section();
1123 	    }
1124 	    f->close_section();
1125 	    f->close_section();
1126 	  }
1127 	  f->close_section();
1128 	  f->open_array_section("old_pg_upmap_items");
1129 	  for (auto& i : old_pg_upmap_items) {
1130 	    f->dump_stream("pgid") << i;
1131 	  }
1132 	  f->close_section();
1133 	
1134 	  f->open_array_section("new_up_thru");
1135 	
1136 	  for (const auto &up_thru : new_up_thru) {
1137 	    f->open_object_section("osd");
1138 	    f->dump_int("osd", up_thru.first);
1139 	    f->dump_int("up_thru", up_thru.second);
1140 	    f->close_section();
1141 	  }
1142 	  f->close_section();
1143 	
1144 	  f->open_array_section("new_lost");
1145 	
1146 	  for (const auto &lost : new_lost) {
1147 	    f->open_object_section("osd");
1148 	    f->dump_int("osd", lost.first);
1149 	    f->dump_int("epoch_lost", lost.second);
1150 	    f->close_section();
1151 	  }
1152 	  f->close_section();
1153 	
1154 	  f->open_array_section("new_last_clean_interval");
1155 	
1156 	  for (const auto &last_clean_interval : new_last_clean_interval) {
1157 	    f->open_object_section("osd");
1158 	    f->dump_int("osd", last_clean_interval.first);
1159 	    f->dump_int("first", last_clean_interval.second.first);
1160 	    f->dump_int("last", last_clean_interval.second.second);
1161 	    f->close_section();
1162 	  }
1163 	  f->close_section();
1164 	
1165 	  f->open_array_section("new_blacklist");
1166 	  for (const auto &blist : new_blacklist) {
1167 	    stringstream ss;
1168 	    ss << blist.first;
1169 	    f->dump_stream(ss.str().c_str()) << blist.second;
1170 	  }
1171 	  f->close_section();
1172 	  f->open_array_section("old_blacklist");
1173 	  for (const auto &blist : old_blacklist)
1174 	    f->dump_stream("addr") << blist;
1175 	  f->close_section();
1176 	
1177 	  f->open_array_section("new_xinfo");
1178 	  for (const auto &xinfo : new_xinfo) {
1179 	    f->open_object_section("xinfo");
1180 	    f->dump_int("osd", xinfo.first);
1181 	    xinfo.second.dump(f);
1182 	    f->close_section();
1183 	  }
1184 	  f->close_section();
1185 	
1186 	  if (cluster_snapshot.size())
1187 	    f->dump_string("cluster_snapshot", cluster_snapshot);
1188 	
1189 	  f->open_array_section("new_uuid");
1190 	  for (const auto &uuid : new_uuid) {
1191 	    f->open_object_section("osd");
1192 	    f->dump_int("osd", uuid.first);
1193 	    f->dump_stream("uuid") << uuid.second;
1194 	    f->close_section();
1195 	  }
1196 	  f->close_section();
1197 	
1198 	  OSDMap::dump_erasure_code_profiles(new_erasure_code_profiles, f);
1199 	  f->open_array_section("old_erasure_code_profiles");
1200 	  for (const auto &erasure_code_profile : old_erasure_code_profiles) {
1201 	    f->dump_string("old", erasure_code_profile.c_str());
1202 	  }
1203 	  f->close_section();
1204 	
1205 	  f->open_array_section("new_removed_snaps");
1206 	  for (auto& p : new_removed_snaps) {
1207 	    f->open_object_section("pool");
1208 	    f->dump_int("pool", p.first);
1209 	    f->open_array_section("snaps");
1210 	    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
1211 	      f->open_object_section("interval");
1212 	      f->dump_unsigned("begin", q.get_start());
1213 	      f->dump_unsigned("length", q.get_len());
1214 	      f->close_section();
1215 	    }
1216 	    f->close_section();
1217 	    f->close_section();
1218 	  }
1219 	  f->close_section();
1220 	  f->open_array_section("new_purged_snaps");
1221 	  for (auto& p : new_purged_snaps) {
1222 	    f->open_object_section("pool");
1223 	    f->dump_int("pool", p.first);
1224 	    f->open_array_section("snaps");
1225 	    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
1226 	      f->open_object_section("interval");
1227 	      f->dump_unsigned("begin", q.get_start());
1228 	      f->dump_unsigned("length", q.get_len());
1229 	      f->close_section();
1230 	    }
1231 	    f->close_section();
1232 	    f->close_section();
1233 	  }
1234 	  f->open_array_section("new_crush_node_flags");
1235 	  for (auto& i : new_crush_node_flags) {
1236 	    f->open_object_section("node");
1237 	    f->dump_int("id", i.first);
1238 	    set<string> st;
1239 	    calc_state_set(i.second, st);
1240 	    for (auto& j : st) {
1241 	      f->dump_string("flag", j);
1242 	    }
1243 	    f->close_section();
1244 	  }
1245 	  f->close_section();
1246 	  f->open_array_section("new_device_class_flags");
1247 	  for (auto& i : new_device_class_flags) {
1248 	    f->open_object_section("device_class");
1249 	    f->dump_int("id", i.first);
1250 	    set<string> st;
1251 	    calc_state_set(i.second, st);
1252 	    for (auto& j : st) {
1253 	      f->dump_string("flag", j);
1254 	    }
1255 	    f->close_section();
1256 	  }
1257 	  f->close_section();
1258 	  f->close_section();
1259 	}
1260 	
1261 	void OSDMap::Incremental::generate_test_instances(list<Incremental*>& o)
1262 	{
1263 	  o.push_back(new Incremental);
1264 	}
1265 	
1266 	// ----------------------------------
1267 	// OSDMap
1268 	
1269 	void OSDMap::set_epoch(epoch_t e)
1270 	{
1271 	  epoch = e;
1272 	  for (auto &pool : pools)
1273 	    pool.second.last_change = e;
1274 	}
1275 	
1276 	bool OSDMap::is_blacklisted(const entity_addr_t& orig) const
1277 	{
1278 	  if (blacklist.empty()) {
1279 	    return false;
1280 	  }
1281 	
1282 	  // all blacklist entries are type ANY for nautilus+
1283 	  // FIXME: avoid this copy!
1284 	  entity_addr_t a = orig;
1285 	  if (require_osd_release < ceph_release_t::nautilus) {
1286 	    a.set_type(entity_addr_t::TYPE_LEGACY);
1287 	  } else {
1288 	    a.set_type(entity_addr_t::TYPE_ANY);
1289 	  }
1290 	
1291 	  // this specific instance?
1292 	  if (blacklist.count(a)) {
1293 	    return true;
1294 	  }
1295 	
1296 	  // is entire ip blacklisted?
1297 	  if (a.is_ip()) {
1298 	    a.set_port(0);
1299 	    a.set_nonce(0);
1300 	    if (blacklist.count(a)) {
1301 	      return true;
1302 	    }
1303 	  }
1304 	
1305 	  return false;
1306 	}
1307 	
1308 	bool OSDMap::is_blacklisted(const entity_addrvec_t& av) const
1309 	{
1310 	  if (blacklist.empty())
1311 	    return false;
1312 	
1313 	  for (auto& a : av.v) {
1314 	    if (is_blacklisted(a)) {
1315 	      return true;
1316 	    }
1317 	  }
1318 	
1319 	  return false;
1320 	}
1321 	
1322 	void OSDMap::get_blacklist(list<pair<entity_addr_t,utime_t> > *bl) const
1323 	{
1324 	   std::copy(blacklist.begin(), blacklist.end(), std::back_inserter(*bl));
1325 	}
1326 	
1327 	void OSDMap::get_blacklist(std::set<entity_addr_t> *bl) const
1328 	{
1329 	  for (const auto &i : blacklist) {
1330 	    bl->insert(i.first);
1331 	  }
1332 	}
1333 	
1334 	void OSDMap::set_max_osd(int m)
1335 	{
1336 	  int o = max_osd;
1337 	  max_osd = m;
1338 	  osd_state.resize(m);
1339 	  osd_weight.resize(m);
1340 	  for (; o<max_osd; o++) {
1341 	    osd_state[o] = 0;
1342 	    osd_weight[o] = CEPH_OSD_OUT;
1343 	  }
1344 	  osd_info.resize(m);
1345 	  osd_xinfo.resize(m);
1346 	  osd_addrs->client_addrs.resize(m);
1347 	  osd_addrs->cluster_addrs.resize(m);
1348 	  osd_addrs->hb_back_addrs.resize(m);
1349 	  osd_addrs->hb_front_addrs.resize(m);
1350 	  osd_uuid->resize(m);
1351 	  if (osd_primary_affinity)
1352 	    osd_primary_affinity->resize(m, CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1353 	
1354 	  calc_num_osds();
1355 	}
1356 	
1357 	int OSDMap::calc_num_osds()
1358 	{
1359 	  num_osd = 0;
1360 	  num_up_osd = 0;
1361 	  num_in_osd = 0;
1362 	  for (int i=0; i<max_osd; i++) {
1363 	    if (osd_state[i] & CEPH_OSD_EXISTS) {
1364 	      ++num_osd;
1365 	      if (osd_state[i] & CEPH_OSD_UP) {
1366 		++num_up_osd;
1367 	      }
1368 	      if (get_weight(i) != CEPH_OSD_OUT) {
1369 		++num_in_osd;
1370 	      }
1371 	    }
1372 	  }
1373 	  return num_osd;
1374 	}
1375 	
1376 	void OSDMap::get_full_pools(CephContext *cct,
1377 	                            set<int64_t> *full,
1378 	                            set<int64_t> *backfillfull,
1379 	                            set<int64_t> *nearfull) const
1380 	{
1381 	  ceph_assert(full);
1382 	  ceph_assert(backfillfull);
1383 	  ceph_assert(nearfull);
1384 	  full->clear();
1385 	  backfillfull->clear();
1386 	  nearfull->clear();
1387 	
1388 	  vector<int> full_osds;
1389 	  vector<int> backfillfull_osds;
1390 	  vector<int> nearfull_osds;
1391 	  for (int i = 0; i < max_osd; ++i) {
1392 	    if (exists(i) && is_up(i) && is_in(i)) {
1393 	      if (osd_state[i] & CEPH_OSD_FULL)
1394 	        full_osds.push_back(i);
1395 	      else if (osd_state[i] & CEPH_OSD_BACKFILLFULL)
1396 		backfillfull_osds.push_back(i);
1397 	      else if (osd_state[i] & CEPH_OSD_NEARFULL)
1398 		nearfull_osds.push_back(i);
1399 	    }
1400 	  }
1401 	
1402 	  for (auto i: full_osds) {
1403 	    get_pool_ids_by_osd(cct, i, full);
1404 	  }
1405 	  for (auto i: backfillfull_osds) {
1406 	    get_pool_ids_by_osd(cct, i, backfillfull);
1407 	  }
1408 	  for (auto i: nearfull_osds) {
1409 	    get_pool_ids_by_osd(cct, i, nearfull);
1410 	  }
1411 	}
1412 	
1413 	void OSDMap::get_full_osd_counts(set<int> *full, set<int> *backfill,
1414 					 set<int> *nearfull) const
1415 	{
1416 	  full->clear();
1417 	  backfill->clear();
1418 	  nearfull->clear();
1419 	  for (int i = 0; i < max_osd; ++i) {
1420 	    if (exists(i) && is_up(i) && is_in(i)) {
1421 	      if (osd_state[i] & CEPH_OSD_FULL)
1422 		full->emplace(i);
1423 	      else if (osd_state[i] & CEPH_OSD_BACKFILLFULL)
1424 		backfill->emplace(i);
1425 	      else if (osd_state[i] & CEPH_OSD_NEARFULL)
1426 		nearfull->emplace(i);
1427 	    }
1428 	  }
1429 	}
1430 	
1431 	void OSDMap::get_all_osds(set<int32_t>& ls) const
1432 	{
1433 	  for (int i=0; i<max_osd; i++)
1434 	    if (exists(i))
1435 	      ls.insert(i);
1436 	}
1437 	
1438 	void OSDMap::get_up_osds(set<int32_t>& ls) const
1439 	{
1440 	  for (int i = 0; i < max_osd; i++) {
1441 	    if (is_up(i))
1442 	      ls.insert(i);
1443 	  }
1444 	}
1445 	
1446 	void OSDMap::get_out_existing_osds(set<int32_t>& ls) const
1447 	{
1448 	  for (int i = 0; i < max_osd; i++) {
1449 	    if (exists(i) && get_weight(i) == CEPH_OSD_OUT)
1450 	      ls.insert(i);
1451 	  }
1452 	}
1453 	
1454 	void OSDMap::get_flag_set(set<string> *flagset) const
1455 	{
1456 	  for (unsigned i = 0; i < sizeof(flags) * 8; ++i) {
1457 	    if (flags & (1<<i)) {
1458 	      flagset->insert(get_flag_string(flags & (1<<i)));
1459 	    }
1460 	  }
1461 	}
1462 	
1463 	void OSDMap::calc_state_set(int state, set<string>& st)
1464 	{
1465 	  unsigned t = state;
1466 	  for (unsigned s = 1; t; s <<= 1) {
1467 	    if (t & s) {
1468 	      t &= ~s;
1469 	      st.insert(ceph_osd_state_name(s));
1470 	    }
1471 	  }
1472 	}
1473 	
1474 	void OSDMap::adjust_osd_weights(const map<int,double>& weights, Incremental& inc) const
1475 	{
1476 	  float max = 0;
1477 	  for (const auto &weight : weights) {
1478 	    if (weight.second > max)
1479 	      max = weight.second;
1480 	  }
1481 	
1482 	  for (const auto &weight : weights) {
1483 	    inc.new_weight[weight.first] = (unsigned)((weight.second / max) * CEPH_OSD_IN);
1484 	  }
1485 	}
1486 	
1487 	int OSDMap::identify_osd(const entity_addr_t& addr) const
1488 	{
1489 	  for (int i=0; i<max_osd; i++)
1490 	    if (exists(i) && (get_addrs(i).contains(addr) ||
1491 			      get_cluster_addrs(i).contains(addr)))
1492 	      return i;
1493 	  return -1;
1494 	}
1495 	
1496 	int OSDMap::identify_osd(const uuid_d& u) const
1497 	{
1498 	  for (int i=0; i<max_osd; i++)
1499 	    if (exists(i) && get_uuid(i) == u)
1500 	      return i;
1501 	  return -1;
1502 	}
1503 	
1504 	int OSDMap::identify_osd_on_all_channels(const entity_addr_t& addr) const
1505 	{
1506 	  for (int i=0; i<max_osd; i++)
1507 	    if (exists(i) && (get_addrs(i).contains(addr) ||
1508 			      get_cluster_addrs(i).contains(addr) ||
1509 			      get_hb_back_addrs(i).contains(addr) ||
1510 			      get_hb_front_addrs(i).contains(addr)))
1511 	      return i;
1512 	  return -1;
1513 	}
1514 	
1515 	int OSDMap::find_osd_on_ip(const entity_addr_t& ip) const
1516 	{
1517 	  for (int i=0; i<max_osd; i++)
1518 	    if (exists(i) && (get_addrs(i).is_same_host(ip) ||
1519 			      get_cluster_addrs(i).is_same_host(ip)))
1520 	      return i;
1521 	  return -1;
1522 	}
1523 	
1524 	
1525 	uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
1526 	{
1527 	  uint64_t features = 0;  // things we actually have
1528 	  uint64_t mask = 0;      // things we could have
1529 	
1530 	  if (crush->has_nondefault_tunables())
1531 	    features |= CEPH_FEATURE_CRUSH_TUNABLES;
1532 	  if (crush->has_nondefault_tunables2())
1533 	    features |= CEPH_FEATURE_CRUSH_TUNABLES2;
1534 	  if (crush->has_nondefault_tunables3())
1535 	    features |= CEPH_FEATURE_CRUSH_TUNABLES3;
1536 	  if (crush->has_v4_buckets())
1537 	    features |= CEPH_FEATURE_CRUSH_V4;
1538 	  if (crush->has_nondefault_tunables5())
1539 	    features |= CEPH_FEATURE_CRUSH_TUNABLES5;
1540 	  if (crush->has_incompat_choose_args()) {
1541 	    features |= CEPH_FEATUREMASK_CRUSH_CHOOSE_ARGS;
1542 	  }
1543 	  mask |= CEPH_FEATURES_CRUSH;
1544 	
1545 	  if (!pg_upmap.empty() || !pg_upmap_items.empty())
1546 	    features |= CEPH_FEATUREMASK_OSDMAP_PG_UPMAP;
1547 	  mask |= CEPH_FEATUREMASK_OSDMAP_PG_UPMAP;
1548 	
1549 	  for (auto &pool: pools) {
1550 	    if (pool.second.has_flag(pg_pool_t::FLAG_HASHPSPOOL)) {
1551 	      features |= CEPH_FEATURE_OSDHASHPSPOOL;
1552 	    }
1553 	    if (!pool.second.tiers.empty() ||
1554 		pool.second.is_tier()) {
1555 	      features |= CEPH_FEATURE_OSD_CACHEPOOL;
1556 	    }
1557 	    int ruleid = crush->find_rule(pool.second.get_crush_rule(),
1558 					  pool.second.get_type(),
1559 					  pool.second.get_size());
1560 	    if (ruleid >= 0) {
1561 	      if (crush->is_v2_rule(ruleid))
1562 		features |= CEPH_FEATURE_CRUSH_V2;
1563 	      if (crush->is_v3_rule(ruleid))
1564 		features |= CEPH_FEATURE_CRUSH_TUNABLES3;
1565 	      if (crush->is_v5_rule(ruleid))
1566 		features |= CEPH_FEATURE_CRUSH_TUNABLES5;
1567 	    }
1568 	  }
1569 	  mask |= CEPH_FEATURE_OSDHASHPSPOOL | CEPH_FEATURE_OSD_CACHEPOOL;
1570 	
1571 	  if (osd_primary_affinity) {
1572 	    for (int i = 0; i < max_osd; ++i) {
1573 	      if ((*osd_primary_affinity)[i] != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
1574 		features |= CEPH_FEATURE_OSD_PRIMARY_AFFINITY;
1575 		break;
1576 	      }
1577 	    }
1578 	  }
1579 	  mask |= CEPH_FEATURE_OSD_PRIMARY_AFFINITY;
1580 	
1581 	  if (entity_type == CEPH_ENTITY_TYPE_OSD) {
1582 	    const uint64_t jewel_features = CEPH_FEATURE_SERVER_JEWEL;
1583 	    if (require_osd_release >= ceph_release_t::jewel) {
1584 	      features |= jewel_features;
1585 	    }
1586 	    mask |= jewel_features;
1587 	
1588 	    const uint64_t kraken_features = CEPH_FEATUREMASK_SERVER_KRAKEN
1589 	      | CEPH_FEATURE_MSG_ADDR2;
1590 	    if (require_osd_release >= ceph_release_t::kraken) {
1591 	      features |= kraken_features;
1592 	    }
1593 	    mask |= kraken_features;
1594 	  }
1595 	
1596 	  if (require_min_compat_client >= ceph_release_t::nautilus) {
1597 	    // if min_compat_client is >= nautilus, require v2 cephx signatures
1598 	    // from everyone
1599 	    features |= CEPH_FEATUREMASK_CEPHX_V2;
1600 	  } else if (require_osd_release >= ceph_release_t::nautilus &&
1601 		     entity_type == CEPH_ENTITY_TYPE_OSD) {
1602 	    // if osds are >= nautilus, at least require the signatures from them
1603 	    features |= CEPH_FEATUREMASK_CEPHX_V2;
1604 	  }
1605 	  mask |= CEPH_FEATUREMASK_CEPHX_V2;
1606 	
1607 	  if (pmask)
1608 	    *pmask = mask;
1609 	  return features;
1610 	}
1611 	
1612 	ceph_release_t OSDMap::get_min_compat_client() const
1613 	{
1614 	  uint64_t f = get_features(CEPH_ENTITY_TYPE_CLIENT, nullptr);
1615 	
1616 	  if (HAVE_FEATURE(f, OSDMAP_PG_UPMAP) ||      // v12.0.0-1733-g27d6f43
1617 	      HAVE_FEATURE(f, CRUSH_CHOOSE_ARGS)) {    // v12.0.1-2172-gef1ef28
1618 	    return ceph_release_t::luminous;  // v12.2.0
1619 	  }
1620 	  if (HAVE_FEATURE(f, CRUSH_TUNABLES5)) {      // v10.0.0-612-g043a737
1621 	    return ceph_release_t::jewel;     // v10.2.0
1622 	  }
1623 	  if (HAVE_FEATURE(f, CRUSH_V4)) {             // v0.91-678-g325fc56
1624 	    return ceph_release_t::hammer;    // v0.94.0
1625 	  }
1626 	  if (HAVE_FEATURE(f, OSD_PRIMARY_AFFINITY) || // v0.76-553-gf825624
1627 	      HAVE_FEATURE(f, CRUSH_TUNABLES3) ||      // v0.76-395-ge20a55d
1628 	      HAVE_FEATURE(f, OSD_CACHEPOOL)) {        // v0.67-401-gb91c1c5
1629 	    return ceph_release_t::firefly;   // v0.80.0
1630 	  }
1631 	  if (HAVE_FEATURE(f, CRUSH_TUNABLES2) ||      // v0.54-684-g0cc47ff
1632 	      HAVE_FEATURE(f, OSDHASHPSPOOL)) {        // v0.57-398-g8cc2b0f
1633 	    return ceph_release_t::dumpling;  // v0.67.0
1634 	  }
1635 	  if (HAVE_FEATURE(f, CRUSH_TUNABLES)) {       // v0.48argonaut-206-g6f381af
1636 	    return ceph_release_t::argonaut;  // v0.48argonaut-206-g6f381af
1637 	  }
1638 	  return ceph_release_t::argonaut;    // v0.48argonaut-206-g6f381af
1639 	}
1640 	
1641 	ceph_release_t OSDMap::get_require_min_compat_client() const
1642 	{
1643 	  return require_min_compat_client;
1644 	}
1645 	
1646 	void OSDMap::_calc_up_osd_features()
1647 	{
1648 	  bool first = true;
1649 	  cached_up_osd_features = 0;
1650 	  for (int osd = 0; osd < max_osd; ++osd) {
1651 	    if (!is_up(osd))
1652 	      continue;
1653 	    const osd_xinfo_t &xi = get_xinfo(osd);
1654 	    if (xi.features == 0)
1655 	      continue;  // bogus xinfo, maybe #20751 or similar, skipping
1656 	    if (first) {
1657 	      cached_up_osd_features = xi.features;
1658 	      first = false;
1659 	    } else {
1660 	      cached_up_osd_features &= xi.features;
1661 	    }
1662 	  }
1663 	}
1664 	
1665 	uint64_t OSDMap::get_up_osd_features() const
1666 	{
1667 	  return cached_up_osd_features;
1668 	}
1669 	
1670 	void OSDMap::dedup(const OSDMap *o, OSDMap *n)
1671 	{
1672 	  using ceph::encode;
1673 	  if (o->epoch == n->epoch)
1674 	    return;
1675 	
1676 	  int diff = 0;
1677 	
1678 	  // do addrs match?
1679 	  if (o->max_osd != n->max_osd)
1680 	    diff++;
1681 	  for (int i = 0; i < o->max_osd && i < n->max_osd; i++) {
1682 	    if ( n->osd_addrs->client_addrs[i] &&  o->osd_addrs->client_addrs[i] &&
1683 		*n->osd_addrs->client_addrs[i] == *o->osd_addrs->client_addrs[i])
1684 	      n->osd_addrs->client_addrs[i] = o->osd_addrs->client_addrs[i];
1685 	    else
1686 	      diff++;
1687 	    if ( n->osd_addrs->cluster_addrs[i] &&  o->osd_addrs->cluster_addrs[i] &&
1688 		*n->osd_addrs->cluster_addrs[i] == *o->osd_addrs->cluster_addrs[i])
1689 	      n->osd_addrs->cluster_addrs[i] = o->osd_addrs->cluster_addrs[i];
1690 	    else
1691 	      diff++;
1692 	    if ( n->osd_addrs->hb_back_addrs[i] &&  o->osd_addrs->hb_back_addrs[i] &&
1693 		*n->osd_addrs->hb_back_addrs[i] == *o->osd_addrs->hb_back_addrs[i])
1694 	      n->osd_addrs->hb_back_addrs[i] = o->osd_addrs->hb_back_addrs[i];
1695 	    else
1696 	      diff++;
1697 	    if ( n->osd_addrs->hb_front_addrs[i] &&  o->osd_addrs->hb_front_addrs[i] &&
1698 		*n->osd_addrs->hb_front_addrs[i] == *o->osd_addrs->hb_front_addrs[i])
1699 	      n->osd_addrs->hb_front_addrs[i] = o->osd_addrs->hb_front_addrs[i];
1700 	    else
1701 	      diff++;
1702 	  }
1703 	  if (diff == 0) {
1704 	    // zoinks, no differences at all!
1705 	    n->osd_addrs = o->osd_addrs;
1706 	  }
1707 	
1708 	  // does crush match?
1709 	  ceph::buffer::list oc, nc;
1710 	  encode(*o->crush, oc, CEPH_FEATURES_SUPPORTED_DEFAULT);
1711 	  encode(*n->crush, nc, CEPH_FEATURES_SUPPORTED_DEFAULT);
1712 	  if (oc.contents_equal(nc)) {
1713 	    n->crush = o->crush;
1714 	  }
1715 	
1716 	  // does pg_temp match?
1717 	  if (*o->pg_temp == *n->pg_temp)
1718 	    n->pg_temp = o->pg_temp;
1719 	
1720 	  // does primary_temp match?
1721 	  if (o->primary_temp->size() == n->primary_temp->size()) {
1722 	    if (*o->primary_temp == *n->primary_temp)
1723 	      n->primary_temp = o->primary_temp;
1724 	  }
1725 	
1726 	  // do uuids match?
1727 	  if (o->osd_uuid->size() == n->osd_uuid->size() &&
1728 	      *o->osd_uuid == *n->osd_uuid)
1729 	    n->osd_uuid = o->osd_uuid;
1730 	}
1731 	
1732 	void OSDMap::clean_temps(CephContext *cct,
1733 				 const OSDMap& oldmap,
1734 				 const OSDMap& nextmap,
1735 				 Incremental *pending_inc)
1736 	{
1737 	  ldout(cct, 10) << __func__ << dendl;
1738 	
1739 	  for (auto pg : *nextmap.pg_temp) {
1740 	    // if pool does not exist, remove any existing pg_temps associated with
1741 	    // it.  we don't care about pg_temps on the pending_inc either; if there
1742 	    // are new_pg_temp entries on the pending, clear them out just as well.
1743 	    if (!nextmap.have_pg_pool(pg.first.pool())) {
1744 	      ldout(cct, 10) << __func__ << " removing pg_temp " << pg.first
1745 			     << " for nonexistent pool " << pg.first.pool() << dendl;
1746 	      pending_inc->new_pg_temp[pg.first].clear();
1747 	      continue;
1748 	    }
1749 	    // all osds down?
1750 	    unsigned num_up = 0;
1751 	    for (auto o : pg.second) {
1752 	      if (!nextmap.is_down(o)) {
1753 		++num_up;
1754 		break;
1755 	      }
1756 	    }
1757 	    if (num_up == 0) {
1758 	      ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first
1759 			     << " with all down osds" << pg.second << dendl;
1760 	      pending_inc->new_pg_temp[pg.first].clear();
1761 	      continue;
1762 	    }
1763 	    // redundant pg_temp?
1764 	    vector<int> raw_up;
1765 	    int primary;
1766 	    nextmap.pg_to_raw_up(pg.first, &raw_up, &primary);
1767 	    bool remove = false;
1768 	    if (raw_up == pg.second) {
1769 	      ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first << " "
1770 			     << pg.second << " that matches raw_up mapping" << dendl;
1771 	      remove = true;
1772 	    }
1773 	    // oversized pg_temp?
1774 	    if (pg.second.size() > nextmap.get_pg_pool(pg.first.pool())->get_size()) {
1775 	      ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first << " "
1776 			     << pg.second << " exceeds pool size" << dendl;
1777 	      remove = true;
1778 	    }
1779 	    if (remove) {
1780 	      if (oldmap.pg_temp->count(pg.first))
1781 		pending_inc->new_pg_temp[pg.first].clear();
1782 	      else
1783 		pending_inc->new_pg_temp.erase(pg.first);
1784 	    }
1785 	  }
1786 	  
1787 	  for (auto &pg : *nextmap.primary_temp) {
1788 	    // primary down?
1789 	    if (nextmap.is_down(pg.second)) {
1790 	      ldout(cct, 10) << __func__ << "  removing primary_temp " << pg.first
1791 			     << " to down " << pg.second << dendl;
1792 	      pending_inc->new_primary_temp[pg.first] = -1;
1793 	      continue;
1794 	    }
1795 	    // redundant primary_temp?
1796 	    vector<int> real_up, templess_up;
1797 	    int real_primary, templess_primary;
1798 	    pg_t pgid = pg.first;
1799 	    nextmap.pg_to_acting_osds(pgid, &real_up, &real_primary);
1800 	    nextmap.pg_to_raw_up(pgid, &templess_up, &templess_primary);
1801 	    if (real_primary == templess_primary){
1802 	      ldout(cct, 10) << __func__ << "  removing primary_temp "
1803 			     << pgid << " -> " << real_primary
1804 			     << " (unnecessary/redundant)" << dendl;
1805 	      if (oldmap.primary_temp->count(pgid))
1806 		pending_inc->new_primary_temp[pgid] = -1;
1807 	      else
1808 		pending_inc->new_primary_temp.erase(pgid);
1809 	    }
1810 	  }
1811 	}
1812 	
1813 	void OSDMap::get_upmap_pgs(vector<pg_t> *upmap_pgs) const
1814 	{
1815 	  upmap_pgs->reserve(pg_upmap.size() + pg_upmap_items.size());
1816 	  for (auto& p : pg_upmap)
1817 	    upmap_pgs->push_back(p.first);
1818 	  for (auto& p : pg_upmap_items)
1819 	    upmap_pgs->push_back(p.first);
1820 	}
1821 	
1822 	bool OSDMap::check_pg_upmaps(
1823 	  CephContext *cct,
1824 	  const vector<pg_t>& to_check,
1825 	  vector<pg_t> *to_cancel,
1826 	  map<pg_t, mempool::osdmap::vector<pair<int,int>>> *to_remap) const
1827 	{
1828 	  bool any_change = false;
1829 	  map<int, map<int, float>> rule_weight_map;
1830 	  for (auto& pg : to_check) {
1831 	    const pg_pool_t *pi = get_pg_pool(pg.pool());
1832 	    if (!pi || pg.ps() >= pi->get_pg_num_pending()) {
1833 	      ldout(cct, 0) << __func__ << " pg " << pg << " is gone or merge source"
1834 			    << dendl;
1835 	      to_cancel->push_back(pg);
1836 	      continue;
1837 	    }
1838 	    if (pi->is_pending_merge(pg, nullptr)) {
1839 	      ldout(cct, 0) << __func__ << " pg " << pg << " is pending merge"
1840 			    << dendl;
1841 	      to_cancel->push_back(pg);
1842 	      continue;
1843 	    }
1844 	    vector<int> raw, up;
1845 	    pg_to_raw_upmap(pg, &raw, &up);
1846 	    auto crush_rule = get_pg_pool_crush_rule(pg);
1847 	    auto r = crush->verify_upmap(cct,
1848 	                                 crush_rule,
1849 	                                 get_pg_pool_size(pg),
1850 	                                 up);
1851 	    if (r < 0) {
1852 	      ldout(cct, 0) << __func__ << " verify_upmap of pg " << pg
1853 	                    << " returning " << r
1854 	                    << dendl;
1855 	      to_cancel->push_back(pg);
1856 	      continue;
1857 	    }
1858 	    // below we check against crush-topology changing..
1859 	    map<int, float> weight_map;
1860 	    auto it = rule_weight_map.find(crush_rule);
1861 	    if (it == rule_weight_map.end()) {
1862 	      auto r = crush->get_rule_weight_osd_map(crush_rule, &weight_map);
1863 	      if (r < 0) {
1864 	        lderr(cct) << __func__ << " unable to get crush weight_map for "
1865 	                   << "crush_rule " << crush_rule
1866 	                   << dendl;
1867 	        continue;
1868 	      }
1869 	      rule_weight_map[crush_rule] = weight_map;
1870 	    } else {
1871 	      weight_map = it->second;
1872 	    }
1873 	    ldout(cct, 10) << __func__ << " pg " << pg
1874 	                   << " weight_map " << weight_map
1875 	                   << dendl;
1876 	    for (auto osd : up) {
1877 	      auto it = weight_map.find(osd);
1878 	      if (it == weight_map.end()) {
1879 	        // osd is gone or has been moved out of the specific crush-tree
1880 	        to_cancel->push_back(pg);
1881 	        break;
1882 	      }
1883 	      auto adjusted_weight = get_weightf(it->first) * it->second;
1884 	      if (adjusted_weight == 0) {
1885 	        // osd is out/crush-out
1886 	        to_cancel->push_back(pg);
1887 	        break;
1888 	      }
1889 	    }
1890 	    if (!to_cancel->empty() && to_cancel->back() == pg)
1891 	      continue;
1892 	    // okay, upmap is valid
1893 	    // continue to check if it is still necessary
1894 	    auto i = pg_upmap.find(pg);
1895 	    if (i != pg_upmap.end() && raw == i->second) {
1896 	      ldout(cct, 10) << " removing redundant pg_upmap "
1897 	                     << i->first << " " << i->second
1898 	                     << dendl;
1899 	      to_cancel->push_back(pg);
1900 	      continue;
1901 	    }
1902 	    auto j = pg_upmap_items.find(pg);
1903 	    if (j != pg_upmap_items.end()) {
1904 	      mempool::osdmap::vector<pair<int,int>> newmap;
1905 	      for (auto& p : j->second) {
1906 	        if (std::find(raw.begin(), raw.end(), p.first) == raw.end()) {
1907 	          // cancel mapping if source osd does not exist anymore
1908 	          continue;
1909 	        }
1910 	        if (p.second != CRUSH_ITEM_NONE && p.second < max_osd &&
1911 	            p.second >= 0 && osd_weight[p.second] == 0) {
1912 	          // cancel mapping if target osd is out
1913 	          continue;
1914 	        }
1915 	        newmap.push_back(p);
1916 	      }
1917 	      if (newmap.empty()) {
1918 	        ldout(cct, 10) << " removing no-op pg_upmap_items "
1919 	                       << j->first << " " << j->second
1920 	                       << dendl;
1921 	        to_cancel->push_back(pg);
1922 	      } else if (newmap != j->second) {
1923 	        ldout(cct, 10) << " simplifying partially no-op pg_upmap_items "
1924 	                       << j->first << " " << j->second
1925 	                       << " -> " << newmap
1926 	                       << dendl;
1927 	        to_remap->insert({pg, newmap});
1928 	        any_change = true;
1929 	      }
1930 	    }
1931 	  }
1932 	  any_change = any_change || !to_cancel->empty();
1933 	  return any_change;
1934 	}
1935 	
1936 	void OSDMap::clean_pg_upmaps(
1937 	  CephContext *cct,
1938 	  Incremental *pending_inc,
1939 	  const vector<pg_t>& to_cancel,
1940 	  const map<pg_t, mempool::osdmap::vector<pair<int,int>>>& to_remap) const
1941 	{
1942 	  for (auto &pg: to_cancel) {
1943 	    auto i = pending_inc->new_pg_upmap.find(pg);
1944 	    if (i != pending_inc->new_pg_upmap.end()) {
1945 	      ldout(cct, 10) << __func__ << " cancel invalid pending "
1946 	                     << "pg_upmap entry "
1947 	                     << i->first << "->" << i->second
1948 	                     << dendl;
1949 	      pending_inc->new_pg_upmap.erase(i);
1950 	    }
1951 	    auto j = pg_upmap.find(pg);
1952 	    if (j != pg_upmap.end()) {
1953 	      ldout(cct, 10) << __func__ << " cancel invalid pg_upmap entry "
1954 	                     << j->first << "->" << j->second
1955 	                     << dendl;
1956 	      pending_inc->old_pg_upmap.insert(pg);
1957 	    }
1958 	    auto p = pending_inc->new_pg_upmap_items.find(pg);
1959 	    if (p != pending_inc->new_pg_upmap_items.end()) {
1960 	      ldout(cct, 10) << __func__ << " cancel invalid pending "
1961 	                     << "pg_upmap_items entry "
1962 	                     << p->first << "->" << p->second
1963 	                     << dendl;
1964 	      pending_inc->new_pg_upmap_items.erase(p);
1965 	    }
1966 	    auto q = pg_upmap_items.find(pg);
1967 	    if (q != pg_upmap_items.end()) {
1968 	      ldout(cct, 10) << __func__ << " cancel invalid "
1969 	                     << "pg_upmap_items entry "
1970 	                     << q->first << "->" << q->second
1971 	                     << dendl;
1972 	      pending_inc->old_pg_upmap_items.insert(pg);
1973 	    }
1974 	  }
1975 	  for (auto& i : to_remap)
1976 	    pending_inc->new_pg_upmap_items[i.first] = i.second;
1977 	}
1978 	
1979 	bool OSDMap::clean_pg_upmaps(
1980 	  CephContext *cct,
1981 	  Incremental *pending_inc) const
1982 	{
1983 	  ldout(cct, 10) << __func__ << dendl;
1984 	  vector<pg_t> to_check;
1985 	  vector<pg_t> to_cancel;
1986 	  map<pg_t, mempool::osdmap::vector<pair<int,int>>> to_remap;
1987 	
1988 	  get_upmap_pgs(&to_check);
1989 	  auto any_change = check_pg_upmaps(cct, to_check, &to_cancel, &to_remap);
1990 	  clean_pg_upmaps(cct, pending_inc, to_cancel, to_remap);
1991 	  return any_change;
1992 	}
1993 	
1994 	int OSDMap::apply_incremental(const Incremental &inc)
1995 	{
1996 	  new_blacklist_entries = false;
1997 	  if (inc.epoch == 1)
1998 	    fsid = inc.fsid;
1999 	  else if (inc.fsid != fsid)
2000 	    return -EINVAL;
2001 	  
2002 	  ceph_assert(inc.epoch == epoch+1);
2003 	
2004 	  epoch++;
2005 	  modified = inc.modified;
2006 	
2007 	  // full map?
2008 	  if (inc.fullmap.length()) {
2009 	    ceph::buffer::list bl(inc.fullmap);
2010 	    decode(bl);
2011 	    return 0;
2012 	  }
2013 	
2014 	  // nope, incremental.
2015 	  if (inc.new_flags >= 0) {
2016 	    flags = inc.new_flags;
2017 	    // the below is just to cover a newly-upgraded luminous mon
2018 	    // cluster that has to set require_jewel_osds or
2019 	    // require_kraken_osds before the osds can be upgraded to
2020 	    // luminous.
2021 	    if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
2022 	      if (require_osd_release < ceph_release_t::kraken) {
2023 		require_osd_release = ceph_release_t::kraken;
2024 	      }
2025 	    } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
2026 	      if (require_osd_release < ceph_release_t::jewel) {
2027 		require_osd_release = ceph_release_t::jewel;
2028 	      }
2029 	    }
2030 	  }
2031 	
2032 	  if (inc.new_max_osd >= 0)
2033 	    set_max_osd(inc.new_max_osd);
2034 	
2035 	  if (inc.new_pool_max != -1)
2036 	    pool_max = inc.new_pool_max;
2037 	
2038 	  for (const auto &pool : inc.new_pools) {
2039 	    pools[pool.first] = pool.second;
2040 	    pools[pool.first].last_change = epoch;
2041 	  }
2042 	
2043 	  new_removed_snaps = inc.new_removed_snaps;
2044 	  new_purged_snaps = inc.new_purged_snaps;
2045 	  for (auto p = new_removed_snaps.begin();
2046 	       p != new_removed_snaps.end();
2047 	       ++p) {
2048 	    removed_snaps_queue[p->first].union_of(p->second);
2049 	  }
2050 	  for (auto p = new_purged_snaps.begin();
2051 	       p != new_purged_snaps.end();
2052 	       ++p) {
2053 	    auto q = removed_snaps_queue.find(p->first);
2054 	    ceph_assert(q != removed_snaps_queue.end());
2055 	    q->second.subtract(p->second);
2056 	    if (q->second.empty()) {
2057 	      removed_snaps_queue.erase(q);
2058 	    }
2059 	  }
2060 	
2061 	  if (inc.new_last_up_change != utime_t()) {
2062 	    last_up_change = inc.new_last_up_change;
2063 	  }
2064 	  if (inc.new_last_in_change != utime_t()) {
2065 	    last_in_change = inc.new_last_in_change;
2066 	  }
2067 	
2068 	  for (const auto &pname : inc.new_pool_names) {
2069 	    auto pool_name_entry = pool_name.find(pname.first);
2070 	    if (pool_name_entry != pool_name.end()) {
2071 	      name_pool.erase(pool_name_entry->second);
2072 	      pool_name_entry->second = pname.second;
2073 	    } else {
2074 	      pool_name[pname.first] = pname.second;
2075 	    }
2076 	    name_pool[pname.second] = pname.first;
2077 	  }
2078 	  
2079 	  for (const auto &pool : inc.old_pools) {
2080 	    pools.erase(pool);
2081 	    name_pool.erase(pool_name[pool]);
2082 	    pool_name.erase(pool);
2083 	  }
2084 	
2085 	  for (const auto &weight : inc.new_weight) {
2086 	    set_weight(weight.first, weight.second);
2087 	
2088 	    // if we are marking in, clear the AUTOOUT and NEW bits, and clear
2089 	    // xinfo old_weight.
2090 	    if (weight.second) {
2091 	      osd_state[weight.first] &= ~(CEPH_OSD_AUTOOUT | CEPH_OSD_NEW);
2092 	      osd_xinfo[weight.first].old_weight = 0;
2093 	    }
2094 	  }
2095 	
2096 	  for (const auto &primary_affinity : inc.new_primary_affinity) {
2097 	    set_primary_affinity(primary_affinity.first, primary_affinity.second);
2098 	  }
2099 	
2100 	  // erasure_code_profiles
2101 	  for (const auto &profile : inc.old_erasure_code_profiles)
2102 	    erasure_code_profiles.erase(profile);
2103 	  
2104 	  for (const auto &profile : inc.new_erasure_code_profiles) {
2105 	    set_erasure_code_profile(profile.first, profile.second);
2106 	  }
2107 	  
2108 	  // up/down
2109 	  for (const auto &state : inc.new_state) {
2110 	    const auto osd = state.first;
2111 	    int s = state.second ? state.second : CEPH_OSD_UP;
2112 	    if ((osd_state[osd] & CEPH_OSD_UP) &&
2113 		(s & CEPH_OSD_UP)) {
2114 	      osd_info[osd].down_at = epoch;
2115 	      osd_xinfo[osd].down_stamp = modified;
2116 	    }
2117 	    if ((osd_state[osd] & CEPH_OSD_EXISTS) &&
2118 		(s & CEPH_OSD_EXISTS)) {
2119 	      // osd is destroyed; clear out anything interesting.
2120 	      (*osd_uuid)[osd] = uuid_d();
2121 	      osd_info[osd] = osd_info_t();
2122 	      osd_xinfo[osd] = osd_xinfo_t();
2123 	      set_primary_affinity(osd, CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
2124 	      osd_addrs->client_addrs[osd].reset(new entity_addrvec_t());
2125 	      osd_addrs->cluster_addrs[osd].reset(new entity_addrvec_t());
2126 	      osd_addrs->hb_front_addrs[osd].reset(new entity_addrvec_t());
2127 	      osd_addrs->hb_back_addrs[osd].reset(new entity_addrvec_t());
2128 	      osd_state[osd] = 0;
2129 	    } else {
2130 	      osd_state[osd] ^= s;
2131 	    }
2132 	  }
2133 	
2134 	  for (const auto &client : inc.new_up_client) {
2135 	    osd_state[client.first] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
2136 	    osd_state[client.first] &= ~CEPH_OSD_STOP; // if any
2137 	    osd_addrs->client_addrs[client.first].reset(
2138 	      new entity_addrvec_t(client.second));
2139 	    osd_addrs->hb_back_addrs[client.first].reset(
2140 	      new entity_addrvec_t(inc.new_hb_back_up.find(client.first)->second));
2141 	    osd_addrs->hb_front_addrs[client.first].reset(
2142 	      new entity_addrvec_t(inc.new_hb_front_up.find(client.first)->second));
2143 	
2144 	    osd_info[client.first].up_from = epoch;
2145 	  }
2146 	
2147 	  for (const auto &cluster : inc.new_up_cluster)
2148 	    osd_addrs->cluster_addrs[cluster.first].reset(
2149 	      new entity_addrvec_t(cluster.second));
2150 	
2151 	  // info
2152 	  for (const auto &thru : inc.new_up_thru)
2153 	    osd_info[thru.first].up_thru = thru.second;
2154 	  
2155 	  for (const auto &interval : inc.new_last_clean_interval) {
2156 	    osd_info[interval.first].last_clean_begin = interval.second.first;
2157 	    osd_info[interval.first].last_clean_end = interval.second.second;
2158 	  }
2159 	  
2160 	  for (const auto &lost : inc.new_lost)
2161 	    osd_info[lost.first].lost_at = lost.second;
2162 	
2163 	  // xinfo
2164 	  for (const auto &xinfo : inc.new_xinfo)
2165 	    osd_xinfo[xinfo.first] = xinfo.second;
2166 	
2167 	  // uuid
2168 	  for (const auto &uuid : inc.new_uuid)
2169 	    (*osd_uuid)[uuid.first] = uuid.second;
2170 	
2171 	  // pg rebuild
2172 	  for (const auto &pg : inc.new_pg_temp) {
2173 	    if (pg.second.empty())
2174 	      pg_temp->erase(pg.first);
2175 	    else
2176 	      pg_temp->set(pg.first, pg.second);
2177 	  }
2178 	  if (!inc.new_pg_temp.empty()) {
2179 	    // make sure pg_temp is efficiently stored
2180 	    pg_temp->rebuild();
2181 	  }
2182 	
2183 	  for (const auto &pg : inc.new_primary_temp) {
2184 	    if (pg.second == -1)
2185 	      primary_temp->erase(pg.first);
2186 	    else
2187 	      (*primary_temp)[pg.first] = pg.second;
2188 	  }
2189 	
2190 	  for (auto& p : inc.new_pg_upmap) {
2191 	    pg_upmap[p.first] = p.second;
2192 	  }
2193 	  for (auto& pg : inc.old_pg_upmap) {
2194 	    pg_upmap.erase(pg);
2195 	  }
2196 	  for (auto& p : inc.new_pg_upmap_items) {
2197 	    pg_upmap_items[p.first] = p.second;
2198 	  }
2199 	  for (auto& pg : inc.old_pg_upmap_items) {
2200 	    pg_upmap_items.erase(pg);
2201 	  }
2202 	
2203 	  // blacklist
2204 	  if (!inc.new_blacklist.empty()) {
2205 	    blacklist.insert(inc.new_blacklist.begin(),inc.new_blacklist.end());
2206 	    new_blacklist_entries = true;
2207 	  }
2208 	  for (const auto &addr : inc.old_blacklist)
2209 	    blacklist.erase(addr);
2210 	
2211 	  for (auto& i : inc.new_crush_node_flags) {
2212 	    if (i.second) {
2213 	      crush_node_flags[i.first] = i.second;
2214 	    } else {
2215 	      crush_node_flags.erase(i.first);
2216 	    }
2217 	  }
2218 	
2219 	  for (auto& i : inc.new_device_class_flags) {
2220 	    if (i.second) {
2221 	      device_class_flags[i.first] = i.second;
2222 	    } else {
2223 	      device_class_flags.erase(i.first);
2224 	    }
2225 	  }
2226 	
2227 	  // cluster snapshot?
2228 	  if (inc.cluster_snapshot.length()) {
2229 	    cluster_snapshot = inc.cluster_snapshot;
2230 	    cluster_snapshot_epoch = inc.epoch;
2231 	  } else {
2232 	    cluster_snapshot.clear();
2233 	    cluster_snapshot_epoch = 0;
2234 	  }
2235 	
2236 	  if (inc.new_nearfull_ratio >= 0) {
2237 	    nearfull_ratio = inc.new_nearfull_ratio;
2238 	  }
2239 	  if (inc.new_backfillfull_ratio >= 0) {
2240 	    backfillfull_ratio = inc.new_backfillfull_ratio;
2241 	  }
2242 	  if (inc.new_full_ratio >= 0) {
2243 	    full_ratio = inc.new_full_ratio;
2244 	  }
2245 	  if (inc.new_require_min_compat_client > ceph_release_t::unknown) {
2246 	    require_min_compat_client = inc.new_require_min_compat_client;
2247 	  }
2248 	  if (inc.new_require_osd_release >= ceph_release_t::unknown) {
2249 	    require_osd_release = inc.new_require_osd_release;
2250 	    if (require_osd_release >= ceph_release_t::luminous) {
2251 	      flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
2252 	      flags |= CEPH_OSDMAP_RECOVERY_DELETES;
2253 	    }
2254 	  }
2255 	
2256 	  if (inc.new_require_osd_release >= ceph_release_t::unknown) {
2257 	    require_osd_release = inc.new_require_osd_release;
2258 	    if (require_osd_release >= ceph_release_t::nautilus) {
2259 	      flags |= CEPH_OSDMAP_PGLOG_HARDLIMIT;
2260 	    }
2261 	  }
2262 	  // do new crush map last (after up/down stuff)
2263 	  if (inc.crush.length()) {
2264 	    ceph::buffer::list bl(inc.crush);
2265 	    auto blp = bl.cbegin();
2266 	    crush.reset(new CrushWrapper);
2267 	    crush->decode(blp);
2268 	    if (require_osd_release >= ceph_release_t::luminous) {
2269 	      // only increment if this is a luminous-encoded osdmap, lest
2270 	      // the mon's crush_version diverge from what the osds or others
2271 	      // are decoding and applying on their end.  if we won't encode
2272 	      // it in the canonical version, don't change it.
2273 	      ++crush_version;
2274 	    }
2275 	    for (auto it = device_class_flags.begin();
2276 	         it != device_class_flags.end();) {
2277 	      const char* class_name = crush->get_class_name(it->first);
2278 	      if (!class_name) // device class is gone
2279 	        it = device_class_flags.erase(it);
2280 	      else
2281 	        it++;
2282 	    }
2283 	  }
2284 	
2285 	  calc_num_osds();
2286 	  _calc_up_osd_features();
2287 	  return 0;
2288 	}
2289 	
2290 	// mapping
2291 	int OSDMap::map_to_pg(
2292 	  int64_t poolid,
2293 	  const string& name,
2294 	  const string& key,
2295 	  const string& nspace,
2296 	  pg_t *pg) const
2297 	{
2298 	  // calculate ps (placement seed)
2299 	  const pg_pool_t *pool = get_pg_pool(poolid);
2300 	  if (!pool)
2301 	    return -ENOENT;
2302 	  ps_t ps;
2303 	  if (!key.empty())
2304 	    ps = pool->hash_key(key, nspace);
2305 	  else
2306 	    ps = pool->hash_key(name, nspace);
2307 	  *pg = pg_t(ps, poolid);
2308 	  return 0;
2309 	}
2310 	
2311 	int OSDMap::object_locator_to_pg(
2312 	  const object_t& oid, const object_locator_t& loc, pg_t &pg) const
2313 	{
2314 	  if (loc.hash >= 0) {
2315 	    if (!get_pg_pool(loc.get_pool())) {
2316 	      return -ENOENT;
2317 	    }
2318 	    pg = pg_t(loc.hash, loc.get_pool());
2319 	    return 0;
2320 	  }
2321 	  return map_to_pg(loc.get_pool(), oid.name, loc.key, loc.nspace, &pg);
2322 	}
2323 	
2324 	ceph_object_layout OSDMap::make_object_layout(
2325 	  object_t oid, int pg_pool, string nspace) const
2326 	{
2327 	  object_locator_t loc(pg_pool, nspace);
2328 	
2329 	  ceph_object_layout ol;
2330 	  pg_t pgid = object_locator_to_pg(oid, loc);
2331 	  ol.ol_pgid = pgid.get_old_pg().v;
2332 	  ol.ol_stripe_unit = 0;
2333 	  return ol;
2334 	}
2335 	
2336 	void OSDMap::_remove_nonexistent_osds(const pg_pool_t& pool,
2337 					      vector<int>& osds) const
2338 	{
2339 	  if (pool.can_shift_osds()) {
2340 	    unsigned removed = 0;
2341 	    for (unsigned i = 0; i < osds.size(); i++) {
2342 	      if (!exists(osds[i])) {
2343 		removed++;
2344 		continue;
2345 	      }
2346 	      if (removed) {
2347 		osds[i - removed] = osds[i];
2348 	      }
2349 	    }
2350 	    if (removed)
2351 	      osds.resize(osds.size() - removed);
2352 	  } else {
2353 	    for (auto& osd : osds) {
2354 	      if (!exists(osd))
2355 		osd = CRUSH_ITEM_NONE;
2356 	    }
2357 	  }
2358 	}
2359 	
2360 	void OSDMap::_pg_to_raw_osds(
2361 	  const pg_pool_t& pool, pg_t pg,
2362 	  vector<int> *osds,
2363 	  ps_t *ppps) const
2364 	{
2365 	  // map to osds[]
2366 	  ps_t pps = pool.raw_pg_to_pps(pg);  // placement ps
2367 	  unsigned size = pool.get_size();
2368 	
2369 	  // what crush rule?
2370 	  int ruleno = crush->find_rule(pool.get_crush_rule(), pool.get_type(), size);
2371 	  if (ruleno >= 0)
2372 	    crush->do_rule(ruleno, pps, *osds, size, osd_weight, pg.pool());
2373 	
2374 	  _remove_nonexistent_osds(pool, *osds);
2375 	
2376 	  if (ppps)
2377 	    *ppps = pps;
2378 	}
2379 	
2380 	int OSDMap::_pick_primary(const vector<int>& osds) const
2381 	{
2382 	  for (auto osd : osds) {
2383 	    if (osd != CRUSH_ITEM_NONE) {
2384 	      return osd;
2385 	    }
2386 	  }
2387 	  return -1;
2388 	}
2389 	
2390 	void OSDMap::_apply_upmap(const pg_pool_t& pi, pg_t raw_pg, vector<int> *raw) const
2391 	{
2392 	  pg_t pg = pi.raw_pg_to_pg(raw_pg);
2393 	  auto p = pg_upmap.find(pg);
2394 	  if (p != pg_upmap.end()) {
2395 	    // make sure targets aren't marked out
2396 	    for (auto osd : p->second) {
2397 	      if (osd != CRUSH_ITEM_NONE && osd < max_osd && osd >= 0 &&
2398 	          osd_weight[osd] == 0) {
2399 		// reject/ignore the explicit mapping
2400 		return;
2401 	      }
2402 	    }
2403 	    *raw = vector<int>(p->second.begin(), p->second.end());
2404 	    // continue to check and apply pg_upmap_items if any
2405 	  }
2406 	
2407 	  auto q = pg_upmap_items.find(pg);
2408 	  if (q != pg_upmap_items.end()) {
2409 	    // NOTE: this approach does not allow a bidirectional swap,
2410 	    // e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
2411 	    for (auto& r : q->second) {
2412 	      // make sure the replacement value doesn't already appear
2413 	      bool exists = false;
2414 	      ssize_t pos = -1;
2415 	      for (unsigned i = 0; i < raw->size(); ++i) {
2416 		int osd = (*raw)[i];
2417 		if (osd == r.second) {
2418 		  exists = true;
2419 		  break;
2420 		}
2421 		// ignore mapping if target is marked out (or invalid osd id)
2422 		if (osd == r.first &&
2423 		    pos < 0 &&
2424 		    !(r.second != CRUSH_ITEM_NONE && r.second < max_osd &&
2425 		      r.second >= 0 && osd_weight[r.second] == 0)) {
2426 		  pos = i;
2427 		}
2428 	      }
2429 	      if (!exists && pos >= 0) {
2430 		(*raw)[pos] = r.second;
2431 	      }
2432 	    }
2433 	  }
2434 	}
2435 	
2436 	// pg -> (up osd list)
2437 	void OSDMap::_raw_to_up_osds(const pg_pool_t& pool, const vector<int>& raw,
2438 	                             vector<int> *up) const
2439 	{
2440 	  if (pool.can_shift_osds()) {
2441 	    // shift left
2442 	    up->clear();
2443 	    up->reserve(raw.size());
2444 	    for (unsigned i=0; i<raw.size(); i++) {
2445 	      if (!exists(raw[i]) || is_down(raw[i]))
2446 		continue;
2447 	      up->push_back(raw[i]);
2448 	    }
2449 	  } else {
2450 	    // set down/dne devices to NONE
2451 	    up->resize(raw.size());
2452 	    for (int i = raw.size() - 1; i >= 0; --i) {
2453 	      if (!exists(raw[i]) || is_down(raw[i])) {
2454 		(*up)[i] = CRUSH_ITEM_NONE;
2455 	      } else {
2456 		(*up)[i] = raw[i];
2457 	      }
2458 	    }
2459 	  }
2460 	}
2461 	
2462 	void OSDMap::_apply_primary_affinity(ps_t seed,
2463 					     const pg_pool_t& pool,
2464 					     vector<int> *osds,
2465 					     int *primary) const
2466 	{
2467 	  // do we have any non-default primary_affinity values for these osds?
2468 	  if (!osd_primary_affinity)
2469 	    return;
2470 	
2471 	  bool any = false;
2472 	  for (const auto osd : *osds) {
2473 	    if (osd != CRUSH_ITEM_NONE &&
2474 		(*osd_primary_affinity)[osd] != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2475 	      any = true;
2476 	      break;
2477 	    }
2478 	  }
2479 	  if (!any)
2480 	    return;
2481 	
2482 	  // pick the primary.  feed both the seed (for the pg) and the osd
2483 	  // into the hash/rng so that a proportional fraction of an osd's pgs
2484 	  // get rejected as primary.
2485 	  int pos = -1;
2486 	  for (unsigned i = 0; i < osds->size(); ++i) {
2487 	    int o = (*osds)[i];
2488 	    if (o == CRUSH_ITEM_NONE)
2489 	      continue;
2490 	    unsigned a = (*osd_primary_affinity)[o];
2491 	    if (a < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2492 		(crush_hash32_2(CRUSH_HASH_RJENKINS1,
2493 				seed, o) >> 16) >= a) {
2494 	      // we chose not to use this primary.  note it anyway as a
2495 	      // fallback in case we don't pick anyone else, but keep looking.
2496 	      if (pos < 0)
2497 		pos = i;
2498 	    } else {
2499 	      pos = i;
2500 	      break;
2501 	    }
2502 	  }
2503 	  if (pos < 0)
2504 	    return;
2505 	
2506 	  *primary = (*osds)[pos];
2507 	
2508 	  if (pool.can_shift_osds() && pos > 0) {
2509 	    // move the new primary to the front.
2510 	    for (int i = pos; i > 0; --i) {
2511 	      (*osds)[i] = (*osds)[i-1];
2512 	    }
2513 	    (*osds)[0] = *primary;
2514 	  }
2515 	}
2516 	
2517 	void OSDMap::_get_temp_osds(const pg_pool_t& pool, pg_t pg,
2518 	                            vector<int> *temp_pg, int *temp_primary) const
2519 	{
2520 	  pg = pool.raw_pg_to_pg(pg);
2521 	  const auto p = pg_temp->find(pg);
2522 	  temp_pg->clear();
2523 	  if (p != pg_temp->end()) {
2524 	    for (unsigned i=0; i<p->second.size(); i++) {
2525 	      if (!exists(p->second[i]) || is_down(p->second[i])) {
2526 		if (pool.can_shift_osds()) {
2527 		  continue;
2528 		} else {
2529 		  temp_pg->push_back(CRUSH_ITEM_NONE);
2530 		}
2531 	      } else {
2532 		temp_pg->push_back(p->second[i]);
2533 	      }
2534 	    }
2535 	  }
2536 	  const auto &pp = primary_temp->find(pg);
2537 	  *temp_primary = -1;
2538 	  if (pp != primary_temp->end()) {
2539 	    *temp_primary = pp->second;
2540 	  } else if (!temp_pg->empty()) { // apply pg_temp's primary
2541 	    for (unsigned i = 0; i < temp_pg->size(); ++i) {
2542 	      if ((*temp_pg)[i] != CRUSH_ITEM_NONE) {
2543 		*temp_primary = (*temp_pg)[i];
2544 		break;
2545 	      }
2546 	    }
2547 	  }
2548 	}
2549 	
2550 	void OSDMap::pg_to_raw_osds(pg_t pg, vector<int> *raw, int *primary) const
2551 	{
2552 	  const pg_pool_t *pool = get_pg_pool(pg.pool());
2553 	  if (!pool) {
2554 	    *primary = -1;
2555 	    raw->clear();
2556 	    return;
2557 	  }
2558 	  _pg_to_raw_osds(*pool, pg, raw, NULL);
2559 	  *primary = _pick_primary(*raw);
2560 	}
2561 	
2562 	void OSDMap::pg_to_raw_upmap(pg_t pg, vector<int>*raw,
2563 	                             vector<int> *raw_upmap) const
2564 	{
2565 	  auto pool = get_pg_pool(pg.pool());
2566 	  if (!pool) {
2567 	    raw_upmap->clear();
2568 	    return;
2569 	  }
2570 	  _pg_to_raw_osds(*pool, pg, raw, NULL);
2571 	  *raw_upmap = *raw;
2572 	  _apply_upmap(*pool, pg, raw_upmap);
2573 	}
2574 	
2575 	void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
2576 	{
2577 	  const pg_pool_t *pool = get_pg_pool(pg.pool());
2578 	  if (!pool) {
2579 	    *primary = -1;
2580 	    up->clear();
2581 	    return;
2582 	  }
2583 	  vector<int> raw;
2584 	  ps_t pps;
2585 	  _pg_to_raw_osds(*pool, pg, &raw, &pps);
2586 	  _apply_upmap(*pool, pg, &raw);
2587 	  _raw_to_up_osds(*pool, raw, up);
2588 	  *primary = _pick_primary(raw);
2589 	  _apply_primary_affinity(pps, *pool, up, primary);
2590 	}
2591 	
2592 	void OSDMap::_pg_to_up_acting_osds(
2593 	  const pg_t& pg, vector<int> *up, int *up_primary,
2594 	  vector<int> *acting, int *acting_primary,
2595 	  bool raw_pg_to_pg) const
2596 	{
2597 	  const pg_pool_t *pool = get_pg_pool(pg.pool());
2598 	  if (!pool ||
2599 	      (!raw_pg_to_pg && pg.ps() >= pool->get_pg_num())) {
2600 	    if (up)
2601 	      up->clear();
2602 	    if (up_primary)
2603 	      *up_primary = -1;
2604 	    if (acting)
2605 	      acting->clear();
2606 	    if (acting_primary)
2607 	      *acting_primary = -1;
2608 	    return;
2609 	  }
2610 	  vector<int> raw;
2611 	  vector<int> _up;
2612 	  vector<int> _acting;
2613 	  int _up_primary;
2614 	  int _acting_primary;
2615 	  ps_t pps;
2616 	  _get_temp_osds(*pool, pg, &_acting, &_acting_primary);
2617 	  if (_acting.empty() || up || up_primary) {
2618 	    _pg_to_raw_osds(*pool, pg, &raw, &pps);
2619 	    _apply_upmap(*pool, pg, &raw);
2620 	    _raw_to_up_osds(*pool, raw, &_up);
2621 	    _up_primary = _pick_primary(_up);
2622 	    _apply_primary_affinity(pps, *pool, &_up, &_up_primary);
2623 	    if (_acting.empty()) {
2624 	      _acting = _up;
2625 	      if (_acting_primary == -1) {
2626 	        _acting_primary = _up_primary;
2627 	      }
2628 	    }
2629 	  
2630 	    if (up)
2631 	      up->swap(_up);
2632 	    if (up_primary)
2633 	      *up_primary = _up_primary;
2634 	  }
2635 	
2636 	  if (acting)
2637 	    acting->swap(_acting);
2638 	  if (acting_primary)
2639 	    *acting_primary = _acting_primary;
2640 	}
2641 	
2642 	int OSDMap::calc_pg_rank(int osd, const vector<int>& acting, int nrep)
2643 	{
2644 	  if (!nrep)
2645 	    nrep = acting.size();
2646 	  for (int i=0; i<nrep; i++) 
2647 	    if (acting[i] == osd)
2648 	      return i;
2649 	  return -1;
2650 	}
2651 	
2652 	int OSDMap::calc_pg_role(int osd, const vector<int>& acting, int nrep)
2653 	{
2654 	  return calc_pg_rank(osd, acting, nrep);
2655 	}
2656 	
2657 	bool OSDMap::primary_changed(
2658 	  int oldprimary,
2659 	  const vector<int> &oldacting,
2660 	  int newprimary,
2661 	  const vector<int> &newacting)
2662 	{
2663 	  if (oldacting.empty() && newacting.empty())
2664 	    return false;    // both still empty
2665 	  if (oldacting.empty() ^ newacting.empty())
2666 	    return true;     // was empty, now not, or vice versa
2667 	  if (oldprimary != newprimary)
2668 	    return true;     // primary changed
2669 	  if (calc_pg_rank(oldprimary, oldacting) !=
2670 	      calc_pg_rank(newprimary, newacting))
2671 	    return true;
2672 	  return false;      // same primary (tho replicas may have changed)
2673 	}
2674 	
2675 	uint64_t OSDMap::get_encoding_features() const
2676 	{
2677 	  uint64_t f = SIGNIFICANT_FEATURES;
2678 	  if (require_osd_release < ceph_release_t::octopus) {
2679 	    f &= ~CEPH_FEATURE_SERVER_OCTOPUS;
2680 	  }
2681 	  if (require_osd_release < ceph_release_t::nautilus) {
2682 	    f &= ~CEPH_FEATURE_SERVER_NAUTILUS;
2683 	  }
2684 	  if (require_osd_release < ceph_release_t::mimic) {
2685 	    f &= ~CEPH_FEATURE_SERVER_MIMIC;
2686 	  }
2687 	  if (require_osd_release < ceph_release_t::luminous) {
2688 	    f &= ~(CEPH_FEATURE_SERVER_LUMINOUS |
2689 		   CEPH_FEATURE_CRUSH_CHOOSE_ARGS);
2690 	  }
2691 	  if (require_osd_release < ceph_release_t::kraken) {
2692 	    f &= ~(CEPH_FEATURE_SERVER_KRAKEN |
2693 		   CEPH_FEATURE_MSG_ADDR2);
2694 	  }
2695 	  if (require_osd_release < ceph_release_t::jewel) {
2696 	    f &= ~(CEPH_FEATURE_SERVER_JEWEL |
2697 		   CEPH_FEATURE_NEW_OSDOP_ENCODING |
2698 		   CEPH_FEATURE_CRUSH_TUNABLES5);
2699 	  }
2700 	  return f;
2701 	}
2702 	
2703 	// serialize, unserialize
2704 	void OSDMap::encode_client_old(ceph::buffer::list& bl) const
2705 	{
2706 	  using ceph::encode;
2707 	  __u16 v = 5;
2708 	  encode(v, bl);
2709 	
2710 	  // base
2711 	  encode(fsid, bl);
2712 	  encode(epoch, bl);
2713 	  encode(created, bl);
2714 	  encode(modified, bl);
2715 	
2716 	  // for encode(pools, bl);
2717 	  __u32 n = pools.size();
2718 	  encode(n, bl);
2719 	
2720 	  for (const auto &pool : pools) {
2721 	    n = pool.first;
2722 	    encode(n, bl);
2723 	    encode(pool.second, bl, 0);
2724 	  }
2725 	  // for encode(pool_name, bl);
2726 	  n = pool_name.size();
2727 	  encode(n, bl);
2728 	  for (const auto &pname : pool_name) {
2729 	    n = pname.first;
2730 	    encode(n, bl);
2731 	    encode(pname.second, bl);
2732 	  }
2733 	  // for encode(pool_max, bl);
2734 	  n = pool_max;
2735 	  encode(n, bl);
2736 	
2737 	  encode(flags, bl);
2738 	
2739 	  encode(max_osd, bl);
2740 	  {
2741 	    uint32_t n = osd_state.size();
2742 	    encode(n, bl);
2743 	    for (auto s : osd_state) {
2744 	      encode((uint8_t)s, bl);
2745 	    }
2746 	  }
2747 	  encode(osd_weight, bl);
2748 	  encode(osd_addrs->client_addrs, bl, 0);
2749 	
2750 	  // for encode(pg_temp, bl);
2751 	  n = pg_temp->size();
2752 	  encode(n, bl);
2753 	  for (const auto pg : *pg_temp) {
2754 	    old_pg_t opg = pg.first.get_old_pg();
2755 	    encode(opg, bl);
2756 	    encode(pg.second, bl);
2757 	  }
2758 	
2759 	  // crush
2760 	  ceph::buffer::list cbl;
2761 	  crush->encode(cbl, 0 /* legacy (no) features */);
2762 	  encode(cbl, bl);
2763 	}
2764 	
2765 	void OSDMap::encode_classic(ceph::buffer::list& bl, uint64_t features) const
2766 	{
2767 	  using ceph::encode;
2768 	  if ((features & CEPH_FEATURE_PGID64) == 0) {
2769 	    encode_client_old(bl);
2770 	    return;
2771 	  }
2772 	
2773 	  __u16 v = 6;
2774 	  encode(v, bl);
2775 	
2776 	  // base
2777 	  encode(fsid, bl);
2778 	  encode(epoch, bl);
2779 	  encode(created, bl);
2780 	  encode(modified, bl);
2781 	
2782 	  encode(pools, bl, features);
2783 	  encode(pool_name, bl);
2784 	  encode(pool_max, bl);
2785 	
2786 	  encode(flags, bl);
2787 	
2788 	  encode(max_osd, bl);
2789 	  {
2790 	    uint32_t n = osd_state.size();
2791 	    encode(n, bl);
2792 	    for (auto s : osd_state) {
2793 	      encode((uint8_t)s, bl);
2794 	    }
2795 	  }
2796 	  encode(osd_weight, bl);
2797 	  encode(osd_addrs->client_addrs, bl, features);
2798 	
2799 	  encode(*pg_temp, bl);
2800 	
2801 	  // crush
2802 	  ceph::buffer::list cbl;
2803 	  crush->encode(cbl, 0 /* legacy (no) features */);
2804 	  encode(cbl, bl);
2805 	
2806 	  // extended
2807 	  __u16 ev = 10;
2808 	  encode(ev, bl);
2809 	  encode(osd_addrs->hb_back_addrs, bl, features);
2810 	  encode(osd_info, bl);
2811 	  encode(blacklist, bl, features);
2812 	  encode(osd_addrs->cluster_addrs, bl, features);
2813 	  encode(cluster_snapshot_epoch, bl);
2814 	  encode(cluster_snapshot, bl);
2815 	  encode(*osd_uuid, bl);
2816 	  encode(osd_xinfo, bl, features);
2817 	  encode(osd_addrs->hb_front_addrs, bl, features);
2818 	}
2819 	
2820 	/* for a description of osdmap versions, and when they were introduced, please
2821 	 * refer to
2822 	 *    doc/dev/osd_internals/osdmap_versions.txt
2823 	 */
2824 	void OSDMap::encode(ceph::buffer::list& bl, uint64_t features) const
2825 	{
2826 	  using ceph::encode;
(1) Event cond_false: Condition "(features & 549755813888UL /* CEPH_FEATURE_OSDMAP_ENC */) == 0", taking false branch.
2827 	  if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
2828 	    encode_classic(bl, features);
2829 	    return;
(2) Event if_end: End of if statement.
2830 	  }
2831 	
2832 	  // only a select set of callers should *ever* be encoding new
2833 	  // OSDMaps.  others should be passing around the canonical encoded
2834 	  // buffers from on high.  select out those callers by passing in an
2835 	  // "impossible" feature bit.
(3) Event cond_true: Condition "features & 4611686018427387904UL /* CEPH_FEATURE_RESERVED */", taking true branch.
2836 	  ceph_assert(features & CEPH_FEATURE_RESERVED);
2837 	  features &= ~CEPH_FEATURE_RESERVED;
2838 	
2839 	  size_t start_offset = bl.length();
2840 	  size_t tail_offset;
2841 	  size_t crc_offset;
2842 	  std::optional<ceph::buffer::list::contiguous_filler> crc_filler;
2843 	
2844 	  // meta-encoding: how we include client-used and osd-specific data
2845 	  ENCODE_START(8, 7, bl);
2846 	
2847 	  {
2848 	    // NOTE: any new encoding dependencies must be reflected by
2849 	    // SIGNIFICANT_FEATURES
2850 	    uint8_t v = 9;
(4) Event cond_true: Condition "!((features & 144115188077953024UL /* CEPH_FEATUREMASK_SERVER_LUMINOUS */) == 144115188077953024UL /* CEPH_FEATUREMASK_SERVER_LUMINOUS */)", taking true branch.
2851 	    if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
2852 	      v = 3;
(5) Event if_fallthrough: Falling through to end of if statement.
2853 	    } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
2854 	      v = 6;
2855 	    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
2856 	      v = 7;
(6) Event if_end: End of if statement.
2857 	    }
2858 	    ENCODE_START(v, 1, bl); // client-usable data
2859 	    // base
2860 	    encode(fsid, bl);
2861 	    encode(epoch, bl);
2862 	    encode(created, bl);
2863 	    encode(modified, bl);
2864 	
2865 	    encode(pools, bl, features);
2866 	    encode(pool_name, bl);
2867 	    encode(pool_max, bl);
2868 	
(7) Event cond_true: Condition "v < 4", taking true branch.
2869 	    if (v < 4) {
2870 	      decltype(flags) f = flags;
(8) Event cond_false: Condition "this->require_osd_release >= ceph_release_t::luminous", taking false branch.
2871 	      if (require_osd_release >= ceph_release_t::luminous)
2872 		f |= CEPH_OSDMAP_REQUIRE_LUMINOUS | CEPH_OSDMAP_RECOVERY_DELETES;
(9) Event else_branch: Reached else branch.
(10) Event cond_true: Condition "this->require_osd_release == ceph_release_t::kraken", taking true branch.
2873 	      else if (require_osd_release == ceph_release_t::kraken)
(11) Event if_fallthrough: Falling through to end of if statement.
2874 		f |= CEPH_OSDMAP_REQUIRE_KRAKEN;
2875 	      else if (require_osd_release == ceph_release_t::jewel)
(12) Event if_end: End of if statement.
2876 		f |= CEPH_OSDMAP_REQUIRE_JEWEL;
2877 	      encode(f, bl);
(13) Event if_fallthrough: Falling through to end of if statement.
2878 	    } else {
2879 	      encode(flags, bl);
(14) Event if_end: End of if statement.
2880 	    }
2881 	
2882 	    encode(max_osd, bl);
(15) Event cond_false: Condition "v >= 5", taking false branch.
2883 	    if (v >= 5) {
2884 	      encode(osd_state, bl);
(16) Event else_branch: Reached else branch.
2885 	    } else {
2886 	      uint32_t n = osd_state.size();
2887 	      encode(n, bl);
(17) Event for_loop: Iterating over another element of "this->osd_state".
2888 	      for (auto s : osd_state) {
(18) Event overrun-buffer-val: Overrunning buffer pointed to by "__u8 const((uint8_t)s)" of 1 bytes by passing it to a function which accesses it at byte offset 7. [details]
2889 		encode((uint8_t)s, bl);
2890 	      }
2891 	    }
2892 	    encode(osd_weight, bl);
2893 	    if (v >= 8) {
2894 	      encode(osd_addrs->client_addrs, bl, features);
2895 	    } else {
2896 	      encode_addrvec_pvec_as_addr(osd_addrs->client_addrs, bl, features);
2897 	    }
2898 	
2899 	    encode(*pg_temp, bl);
2900 	    encode(*primary_temp, bl);
2901 	    if (osd_primary_affinity) {
2902 	      encode(*osd_primary_affinity, bl);
2903 	    } else {
2904 	      vector<__u32> v;
2905 	      encode(v, bl);
2906 	    }
2907 	
2908 	    // crush
2909 	    ceph::buffer::list cbl;
2910 	    crush->encode(cbl, features);
2911 	    encode(cbl, bl);
2912 	    encode(erasure_code_profiles, bl);
2913 	
2914 	    if (v >= 4) {
2915 	      encode(pg_upmap, bl);
2916 	      encode(pg_upmap_items, bl);
2917 	    } else {
2918 	      ceph_assert(pg_upmap.empty());
2919 	      ceph_assert(pg_upmap_items.empty());
2920 	    }
2921 	    if (v >= 6) {
2922 	      encode(crush_version, bl);
2923 	    }
2924 	    if (v >= 7) {
2925 	      encode(new_removed_snaps, bl);
2926 	      encode(new_purged_snaps, bl);
2927 	    }
2928 	    if (v >= 9) {
2929 	      encode(last_up_change, bl);
2930 	      encode(last_in_change, bl);
2931 	    }
2932 	    ENCODE_FINISH(bl); // client-usable data
2933 	  }
2934 	
2935 	  {
2936 	    // NOTE: any new encoding dependencies must be reflected by
2937 	    // SIGNIFICANT_FEATURES
2938 	    uint8_t target_v = 9;
2939 	    if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
2940 	      target_v = 1;
2941 	    } else if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
2942 	      target_v = 5;
2943 	    } else if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
2944 	      target_v = 6;
2945 	    }
2946 	    ENCODE_START(target_v, 1, bl); // extended, osd-only data
2947 	    if (target_v < 7) {
2948 	      encode_addrvec_pvec_as_addr(osd_addrs->hb_back_addrs, bl, features);
2949 	    } else {
2950 	      encode(osd_addrs->hb_back_addrs, bl, features);
2951 	    }
2952 	    encode(osd_info, bl);
2953 	    {
2954 	      // put this in a sorted, ordered map<> so that we encode in a
2955 	      // deterministic order.
2956 	      map<entity_addr_t,utime_t> blacklist_map;
2957 	      for (const auto &addr : blacklist)
2958 		blacklist_map.insert(make_pair(addr.first, addr.second));
2959 	      encode(blacklist_map, bl, features);
2960 	    }
2961 	    if (target_v < 7) {
2962 	      encode_addrvec_pvec_as_addr(osd_addrs->cluster_addrs, bl, features);
2963 	    } else {
2964 	      encode(osd_addrs->cluster_addrs, bl, features);
2965 	    }
2966 	    encode(cluster_snapshot_epoch, bl);
2967 	    encode(cluster_snapshot, bl);
2968 	    encode(*osd_uuid, bl);
2969 	    encode(osd_xinfo, bl, features);
2970 	    if (target_v < 7) {
2971 	      encode_addrvec_pvec_as_addr(osd_addrs->hb_front_addrs, bl, features);
2972 	    } else {
2973 	      encode(osd_addrs->hb_front_addrs, bl, features);
2974 	    }
2975 	    if (target_v >= 2) {
2976 	      encode(nearfull_ratio, bl);
2977 	      encode(full_ratio, bl);
2978 	      encode(backfillfull_ratio, bl);
2979 	    }
2980 	    // 4 was string-based new_require_min_compat_client
2981 	    if (target_v >= 5) {
2982 	      encode(require_min_compat_client, bl);
2983 	      encode(require_osd_release, bl);
2984 	    }
2985 	    if (target_v >= 6) {
2986 	      encode(removed_snaps_queue, bl);
2987 	    }
2988 	    if (target_v >= 8) {
2989 	      encode(crush_node_flags, bl);
2990 	    }
2991 	    if (target_v >= 9) {
2992 	      encode(device_class_flags, bl);
2993 	    }
2994 	    ENCODE_FINISH(bl); // osd-only data
2995 	  }
2996 	
2997 	  crc_offset = bl.length();
2998 	  crc_filler = bl.append_hole(sizeof(uint32_t));
2999 	  tail_offset = bl.length();
3000 	
3001 	  ENCODE_FINISH(bl); // meta-encoding wrapper
3002 	
3003 	  // fill in crc
3004 	  ceph::buffer::list front;
3005 	  front.substr_of(bl, start_offset, crc_offset - start_offset);
3006 	  crc = front.crc32c(-1);
3007 	  if (tail_offset < bl.length()) {
3008 	    ceph::buffer::list tail;
3009 	    tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
3010 	    crc = tail.crc32c(crc);
3011 	  }
3012 	  ceph_le32 crc_le;
3013 	  crc_le = crc;
3014 	  crc_filler->copy_in(4, (char*)&crc_le);
3015 	  crc_defined = true;
3016 	}
3017 	
3018 	/* for a description of osdmap versions, and when they were introduced, please
3019 	 * refer to
3020 	 *    doc/dev/osd_internals/osdmap_versions.txt
3021 	 */
3022 	void OSDMap::decode(ceph::buffer::list& bl)
3023 	{
3024 	  auto p = bl.cbegin();
3025 	  decode(p);
3026 	}
3027 	
3028 	void OSDMap::decode_classic(ceph::buffer::list::const_iterator& p)
3029 	{
3030 	  using ceph::decode;
3031 	  __u32 n, t;
3032 	  __u16 v;
3033 	  decode(v, p);
3034 	
3035 	  // base
3036 	  decode(fsid, p);
3037 	  decode(epoch, p);
3038 	  decode(created, p);
3039 	  decode(modified, p);
3040 	
3041 	  if (v < 6) {
3042 	    if (v < 4) {
3043 	      int32_t max_pools = 0;
3044 	      decode(max_pools, p);
3045 	      pool_max = max_pools;
3046 	    }
3047 	    pools.clear();
3048 	    decode(n, p);
3049 	    while (n--) {
3050 	      decode(t, p);
3051 	      decode(pools[t], p);
3052 	    }
3053 	    if (v == 4) {
3054 	      decode(n, p);
3055 	      pool_max = n;
3056 	    } else if (v == 5) {
3057 	      pool_name.clear();
3058 	      decode(n, p);
3059 	      while (n--) {
3060 		decode(t, p);
3061 		decode(pool_name[t], p);
3062 	      }
3063 	      decode(n, p);
3064 	      pool_max = n;
3065 	    }
3066 	  } else {
3067 	    decode(pools, p);
3068 	    decode(pool_name, p);
3069 	    decode(pool_max, p);
3070 	  }
3071 	  // kludge around some old bug that zeroed out pool_max (#2307)
3072 	  if (pools.size() && pool_max < pools.rbegin()->first) {
3073 	    pool_max = pools.rbegin()->first;
3074 	  }
3075 	
3076 	  decode(flags, p);
3077 	
3078 	  decode(max_osd, p);
3079 	  {
3080 	    vector<uint8_t> os;
3081 	    decode(os, p);
3082 	    osd_state.resize(os.size());
3083 	    for (unsigned i = 0; i < os.size(); ++i) {
3084 	      osd_state[i] = os[i];
3085 	    }
3086 	  }
3087 	  decode(osd_weight, p);
3088 	  decode(osd_addrs->client_addrs, p);
3089 	  if (v <= 5) {
3090 	    pg_temp->clear();
3091 	    decode(n, p);
3092 	    while (n--) {
3093 	      old_pg_t opg;
3094 	      ceph::decode_raw(opg, p);
3095 	      mempool::osdmap::vector<int32_t> v;
3096 	      decode(v, p);
3097 	      pg_temp->set(pg_t(opg), v);
3098 	    }
3099 	  } else {
3100 	    decode(*pg_temp, p);
3101 	  }
3102 	
3103 	  // crush
3104 	  ceph::buffer::list cbl;
3105 	  decode(cbl, p);
3106 	  auto cblp = cbl.cbegin();
3107 	  crush->decode(cblp);
3108 	
3109 	  // extended
3110 	  __u16 ev = 0;
3111 	  if (v >= 5)
3112 	    decode(ev, p);
3113 	  decode(osd_addrs->hb_back_addrs, p);
3114 	  decode(osd_info, p);
3115 	  if (v < 5)
3116 	    decode(pool_name, p);
3117 	
3118 	  decode(blacklist, p);
3119 	  if (ev >= 6)
3120 	    decode(osd_addrs->cluster_addrs, p);
3121 	  else
3122 	    osd_addrs->cluster_addrs.resize(osd_addrs->client_addrs.size());
3123 	
3124 	  if (ev >= 7) {
3125 	    decode(cluster_snapshot_epoch, p);
3126 	    decode(cluster_snapshot, p);
3127 	  }
3128 	
3129 	  if (ev >= 8) {
3130 	    decode(*osd_uuid, p);
3131 	  } else {
3132 	    osd_uuid->resize(max_osd);
3133 	  }
3134 	  if (ev >= 9)
3135 	    decode(osd_xinfo, p);
3136 	  else
3137 	    osd_xinfo.resize(max_osd);
3138 	
3139 	  if (ev >= 10)
3140 	    decode(osd_addrs->hb_front_addrs, p);
3141 	  else
3142 	    osd_addrs->hb_front_addrs.resize(osd_addrs->hb_back_addrs.size());
3143 	
3144 	  osd_primary_affinity.reset();
3145 	
3146 	  post_decode();
3147 	}
3148 	
3149 	void OSDMap::decode(ceph::buffer::list::const_iterator& bl)
3150 	{
3151 	  using ceph::decode;
3152 	  /**
3153 	   * Older encodings of the OSDMap had a single struct_v which
3154 	   * covered the whole encoding, and was prior to our modern
3155 	   * stuff which includes a compatv and a size. So if we see
3156 	   * a struct_v < 7, we must rewind to the beginning and use our
3157 	   * classic decoder.
3158 	   */
3159 	  size_t start_offset = bl.get_off();
3160 	  size_t tail_offset = 0;
3161 	  ceph::buffer::list crc_front, crc_tail;
3162 	
3163 	  DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
3164 	  if (struct_v < 7) {
3165 	    bl.seek(start_offset);
3166 	    decode_classic(bl);
3167 	    return;
3168 	  }
3169 	  /**
3170 	   * Since we made it past that hurdle, we can use our normal paths.
3171 	   */
3172 	  {
3173 	    DECODE_START(9, bl); // client-usable data
3174 	    // base
3175 	    decode(fsid, bl);
3176 	    decode(epoch, bl);
3177 	    decode(created, bl);
3178 	    decode(modified, bl);
3179 	
3180 	    decode(pools, bl);
3181 	    decode(pool_name, bl);
3182 	    decode(pool_max, bl);
3183 	
3184 	    decode(flags, bl);
3185 	
3186 	    decode(max_osd, bl);
3187 	    if (struct_v >= 5) {
3188 	      decode(osd_state, bl);
3189 	    } else {
3190 	      vector<uint8_t> os;
3191 	      decode(os, bl);
3192 	      osd_state.resize(os.size());
3193 	      for (unsigned i = 0; i < os.size(); ++i) {
3194 		osd_state[i] = os[i];
3195 	      }
3196 	    }
3197 	    decode(osd_weight, bl);
3198 	    decode(osd_addrs->client_addrs, bl);
3199 	
3200 	    decode(*pg_temp, bl);
3201 	    decode(*primary_temp, bl);
3202 	    // dates back to firefly. version increased from 2 to 3 still in firefly.
3203 	    // do we really still need to keep this around? even for old clients?
3204 	    if (struct_v >= 2) {
3205 	      osd_primary_affinity.reset(new mempool::osdmap::vector<__u32>);
3206 	      decode(*osd_primary_affinity, bl);
3207 	      if (osd_primary_affinity->empty())
3208 		osd_primary_affinity.reset();
3209 	    } else {
3210 	      osd_primary_affinity.reset();
3211 	    }
3212 	
3213 	    // crush
3214 	    ceph::buffer::list cbl;
3215 	    decode(cbl, bl);
3216 	    auto cblp = cbl.cbegin();
3217 	    crush->decode(cblp);
3218 	    // added in firefly; version increased in luminous, so it affects
3219 	    // giant, hammer, infernallis, jewel, and kraken. probably should be left
3220 	    // alone until we require clients to be all luminous?
3221 	    if (struct_v >= 3) {
3222 	      decode(erasure_code_profiles, bl);
3223 	    } else {
3224 	      erasure_code_profiles.clear();
3225 	    }
3226 	    // version increased from 3 to 4 still in luminous, so same as above
3227 	    // applies.
3228 	    if (struct_v >= 4) {
3229 	      decode(pg_upmap, bl);
3230 	      decode(pg_upmap_items, bl);
3231 	    } else {
3232 	      pg_upmap.clear();
3233 	      pg_upmap_items.clear();
3234 	    }
3235 	    // again, version increased from 5 to 6 still in luminous, so above
3236 	    // applies.
3237 	    if (struct_v >= 6) {
3238 	      decode(crush_version, bl);
3239 	    }
3240 	    // version increase from 6 to 7 in mimic
3241 	    if (struct_v >= 7) {
3242 	      decode(new_removed_snaps, bl);
3243 	      decode(new_purged_snaps, bl);
3244 	    }
3245 	    // version increase from 7 to 8, 8 to 9, in nautilus.
3246 	    if (struct_v >= 9) {
3247 	      decode(last_up_change, bl);
3248 	      decode(last_in_change, bl);
3249 	    }
3250 	    DECODE_FINISH(bl); // client-usable data
3251 	  }
3252 	
3253 	  {
3254 	    DECODE_START(9, bl); // extended, osd-only data
3255 	    decode(osd_addrs->hb_back_addrs, bl);
3256 	    decode(osd_info, bl);
3257 	    decode(blacklist, bl);
3258 	    decode(osd_addrs->cluster_addrs, bl);
3259 	    decode(cluster_snapshot_epoch, bl);
3260 	    decode(cluster_snapshot, bl);
3261 	    decode(*osd_uuid, bl);
3262 	    decode(osd_xinfo, bl);
3263 	    decode(osd_addrs->hb_front_addrs, bl);
3264 	    // 
3265 	    if (struct_v >= 2) {
3266 	      decode(nearfull_ratio, bl);
3267 	      decode(full_ratio, bl);
3268 	    } else {
3269 	      nearfull_ratio = 0;
3270 	      full_ratio = 0;
3271 	    }
3272 	    if (struct_v >= 3) {
3273 	      decode(backfillfull_ratio, bl);
3274 	    } else {
3275 	      backfillfull_ratio = 0;
3276 	    }
3277 	    if (struct_v == 4) {
3278 	      string r;
3279 	      decode(r, bl);
3280 	      if (r.length())
3281 		require_min_compat_client = ceph_release_from_name(r.c_str());
3282 	    }
3283 	    if (struct_v >= 5) {
3284 	      decode(require_min_compat_client, bl);
3285 	      decode(require_osd_release, bl);
3286 	      if (require_osd_release >= ceph_release_t::nautilus) {
3287 		flags |= CEPH_OSDMAP_PGLOG_HARDLIMIT;
3288 	      }
3289 	      if (require_osd_release >= ceph_release_t::luminous) {
3290 		flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
3291 		flags |= CEPH_OSDMAP_RECOVERY_DELETES;
3292 	      }
3293 	    } else {
3294 	      if (flags & CEPH_OSDMAP_REQUIRE_LUMINOUS) {
3295 		// only for compat with post-kraken pre-luminous test clusters
3296 		require_osd_release = ceph_release_t::luminous;
3297 		flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
3298 		flags |= CEPH_OSDMAP_RECOVERY_DELETES;
3299 	      } else if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
3300 		require_osd_release = ceph_release_t::kraken;
3301 	      } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
3302 		require_osd_release = ceph_release_t::jewel;
3303 	      } else {
3304 		require_osd_release = ceph_release_t::unknown;
3305 	      }
3306 	    }
3307 	    if (struct_v >= 6) {
3308 	      decode(removed_snaps_queue, bl);
3309 	    }
3310 	    if (struct_v >= 8) {
3311 	      decode(crush_node_flags, bl);
3312 	    } else {
3313 	      crush_node_flags.clear();
3314 	    }
3315 	    if (struct_v >= 9) {
3316 	      decode(device_class_flags, bl);
3317 	    } else {
3318 	      device_class_flags.clear();
3319 	    }
3320 	    DECODE_FINISH(bl); // osd-only data
3321 	  }
3322 	
3323 	  if (struct_v >= 8) {
3324 	    crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
3325 	    decode(crc, bl);
3326 	    tail_offset = bl.get_off();
3327 	    crc_defined = true;
3328 	  } else {
3329 	    crc_defined = false;
3330 	    crc = 0;
3331 	  }
3332 	
3333 	  DECODE_FINISH(bl); // wrapper
3334 	
3335 	  if (tail_offset) {
3336 	    // verify crc
3337 	    uint32_t actual = crc_front.crc32c(-1);
3338 	    if (tail_offset < bl.get_off()) {
3339 	      ceph::buffer::list tail;
3340 	      tail.substr_of(bl.get_bl(), tail_offset, bl.get_off() - tail_offset);
3341 	      actual = tail.crc32c(actual);
3342 	    }
3343 	    if (crc != actual) {
3344 	      ostringstream ss;
3345 	      ss << "bad crc, actual " << actual << " != expected " << crc;
3346 	      string s = ss.str();
3347 	      throw ceph::buffer::malformed_input(s.c_str());
3348 	    }
3349 	  }
3350 	
3351 	  post_decode();
3352 	}
3353 	
3354 	void OSDMap::post_decode()
3355 	{
3356 	  // index pool names
3357 	  name_pool.clear();
3358 	  for (const auto &pname : pool_name) {
3359 	    name_pool[pname.second] = pname.first;
3360 	  }
3361 	
3362 	  calc_num_osds();
3363 	  _calc_up_osd_features();
3364 	}
3365 	
3366 	void OSDMap::dump_erasure_code_profiles(
3367 	  const mempool::osdmap::map<string,map<string,string>>& profiles,
3368 	  Formatter *f)
3369 	{
3370 	  f->open_object_section("erasure_code_profiles");
3371 	  for (const auto &profile : profiles) {
3372 	    f->open_object_section(profile.first.c_str());
3373 	    for (const auto &profm : profile.second) {
3374 	      f->dump_string(profm.first.c_str(), profm.second.c_str());
3375 	    }
3376 	    f->close_section();
3377 	  }
3378 	  f->close_section();
3379 	}
3380 	
3381 	void OSDMap::dump_osds(Formatter *f) const
3382 	{
3383 	  f->open_array_section("osds");
3384 	  for (int i=0; i<get_max_osd(); i++) {
3385 	    if (exists(i)) {
3386 	      dump_osd(i, f);
3387 	    }
3388 	  }
3389 	  f->close_section();
3390 	}
3391 	
3392 	void OSDMap::dump_osd(int id, Formatter *f) const
3393 	{
3394 	  ceph_assert(f != nullptr);
3395 	  if (!exists(id)) {
3396 	    return;
3397 	  }
3398 	
3399 	  f->open_object_section("osd_info");
3400 	  f->dump_int("osd", id);
3401 	  f->dump_stream("uuid") << get_uuid(id);
3402 	  f->dump_int("up", is_up(id));
3403 	  f->dump_int("in", is_in(id));
3404 	  f->dump_float("weight", get_weightf(id));
3405 	  f->dump_float("primary_affinity", get_primary_affinityf(id));
3406 	  get_info(id).dump(f);
3407 	  f->dump_object("public_addrs", get_addrs(id));
3408 	  f->dump_object("cluster_addrs", get_cluster_addrs(id));
3409 	  f->dump_object("heartbeat_back_addrs", get_hb_back_addrs(id));
3410 	  f->dump_object("heartbeat_front_addrs", get_hb_front_addrs(id));
3411 	  // compat
3412 	  f->dump_stream("public_addr") << get_addrs(id).get_legacy_str();
3413 	  f->dump_stream("cluster_addr") << get_cluster_addrs(id).get_legacy_str();
3414 	  f->dump_stream("heartbeat_back_addr")
3415 	    << get_hb_back_addrs(id).get_legacy_str();
3416 	  f->dump_stream("heartbeat_front_addr")
3417 	    << get_hb_front_addrs(id).get_legacy_str();
3418 	
3419 	  set<string> st;
3420 	  get_state(id, st);
3421 	  f->open_array_section("state");
3422 	  for (const auto &state : st)
3423 	    f->dump_string("state", state);
3424 	  f->close_section();
3425 	
3426 	  f->close_section();
3427 	}
3428 	
3429 	void OSDMap::dump(Formatter *f) const
3430 	{
3431 	  f->dump_int("epoch", get_epoch());
3432 	  f->dump_stream("fsid") << get_fsid();
3433 	  f->dump_stream("created") << get_created();
3434 	  f->dump_stream("modified") << get_modified();
3435 	  f->dump_stream("last_up_change") << last_up_change;
3436 	  f->dump_stream("last_in_change") << last_in_change;
3437 	  f->dump_string("flags", get_flag_string());
3438 	  f->dump_unsigned("flags_num", flags);
3439 	  f->open_array_section("flags_set");
3440 	  set<string> flagset;
3441 	  get_flag_set(&flagset);
3442 	  for (auto p : flagset) {
3443 	    f->dump_string("flag", p);
3444 	  }
3445 	  f->close_section();
3446 	  f->dump_unsigned("crush_version", get_crush_version());
3447 	  f->dump_float("full_ratio", full_ratio);
3448 	  f->dump_float("backfillfull_ratio", backfillfull_ratio);
3449 	  f->dump_float("nearfull_ratio", nearfull_ratio);
3450 	  f->dump_string("cluster_snapshot", get_cluster_snapshot());
3451 	  f->dump_int("pool_max", get_pool_max());
3452 	  f->dump_int("max_osd", get_max_osd());
3453 	  f->dump_string("require_min_compat_client",
3454 			 ceph::to_string(require_min_compat_client));
3455 	  f->dump_string("min_compat_client",
3456 			 ceph::to_string(get_min_compat_client()));
3457 	  f->dump_string("require_osd_release",
3458 			 ceph::to_string(require_osd_release));
3459 	
3460 	  f->open_array_section("pools");
3461 	  for (const auto &pool : pools) {
3462 	    std::string name("<unknown>");
3463 	    const auto &pni = pool_name.find(pool.first);
3464 	    if (pni != pool_name.end())
3465 	      name = pni->second;
3466 	    f->open_object_section("pool");
3467 	    f->dump_int("pool", pool.first);
3468 	    f->dump_string("pool_name", name);
3469 	    pool.second.dump(f);
3470 	    f->close_section();
3471 	  }
3472 	  f->close_section();
3473 	
3474 	  dump_osds(f);
3475 	
3476 	  f->open_array_section("osd_xinfo");
3477 	  for (int i=0; i<get_max_osd(); i++) {
3478 	    if (exists(i)) {
3479 	      f->open_object_section("xinfo");
3480 	      f->dump_int("osd", i);
3481 	      osd_xinfo[i].dump(f);
3482 	      f->close_section();
3483 	    }
3484 	  }
3485 	  f->close_section();
3486 	
3487 	  f->open_array_section("pg_upmap");
3488 	  for (auto& p : pg_upmap) {
3489 	    f->open_object_section("mapping");
3490 	    f->dump_stream("pgid") << p.first;
3491 	    f->open_array_section("osds");
3492 	    for (auto q : p.second) {
3493 	      f->dump_int("osd", q);
3494 	    }
3495 	    f->close_section();
3496 	    f->close_section();
3497 	  }
3498 	  f->close_section();
3499 	  f->open_array_section("pg_upmap_items");
3500 	  for (auto& p : pg_upmap_items) {
3501 	    f->open_object_section("mapping");
3502 	    f->dump_stream("pgid") << p.first;
3503 	    f->open_array_section("mappings");
3504 	    for (auto& q : p.second) {
3505 	      f->open_object_section("mapping");
3506 	      f->dump_int("from", q.first);
3507 	      f->dump_int("to", q.second);
3508 	      f->close_section();
3509 	    }
3510 	    f->close_section();
3511 	    f->close_section();
3512 	  }
3513 	  f->close_section();
3514 	  f->open_array_section("pg_temp");
3515 	  pg_temp->dump(f);
3516 	  f->close_section();
3517 	
3518 	  f->open_array_section("primary_temp");
3519 	  for (const auto &pg : *primary_temp) {
3520 	    f->dump_stream("pgid") << pg.first;
3521 	    f->dump_int("osd", pg.second);
3522 	  }
3523 	  f->close_section(); // primary_temp
3524 	
3525 	  f->open_object_section("blacklist");
3526 	  for (const auto &addr : blacklist) {
3527 	    stringstream ss;
3528 	    ss << addr.first;
3529 	    f->dump_stream(ss.str().c_str()) << addr.second;
3530 	  }
3531 	  f->close_section();
3532 	
3533 	  dump_erasure_code_profiles(erasure_code_profiles, f);
3534 	
3535 	  f->open_array_section("removed_snaps_queue");
3536 	  for (auto& p : removed_snaps_queue) {
3537 	    f->open_object_section("pool");
3538 	    f->dump_int("pool", p.first);
3539 	    f->open_array_section("snaps");
3540 	    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
3541 	      f->open_object_section("interval");
3542 	      f->dump_unsigned("begin", q.get_start());
3543 	      f->dump_unsigned("length", q.get_len());
3544 	      f->close_section();
3545 	    }
3546 	    f->close_section();
3547 	    f->close_section();
3548 	  }
3549 	  f->close_section();
3550 	  f->open_array_section("new_removed_snaps");
3551 	  for (auto& p : new_removed_snaps) {
3552 	    f->open_object_section("pool");
3553 	    f->dump_int("pool", p.first);
3554 	    f->open_array_section("snaps");
3555 	    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
3556 	      f->open_object_section("interval");
3557 	      f->dump_unsigned("begin", q.get_start());
3558 	      f->dump_unsigned("length", q.get_len());
3559 	      f->close_section();
3560 	    }
3561 	    f->close_section();
3562 	    f->close_section();
3563 	  }
3564 	  f->close_section();
3565 	  f->open_array_section("new_purged_snaps");
3566 	  for (auto& p : new_purged_snaps) {
3567 	    f->open_object_section("pool");
3568 	    f->dump_int("pool", p.first);
3569 	    f->open_array_section("snaps");
3570 	    for (auto q = p.second.begin(); q != p.second.end(); ++q) {
3571 	      f->open_object_section("interval");
3572 	      f->dump_unsigned("begin", q.get_start());
3573 	      f->dump_unsigned("length", q.get_len());
3574 	      f->close_section();
3575 	    }
3576 	    f->close_section();
3577 	    f->close_section();
3578 	  }
3579 	  f->close_section();
3580 	  f->open_object_section("crush_node_flags");
3581 	  for (auto& i : crush_node_flags) {
3582 	    string s = crush->item_exists(i.first) ? crush->get_item_name(i.first)
3583 	      : stringify(i.first);
3584 	    f->open_array_section(s.c_str());
3585 	    set<string> st;
3586 	    calc_state_set(i.second, st);
3587 	    for (auto& j : st) {
3588 	      f->dump_string("flag", j);
3589 	    }
3590 	    f->close_section();
3591 	  }
3592 	  f->close_section();
3593 	  f->open_object_section("device_class_flags");
3594 	  for (auto& i : device_class_flags) {
3595 	    const char* class_name = crush->get_class_name(i.first);
3596 	    string s = class_name ? class_name : stringify(i.first);
3597 	    f->open_array_section(s.c_str());
3598 	    set<string> st;
3599 	    calc_state_set(i.second, st);
3600 	    for (auto& j : st) {
3601 	      f->dump_string("flag", j);
3602 	    }
3603 	    f->close_section();
3604 	  }
3605 	  f->close_section();
3606 	}
3607 	
3608 	void OSDMap::generate_test_instances(list<OSDMap*>& o)
3609 	{
3610 	  o.push_back(new OSDMap);
3611 	
3612 	  CephContext *cct = new CephContext(CODE_ENVIRONMENT_UTILITY);
3613 	  o.push_back(new OSDMap);
3614 	  uuid_d fsid;
3615 	  o.back()->build_simple(cct, 1, fsid, 16);
3616 	  o.back()->created = o.back()->modified = utime_t(1, 2);  // fix timestamp
3617 	  o.back()->blacklist[entity_addr_t()] = utime_t(5, 6);
3618 	  cct->put();
3619 	}
3620 	
3621 	string OSDMap::get_flag_string(unsigned f)
3622 	{
3623 	  string s;
3624 	  if (f & CEPH_OSDMAP_PAUSERD)
3625 	    s += ",pauserd";
3626 	  if (f & CEPH_OSDMAP_PAUSEWR)
3627 	    s += ",pausewr";
3628 	  if (f & CEPH_OSDMAP_PAUSEREC)
3629 	    s += ",pauserec";
3630 	  if (f & CEPH_OSDMAP_NOUP)
3631 	    s += ",noup";
3632 	  if (f & CEPH_OSDMAP_NODOWN)
3633 	    s += ",nodown";
3634 	  if (f & CEPH_OSDMAP_NOOUT)
3635 	    s += ",noout";
3636 	  if (f & CEPH_OSDMAP_NOIN)
3637 	    s += ",noin";
3638 	  if (f & CEPH_OSDMAP_NOBACKFILL)
3639 	    s += ",nobackfill";
3640 	  if (f & CEPH_OSDMAP_NOREBALANCE)
3641 	    s += ",norebalance";
3642 	  if (f & CEPH_OSDMAP_NORECOVER)
3643 	    s += ",norecover";
3644 	  if (f & CEPH_OSDMAP_NOSCRUB)
3645 	    s += ",noscrub";
3646 	  if (f & CEPH_OSDMAP_NODEEP_SCRUB)
3647 	    s += ",nodeep-scrub";
3648 	  if (f & CEPH_OSDMAP_NOTIERAGENT)
3649 	    s += ",notieragent";
3650 	  if (f & CEPH_OSDMAP_NOSNAPTRIM)
3651 	    s += ",nosnaptrim";
3652 	  if (f & CEPH_OSDMAP_SORTBITWISE)
3653 	    s += ",sortbitwise";
3654 	  if (f & CEPH_OSDMAP_REQUIRE_JEWEL)
3655 	    s += ",require_jewel_osds";
3656 	  if (f & CEPH_OSDMAP_REQUIRE_KRAKEN)
3657 	    s += ",require_kraken_osds";
3658 	  if (f & CEPH_OSDMAP_REQUIRE_LUMINOUS)
3659 	    s += ",require_luminous_osds";
3660 	  if (f & CEPH_OSDMAP_RECOVERY_DELETES)
3661 	    s += ",recovery_deletes";
3662 	  if (f & CEPH_OSDMAP_PURGED_SNAPDIRS)
3663 	    s += ",purged_snapdirs";
3664 	  if (f & CEPH_OSDMAP_PGLOG_HARDLIMIT)
3665 	    s += ",pglog_hardlimit";
3666 	  if (s.length())
3667 	    s.erase(0, 1);
3668 	  return s;
3669 	}
3670 	
3671 	string OSDMap::get_flag_string() const
3672 	{
3673 	  return get_flag_string(flags);
3674 	}
3675 	
3676 	void OSDMap::print_pools(ostream& out) const
3677 	{
3678 	  for (const auto &pool : pools) {
3679 	    std::string name("<unknown>");
3680 	    const auto &pni = pool_name.find(pool.first);
3681 	    if (pni != pool_name.end())
3682 	      name = pni->second;
3683 	    out << "pool " << pool.first
3684 		<< " '" << name
3685 		<< "' " << pool.second << "\n";
3686 	
3687 	    for (const auto &snap : pool.second.snaps)
3688 	      out << "\tsnap " << snap.second.snapid << " '" << snap.second.name << "' " << snap.second.stamp << "\n";
3689 	
3690 	    if (!pool.second.removed_snaps.empty())
3691 	      out << "\tremoved_snaps " << pool.second.removed_snaps << "\n";
3692 	    auto p = removed_snaps_queue.find(pool.first);
3693 	    if (p != removed_snaps_queue.end()) {
3694 	      out << "\tremoved_snaps_queue " << p->second << "\n";
3695 	    }
3696 	  }
3697 	  out << std::endl;
3698 	}
3699 	
3700 	void OSDMap::print_osds(ostream& out) const
3701 	{
3702 	  for (int i=0; i<get_max_osd(); i++) {
3703 	    if (exists(i)) {
3704 	      print_osd(i, out);
3705 	    }
3706 	  }
3707 	}
3708 	void OSDMap::print_osd(int id, ostream& out) const
3709 	{
3710 	  if (!exists(id)) {
3711 	    return;
3712 	  }
3713 	
3714 	  out << "osd." << id;
3715 	  out << (is_up(id) ? " up  ":" down");
3716 	  out << (is_in(id) ? " in ":" out");
3717 	  out << " weight " << get_weightf(id);
3718 	  if (get_primary_affinity(id) != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
3719 	    out << " primary_affinity " << get_primary_affinityf(id);
3720 	  }
3721 	  const osd_info_t& info(get_info(id));
3722 	  out << " " << info;
3723 	  out << " " << get_addrs(id) << " " << get_cluster_addrs(id);
3724 	  set<string> st;
3725 	  get_state(id, st);
3726 	  out << " " << st;
3727 	  if (!get_uuid(id).is_zero()) {
3728 	    out << " " << get_uuid(id);
3729 	  }
3730 	  out << "\n";
3731 	}
3732 	
3733 	void OSDMap::print(ostream& out) const
3734 	{
3735 	  out << "epoch " << get_epoch() << "\n"
3736 	      << "fsid " << get_fsid() << "\n"
3737 	      << "created " << get_created() << "\n"
3738 	      << "modified " << get_modified() << "\n";
3739 	
3740 	  out << "flags " << get_flag_string() << "\n";
3741 	  out << "crush_version " << get_crush_version() << "\n";
3742 	  out << "full_ratio " << full_ratio << "\n";
3743 	  out << "backfillfull_ratio " << backfillfull_ratio << "\n";
3744 	  out << "nearfull_ratio " << nearfull_ratio << "\n";
3745 	  if (require_min_compat_client != ceph_release_t::unknown) {
3746 	    out << "require_min_compat_client "
3747 		<< require_min_compat_client << "\n";
3748 	  }
3749 	  out << "min_compat_client " << get_min_compat_client()
3750 	      << "\n";
3751 	  if (require_osd_release > ceph_release_t::unknown) {
3752 	    out << "require_osd_release " << require_osd_release
3753 		<< "\n";
3754 	  }
3755 	  if (get_cluster_snapshot().length())
3756 	    out << "cluster_snapshot " << get_cluster_snapshot() << "\n";
3757 	  out << "\n";
3758 	
3759 	  print_pools(out);
3760 	
3761 	  out << "max_osd " << get_max_osd() << "\n";
3762 	  print_osds(out);
3763 	  out << std::endl;
3764 	
3765 	  for (auto& p : pg_upmap) {
3766 	    out << "pg_upmap " << p.first << " " << p.second << "\n";
3767 	  }
3768 	  for (auto& p : pg_upmap_items) {
3769 	    out << "pg_upmap_items " << p.first << " " << p.second << "\n";
3770 	  }
3771 	
3772 	  for (const auto pg : *pg_temp)
3773 	    out << "pg_temp " << pg.first << " " << pg.second << "\n";
3774 	
3775 	  for (const auto pg : *primary_temp)
3776 	    out << "primary_temp " << pg.first << " " << pg.second << "\n";
3777 	
3778 	  for (const auto &addr : blacklist)
3779 	    out << "blacklist " << addr.first << " expires " << addr.second << "\n";
3780 	}
3781 	
3782 	class OSDTreePlainDumper : public CrushTreeDumper::Dumper<TextTable> {
3783 	public:
3784 	  typedef CrushTreeDumper::Dumper<TextTable> Parent;
3785 	
3786 	  OSDTreePlainDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
3787 			     unsigned f)
3788 	    : Parent(crush, osdmap_->get_pool_names()), osdmap(osdmap_), filter(f) { }
3789 	
3790 	  bool should_dump_leaf(int i) const override {
3791 	    if (!filter) {
3792 	      return true; // normal case
3793 	    }
3794 	    if (((filter & OSDMap::DUMP_UP) && osdmap->is_up(i)) ||
3795 		((filter & OSDMap::DUMP_DOWN) && osdmap->is_down(i)) ||
3796 		((filter & OSDMap::DUMP_IN) && osdmap->is_in(i)) ||
3797 		((filter & OSDMap::DUMP_OUT) && osdmap->is_out(i)) ||
3798 	        ((filter & OSDMap::DUMP_DESTROYED) && osdmap->is_destroyed(i))) {
3799 	      return true;
3800 	    }
3801 	    return false;
3802 	  }
3803 	
3804 	  bool should_dump_empty_bucket() const override {
3805 	    return !filter;
3806 	  }
3807 	
3808 	  void init_table(TextTable *tbl) {
3809 	    tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
3810 	    tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
3811 	    tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
3812 	    tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
3813 	    tbl->define_column("STATUS", TextTable::LEFT, TextTable::RIGHT);
3814 	    tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
3815 	    tbl->define_column("PRI-AFF", TextTable::LEFT, TextTable::RIGHT);
3816 	  }
3817 	  void dump(TextTable *tbl, string& bucket) {
3818 	    init_table(tbl);
3819 	
3820 	    if (!bucket.empty()) {
3821 	      set_root(bucket);
3822 	      Parent::dump(tbl);
3823 	    } else {
3824 	      Parent::dump(tbl);
3825 	      for (int i = 0; i < osdmap->get_max_osd(); i++) {
3826 		if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i)) {
3827 		  dump_item(CrushTreeDumper::Item(i, 0, 0, 0), tbl);
3828 		}
3829 	      }
3830 	    }
3831 	  }
3832 	
3833 	protected:
3834 	  void dump_item(const CrushTreeDumper::Item &qi, TextTable *tbl) override {
3835 	    const char *c = crush->get_item_class(qi.id);
3836 	    if (!c)
3837 	      c = "";
3838 	    *tbl << qi.id
3839 		 << c
3840 		 << weightf_t(qi.weight);
3841 	
3842 	    ostringstream name;
3843 	    for (int k = 0; k < qi.depth; k++)
3844 	      name << "    ";
3845 	    if (qi.is_bucket()) {
3846 	      name << crush->get_type_name(crush->get_bucket_type(qi.id)) << " "
3847 		   << crush->get_item_name(qi.id);
3848 	    } else {
3849 	      name << "osd." << qi.id;
3850 	    }
3851 	    *tbl << name.str();
3852 	
3853 	    if (!qi.is_bucket()) {
3854 	      if (!osdmap->exists(qi.id)) {
3855 		*tbl << "DNE"
3856 		     << 0;
3857 	      } else {
3858 	        string s;
3859 	        if (osdmap->is_up(qi.id)) {
3860 	          s = "up";
3861 	        } else if (osdmap->is_destroyed(qi.id)) {
3862 	          s = "destroyed";
3863 	        } else {
3864 	          s = "down";
3865 	        }
3866 		*tbl << s
3867 		     << weightf_t(osdmap->get_weightf(qi.id))
3868 		     << weightf_t(osdmap->get_primary_affinityf(qi.id));
3869 	      }
3870 	    }
3871 	    *tbl << TextTable::endrow;
3872 	  }
3873 	
3874 	private:
3875 	  const OSDMap *osdmap;
3876 	  const unsigned filter;
3877 	};
3878 	
3879 	class OSDTreeFormattingDumper : public CrushTreeDumper::FormattingDumper {
3880 	public:
3881 	  typedef CrushTreeDumper::FormattingDumper Parent;
3882 	
3883 	  OSDTreeFormattingDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
3884 				  unsigned f)
3885 	    : Parent(crush, osdmap_->get_pool_names()), osdmap(osdmap_), filter(f) { }
3886 	
3887 	  bool should_dump_leaf(int i) const override {
3888 	    if (!filter) {
3889 	      return true; // normal case
3890 	    }
3891 	    if (((filter & OSDMap::DUMP_UP) && osdmap->is_up(i)) ||
3892 	        ((filter & OSDMap::DUMP_DOWN) && osdmap->is_down(i)) ||
3893 	        ((filter & OSDMap::DUMP_IN) && osdmap->is_in(i)) ||
3894 	        ((filter & OSDMap::DUMP_OUT) && osdmap->is_out(i)) ||
3895 	        ((filter & OSDMap::DUMP_DESTROYED) && osdmap->is_destroyed(i))) {
3896 	      return true;
3897 	    }
3898 	    return false;
3899 	  }
3900 	
3901 	  bool should_dump_empty_bucket() const override {
3902 	    return !filter;
3903 	  }
3904 	
3905 	  void dump(Formatter *f, string& bucket) {
3906 	    if (!bucket.empty()) {
3907 	      set_root(bucket);
3908 	      f->open_array_section("nodes");
3909 	      Parent::dump(f);
3910 	      f->close_section();
3911 	    } else {
3912 	      f->open_array_section("nodes");
3913 	      Parent::dump(f);
3914 	      f->close_section();
3915 	      f->open_array_section("stray");
3916 	      for (int i = 0; i < osdmap->get_max_osd(); i++) {
3917 		if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i))
3918 		  dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
3919 	      }
3920 	      f->close_section();
3921 	    }
3922 	  }
3923 	
3924 	protected:
3925 	  void dump_item_fields(const CrushTreeDumper::Item &qi, Formatter *f) override {
3926 	    Parent::dump_item_fields(qi, f);
3927 	    if (!qi.is_bucket())
3928 	    {
3929 	      string s;
3930 	      if (osdmap->is_up(qi.id)) {
3931 	        s = "up";
3932 	      } else if (osdmap->is_destroyed(qi.id)) {
3933 	        s = "destroyed";
3934 	      } else {
3935 	        s = "down";
3936 	      }
3937 	      f->dump_unsigned("exists", (int)osdmap->exists(qi.id));
3938 	      f->dump_string("status", s);
3939 	      f->dump_float("reweight", osdmap->get_weightf(qi.id));
3940 	      f->dump_float("primary_affinity", osdmap->get_primary_affinityf(qi.id));
3941 	    }
3942 	  }
3943 	
3944 	private:
3945 	  const OSDMap *osdmap;
3946 	  const unsigned filter;
3947 	};
3948 	
3949 	void OSDMap::print_tree(Formatter *f, ostream *out, unsigned filter, string bucket) const
3950 	{
3951 	  if (f) {
3952 	    OSDTreeFormattingDumper(crush.get(), this, filter).dump(f, bucket);
3953 	  } else {
3954 	    ceph_assert(out);
3955 	    TextTable tbl;
3956 	    OSDTreePlainDumper(crush.get(), this, filter).dump(&tbl, bucket);
3957 	    *out << tbl;
3958 	  }
3959 	}
3960 	
3961 	void OSDMap::print_summary(Formatter *f, ostream& out,
3962 				   const string& prefix, bool extra) const
3963 	{
3964 	  if (f) {
3965 	    f->dump_int("epoch", get_epoch());
3966 	    f->dump_int("num_osds", get_num_osds());
3967 	    f->dump_int("num_up_osds", get_num_up_osds());
3968 	    f->dump_int("num_in_osds", get_num_in_osds());
3969 	    f->dump_unsigned("num_remapped_pgs", get_num_pg_temp());
3970 	  } else {
3971 	    utime_t now = ceph_clock_now();
3972 	    out << get_num_osds() << " osds: "
3973 		<< get_num_up_osds() << " up";
3974 	    if (last_up_change != utime_t()) {
3975 	      out << " (since " << utimespan_str(now - last_up_change) << ")";
3976 	    }
3977 	    out << ", " << get_num_in_osds() << " in";
3978 	    if (last_in_change != utime_t()) {
3979 	      out << " (since " << utimespan_str(now - last_in_change) << ")";
3980 	    }
3981 	    if (extra)
3982 	      out << "; epoch: e" << get_epoch();
3983 	    if (get_num_pg_temp())
3984 	      out << "; " << get_num_pg_temp() << " remapped pgs";
3985 	    out << "\n";
3986 	    uint64_t important_flags = flags & ~CEPH_OSDMAP_SEMIHIDDEN_FLAGS;
3987 	    if (important_flags)
3988 	      out << prefix << "flags " << get_flag_string(important_flags) << "\n";
3989 	  }
3990 	}
3991 	
3992 	void OSDMap::print_oneline_summary(ostream& out) const
3993 	{
3994 	  out << "e" << get_epoch() << ": "
3995 	      << get_num_osds() << " total, "
3996 	      << get_num_up_osds() << " up, "
3997 	      << get_num_in_osds() << " in";
3998 	}
3999 	
4000 	bool OSDMap::crush_rule_in_use(int rule_id) const
4001 	{
4002 	  for (const auto &pool : pools) {
4003 	    if (pool.second.crush_rule == rule_id)
4004 	      return true;
4005 	  }
4006 	  return false;
4007 	}
4008 	
4009 	int OSDMap::validate_crush_rules(CrushWrapper *newcrush,
4010 					 ostream *ss) const
4011 	{
4012 	  for (auto& i : pools) {
4013 	    auto& pool = i.second;
4014 	    int ruleno = pool.get_crush_rule();
4015 	    if (!newcrush->rule_exists(ruleno)) {
4016 	      *ss << "pool " << i.first << " references crush_rule " << ruleno
4017 		  << " but it is not present";
4018 	      return -EINVAL;
4019 	    }
4020 	    if (newcrush->get_rule_mask_ruleset(ruleno) != ruleno) {
4021 	      *ss << "rule " << ruleno << " mask ruleset does not match rule id";
4022 	      return -EINVAL;
4023 	    }
4024 	    if (newcrush->get_rule_mask_type(ruleno) != (int)pool.get_type()) {
4025 	      *ss << "pool " << i.first << " type does not match rule " << ruleno;
4026 	      return -EINVAL;
4027 	    }
4028 	    int poolsize = pool.get_size();
4029 	    if (poolsize < newcrush->get_rule_mask_min_size(ruleno) ||
4030 		poolsize > newcrush->get_rule_mask_max_size(ruleno)) {
4031 	      *ss << "pool " << i.first << " size " << poolsize << " does not"
4032 		  << " fall within rule " << ruleno
4033 		  << " min_size " << newcrush->get_rule_mask_min_size(ruleno)
4034 		  << " and max_size " << newcrush->get_rule_mask_max_size(ruleno);
4035 	      return -EINVAL;
4036 	    }
4037 	  }
4038 	  return 0;
4039 	}
4040 	
4041 	int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
4042 					  int nosd, int pg_bits, int pgp_bits,
4043 					  bool default_pool)
4044 	{
4045 	  ldout(cct, 10) << "build_simple on " << nosd
4046 			 << " osds" << dendl;
4047 	  epoch = e;
4048 	  set_fsid(fsid);
4049 	  created = modified = ceph_clock_now();
4050 	
4051 	  if (nosd >=  0) {
4052 	    set_max_osd(nosd);
4053 	  } else {
4054 	    // count osds
4055 	    int maxosd = 0;
4056 	    const auto& conf = cct->_conf;
4057 	    vector<string> sections;
4058 	    conf.get_all_sections(sections);
4059 	
4060 	    for (auto &section : sections) {
4061 	      if (section.find("osd.") != 0)
4062 		continue;
4063 	
4064 	      const char *begin = section.c_str() + 4;
4065 	      char *end = (char*)begin;
4066 	      int o = strtol(begin, &end, 10);
4067 	      if (*end != '\0')
4068 		continue;
4069 	
4070 	      if (o > cct->_conf->mon_max_osd) {
4071 		lderr(cct) << "[osd." << o << "] in config has id > mon_max_osd " << cct->_conf->mon_max_osd << dendl;
4072 		return -ERANGE;
4073 	      }
4074 	
4075 	      if (o > maxosd)
4076 		maxosd = o;
4077 	    }
4078 	
4079 	    set_max_osd(maxosd + 1);
4080 	  }
4081 	
4082 	
4083 	  stringstream ss;
4084 	  int r;
4085 	  if (nosd >= 0)
4086 	    r = build_simple_crush_map(cct, *crush, nosd, &ss);
4087 	  else
4088 	    r = build_simple_crush_map_from_conf(cct, *crush, &ss);
4089 	  ceph_assert(r == 0);
4090 	
4091 	  int poolbase = get_max_osd() ? get_max_osd() : 1;
4092 	
4093 	  const int default_replicated_rule = crush->get_osd_pool_default_crush_replicated_ruleset(cct);
4094 	  ceph_assert(default_replicated_rule >= 0);
4095 	
4096 	  if (default_pool) {
4097 	    // pgp_num <= pg_num
4098 	    if (pgp_bits > pg_bits)
4099 	      pgp_bits = pg_bits;
4100 	
4101 	    vector<string> pool_names;
4102 	    pool_names.push_back("rbd");
4103 	    for (auto &plname : pool_names) {
4104 	      int64_t pool = ++pool_max;
4105 	      pools[pool].type = pg_pool_t::TYPE_REPLICATED;
4106 	      pools[pool].flags = cct->_conf->osd_pool_default_flags;
4107 	      if (cct->_conf->osd_pool_default_flag_hashpspool)
4108 		pools[pool].set_flag(pg_pool_t::FLAG_HASHPSPOOL);
4109 	      if (cct->_conf->osd_pool_default_flag_nodelete)
4110 		pools[pool].set_flag(pg_pool_t::FLAG_NODELETE);
4111 	      if (cct->_conf->osd_pool_default_flag_nopgchange)
4112 		pools[pool].set_flag(pg_pool_t::FLAG_NOPGCHANGE);
4113 	      if (cct->_conf->osd_pool_default_flag_nosizechange)
4114 		pools[pool].set_flag(pg_pool_t::FLAG_NOSIZECHANGE);
4115 	      pools[pool].size = cct->_conf.get_val<uint64_t>("osd_pool_default_size");
4116 	      pools[pool].min_size = cct->_conf.get_osd_pool_default_min_size(
4117 	                                 pools[pool].size);
4118 	      pools[pool].crush_rule = default_replicated_rule;
4119 	      pools[pool].object_hash = CEPH_STR_HASH_RJENKINS;
4120 	      pools[pool].set_pg_num(poolbase << pg_bits);
4121 	      pools[pool].set_pgp_num(poolbase << pgp_bits);
4122 	      pools[pool].set_pg_num_target(poolbase << pg_bits);
4123 	      pools[pool].set_pgp_num_target(poolbase << pgp_bits);
4124 	      pools[pool].last_change = epoch;
4125 	      pools[pool].application_metadata.insert(
4126 	        {pg_pool_t::APPLICATION_NAME_RBD, {}});
4127 	      if (auto m = pg_pool_t::get_pg_autoscale_mode_by_name(
4128 	            cct->_conf.get_val<string>("osd_pool_default_pg_autoscale_mode"));
4129 		  m != pg_pool_t::pg_autoscale_mode_t::UNKNOWN) {
4130 		pools[pool].pg_autoscale_mode = m;
4131 	      } else {
4132 		pools[pool].pg_autoscale_mode = pg_pool_t::pg_autoscale_mode_t::OFF;
4133 	      }
4134 	      pool_name[pool] = plname;
4135 	      name_pool[plname] = pool;
4136 	    }
4137 	  }
4138 	
4139 	  for (int i=0; i<get_max_osd(); i++) {
4140 	    set_state(i, 0);
4141 	    set_weight(i, CEPH_OSD_OUT);
4142 	  }
4143 	
4144 	  map<string,string> profile_map;
4145 	  r = get_erasure_code_profile_default(cct, profile_map, &ss);
4146 	  if (r < 0) {
4147 	    lderr(cct) << ss.str() << dendl;
4148 	    return r;
4149 	  }
4150 	  set_erasure_code_profile("default", profile_map);
4151 	  return 0;
4152 	}
4153 	
4154 	int OSDMap::get_erasure_code_profile_default(CephContext *cct,
4155 						     map<string,string> &profile_map,
4156 						     ostream *ss)
4157 	{
4158 	  int r = get_json_str_map(cct->_conf.get_val<string>("osd_pool_default_erasure_code_profile"),
4159 			      *ss,
4160 			      &profile_map);
4161 	  return r;
4162 	}
4163 	
4164 	int OSDMap::_build_crush_types(CrushWrapper& crush)
4165 	{
4166 	  crush.set_type_name(0, "osd");
4167 	  crush.set_type_name(1, "host");
4168 	  crush.set_type_name(2, "chassis");
4169 	  crush.set_type_name(3, "rack");
4170 	  crush.set_type_name(4, "row");
4171 	  crush.set_type_name(5, "pdu");
4172 	  crush.set_type_name(6, "pod");
4173 	  crush.set_type_name(7, "room");
4174 	  crush.set_type_name(8, "datacenter");
4175 	  crush.set_type_name(9, "zone");
4176 	  crush.set_type_name(10, "region");
4177 	  crush.set_type_name(11, "root");
4178 	  return 11;
4179 	}
4180 	
4181 	int OSDMap::build_simple_crush_map(CephContext *cct, CrushWrapper& crush,
4182 					   int nosd, ostream *ss)
4183 	{
4184 	  crush.create();
4185 	
4186 	  // root
4187 	  int root_type = _build_crush_types(crush);
4188 	  int rootid;
4189 	  int r = crush.add_bucket(0, 0, CRUSH_HASH_DEFAULT,
4190 				   root_type, 0, NULL, NULL, &rootid);
4191 	  ceph_assert(r == 0);
4192 	  crush.set_item_name(rootid, "default");
4193 	
4194 	  for (int o=0; o<nosd; o++) {
4195 	    map<string,string> loc;
4196 	    loc["host"] = "localhost";
4197 	    loc["rack"] = "localrack";
4198 	    loc["root"] = "default";
4199 	    ldout(cct, 10) << " adding osd." << o << " at " << loc << dendl;
4200 	    char name[32];
4201 	    snprintf(name, sizeof(name), "osd.%d", o);
4202 	    crush.insert_item(cct, o, 1.0, name, loc);
4203 	  }
4204 	
4205 	  build_simple_crush_rules(cct, crush, "default", ss);
4206 	
4207 	  crush.finalize();
4208 	
4209 	  return 0;
4210 	}
4211 	
4212 	int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
4213 						     CrushWrapper& crush,
4214 						     ostream *ss)
4215 	{
4216 	  const auto& conf = cct->_conf;
4217 	
4218 	  crush.create();
4219 	
4220 	  // root
4221 	  int root_type = _build_crush_types(crush);
4222 	  int rootid;
4223 	  int r = crush.add_bucket(0, 0,
4224 				   CRUSH_HASH_DEFAULT,
4225 				   root_type, 0, NULL, NULL, &rootid);
4226 	  ceph_assert(r == 0);
4227 	  crush.set_item_name(rootid, "default");
4228 	
4229 	  // add osds
4230 	  vector<string> sections;
4231 	  conf.get_all_sections(sections);
4232 	
4233 	  for (auto &section : sections) {
4234 	    if (section.find("osd.") != 0)
4235 	      continue;
4236 	
4237 	    const char *begin = section.c_str() + 4;
4238 	    char *end = (char*)begin;
4239 	    int o = strtol(begin, &end, 10);
4240 	    if (*end != '\0')
4241 	      continue;
4242 	
4243 	    string host, rack, row, room, dc, pool;
4244 	    vector<string> sectiontmp;
4245 	    sectiontmp.push_back("osd");
4246 	    sectiontmp.push_back(section);
4247 	    conf.get_val_from_conf_file(sectiontmp, "host", host, false);
4248 	    conf.get_val_from_conf_file(sectiontmp, "rack", rack, false);
4249 	    conf.get_val_from_conf_file(sectiontmp, "row", row, false);
4250 	    conf.get_val_from_conf_file(sectiontmp, "room", room, false);
4251 	    conf.get_val_from_conf_file(sectiontmp, "datacenter", dc, false);
4252 	    conf.get_val_from_conf_file(sectiontmp, "root", pool, false);
4253 	
4254 	    if (host.length() == 0)
4255 	      host = "unknownhost";
4256 	    if (rack.length() == 0)
4257 	      rack = "unknownrack";
4258 	
4259 	    map<string,string> loc;
4260 	    loc["host"] = host;
4261 	    loc["rack"] = rack;
4262 	    if (row.size())
4263 	      loc["row"] = row;
4264 	    if (room.size())
4265 	      loc["room"] = room;
4266 	    if (dc.size())
4267 	      loc["datacenter"] = dc;
4268 	    loc["root"] = "default";
4269 	
4270 	    ldout(cct, 5) << " adding osd." << o << " at " << loc << dendl;
4271 	    crush.insert_item(cct, o, 1.0, section, loc);
4272 	  }
4273 	
4274 	  build_simple_crush_rules(cct, crush, "default", ss);
4275 	
4276 	  crush.finalize();
4277 	
4278 	  return 0;
4279 	}
4280 	
4281 	
4282 	int OSDMap::build_simple_crush_rules(
4283 	  CephContext *cct,
4284 	  CrushWrapper& crush,
4285 	  const string& root,
4286 	  ostream *ss)
4287 	{
4288 	  int crush_rule = crush.get_osd_pool_default_crush_replicated_ruleset(cct);
4289 	  string failure_domain =
4290 	    crush.get_type_name(cct->_conf->osd_crush_chooseleaf_type);
4291 	
4292 	  int r;
4293 	  r = crush.add_simple_rule_at(
4294 	    "replicated_rule", root, failure_domain, "",
4295 	    "firstn", pg_pool_t::TYPE_REPLICATED,
4296 	    crush_rule, ss);
4297 	  if (r < 0)
4298 	    return r;
4299 	  // do not add an erasure rule by default or else we will implicitly
4300 	  // require the crush_v2 feature of clients
4301 	  return 0;
4302 	}
4303 	
4304 	int OSDMap::summarize_mapping_stats(
4305 	  OSDMap *newmap,
4306 	  const set<int64_t> *pools,
4307 	  std::string *out,
4308 	  Formatter *f) const
4309 	{
4310 	  set<int64_t> ls;
4311 	  if (pools) {
4312 	    ls = *pools;
4313 	  } else {
4314 	    for (auto &p : get_pools())
4315 	      ls.insert(p.first);
4316 	  }
4317 	
4318 	  unsigned total_pg = 0;
4319 	  unsigned moved_pg = 0;
4320 	  vector<unsigned> base_by_osd(get_max_osd(), 0);
4321 	  vector<unsigned> new_by_osd(get_max_osd(), 0);
4322 	  for (int64_t pool_id : ls) {
4323 	    const pg_pool_t *pi = get_pg_pool(pool_id);
4324 	    vector<int> up, up2;
4325 	    int up_primary;
4326 	    for (unsigned ps = 0; ps < pi->get_pg_num(); ++ps) {
4327 	      pg_t pgid(ps, pool_id);
4328 	      total_pg += pi->get_size();
4329 	      pg_to_up_acting_osds(pgid, &up, &up_primary, nullptr, nullptr);
4330 	      for (int osd : up) {
4331 		if (osd >= 0 && osd < get_max_osd())
4332 		  ++base_by_osd[osd];
4333 	      }
4334 	      if (newmap) {
4335 		newmap->pg_to_up_acting_osds(pgid, &up2, &up_primary, nullptr, nullptr);
4336 		for (int osd : up2) {
4337 		  if (osd >= 0 && osd < get_max_osd())
4338 		    ++new_by_osd[osd];
4339 		}
4340 		if (pi->type == pg_pool_t::TYPE_ERASURE) {
4341 		  for (unsigned i=0; i<up.size(); ++i) {
4342 		    if (up[i] != up2[i]) {
4343 		      ++moved_pg;
4344 		    }
4345 		  }
4346 		} else if (pi->type == pg_pool_t::TYPE_REPLICATED) {
4347 		  for (int osd : up) {
4348 		    if (std::find(up2.begin(), up2.end(), osd) == up2.end()) {
4349 		      ++moved_pg;
4350 		    }
4351 		  }
4352 		} else {
4353 		  ceph_abort_msg("unhandled pool type");
4354 		}
4355 	      }
4356 	    }
4357 	  }
4358 	
4359 	  unsigned num_up_in = 0;
4360 	  for (int osd = 0; osd < get_max_osd(); ++osd) {
4361 	    if (is_up(osd) && is_in(osd))
4362 	      ++num_up_in;
4363 	  }
4364 	  if (!num_up_in) {
4365 	    return -EINVAL;
4366 	  }
4367 	
4368 	  float avg_pg = (float)total_pg / (float)num_up_in;
4369 	  float base_stddev = 0, new_stddev = 0;
4370 	  int min = -1, max = -1;
4371 	  unsigned min_base_pg = 0, max_base_pg = 0;
4372 	  unsigned min_new_pg = 0, max_new_pg = 0;
4373 	  for (int osd = 0; osd < get_max_osd(); ++osd) {
4374 	    if (is_up(osd) && is_in(osd)) {
4375 	      float base_diff = (float)base_by_osd[osd] - avg_pg;
4376 	      base_stddev += base_diff * base_diff;
4377 	      float new_diff = (float)new_by_osd[osd] - avg_pg;
4378 	      new_stddev += new_diff * new_diff;
4379 	      if (min < 0 || base_by_osd[osd] < min_base_pg) {
4380 		min = osd;
4381 		min_base_pg = base_by_osd[osd];
4382 		min_new_pg = new_by_osd[osd];
4383 	      }
4384 	      if (max < 0 || base_by_osd[osd] > max_base_pg) {
4385 		max = osd;
4386 		max_base_pg = base_by_osd[osd];
4387 		max_new_pg = new_by_osd[osd];
4388 	      }
4389 	    }
4390 	  }
4391 	  base_stddev = sqrt(base_stddev / num_up_in);
4392 	  new_stddev = sqrt(new_stddev / num_up_in);
4393 	
4394 	  float edev = sqrt(avg_pg * (1.0 - (1.0 / (double)num_up_in)));
4395 	
4396 	  ostringstream ss;
4397 	  if (f)
4398 	    f->open_object_section("utilization");
4399 	  if (newmap) {
4400 	    if (f) {
4401 	      f->dump_unsigned("moved_pgs", moved_pg);
4402 	      f->dump_unsigned("total_pgs", total_pg);
4403 	    } else {
4404 	      float percent = 0;
4405 	      if (total_pg)
4406 	        percent = (float)moved_pg * 100.0 / (float)total_pg;
4407 	      ss << "moved " << moved_pg << " / " << total_pg
4408 		 << " (" << percent << "%)\n";
4409 	    }
4410 	  }
4411 	  if (f) {
4412 	    f->dump_float("avg_pgs", avg_pg);
4413 	    f->dump_float("std_dev", base_stddev);
4414 	    f->dump_float("expected_baseline_std_dev", edev);
4415 	    if (newmap)
4416 	      f->dump_float("new_std_dev", new_stddev);
4417 	  } else {
4418 	    ss << "avg " << avg_pg << "\n";
4419 	    ss << "stddev " << base_stddev;
4420 	    if (newmap)
4421 	      ss << " -> " << new_stddev;
4422 	    ss << " (expected baseline " << edev << ")\n";
4423 	  }
4424 	  if (min >= 0) {
4425 	    if (f) {
4426 	      f->dump_unsigned("min_osd", min);
4427 	      f->dump_unsigned("min_osd_pgs", min_base_pg);
4428 	      if (newmap)
4429 		f->dump_unsigned("new_min_osd_pgs", min_new_pg);
4430 	    } else {
4431 	      ss << "min osd." << min << " with " << min_base_pg;
4432 	      if (newmap)
4433 		ss << " -> " << min_new_pg;
4434 	      ss << " pgs (" << (float)min_base_pg / avg_pg;
4435 	      if (newmap)
4436 		ss << " -> " << (float)min_new_pg / avg_pg;
4437 	      ss << " * mean)\n";
4438 	    }
4439 	  }
4440 	  if (max >= 0) {
4441 	    if (f) {
4442 	      f->dump_unsigned("max_osd", max);
4443 	      f->dump_unsigned("max_osd_pgs", max_base_pg);
4444 	      if (newmap)
4445 		f->dump_unsigned("new_max_osd_pgs", max_new_pg);
4446 	    } else {
4447 	      ss << "max osd." << max << " with " << max_base_pg;
4448 	      if (newmap)
4449 		ss << " -> " << max_new_pg;
4450 	      ss << " pgs (" << (float)max_base_pg / avg_pg;
4451 	      if (newmap)
4452 		ss << " -> " << (float)max_new_pg / avg_pg;
4453 	      ss << " * mean)\n";
4454 	    }
4455 	  }
4456 	  if (f)
4457 	    f->close_section();
4458 	  if (out)
4459 	    *out = ss.str();
4460 	  return 0;
4461 	}
4462 	
4463 	bool OSDMap::try_pg_upmap(
4464 	  CephContext *cct,
4465 	  pg_t pg,                       ///< pg to potentially remap
4466 	  const set<int>& overfull,      ///< osds we'd want to evacuate
4467 	  const vector<int>& underfull,  ///< osds to move to, in order of preference
4468 	  vector<int> *orig,
4469 	  vector<int> *out)              ///< resulting alternative mapping
4470 	{
4471 	  const pg_pool_t *pool = get_pg_pool(pg.pool());
4472 	  if (!pool)
4473 	    return false;
4474 	  int rule = crush->find_rule(pool->get_crush_rule(), pool->get_type(),
4475 				      pool->get_size());
4476 	  if (rule < 0)
4477 	    return false;
4478 	
4479 	  // make sure there is something there to remap
4480 	  bool any = false;
4481 	  for (auto osd : *orig) {
4482 	    if (overfull.count(osd)) {
4483 	      any = true;
4484 	      break;
4485 	    }
4486 	  }
4487 	  if (!any) {
4488 	    return false;
4489 	  }
4490 	
4491 	  int r = crush->try_remap_rule(
4492 	    cct,
4493 	    rule,
4494 	    pool->get_size(),
4495 	    overfull, underfull,
4496 	    *orig,
4497 	    out);
4498 	  if (r < 0)
4499 	    return false;
4500 	  if (*out == *orig)
4501 	    return false;
4502 	  return true;
4503 	}
4504 	
4505 	int OSDMap::calc_pg_upmaps(
4506 	  CephContext *cct,
4507 	  float max_deviation_ratio,
4508 	  int max,
4509 	  const set<int64_t>& only_pools,
4510 	  OSDMap::Incremental *pending_inc)
4511 	{
4512 	  ldout(cct, 10) << __func__ << " pools " << only_pools << dendl;
4513 	  OSDMap tmp;
4514 	  tmp.deepish_copy_from(*this);
4515 	  int num_changed = 0;
4516 	  map<int,set<pg_t>> pgs_by_osd;
4517 	  int total_pgs = 0;
4518 	  float osd_weight_total = 0;
4519 	  map<int,float> osd_weight;
4520 	  for (auto& i : pools) {
4521 	    if (!only_pools.empty() && !only_pools.count(i.first))
4522 	      continue;
4523 	    for (unsigned ps = 0; ps < i.second.get_pg_num(); ++ps) {
4524 	      pg_t pg(ps, i.first);
4525 	      vector<int> up;
4526 	      tmp.pg_to_up_acting_osds(pg, &up, nullptr, nullptr, nullptr);
4527 	      ldout(cct, 20) << __func__ << " " << pg << " up " << up << dendl;
4528 	      for (auto osd : up) {
4529 	        if (osd != CRUSH_ITEM_NONE)
4530 		  pgs_by_osd[osd].insert(pg);
4531 	      }
4532 	    }
4533 	    total_pgs += i.second.get_size() * i.second.get_pg_num();
4534 	
4535 	    map<int,float> pmap;
4536 	    int ruleno = tmp.crush->find_rule(i.second.get_crush_rule(),
4537 					      i.second.get_type(),
4538 					      i.second.get_size());
4539 	    tmp.crush->get_rule_weight_osd_map(ruleno, &pmap);
4540 	    ldout(cct,20) << __func__ << " pool " << i.first
4541 	                  << " ruleno " << ruleno
4542 	                  << " weight-map " << pmap
4543 	                  << dendl;
4544 	    for (auto p : pmap) {
4545 	      auto adjusted_weight = tmp.get_weightf(p.first) * p.second;
4546 	      if (adjusted_weight == 0) {
4547 	        continue;
4548 	      }
4549 	      osd_weight[p.first] += adjusted_weight;
4550 	      osd_weight_total += adjusted_weight;
4551 	    }
4552 	  }
4553 	  for (auto& i : osd_weight) {
4554 	    int pgs = 0;
4555 	    auto p = pgs_by_osd.find(i.first);
4556 	    if (p != pgs_by_osd.end())
4557 		pgs = p->second.size();
4558 	    else
4559 		pgs_by_osd.emplace(i.first, set<pg_t>());
4560 	    ldout(cct, 20) << " osd." << i.first << " weight " << i.second
4561 			     << " pgs " << pgs << dendl;
4562 	  }
4563 	  if (osd_weight_total == 0) {
4564 	    lderr(cct) << __func__ << " abort due to osd_weight_total == 0" << dendl;
4565 	    return 0;
4566 	  }
4567 	  float pgs_per_weight = total_pgs / osd_weight_total;
4568 	  ldout(cct, 10) << " osd_weight_total " << osd_weight_total << dendl;
4569 	  ldout(cct, 10) << " pgs_per_weight " << pgs_per_weight << dendl;
4570 	
4571 	  if (max <= 0) {
4572 	    lderr(cct) << __func__ << " abort due to max <= 0" << dendl;
4573 	    return 0;
4574 	  }
4575 	  float decay_factor = 1.0 / float(max);
4576 	  float stddev = 0;
4577 	  map<int,float> osd_deviation;       // osd, deviation(pgs)
4578 	  multimap<float,int> deviation_osd;  // deviation(pgs), osd
4579 	  for (auto& i : pgs_by_osd) {
4580 	    // make sure osd is still there (belongs to this crush-tree)
4581 	    ceph_assert(osd_weight.count(i.first));
4582 	    float target = osd_weight[i.first] * pgs_per_weight;
4583 	    float deviation = (float)i.second.size() - target;
4584 	    ldout(cct, 20) << " osd." << i.first
4585 	                   << "\tpgs " << i.second.size()
4586 	                   << "\ttarget " << target
4587 	                   << "\tdeviation " << deviation
4588 	                   << dendl;
4589 	    osd_deviation[i.first] = deviation;
4590 	    deviation_osd.insert(make_pair(deviation, i.first));
4591 	    stddev += deviation * deviation;
4592 	  }
4593 	  if (stddev <= cct->_conf.get_val<double>("osd_calc_pg_upmaps_max_stddev")) {
4594 	    ldout(cct, 10) << __func__ << " distribution is almost perfect"
4595 	                   << dendl;
4596 	    return 0;
4597 	  }
4598 	  bool skip_overfull = false;
4599 	  auto aggressive =
4600 	    cct->_conf.get_val<bool>("osd_calc_pg_upmaps_aggressively");
4601 	  auto local_fallback_retries =
4602 	    cct->_conf.get_val<uint64_t>("osd_calc_pg_upmaps_local_fallback_retries");
4603 	  while (max--) {
4604 	    // build overfull and underfull
4605 	    set<int> overfull;
4606 	    vector<int> underfull;
4607 	    float decay = 0;
4608 	    int decay_count = 0;
4609 	    while (overfull.empty()) {
4610 	      for (auto i = deviation_osd.rbegin(); i != deviation_osd.rend(); i++) {
4611 	        if (i->first >= (1.0 - decay))
4612 	          overfull.insert(i->second);
4613 	      }
4614 	      if (!overfull.empty())
4615 	        break;
4616 	      decay_count++;
4617 	      decay = decay_factor * decay_count;
4618 	      if (decay >= 1.0)
4619 	        break;
4620 	      ldout(cct, 30) << " decay_factor = " << decay_factor
4621 	                     << " decay_count = " << decay_count
4622 	                     << " decay (overfull) = " << decay
4623 	                     << dendl;
4624 	    }
4625 	    if (overfull.empty()) {
4626 	      lderr(cct) << __func__ << " failed to build overfull" << dendl;
4627 	      break;
4628 	    }
4629 	
4630 	    decay = 0;
4631 	    decay_count = 0;
4632 	    while (underfull.empty()) {
4633 	      for (auto i = deviation_osd.begin(); i != deviation_osd.end(); i++) {
4634 	        if (i->first >= (-.999 + decay))
4635 	          break;
4636 	        underfull.push_back(i->second);
4637 	      }
4638 	      if (!underfull.empty())
4639 	        break;
4640 	      decay_count++;
4641 	      decay = decay_factor * decay_count;
4642 	      if (decay >= .999)
4643 	        break;
4644 	      ldout(cct, 30) << " decay_factor = " << decay_factor
4645 	                     << " decay_count = " << decay_count
4646 	                     << " decay (underfull) = " << decay
4647 	                     << dendl;
4648 	    }
4649 	    if (underfull.empty()) {
4650 	      lderr(cct) << __func__ << " failed to build underfull" << dendl;
4651 	      break;
4652 	    }
4653 	
4654 	    ldout(cct, 10) << " overfull " << overfull
4655 	                   << " underfull " << underfull
4656 	                   << dendl;
4657 	    set<pg_t> to_skip;
4658 	    uint64_t local_fallback_retried = 0;
4659 	
4660 	  retry:
4661 	
4662 	    set<pg_t> to_unmap;
4663 	    map<pg_t, mempool::osdmap::vector<pair<int32_t,int32_t>>> to_upmap;
4664 	    auto temp_pgs_by_osd = pgs_by_osd;
4665 	    // always start with fullest, break if we find any changes to make
4666 	    for (auto p = deviation_osd.rbegin(); p != deviation_osd.rend(); ++p) {
4667 	      if (skip_overfull) {
4668 	        ldout(cct, 10) << " skipping overfull " << dendl;
4669 	        break; // fall through to check underfull
4670 	      }
4671 	      int osd = p->second;
4672 	      float deviation = p->first;
4673 	      float target = osd_weight[osd] * pgs_per_weight;
4674 	      ceph_assert(target > 0);
4675 	      float deviation_ratio = deviation / target;
4676 	      if (deviation_ratio < max_deviation_ratio) {
4677 		ldout(cct, 10) << " osd." << osd
4678 	                       << " target " << target
4679 	                       << " deviation " << deviation
4680 	                       << " -> ratio " << deviation_ratio
4681 	                       << " < max ratio " << max_deviation_ratio
4682 	                       << dendl;
4683 		break;
4684 	      }
4685 	
4686 	      vector<pg_t> pgs;
4687 	      pgs.reserve(pgs_by_osd[osd].size());
4688 	      for (auto& pg : pgs_by_osd[osd]) {
4689 	        if (to_skip.count(pg))
4690 	          continue;
4691 	        pgs.push_back(pg);
4692 	      }
4693 	      if (aggressive) {
4694 	        // shuffle PG list so they all get equal (in)attention
4695 	        std::random_device rd;
4696 	        std::default_random_engine rng{rd()};
4697 	        std::shuffle(pgs.begin(), pgs.end(), rng);
4698 	      }
4699 	      // look for remaps we can un-remap
4700 	      for (auto pg : pgs) {
4701 		auto p = tmp.pg_upmap_items.find(pg);
4702 	        if (p == tmp.pg_upmap_items.end())
4703 	          continue;
4704 	        mempool::osdmap::vector<pair<int32_t,int32_t>> new_upmap_items;
4705 	        for (auto q : p->second) {
4706 		  if (q.second == osd) {
4707 	            ldout(cct, 10) << " will try dropping existing"
4708 	                           << " remapping pair "
4709 	                           << q.first << " -> " << q.second
4710 	                           << " which remapped " << pg
4711 	                           << " into overfull osd." << osd
4712 	                           << dendl;
4713 	            temp_pgs_by_osd[q.second].erase(pg);
4714 	            temp_pgs_by_osd[q.first].insert(pg);
4715 	          } else {
4716 	            new_upmap_items.push_back(q);
4717 	          }
4718 	        }
4719 	        if (new_upmap_items.empty()) {
4720 	          // drop whole item
4721 	          ldout(cct, 10) << " existing pg_upmap_items " << p->second
4722 	                         << " remapped " << pg << " into overfull osd." << osd
4723 	                         << ", will try cancelling it entirely"
4724 	                         << dendl;
4725 	          to_unmap.insert(pg);
4726 	          goto test_change;
4727 	        } else if (new_upmap_items.size() != p->second.size()) {
4728 	          // drop single remapping pair, updating
4729 	          ceph_assert(new_upmap_items.size() < p->second.size());
4730 	          ldout(cct, 10) << " existing pg_upmap_items " << p->second
4731 	                         << " remapped " << pg << " into overfull osd." << osd
4732 	                         << ", new_pg_upmap_items now " << new_upmap_items
4733 	                         << dendl;
4734 	          to_upmap[pg] = new_upmap_items;
4735 	          goto test_change;
4736 	        }
4737 	      }
4738 	
4739 	      // try upmap
4740 	      for (auto pg : pgs) {
4741 	        auto temp_it = tmp.pg_upmap.find(pg);
4742 	        if (temp_it != tmp.pg_upmap.end()) {
4743 	          // leave pg_upmap alone
4744 	          // it must be specified by admin since balancer does not
4745 	          // support pg_upmap yet
4746 		  ldout(cct, 10) << " " << pg << " already has pg_upmap "
4747 	                         << temp_it->second << ", skipping"
4748 	                         << dendl;
4749 		  continue;
4750 		}
4751 	        auto pg_pool_size = tmp.get_pg_pool_size(pg);
4752 	        mempool::osdmap::vector<pair<int32_t,int32_t>> new_upmap_items;
4753 	        set<int> existing;
4754 	        auto it = tmp.pg_upmap_items.find(pg);
4755 	        if (it != tmp.pg_upmap_items.end() &&
4756 	            it->second.size() >= (size_t)pg_pool_size) {
4757 	          ldout(cct, 10) << " " << pg << " already has full-size pg_upmap_items "
4758 	                         << it->second << ", skipping"
4759 	                         << dendl;
4760 	          continue;
4761 	        } else if (it != tmp.pg_upmap_items.end()) {
4762 	          ldout(cct, 10) << " " << pg << " already has pg_upmap_items "
4763 	                         << it->second
4764 	                         << dendl;
4765 	          new_upmap_items = it->second;
4766 	          // build existing too (for dedup)
4767 	          for (auto i : it->second) {
4768 	            existing.insert(i.first);
4769 	            existing.insert(i.second);
4770 	          }
4771 	          // fall through
4772 	          // to see if we can append more remapping pairs
4773 	        }
4774 		ldout(cct, 10) << " trying " << pg << dendl;
4775 	        vector<int> raw, orig, out;
4776 	        tmp.pg_to_raw_upmap(pg, &raw, &orig); // including existing upmaps too
4777 		if (!try_pg_upmap(cct, pg, overfull, underfull, &orig, &out)) {
4778 		  continue;
4779 		}
4780 		ldout(cct, 10) << " " << pg << " " << orig << " -> " << out << dendl;
4781 		if (orig.size() != out.size()) {
4782 		  continue;
4783 		}
4784 		ceph_assert(orig != out);
4785 		for (unsigned i = 0; i < out.size(); ++i) {
4786 	          if (orig[i] == out[i])
4787 	            continue; // skip invalid remappings
4788 	          if (existing.count(orig[i]) || existing.count(out[i]))
4789 	            continue; // we want new remappings only!
4790 	          ldout(cct, 10) << " will try adding new remapping pair "
4791 	                         << orig[i] << " -> " << out[i] << " for " << pg
4792 	                         << dendl;
4793 	          existing.insert(orig[i]);
4794 	          existing.insert(out[i]);
4795 	          temp_pgs_by_osd[orig[i]].erase(pg);
4796 	          temp_pgs_by_osd[out[i]].insert(pg);
4797 	          ceph_assert(new_upmap_items.size() < (size_t)pg_pool_size);
4798 	          new_upmap_items.push_back(make_pair(orig[i], out[i]));
4799 	          // append new remapping pairs slowly
4800 	          // This way we can make sure that each tiny change will
4801 	          // definitely make distribution of PGs converging to
4802 	          // the perfect status.
4803 	          to_upmap[pg] = new_upmap_items;
4804 	          goto test_change;
4805 		}
4806 	      }
4807 	    }
4808 	
4809 	    ceph_assert(!(to_unmap.size() || to_upmap.size()));
4810 	    ldout(cct, 10) << " failed to find any changes for overfull osds"
4811 	                   << dendl;
4812 	    for (auto& p : deviation_osd) {
4813 	      if (std::find(underfull.begin(), underfull.end(), p.second) ==
4814 	                    underfull.end())
4815 	        break;
4816 	      int osd = p.second;
4817 	      float deviation = p.first;
4818 	      float target = osd_weight[osd] * pgs_per_weight;
4819 	      ceph_assert(target > 0);
4820 	      float deviation_ratio = abs(deviation / target);
4821 	      if (deviation_ratio < max_deviation_ratio) {
4822 	        // respect max_deviation_ratio too
4823 	        ldout(cct, 10) << " osd." << osd
4824 	                       << " target " << target
4825 	                       << " deviation " << deviation
4826 	                       << " -> absolute ratio " << deviation_ratio
4827 	                       << " < max ratio " << max_deviation_ratio
4828 	                       << dendl;
4829 	        break;
4830 	      }
4831 	      // look for remaps we can un-remap
4832 	      vector<pair<pg_t,
4833 	        mempool::osdmap::vector<pair<int32_t,int32_t>>>> candidates;
4834 	      candidates.reserve(tmp.pg_upmap_items.size());
4835 	      for (auto& i : tmp.pg_upmap_items) {
4836 	        if (to_skip.count(i.first))
4837 	          continue;
4838 	        if (!only_pools.empty() && !only_pools.count(i.first.pool()))
4839 	          continue;
4840 	        candidates.push_back(make_pair(i.first, i.second));
4841 	      }
4842 	      if (aggressive) {
4843 	        // shuffle candidates so they all get equal (in)attention
4844 	        std::random_device rd;
4845 	        std::default_random_engine rng{rd()};
4846 	        std::shuffle(candidates.begin(), candidates.end(), rng);
4847 	      }
4848 	      for (auto& i : candidates) {
4849 	        auto pg = i.first;
4850 	        mempool::osdmap::vector<pair<int32_t,int32_t>> new_upmap_items;
4851 	        for (auto& j : i.second) {
4852 	          if (j.first == osd) {
4853 	            ldout(cct, 10) << " will try dropping existing"
4854 	                           << " remapping pair "
4855 	                           << j.first << " -> " << j.second
4856 	                           << " which remapped " << pg
4857 	                           << " out from underfull osd." << osd
4858 	                           << dendl;
4859 	            temp_pgs_by_osd[j.second].erase(pg);
4860 	            temp_pgs_by_osd[j.first].insert(pg);
4861 	          } else {
4862 	            new_upmap_items.push_back(j);
4863 	          }
4864 	        }
4865 	        if (new_upmap_items.empty()) {
4866 	          // drop whole item
4867 	          ldout(cct, 10) << " existing pg_upmap_items " << i.second
4868 	                         << " remapped " << pg
4869 	                         << " out from underfull osd." << osd
4870 	                         << ", will try cancelling it entirely"
4871 	                         << dendl;
4872 	          to_unmap.insert(pg);
4873 	          goto test_change;
4874 	        } else if (new_upmap_items.size() != i.second.size()) {
4875 	          // drop single remapping pair, updating
4876 	          ceph_assert(new_upmap_items.size() < i.second.size());
4877 	          ldout(cct, 10) << " existing pg_upmap_items " << i.second
4878 	                         << " remapped " << pg
4879 	                         << " out from underfull osd." << osd
4880 	                         << ", new_pg_upmap_items now " << new_upmap_items
4881 	                         << dendl;
4882 	          to_upmap[pg] = new_upmap_items;
4883 	          goto test_change;
4884 	        }
4885 	      }
4886 	    }
4887 	
4888 	    ceph_assert(!(to_unmap.size() || to_upmap.size()));
4889 	    ldout(cct, 10) << " failed to find any changes for underfull osds"
4890 	                   << dendl;
4891 	    if (!aggressive) {
4892 	      ldout(cct, 10) << " break due to aggressive mode not enabled" << dendl;
4893 	      break;
4894 	    } else if (!skip_overfull) {
4895 	      // safe to quit because below here we know
4896 	      // we've done checking both overfull and underfull osds..
4897 	      ldout(cct, 10) << " break due to not being able to find any"
4898 	                     << " further optimizations"
4899 	                     << dendl;
4900 	      break;
4901 	    }
4902 	    // restart with fullest and do exhaustive searching
4903 	    skip_overfull = false;
4904 	    continue;
4905 	
4906 	  test_change:
4907 	
4908 	    // test change, apply if change is good
4909 	    ceph_assert(to_unmap.size() || to_upmap.size());
4910 	    float new_stddev = 0;
4911 	    map<int,float> temp_osd_deviation;
4912 	    multimap<float,int> temp_deviation_osd;
4913 	    for (auto& i : temp_pgs_by_osd) {
4914 	      // make sure osd is still there (belongs to this crush-tree)
4915 	      ceph_assert(osd_weight.count(i.first));
4916 	      float target = osd_weight[i.first] * pgs_per_weight;
4917 	      float deviation = (float)i.second.size() - target;
4918 	      ldout(cct, 20) << " osd." << i.first
4919 	                     << "\tpgs " << i.second.size()
4920 	                     << "\ttarget " << target
4921 	                     << "\tdeviation " << deviation
4922 	                     << dendl;
4923 	      temp_osd_deviation[i.first] = deviation;
4924 	      temp_deviation_osd.insert(make_pair(deviation, i.first));
4925 	      new_stddev += deviation * deviation;
4926 	    }
4927 	    ldout(cct, 10) << " stddev " << stddev << " -> " << new_stddev << dendl;
4928 	    if (new_stddev >= stddev) {
4929 	      if (!aggressive) {
4930 	        ldout(cct, 10) << " break because stddev is not decreasing"
4931 	                       << " and aggressive mode is not enabled"
4932 	                       << dendl;
4933 	        break;
4934 	      }
4935 	      local_fallback_retried++;
4936 	      if (local_fallback_retried >= local_fallback_retries) {
4937 	        // does not make progress
4938 	        // flip *skip_overfull* so both overfull and underfull
4939 	        // get equal (in)attention
4940 	        skip_overfull = !skip_overfull;
4941 	        ldout(cct, 10) << " hit local_fallback_retries "
4942 	                       << local_fallback_retries
4943 	                       << dendl;
4944 	        continue;
4945 	      }
4946 	      for (auto& i : to_unmap)
4947 	        to_skip.insert(i);
4948 	      for (auto& i : to_upmap)
4949 	        to_skip.insert(i.first);
4950 	      ldout(cct, 20) << " local_fallback_retried " << local_fallback_retried
4951 	                     << " to_skip " << to_skip
4952 	                     << dendl;
4953 	      goto retry;
4954 	    }
4955 	
4956 	    // ready to go
4957 	    ceph_assert(new_stddev < stddev);
4958 	    stddev = new_stddev;
4959 	    pgs_by_osd = temp_pgs_by_osd;
4960 	    osd_deviation = temp_osd_deviation;
4961 	    deviation_osd = temp_deviation_osd;
4962 	    for (auto& i : to_unmap) {
4963 	      ldout(cct, 10) << " unmap pg " << i << dendl;
4964 	      ceph_assert(tmp.pg_upmap_items.count(i));
4965 	      tmp.pg_upmap_items.erase(i);
4966 	      pending_inc->old_pg_upmap_items.insert(i);
4967 	      ++num_changed;
4968 	    }
4969 	    for (auto& i : to_upmap) {
4970 	      ldout(cct, 10) << " upmap pg " << i.first
4971 	                     << " new pg_upmap_items " << i.second
4972 	                     << dendl;
4973 	      tmp.pg_upmap_items[i.first] = i.second;
4974 	      pending_inc->new_pg_upmap_items[i.first] = i.second;
4975 	      ++num_changed;
4976 	    }
4977 	  }
4978 	  ldout(cct, 10) << " num_changed = " << num_changed << dendl;
4979 	  return num_changed;
4980 	}
4981 	
4982 	int OSDMap::get_osds_by_bucket_name(const string &name, set<int> *osds) const
4983 	{
4984 	  return crush->get_leaves(name, osds);
4985 	}
4986 	
4987 	// get pools whose crush rules might reference the given osd
4988 	void OSDMap::get_pool_ids_by_osd(CephContext *cct,
4989 	                                int osd,
4990 	                                set<int64_t> *pool_ids) const
4991 	{
4992 	  ceph_assert(pool_ids);
4993 	  set<int> raw_rules;
4994 	  int r = crush->get_rules_by_osd(osd, &raw_rules);
4995 	  if (r < 0) {
4996 	    lderr(cct) << __func__ << " get_rules_by_osd failed: " << cpp_strerror(r)
4997 	               << dendl;
4998 	    ceph_assert(r >= 0);
4999 	  }
5000 	  set<int> rules;
5001 	  for (auto &i: raw_rules) {
5002 	    // exclude any dead rule
5003 	    if (crush_rule_in_use(i)) {
5004 	      rules.insert(i);
5005 	    }
5006 	  }
5007 	  for (auto &r: rules) {
5008 	    get_pool_ids_by_rule(r, pool_ids);
5009 	  }
5010 	}
5011 	
5012 	template <typename F>
5013 	class OSDUtilizationDumper : public CrushTreeDumper::Dumper<F> {
5014 	public:
5015 	  typedef CrushTreeDumper::Dumper<F> Parent;
5016 	
5017 	  OSDUtilizationDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
5018 	                       const PGMap& pgmap_, bool tree_,
5019 	                       const string& filter) :
5020 	    Parent(crush, osdmap_->get_pool_names()),
5021 	    osdmap(osdmap_),
5022 	    pgmap(pgmap_),
5023 	    tree(tree_),
5024 	    min_var(-1),
5025 	    max_var(-1),
5026 	    stddev(0),
5027 	    sum(0) {
5028 	    if (osdmap->crush->name_exists(filter)) {
5029 	      // filter by crush node
5030 	      auto item_id = osdmap->crush->get_item_id(filter);
5031 	      allowed.insert(item_id);
5032 	      osdmap->crush->get_all_children(item_id, &allowed);
5033 	    } else if (osdmap->crush->class_exists(filter)) {
5034 	      // filter by device class
5035 	      class_id = osdmap->crush->get_class_id(filter);
5036 	    } else if (auto pool_id = osdmap->lookup_pg_pool_name(filter);
5037 	               pool_id >= 0) {
5038 	      // filter by pool
5039 	      auto crush_rule = osdmap->get_pool_crush_rule(pool_id);
5040 	      set<int> roots;
5041 	      osdmap->crush->find_takes_by_rule(crush_rule, &roots);
5042 	      allowed = roots;
5043 	      for (auto r : roots)
5044 	        osdmap->crush->get_all_children(r, &allowed);
5045 	    }
5046 	    average_util = average_utilization();
5047 	  }
5048 	
5049 	protected:
5050 	
5051 	  bool should_dump(int id) const {
5052 	    if (!allowed.empty() && !allowed.count(id)) // filter by name
5053 	      return false;
5054 	    if (id >= 0 && class_id >= 0) {
5055 	      auto item_class_id = osdmap->crush->get_item_class_id(id);
5056 	      if (item_class_id < 0 || // not bound to a class yet
5057 	          item_class_id != class_id) // or already bound to a different class
5058 	        return false;
5059 	    }
5060 	    return true;
5061 	  }
5062 	
5063 	  set<int> get_dumped_osds() {
5064 	    if (allowed.empty() && class_id < 0) {
5065 	      // old way, all
5066 	      return {};
5067 	    }
5068 	    return dumped_osds;
5069 	  }
5070 	
5071 	  void dump_stray(F *f) {
5072 	    for (int i = 0; i < osdmap->get_max_osd(); i++) {
5073 	      if (osdmap->exists(i) && !this->is_touched(i))
5074 		dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
5075 	    }
5076 	  }
5077 	
5078 	  void dump_item(const CrushTreeDumper::Item &qi, F *f) override {
5079 	    if (!tree && qi.is_bucket())
5080 	      return;
5081 	    if (!should_dump(qi.id))
5082 	      return;
5083 	
5084 	    if (!qi.is_bucket())
5085 	      dumped_osds.insert(qi.id);
5086 	    float reweight = qi.is_bucket() ? -1 : osdmap->get_weightf(qi.id);
5087 	    int64_t kb = 0, kb_used = 0, kb_used_data = 0, kb_used_omap = 0,
5088 	      kb_used_meta = 0, kb_avail = 0;
5089 	    double util = 0;
5090 	    if (get_bucket_utilization(qi.id, &kb, &kb_used, &kb_used_data,
5091 				       &kb_used_omap, &kb_used_meta, &kb_avail))
5092 	      if (kb_used && kb)
5093 	        util = 100.0 * (double)kb_used / (double)kb;
5094 	
5095 	    double var = 1.0;
5096 	    if (average_util)
5097 	      var = util / average_util;
5098 	
5099 	    size_t num_pgs = qi.is_bucket() ? 0 : pgmap.get_num_pg_by_osd(qi.id);
5100 	
5101 	    dump_item(qi, reweight, kb, kb_used,
5102 		      kb_used_data, kb_used_omap, kb_used_meta,
5103 		      kb_avail, util, var, num_pgs, f);
5104 	
5105 	    if (!qi.is_bucket() && reweight > 0) {
5106 	      if (min_var < 0 || var < min_var)
5107 		min_var = var;
5108 	      if (max_var < 0 || var > max_var)
5109 		max_var = var;
5110 	
5111 	      double dev = util - average_util;
5112 	      dev *= dev;
5113 	      stddev += reweight * dev;
5114 	      sum += reweight;
5115 	    }
5116 	  }
5117 	
5118 	  virtual void dump_item(const CrushTreeDumper::Item &qi,
5119 				 float &reweight,
5120 				 int64_t kb,
5121 				 int64_t kb_used,
5122 				 int64_t kb_used_data,
5123 				 int64_t kb_used_omap,
5124 				 int64_t kb_used_meta,
5125 				 int64_t kb_avail,
5126 				 double& util,
5127 				 double& var,
5128 				 const size_t num_pgs,
5129 				 F *f) = 0;
5130 	
5131 	  double dev() {
5132 	    return sum > 0 ? sqrt(stddev / sum) : 0;
5133 	  }
5134 	
5135 	  double average_utilization() {
5136 	    int64_t kb = 0, kb_used = 0;
5137 	    for (int i = 0; i < osdmap->get_max_osd(); i++) {
5138 	      if (!osdmap->exists(i) ||
5139 	           osdmap->get_weight(i) == 0 ||
5140 	          !should_dump(i))
5141 		continue;
5142 	      int64_t kb_i, kb_used_i, kb_used_data_i, kb_used_omap_i, kb_used_meta_i,
5143 		kb_avail_i;
5144 	      if (get_osd_utilization(i, &kb_i, &kb_used_i, &kb_used_data_i,
5145 				      &kb_used_omap_i, &kb_used_meta_i, &kb_avail_i)) {
5146 		kb += kb_i;
5147 		kb_used += kb_used_i;
5148 	      }
5149 	    }
5150 	    return kb > 0 ? 100.0 * (double)kb_used / (double)kb : 0;
5151 	  }
5152 	
5153 	  bool get_osd_utilization(int id, int64_t* kb, int64_t* kb_used,
5154 				   int64_t* kb_used_data,
5155 				   int64_t* kb_used_omap,
5156 				   int64_t* kb_used_meta,
5157 				   int64_t* kb_avail) const {
5158 	    const osd_stat_t *p = pgmap.get_osd_stat(id);
5159 	    if (!p) return false;
5160 	    *kb = p->statfs.kb();
5161 	    *kb_used = p->statfs.kb_used_raw();
5162 	    *kb_used_data = p->statfs.kb_used_data();
5163 	    *kb_used_omap = p->statfs.kb_used_omap();
5164 	    *kb_used_meta = p->statfs.kb_used_internal_metadata();
5165 	    *kb_avail = p->statfs.kb_avail();
5166 	    
5167 	    return *kb > 0;
5168 	  }
5169 	
5170 	  bool get_bucket_utilization(int id, int64_t* kb, int64_t* kb_used,
5171 				      int64_t* kb_used_data,
5172 				      int64_t* kb_used_omap,
5173 				      int64_t* kb_used_meta,
5174 				      int64_t* kb_avail) const {
5175 	    if (id >= 0) {
5176 	      if (osdmap->is_out(id) || !should_dump(id)) {
5177 	        *kb = 0;
5178 	        *kb_used = 0;
5179 		*kb_used_data = 0;
5180 		*kb_used_omap = 0;
5181 		*kb_used_meta = 0;
5182 	        *kb_avail = 0;
5183 	        return true;
5184 	      }
5185 	      return get_osd_utilization(id, kb, kb_used, kb_used_data,
5186 					 kb_used_omap, kb_used_meta, kb_avail);
5187 	    }
5188 	
5189 	    *kb = 0;
5190 	    *kb_used = 0;
5191 	    *kb_used_data = 0;
5192 	    *kb_used_omap = 0;
5193 	    *kb_used_meta = 0;
5194 	    *kb_avail = 0;
5195 	
5196 	    for (int k = osdmap->crush->get_bucket_size(id) - 1; k >= 0; k--) {
5197 	      int item = osdmap->crush->get_bucket_item(id, k);
5198 	      int64_t kb_i = 0, kb_used_i = 0, kb_used_data_i = 0,
5199 		kb_used_omap_i = 0, kb_used_meta_i = 0, kb_avail_i = 0;
5200 	      if (!get_bucket_utilization(item, &kb_i, &kb_used_i,
5201 					  &kb_used_data_i, &kb_used_omap_i,
5202 					  &kb_used_meta_i, &kb_avail_i))
5203 		return false;
5204 	      *kb += kb_i;
5205 	      *kb_used += kb_used_i;
5206 	      *kb_used_data += kb_used_data_i;
5207 	      *kb_used_omap += kb_used_omap_i;
5208 	      *kb_used_meta += kb_used_meta_i;
5209 	      *kb_avail += kb_avail_i;
5210 	    }
5211 	    return *kb > 0;
5212 	  }
5213 	
5214 	protected:
5215 	  const OSDMap *osdmap;
5216 	  const PGMap& pgmap;
5217 	  bool tree;
5218 	  double average_util;
5219 	  double min_var;
5220 	  double max_var;
5221 	  double stddev;
5222 	  double sum;
5223 	  int class_id = -1;
5224 	  set<int> allowed;
5225 	  set<int> dumped_osds;
5226 	};
5227 	
5228 	
5229 	class OSDUtilizationPlainDumper : public OSDUtilizationDumper<TextTable> {
5230 	public:
5231 	  typedef OSDUtilizationDumper<TextTable> Parent;
5232 	
5233 	  OSDUtilizationPlainDumper(const CrushWrapper *crush, const OSDMap *osdmap,
5234 	                            const PGMap& pgmap, bool tree,
5235 	                            const string& filter) :
5236 	    Parent(crush, osdmap, pgmap, tree, filter) {}
5237 	
5238 	  void dump(TextTable *tbl) {
5239 	    tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
5240 	    tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
5241 	    tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
5242 	    tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
5243 	    tbl->define_column("SIZE", TextTable::LEFT, TextTable::RIGHT);
5244 	    tbl->define_column("RAW USE", TextTable::LEFT, TextTable::RIGHT);
5245 	    tbl->define_column("DATA", TextTable::LEFT, TextTable::RIGHT);
5246 	    tbl->define_column("OMAP", TextTable::LEFT, TextTable::RIGHT);
5247 	    tbl->define_column("META", TextTable::LEFT, TextTable::RIGHT);
5248 	    tbl->define_column("AVAIL", TextTable::LEFT, TextTable::RIGHT);
5249 	    tbl->define_column("%USE", TextTable::LEFT, TextTable::RIGHT);
5250 	    tbl->define_column("VAR", TextTable::LEFT, TextTable::RIGHT);
5251 	    tbl->define_column("PGS", TextTable::LEFT, TextTable::RIGHT);
5252 	    tbl->define_column("STATUS", TextTable::LEFT, TextTable::RIGHT);
5253 	    if (tree)
5254 	      tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
5255 	
5256 	    Parent::dump(tbl);
5257 	
5258 	    dump_stray(tbl);
5259 	
5260 	    auto sum = pgmap.get_osd_sum(get_dumped_osds());
5261 	    *tbl << ""
5262 		 << ""
5263 		 << "" << "TOTAL"
5264 		 << byte_u_t(sum.statfs.total)
5265 		 << byte_u_t(sum.statfs.get_used_raw())
5266 		 << byte_u_t(sum.statfs.allocated)
5267 		 << byte_u_t(sum.statfs.omap_allocated)
5268 		 << byte_u_t(sum.statfs.internal_metadata)
5269 		 << byte_u_t(sum.statfs.available)
5270 		 << lowprecision_t(average_util)
5271 		 << ""
5272 		 << TextTable::endrow;
5273 	  }
5274 	
5275 	protected:
5276 	  struct lowprecision_t {
5277 	    float v;
5278 	    explicit lowprecision_t(float _v) : v(_v) {}
5279 	  };
5280 	  friend std::ostream &operator<<(ostream& out, const lowprecision_t& v);
5281 	
5282 	  using OSDUtilizationDumper<TextTable>::dump_item;
5283 	  void dump_item(const CrushTreeDumper::Item &qi,
5284 				 float &reweight,
5285 				 int64_t kb,
5286 				 int64_t kb_used,
5287 				 int64_t kb_used_data,
5288 				 int64_t kb_used_omap,
5289 				 int64_t kb_used_meta,
5290 				 int64_t kb_avail,
5291 				 double& util,
5292 				 double& var,
5293 				 const size_t num_pgs,
5294 				 TextTable *tbl) override {
5295 	    const char *c = crush->get_item_class(qi.id);
5296 	    if (!c)
5297 	      c = "";
5298 	    *tbl << qi.id
5299 		 << c
5300 		 << weightf_t(qi.weight)
5301 		 << weightf_t(reweight)
5302 		 << byte_u_t(kb << 10)
5303 		 << byte_u_t(kb_used << 10)
5304 		 << byte_u_t(kb_used_data << 10)
5305 		 << byte_u_t(kb_used_omap << 10)
5306 		 << byte_u_t(kb_used_meta << 10)
5307 		 << byte_u_t(kb_avail << 10)
5308 		 << lowprecision_t(util)
5309 		 << lowprecision_t(var);
5310 	
5311 	    if (qi.is_bucket()) {
5312 	      *tbl << "-";
5313 	      *tbl << "";
5314 	    } else {
5315 	      *tbl << num_pgs;
5316 	      if (osdmap->is_up(qi.id)) {
5317 	        *tbl << "up";
5318 	      } else if (osdmap->is_destroyed(qi.id)) {
5319 	        *tbl << "destroyed";
5320 	      } else {
5321 	        *tbl << "down";
5322 	      }
5323 	    }
5324 	
5325 	    if (tree) {
5326 	      ostringstream name;
5327 	      for (int k = 0; k < qi.depth; k++)
5328 		name << "    ";
5329 	      if (qi.is_bucket()) {
5330 		int type = crush->get_bucket_type(qi.id);
5331 		name << crush->get_type_name(type) << " "
5332 		     << crush->get_item_name(qi.id);
5333 	      } else {
5334 		name << "osd." << qi.id;
5335 	      }
5336 	      *tbl << name.str();
5337 	    }
5338 	
5339 	    *tbl << TextTable::endrow;
5340 	  }
5341 	
5342 	public:
5343 	  string summary() {
5344 	    ostringstream out;
5345 	    out << "MIN/MAX VAR: " << lowprecision_t(min_var)
5346 		<< "/" << lowprecision_t(max_var) << "  "
5347 		<< "STDDEV: " << lowprecision_t(dev());
5348 	    return out.str();
5349 	  }
5350 	};
5351 	
5352 	ostream& operator<<(ostream& out,
5353 			    const OSDUtilizationPlainDumper::lowprecision_t& v)
5354 	{
5355 	  if (v.v < -0.01) {
5356 	    return out << "-";
5357 	  } else if (v.v < 0.001) {
5358 	    return out << "0";
5359 	  } else {
5360 	    std::streamsize p = out.precision();
5361 	    return out << std::fixed << std::setprecision(2) << v.v << std::setprecision(p);
5362 	  }
5363 	}
5364 	
5365 	class OSDUtilizationFormatDumper : public OSDUtilizationDumper<Formatter> {
5366 	public:
5367 	  typedef OSDUtilizationDumper<Formatter> Parent;
5368 	
5369 	  OSDUtilizationFormatDumper(const CrushWrapper *crush, const OSDMap *osdmap,
5370 	                             const PGMap& pgmap, bool tree,
5371 	                             const string& filter) :
5372 	    Parent(crush, osdmap, pgmap, tree, filter) {}
5373 	
5374 	  void dump(Formatter *f) {
5375 	    f->open_array_section("nodes");
5376 	    Parent::dump(f);
5377 	    f->close_section();
5378 	
5379 	    f->open_array_section("stray");
5380 	    dump_stray(f);
5381 	    f->close_section();
5382 	  }
5383 	
5384 	protected:
5385 	  using OSDUtilizationDumper<Formatter>::dump_item;
5386 	  void dump_item(const CrushTreeDumper::Item &qi,
5387 			 float &reweight,
5388 			 int64_t kb,
5389 			 int64_t kb_used,
5390 			 int64_t kb_used_data,
5391 			 int64_t kb_used_omap,
5392 			 int64_t kb_used_meta,
5393 			 int64_t kb_avail,
5394 			 double& util,
5395 			 double& var,
5396 			 const size_t num_pgs,
5397 			 Formatter *f) override {
5398 	    f->open_object_section("item");
5399 	    CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
5400 	    f->dump_float("reweight", reweight);
5401 	    f->dump_int("kb", kb);
5402 	    f->dump_int("kb_used", kb_used);
5403 	    f->dump_int("kb_used_data", kb_used_data);
5404 	    f->dump_int("kb_used_omap", kb_used_omap);
5405 	    f->dump_int("kb_used_meta", kb_used_meta);
5406 	    f->dump_int("kb_avail", kb_avail);
5407 	    f->dump_float("utilization", util);
5408 	    f->dump_float("var", var);
5409 	    f->dump_unsigned("pgs", num_pgs);
5410 	    if (!qi.is_bucket()) {
5411 	      if (osdmap->is_up(qi.id)) {
5412 	        f->dump_string("status", "up");
5413 	      } else if (osdmap->is_destroyed(qi.id)) {
5414 	        f->dump_string("status", "destroyed");
5415 	      } else {
5416 	        f->dump_string("status", "down");
5417 	      }
5418 	    }
5419 	    CrushTreeDumper::dump_bucket_children(crush, qi, f);
5420 	    f->close_section();
5421 	  }
5422 	
5423 	public:
5424 	  void summary(Formatter *f) {
5425 	    f->open_object_section("summary");
5426 	    auto sum = pgmap.get_osd_sum(get_dumped_osds());
5427 	    auto& s = sum.statfs;
5428 	
5429 	    f->dump_int("total_kb", s.kb());
5430 	    f->dump_int("total_kb_used", s.kb_used_raw());
5431 	    f->dump_int("total_kb_used_data", s.kb_used_data());
5432 	    f->dump_int("total_kb_used_omap", s.kb_used_omap());
5433 	    f->dump_int("total_kb_used_meta", s.kb_used_internal_metadata());
5434 	    f->dump_int("total_kb_avail", s.kb_avail());
5435 	    f->dump_float("average_utilization", average_util);
5436 	    f->dump_float("min_var", min_var);
5437 	    f->dump_float("max_var", max_var);
5438 	    f->dump_float("dev", dev());
5439 	    f->close_section();
5440 	  }
5441 	};
5442 	
5443 	void print_osd_utilization(const OSDMap& osdmap,
5444 	                           const PGMap& pgmap,
5445 	                           ostream& out,
5446 	                           Formatter *f,
5447 	                           bool tree,
5448 	                           const string& filter)
5449 	{
5450 	  const CrushWrapper *crush = osdmap.crush.get();
5451 	  if (f) {
5452 	    f->open_object_section("df");
5453 	    OSDUtilizationFormatDumper d(crush, &osdmap, pgmap, tree, filter);
5454 	    d.dump(f);
5455 	    d.summary(f);
5456 	    f->close_section();
5457 	    f->flush(out);
5458 	  } else {
5459 	    OSDUtilizationPlainDumper d(crush, &osdmap, pgmap, tree, filter);
5460 	    TextTable tbl;
5461 	    d.dump(&tbl);
5462 	    out << tbl << d.summary() << "\n";
5463 	  }
5464 	}
5465 	
5466 	void OSDMap::check_health(CephContext *cct,
5467 				  health_check_map_t *checks) const
5468 	{
5469 	  int num_osds = get_num_osds();
5470 	
5471 	  // OSD_DOWN
5472 	  // OSD_$subtree_DOWN
5473 	  // OSD_ORPHAN
5474 	  if (num_osds >= 0) {
5475 	    int num_in_osds = 0;
5476 	    int num_down_in_osds = 0;
5477 	    set<int> osds;
5478 	    set<int> down_in_osds;
5479 	    set<int> up_in_osds;
5480 	    set<int> subtree_up;
5481 	    unordered_map<int, set<int> > subtree_type_down;
5482 	    unordered_map<int, int> num_osds_subtree;
5483 	    int max_type = crush->get_max_type_id();
5484 	
5485 	    for (int i = 0; i < get_max_osd(); i++) {
5486 	      if (!exists(i)) {
5487 	        if (crush->item_exists(i)) {
5488 	          osds.insert(i);
5489 	        }
5490 		continue;
5491 	      }
5492 	      if (is_out(i))
5493 	        continue;
5494 	      ++num_in_osds;
5495 	      if (down_in_osds.count(i) || up_in_osds.count(i))
5496 		continue;
5497 	      if (!is_up(i)) {
5498 		down_in_osds.insert(i);
5499 		int parent_id = 0;
5500 		int current = i;
5501 		for (int type = 0; type <= max_type; type++) {
5502 		  if (!crush->get_type_name(type))
5503 		    continue;
5504 		  int r = crush->get_immediate_parent_id(current, &parent_id);
5505 		  if (r == -ENOENT)
5506 		    break;
5507 		  // break early if this parent is already marked as up
5508 		  if (subtree_up.count(parent_id))
5509 		    break;
5510 		  type = crush->get_bucket_type(parent_id);
5511 		  if (!subtree_type_is_down(
5512 			cct, parent_id, type,
5513 			&down_in_osds, &up_in_osds, &subtree_up, &subtree_type_down))
5514 		    break;
5515 		  current = parent_id;
5516 		}
5517 	      }
5518 	    }
5519 	
5520 	    // calculate the number of down osds in each down subtree and
5521 	    // store it in num_osds_subtree
5522 	    for (int type = 1; type <= max_type; type++) {
5523 	      if (!crush->get_type_name(type))
5524 		continue;
5525 	      for (auto j = subtree_type_down[type].begin();
5526 		   j != subtree_type_down[type].end();
5527 		   ++j) {
5528 		list<int> children;
5529 		int num = 0;
5530 		int num_children = crush->get_children(*j, &children);
5531 		if (num_children == 0)
5532 		  continue;
5533 		for (auto l = children.begin(); l != children.end(); ++l) {
5534 		  if (*l >= 0) {
5535 		    ++num;
5536 		  } else if (num_osds_subtree[*l] > 0) {
5537 		    num = num + num_osds_subtree[*l];
5538 		  }
5539 		}
5540 		num_osds_subtree[*j] = num;
5541 	      }
5542 	    }
5543 	    num_down_in_osds = down_in_osds.size();
5544 	    ceph_assert(num_down_in_osds <= num_in_osds);
5545 	    if (num_down_in_osds > 0) {
5546 	      // summary of down subtree types and osds
5547 	      for (int type = max_type; type > 0; type--) {
5548 		if (!crush->get_type_name(type))
5549 		  continue;
5550 		if (subtree_type_down[type].size() > 0) {
5551 		  ostringstream ss;
5552 		  ss << subtree_type_down[type].size() << " "
5553 		     << crush->get_type_name(type);
5554 		  if (subtree_type_down[type].size() > 1) {
5555 		    ss << "s";
5556 		  }
5557 		  int sum_down_osds = 0;
5558 		  for (auto j = subtree_type_down[type].begin();
5559 		       j != subtree_type_down[type].end();
5560 		       ++j) {
5561 		    sum_down_osds = sum_down_osds + num_osds_subtree[*j];
5562 		  }
5563 	          ss << " (" << sum_down_osds << " osds) down";
5564 		  string err = string("OSD_") +
5565 		    string(crush->get_type_name(type)) + "_DOWN";
5566 		  boost::to_upper(err);
5567 		  auto& d = checks->add(err, HEALTH_WARN, ss.str(),
5568 					subtree_type_down[type].size());
5569 		  for (auto j = subtree_type_down[type].rbegin();
5570 		       j != subtree_type_down[type].rend();
5571 		       ++j) {
5572 		    ostringstream ss;
5573 		    ss << crush->get_type_name(type);
5574 		    ss << " ";
5575 		    ss << crush->get_item_name(*j);
5576 		    // at the top level, do not print location
5577 		    if (type != max_type) {
5578 	              ss << " (";
5579 	              ss << crush->get_full_location_ordered_string(*j);
5580 	              ss << ")";
5581 		    }
5582 		    int num = num_osds_subtree[*j];
5583 		    ss << " (" << num << " osds)";
5584 		    ss << " is down";
5585 		    d.detail.push_back(ss.str());
5586 		  }
5587 		}
5588 	      }
5589 	      ostringstream ss;
5590 	      ss << down_in_osds.size() << " osds down";
5591 	      auto& d = checks->add("OSD_DOWN", HEALTH_WARN, ss.str(),
5592 				    down_in_osds.size());
5593 	      for (auto it = down_in_osds.begin(); it != down_in_osds.end(); ++it) {
5594 		ostringstream ss;
5595 		ss << "osd." << *it << " (";
5596 		ss << crush->get_full_location_ordered_string(*it);
5597 		ss << ") is down";
5598 		d.detail.push_back(ss.str());
5599 	      }
5600 	    }
5601 	
5602 	    if (!osds.empty()) {
5603 	      ostringstream ss;
5604 	      ss << osds.size() << " osds exist in the crush map but not in the osdmap";
5605 	      auto& d = checks->add("OSD_ORPHAN", HEALTH_WARN, ss.str(),
5606 				    osds.size());
5607 	      for (auto osd : osds) {
5608 		ostringstream ss;
5609 		ss << "osd." << osd << " exists in crush map but not in osdmap";
5610 		d.detail.push_back(ss.str());
5611 	      }
5612 	    }
5613 	  }
5614 	
5615 	  std::list<std::string> scrub_messages;
5616 	  bool noscrub = false, nodeepscrub = false;
5617 	  for (const auto &p : pools) {
5618 	    if (p.second.flags & pg_pool_t::FLAG_NOSCRUB) {
5619 	      ostringstream ss;
5620 	      ss << "Pool " << get_pool_name(p.first) << " has noscrub flag";
5621 	      scrub_messages.push_back(ss.str());
5622 	      noscrub = true;
5623 	    }
5624 	    if (p.second.flags & pg_pool_t::FLAG_NODEEP_SCRUB) {
5625 	      ostringstream ss;
5626 	      ss << "Pool " << get_pool_name(p.first) << " has nodeep-scrub flag";
5627 	      scrub_messages.push_back(ss.str());
5628 	      nodeepscrub = true;
5629 	    }
5630 	  }
5631 	  if (noscrub || nodeepscrub) {
5632 	    string out = "";
5633 	    out += noscrub ? string("noscrub") + (nodeepscrub ? ", " : "") : "";
5634 	    out += nodeepscrub ? "nodeep-scrub" : "";
5635 	    auto& d = checks->add("POOL_SCRUB_FLAGS", HEALTH_OK,
5636 				  "Some pool(s) have the " + out + " flag(s) set", 0);
5637 	    d.detail.splice(d.detail.end(), scrub_messages);
5638 	  }
5639 	
5640 	  // OSD_OUT_OF_ORDER_FULL
5641 	  {
5642 	    // An osd could configure failsafe ratio, to something different
5643 	    // but for now assume it is the same here.
5644 	    float fsr = cct->_conf->osd_failsafe_full_ratio;
5645 	    if (fsr > 1.0) fsr /= 100;
5646 	    float fr = get_full_ratio();
5647 	    float br = get_backfillfull_ratio();
5648 	    float nr = get_nearfull_ratio();
5649 	
5650 	    list<string> detail;
5651 	    // These checks correspond to how OSDService::check_full_status() in an OSD
5652 	    // handles the improper setting of these values.
5653 	    if (br < nr) {
5654 	      ostringstream ss;
5655 	      ss << "backfillfull_ratio (" << br
5656 		 << ") < nearfull_ratio (" << nr << "), increased";
5657 	      detail.push_back(ss.str());
5658 	      br = nr;
5659 	    }
5660 	    if (fr < br) {
5661 	      ostringstream ss;
5662 	      ss << "full_ratio (" << fr << ") < backfillfull_ratio (" << br
5663 		 << "), increased";
5664 	      detail.push_back(ss.str());
5665 	      fr = br;
5666 	    }
5667 	    if (fsr < fr) {
5668 	      ostringstream ss;
5669 	      ss << "osd_failsafe_full_ratio (" << fsr << ") < full_ratio (" << fr
5670 		 << "), increased";
5671 	      detail.push_back(ss.str());
5672 	    }
5673 	    if (!detail.empty()) {
5674 	      auto& d = checks->add("OSD_OUT_OF_ORDER_FULL", HEALTH_ERR,
5675 				    "full ratio(s) out of order", 0);
5676 	      d.detail.swap(detail);
5677 	    }
5678 	  }
5679 	
5680 	  // OSD_FULL
5681 	  // OSD_NEARFULL
5682 	  // OSD_BACKFILLFULL
5683 	  // OSD_FAILSAFE_FULL
5684 	  {
5685 	    set<int> full, backfillfull, nearfull;
5686 	    get_full_osd_counts(&full, &backfillfull, &nearfull);
5687 	    if (full.size()) {
5688 	      ostringstream ss;
5689 	      ss << full.size() << " full osd(s)";
5690 	      auto& d = checks->add("OSD_FULL", HEALTH_ERR, ss.str(), full.size());
5691 	      for (auto& i: full) {
5692 		ostringstream ss;
5693 		ss << "osd." << i << " is full";
5694 		d.detail.push_back(ss.str());
5695 	      }
5696 	    }
5697 	    if (backfillfull.size()) {
5698 	      ostringstream ss;
5699 	      ss << backfillfull.size() << " backfillfull osd(s)";
5700 	      auto& d = checks->add("OSD_BACKFILLFULL", HEALTH_WARN, ss.str(),
5701 				    backfillfull.size());
5702 	      for (auto& i: backfillfull) {
5703 		ostringstream ss;
5704 		ss << "osd." << i << " is backfill full";
5705 		d.detail.push_back(ss.str());
5706 	      }
5707 	    }
5708 	    if (nearfull.size()) {
5709 	      ostringstream ss;
5710 	      ss << nearfull.size() << " nearfull osd(s)";
5711 	      auto& d = checks->add("OSD_NEARFULL", HEALTH_WARN, ss.str(), nearfull.size());
5712 	      for (auto& i: nearfull) {
5713 		ostringstream ss;
5714 		ss << "osd." << i << " is near full";
5715 		d.detail.push_back(ss.str());
5716 	      }
5717 	    }
5718 	  }
5719 	
5720 	  // OSDMAP_FLAGS
5721 	  {
5722 	    // warn about flags
5723 	    uint64_t warn_flags =
5724 	      CEPH_OSDMAP_PAUSERD |
5725 	      CEPH_OSDMAP_PAUSEWR |
5726 	      CEPH_OSDMAP_PAUSEREC |
5727 	      CEPH_OSDMAP_NOUP |
5728 	      CEPH_OSDMAP_NODOWN |
5729 	      CEPH_OSDMAP_NOIN |
5730 	      CEPH_OSDMAP_NOOUT |
5731 	      CEPH_OSDMAP_NOBACKFILL |
5732 	      CEPH_OSDMAP_NORECOVER |
5733 	      CEPH_OSDMAP_NOSCRUB |
5734 	      CEPH_OSDMAP_NODEEP_SCRUB |
5735 	      CEPH_OSDMAP_NOTIERAGENT |
5736 	      CEPH_OSDMAP_NOSNAPTRIM |
5737 	      CEPH_OSDMAP_NOREBALANCE;
5738 	    if (test_flag(warn_flags)) {
5739 	      ostringstream ss;
5740 	      string s = get_flag_string(get_flags() & warn_flags);
5741 	      ss << s << " flag(s) set";
5742 	      checks->add("OSDMAP_FLAGS", HEALTH_WARN, ss.str(),
5743 			  s.size() /* kludgey but sufficient */);
5744 	    }
5745 	  }
5746 	
5747 	  // OSD_FLAGS
5748 	  {
5749 	    list<string> detail;
5750 	    const unsigned flags =
5751 	      CEPH_OSD_NOUP |
5752 	      CEPH_OSD_NOIN |
5753 	      CEPH_OSD_NODOWN |
5754 	      CEPH_OSD_NOOUT;
5755 	    for (int i = 0; i < max_osd; ++i) {
5756 	      if (osd_state[i] & flags) {
5757 		ostringstream ss;
5758 		set<string> states;
5759 		OSDMap::calc_state_set(osd_state[i] & flags, states);
5760 		ss << "osd." << i << " has flags " << states;
5761 		detail.push_back(ss.str());
5762 	      }
5763 	    }
5764 	    for (auto& i : crush_node_flags) {
5765 	      if (i.second && crush->item_exists(i.first)) {
5766 		ostringstream ss;
5767 		set<string> states;
5768 		OSDMap::calc_state_set(i.second, states);
5769 		int t = i.first >= 0 ? 0 : crush->get_bucket_type(i.first);
5770 		const char *tn = crush->get_type_name(t);
5771 		ss << (tn ? tn : "node") << " "
5772 		   << crush->get_item_name(i.first) << " has flags " << states;
5773 		detail.push_back(ss.str());
5774 	      }
5775 	    }
5776 	    for (auto& i : device_class_flags) {
5777 	      const char* class_name = crush->get_class_name(i.first);
5778 	      if (i.second && class_name) {
5779 	        ostringstream ss;
5780 	        set<string> states;
5781 	        OSDMap::calc_state_set(i.second, states);
5782 	        ss << "device class '" << class_name << "' has flags " << states;
5783 	        detail.push_back(ss.str());
5784 	      }
5785 	    }
5786 	    if (!detail.empty()) {
5787 	      ostringstream ss;
5788 	      ss << detail.size() << " OSDs or CRUSH {nodes, device-classes} have {NOUP,NODOWN,NOIN,NOOUT} flags set";
5789 	      auto& d = checks->add("OSD_FLAGS", HEALTH_WARN, ss.str(), detail.size());
5790 	      d.detail.swap(detail);
5791 	    }
5792 	  }
5793 	
5794 	  // OLD_CRUSH_TUNABLES
5795 	  if (cct->_conf->mon_warn_on_legacy_crush_tunables) {
5796 	    string min = crush->get_min_required_version();
5797 	    if (min < cct->_conf->mon_crush_min_required_version) {
5798 	      ostringstream ss;
5799 	      ss << "crush map has legacy tunables (require " << min
5800 		 << ", min is " << cct->_conf->mon_crush_min_required_version << ")";
5801 	      auto& d = checks->add("OLD_CRUSH_TUNABLES", HEALTH_WARN, ss.str(), 0);
5802 	      d.detail.push_back("see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
5803 	    }
5804 	  }
5805 	
5806 	  // OLD_CRUSH_STRAW_CALC_VERSION
5807 	  if (cct->_conf->mon_warn_on_crush_straw_calc_version_zero) {
5808 	    if (crush->get_straw_calc_version() == 0) {
5809 	      ostringstream ss;
5810 	      ss << "crush map has straw_calc_version=0";
5811 	      auto& d = checks->add("OLD_CRUSH_STRAW_CALC_VERSION", HEALTH_WARN, ss.str(), 0);
5812 	      d.detail.push_back(
5813 		"see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
5814 	    }
5815 	  }
5816 	
5817 	  // CACHE_POOL_NO_HIT_SET
5818 	  if (cct->_conf->mon_warn_on_cache_pools_without_hit_sets) {
5819 	    list<string> detail;
5820 	    for (auto p = pools.cbegin(); p != pools.cend(); ++p) {
5821 	      const pg_pool_t& info = p->second;
5822 	      if (info.cache_mode_requires_hit_set() &&
5823 		  info.hit_set_params.get_type() == HitSet::TYPE_NONE) {
5824 		ostringstream ss;
5825 		ss << "pool '" << get_pool_name(p->first)
5826 		   << "' with cache_mode " << info.get_cache_mode_name()
5827 		   << " needs hit_set_type to be set but it is not";
5828 		detail.push_back(ss.str());
5829 	      }
5830 	    }
5831 	    if (!detail.empty()) {
5832 	      ostringstream ss;
5833 	      ss << detail.size() << " cache pools are missing hit_sets";
5834 	      auto& d = checks->add("CACHE_POOL_NO_HIT_SET", HEALTH_WARN, ss.str(),
5835 				    detail.size());
5836 	      d.detail.swap(detail);
5837 	    }
5838 	  }
5839 	
5840 	  // OSD_NO_SORTBITWISE
5841 	  if (!test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5842 	    ostringstream ss;
5843 	    ss << "'sortbitwise' flag is not set";
5844 	    checks->add("OSD_NO_SORTBITWISE", HEALTH_WARN, ss.str(), 0);
5845 	  }
5846 	
5847 	  // OSD_UPGRADE_FINISHED
5848 	  // none of these (yet) since we don't run until luminous upgrade is done.
5849 	
5850 	  // POOL_NEARFULL/BACKFILLFULL/FULL
5851 	  {
5852 	    list<string> full_detail, backfillfull_detail, nearfull_detail;
5853 	    for (auto it : get_pools()) {
5854 	      const pg_pool_t &pool = it.second;
5855 	      const string& pool_name = get_pool_name(it.first);
5856 	      if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
5857 		stringstream ss;
5858 	        if (pool.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) {
5859 	          // may run out of space too,
5860 	          // but we want EQUOTA taking precedence
5861 	          ss << "pool '" << pool_name << "' is full (running out of quota)";
5862 	        } else {
5863 	          ss << "pool '" << pool_name << "' is full (no space)";
5864 	        }
5865 		full_detail.push_back(ss.str());
5866 	      } else if (pool.has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
5867 	        stringstream ss;
5868 	        ss << "pool '" << pool_name << "' is backfillfull";
5869 	        backfillfull_detail.push_back(ss.str());
5870 	      } else if (pool.has_flag(pg_pool_t::FLAG_NEARFULL)) {
5871 	        stringstream ss;
5872 	        ss << "pool '" << pool_name << "' is nearfull";
5873 	        nearfull_detail.push_back(ss.str());
5874 	      }
5875 	    }
5876 	    if (!full_detail.empty()) {
5877 	      ostringstream ss;
5878 	      ss << full_detail.size() << " pool(s) full";
5879 	      auto& d = checks->add("POOL_FULL", HEALTH_WARN, ss.str(), full_detail.size());
5880 	      d.detail.swap(full_detail);
5881 	    }
5882 	    if (!backfillfull_detail.empty()) {
5883 	      ostringstream ss;
5884 	      ss << backfillfull_detail.size() << " pool(s) backfillfull";
5885 	      auto& d = checks->add("POOL_BACKFILLFULL", HEALTH_WARN, ss.str(),
5886 				    backfillfull_detail.size());
5887 	      d.detail.swap(backfillfull_detail);
5888 	    }
5889 	    if (!nearfull_detail.empty()) {
5890 	      ostringstream ss;
5891 	      ss << nearfull_detail.size() << " pool(s) nearfull";
5892 	      auto& d = checks->add("POOL_NEARFULL", HEALTH_WARN, ss.str(),
5893 				    nearfull_detail.size());
5894 	      d.detail.swap(nearfull_detail);
5895 	    }
5896 	  }
5897 	
5898 	  // POOL_PG_NUM_NOT_POWER_OF_TWO
5899 	  if (cct->_conf.get_val<bool>("mon_warn_on_pool_pg_num_not_power_of_two")) {
5900 	    list<string> detail;
5901 	    for (auto it : get_pools()) {
5902 	      if (!isp2(it.second.get_pg_num_target())) {
5903 		ostringstream ss;
5904 		ss << "pool '" << get_pool_name(it.first)
5905 		   << "' pg_num " << it.second.get_pg_num_target()
5906 		   << " is not a power of two";
5907 		detail.push_back(ss.str());
5908 	      }
5909 	    }
5910 	    if (!detail.empty()) {
5911 	      ostringstream ss;
5912 	      ss << detail.size() << " pool(s) have non-power-of-two pg_num";
5913 	      auto& d = checks->add("POOL_PG_NUM_NOT_POWER_OF_TWO", HEALTH_WARN,
5914 				    ss.str(), detail.size());
5915 	      d.detail.swap(detail);
5916 	    }
5917 	  }
5918 	}
5919 	
5920 	int OSDMap::parse_osd_id_list(const vector<string>& ls, set<int> *out,
5921 				      ostream *ss) const
5922 	{
5923 	  out->clear();
5924 	  for (auto i = ls.begin(); i != ls.end(); ++i) {
5925 	    if (i == ls.begin() &&
5926 		(*i == "any" || *i == "all" || *i == "*")) {
5927 	      get_all_osds(*out);
5928 	      break;
5929 	    }
5930 	    long osd = parse_osd_id(i->c_str(), ss);
5931 	    if (osd < 0) {
5932 	      *ss << "invalid osd id '" << *i << "'";
5933 	      return -EINVAL;
5934 	    }
5935 	    out->insert(osd);
5936 	  }
5937 	  return 0;
5938 	}
5939 	
5940 	void OSDMap::get_random_up_osds_by_subtree(int n,     // whoami
5941 	                                           string &subtree,
5942 	                                           int limit, // how many
5943 	                                           set<int> skip,
5944 	                                           set<int> *want) const {
5945 	  if (limit <= 0)
5946 	    return;
5947 	  int subtree_type = crush->get_type_id(subtree);
5948 	  if (subtree_type < 1)
5949 	    return;
5950 	  vector<int> subtrees;
5951 	  crush->get_subtree_of_type(subtree_type, &subtrees);
5952 	  std::random_device rd;
5953 	  std::default_random_engine rng{rd()};
5954 	  std::shuffle(subtrees.begin(), subtrees.end(), rng);
5955 	  for (auto s : subtrees) {
5956 	    if (limit <= 0)
5957 	      break;
5958 	    if (crush->subtree_contains(s, n))
5959 	      continue;
5960 	    vector<int> osds;
5961 	    crush->get_children_of_type(s, 0, &osds);
5962 	    if (osds.empty())
5963 	      continue;
5964 	    vector<int> up_osds;
5965 	    for (auto o : osds) {
5966 	      if (is_up(o) && !skip.count(o))
5967 	        up_osds.push_back(o);
5968 	    }
5969 	    if (up_osds.empty())
5970 	      continue;
5971 	    auto it = up_osds.begin();
5972 	    std::advance(it, (n % up_osds.size()));
5973 	    want->insert(*it);
5974 	    --limit;
5975 	  }
5976 	}
5977 	
5978 	float OSDMap::pool_raw_used_rate(int64_t poolid) const
5979 	{
5980 	  const pg_pool_t *pool = get_pg_pool(poolid);
5981 	  assert(pool != nullptr);
5982 	
5983 	  switch (pool->get_type()) {
5984 	  case pg_pool_t::TYPE_REPLICATED:
5985 	    return pool->get_size();
5986 	    break;
5987 	  case pg_pool_t::TYPE_ERASURE:
5988 	  {
5989 	    auto& ecp =
5990 	      get_erasure_code_profile(pool->erasure_code_profile);
5991 	    auto pm = ecp.find("m");
5992 	    auto pk = ecp.find("k");
5993 	    if (pm != ecp.end() && pk != ecp.end()) {
5994 	      int k = atoi(pk->second.c_str());
5995 	      int m = atoi(pm->second.c_str());
5996 	      int mk = m + k;
5997 	      ceph_assert(mk != 0);
5998 	      ceph_assert(k != 0);
5999 	      return (float)mk / k;
6000 	    } else {
6001 	      return 0.0;
6002 	    }
6003 	  }
6004 	  break;
6005 	  default:
6006 	    ceph_abort_msg("unrecognized pool type");
6007 	  }
6008 	}
6009 	
6010 	unsigned OSDMap::get_osd_crush_node_flags(int osd) const
6011 	{
6012 	  unsigned flags = 0;
6013 	  if (!crush_node_flags.empty()) {
6014 	    // the map will contain type -> name
6015 	    std::map<std::string,std::string> ploc = crush->get_full_location(osd);
6016 	    for (auto& i : ploc) {
6017 	      int id = crush->get_item_id(i.second);
6018 	      auto p = crush_node_flags.find(id);
6019 	      if (p != crush_node_flags.end()) {
6020 		flags |= p->second;
6021 	      }
6022 	    }
6023 	  }
6024 	  return flags;
6025 	}
6026 	
6027 	unsigned OSDMap::get_crush_node_flags(int id) const
6028 	{
6029 	  unsigned flags = 0;
6030 	  auto it = crush_node_flags.find(id);
6031 	  if (it != crush_node_flags.end())
6032 	    flags = it->second;
6033 	  return flags;
6034 	}
6035 	
6036 	unsigned OSDMap::get_device_class_flags(int id) const
6037 	{
6038 	  unsigned flags = 0;
6039 	  auto it = device_class_flags.find(id);
6040 	  if (it != device_class_flags.end())
6041 	    flags = it->second;
6042 	  return flags;
6043 	}
6044