1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "osd/osd_types.h"
5    	#include "common/debug.h"
6    	#include "common/Formatter.h"
7    	#include "common/errno.h"
8    	#include "common/TextTable.h"
9    	#include "include/stringify.h"
10   	
11   	#include "CrushWrapper.h"
12   	#include "CrushTreeDumper.h"
13   	
14   	#define dout_subsys ceph_subsys_crush
15   	
16   	using std::cout;
17   	using std::list;
18   	using std::map;
19   	using std::make_pair;
20   	using std::ostream;
21   	using std::ostringstream;
22   	using std::pair;
23   	using std::set;
24   	using std::string;
25   	using std::vector;
26   	
27   	using ceph::bufferlist;
28   	using ceph::decode;
29   	using ceph::decode_nohead;
30   	using ceph::encode;
31   	using ceph::Formatter;
32   	
33   	bool CrushWrapper::has_legacy_rule_ids() const
34   	{
35   	  for (unsigned i=0; i<crush->max_rules; i++) {
36   	    crush_rule *r = crush->rules[i];
37   	    if (r &&
38   		r->mask.ruleset != i) {
39   	      return true;
40   	    }
41   	  }
42   	  return false;
43   	}
44   	
45   	std::map<int, int> CrushWrapper::renumber_rules()
46   	{
47   	  std::map<int, int> result;
48   	  for (unsigned i=0; i<crush->max_rules; i++) {
49   	    crush_rule *r = crush->rules[i];
50   	    if (r && r->mask.ruleset != i) {
51   	      result[r->mask.ruleset] = i;
52   	      r->mask.ruleset = i;
53   	    }
54   	  }
55   	  return result;
56   	}
57   	
58   	bool CrushWrapper::has_non_straw2_buckets() const
59   	{
60   	  for (int i=0; i<crush->max_buckets; ++i) {
61   	    crush_bucket *b = crush->buckets[i];
62   	    if (!b)
63   	      continue;
64   	    if (b->alg != CRUSH_BUCKET_STRAW2)
65   	      return true;
66   	  }
67   	  return false;
68   	}
69   	
70   	bool CrushWrapper::has_v2_rules() const
71   	{
72   	  for (unsigned i=0; i<crush->max_rules; i++) {
73   	    if (is_v2_rule(i)) {
74   	      return true;
75   	    }
76   	  }
77   	  return false;
78   	}
79   	
80   	bool CrushWrapper::is_v2_rule(unsigned ruleid) const
81   	{
82   	  // check rule for use of indep or new SET_* rule steps
83   	  if (ruleid >= crush->max_rules)
84   	    return false;
85   	  crush_rule *r = crush->rules[ruleid];
86   	  if (!r)
87   	    return false;
88   	  for (unsigned j=0; j<r->len; j++) {
89   	    if (r->steps[j].op == CRUSH_RULE_CHOOSE_INDEP ||
90   		r->steps[j].op == CRUSH_RULE_CHOOSELEAF_INDEP ||
91   		r->steps[j].op == CRUSH_RULE_SET_CHOOSE_TRIES ||
92   		r->steps[j].op == CRUSH_RULE_SET_CHOOSELEAF_TRIES) {
93   	      return true;
94   	    }
95   	  }
96   	  return false;
97   	}
98   	
99   	bool CrushWrapper::has_v3_rules() const
100  	{
101  	  for (unsigned i=0; i<crush->max_rules; i++) {
102  	    if (is_v3_rule(i)) {
103  	      return true;
104  	    }
105  	  }
106  	  return false;
107  	}
108  	
109  	bool CrushWrapper::is_v3_rule(unsigned ruleid) const
110  	{
111  	  // check rule for use of SET_CHOOSELEAF_VARY_R step
112  	  if (ruleid >= crush->max_rules)
113  	    return false;
114  	  crush_rule *r = crush->rules[ruleid];
115  	  if (!r)
116  	    return false;
117  	  for (unsigned j=0; j<r->len; j++) {
118  	    if (r->steps[j].op == CRUSH_RULE_SET_CHOOSELEAF_VARY_R) {
119  	      return true;
120  	    }
121  	  }
122  	  return false;
123  	}
124  	
125  	bool CrushWrapper::has_v4_buckets() const
126  	{
127  	  for (int i=0; i<crush->max_buckets; ++i) {
128  	    crush_bucket *b = crush->buckets[i];
129  	    if (!b)
130  	      continue;
131  	    if (b->alg == CRUSH_BUCKET_STRAW2)
132  	      return true;
133  	  }
134  	  return false;
135  	}
136  	
137  	bool CrushWrapper::has_v5_rules() const
138  	{
139  	  for (unsigned i=0; i<crush->max_rules; i++) {
140  	    if (is_v5_rule(i)) {
141  	      return true;
142  	    }
143  	  }
144  	  return false;
145  	}
146  	
147  	bool CrushWrapper::is_v5_rule(unsigned ruleid) const
148  	{
149  	  // check rule for use of SET_CHOOSELEAF_STABLE step
150  	  if (ruleid >= crush->max_rules)
151  	    return false;
152  	  crush_rule *r = crush->rules[ruleid];
153  	  if (!r)
154  	    return false;
155  	  for (unsigned j=0; j<r->len; j++) {
156  	    if (r->steps[j].op == CRUSH_RULE_SET_CHOOSELEAF_STABLE) {
157  	      return true;
158  	    }
159  	  }
160  	  return false;
161  	}
162  	
163  	bool CrushWrapper::has_choose_args() const
164  	{
165  	  return !choose_args.empty();
166  	}
167  	
168  	bool CrushWrapper::has_incompat_choose_args() const
169  	{
170  	  if (choose_args.empty())
171  	    return false;
172  	  if (choose_args.size() > 1)
173  	    return true;
174  	  if (choose_args.begin()->first != DEFAULT_CHOOSE_ARGS)
175  	    return true;
176  	  crush_choose_arg_map arg_map = choose_args.begin()->second;
177  	  for (__u32 i = 0; i < arg_map.size; i++) {
178  	    crush_choose_arg *arg = &arg_map.args[i];
179  	    if (arg->weight_set_positions == 0 &&
180  		arg->ids_size == 0)
181  		continue;
182  	    if (arg->weight_set_positions != 1)
183  	      return true;
184  	    if (arg->ids_size != 0)
185  	      return true;
186  	  }
187  	  return false;
188  	}
189  	
190  	int CrushWrapper::split_id_class(int i, int *idout, int *classout) const
191  	{
192  	  if (!item_exists(i))
193  	    return -EINVAL;
194  	  string name = get_item_name(i);
195  	  size_t pos = name.find("~");
196  	  if (pos == string::npos) {
197  	    *idout = i;
198  	    *classout = -1;
199  	    return 0;
200  	  }
201  	  string name_no_class = name.substr(0, pos);
202  	  if (!name_exists(name_no_class))
203  	    return -ENOENT;
204  	  string class_name = name.substr(pos + 1);
205  	  if (!class_exists(class_name))
206  	    return -ENOENT;
207  	  *idout = get_item_id(name_no_class);
208  	  *classout = get_class_id(class_name);
209  	  return 0;
210  	}
211  	
212  	int CrushWrapper::can_rename_item(const string& srcname,
213  	                                  const string& dstname,
214  	                                  ostream *ss) const
215  	{
216  	  if (name_exists(srcname)) {
217  	    if (name_exists(dstname)) {
218  	      *ss << "dstname = '" << dstname << "' already exists";
219  	      return -EEXIST;
220  	    }
221  	    if (is_valid_crush_name(dstname)) {
222  	      return 0;
223  	    } else {
224  	      *ss << "dstname = '" << dstname << "' does not match [-_.0-9a-zA-Z]+";
225  	      return -EINVAL;
226  	    }
227  	  } else {
228  	    if (name_exists(dstname)) {
229  	      *ss << "srcname = '" << srcname << "' does not exist "
230  	          << "and dstname = '" << dstname << "' already exists";
231  	      return -EALREADY;
232  	    } else {
233  	      *ss << "srcname = '" << srcname << "' does not exist";
234  	      return -ENOENT;
235  	    }
236  	  }
237  	}
238  	
239  	int CrushWrapper::rename_item(const string& srcname,
240  	                              const string& dstname,
241  	                              ostream *ss)
242  	{
243  	  int ret = can_rename_item(srcname, dstname, ss);
244  	  if (ret < 0)
245  	    return ret;
246  	  int oldid = get_item_id(srcname);
247  	  return set_item_name(oldid, dstname);
248  	}
249  	
250  	int CrushWrapper::can_rename_bucket(const string& srcname,
251  	                                    const string& dstname,
252  	                                    ostream *ss) const
253  	{
254  	  int ret = can_rename_item(srcname, dstname, ss);
255  	  if (ret)
256  	    return ret;
257  	  int srcid = get_item_id(srcname);
258  	  if (srcid >= 0) {
259  	    *ss << "srcname = '" << srcname << "' is not a bucket "
260  	        << "because its id = " << srcid << " is >= 0";
261  	    return -ENOTDIR;
262  	  }
263  	  return 0;
264  	}
265  	
266  	int CrushWrapper::rename_bucket(const string& srcname,
267  	                                const string& dstname,
268  	                                ostream *ss)
269  	{
270  	  int ret = can_rename_bucket(srcname, dstname, ss);
271  	  if (ret < 0)
272  	    return ret;
273  	  int oldid = get_item_id(srcname);
274  	  return set_item_name(oldid, dstname);
275  	}
276  	
277  	int CrushWrapper::rename_rule(const string& srcname,
278  	                              const string& dstname,
279  	                              ostream *ss)
280  	{
281  	  if (!rule_exists(srcname)) {
282  	    if (ss) {
283  	      *ss << "source rule name '" << srcname << "' does not exist";
284  	    }
285  	    return -ENOENT;
286  	  }
287  	  if (rule_exists(dstname)) {
288  	    if (ss) {
289  	      *ss << "destination rule name '" << dstname << "' already exists";
290  	    }
291  	    return -EEXIST;
292  	  }
293  	  int rule_id = get_rule_id(srcname);
294  	  auto it = rule_name_map.find(rule_id);
295  	  ceph_assert(it != rule_name_map.end());
296  	  it->second = dstname;
297  	  if (have_rmaps) {
298  	    rule_name_rmap.erase(srcname);
299  	    rule_name_rmap[dstname] = rule_id;
300  	  }
301  	  return 0;
302  	}
303  	
304  	void CrushWrapper::find_takes(set<int> *roots) const
305  	{
306  	  for (unsigned i=0; i<crush->max_rules; i++) {
307  	    crush_rule *r = crush->rules[i];
308  	    if (!r)
309  	      continue;
310  	    for (unsigned j=0; j<r->len; j++) {
311  	      if (r->steps[j].op == CRUSH_RULE_TAKE)
312  		roots->insert(r->steps[j].arg1);
313  	    }
314  	  }
315  	}
316  	
317  	void CrushWrapper::find_takes_by_rule(int rule, set<int> *roots) const
318  	{
319  	  if (rule < 0 || rule >= (int)crush->max_rules)
320  	    return;
321  	  crush_rule *r = crush->rules[rule];
322  	  if (!r)
323  	    return;
324  	  for (unsigned i = 0; i < r->len; i++) {
325  	    if (r->steps[i].op == CRUSH_RULE_TAKE)
326  	      roots->insert(r->steps[i].arg1);
327  	  }
328  	}
329  	
330  	void CrushWrapper::find_roots(set<int> *roots) const
331  	{
332  	  for (int i = 0; i < crush->max_buckets; i++) {
333  	    if (!crush->buckets[i])
334  	      continue;
335  	    crush_bucket *b = crush->buckets[i];
336  	    if (!_search_item_exists(b->id))
337  	      roots->insert(b->id);
338  	  }
339  	}
340  	
341  	bool CrushWrapper::subtree_contains(int root, int item) const
342  	{
343  	  if (root == item)
344  	    return true;
345  	
346  	  if (root >= 0)
347  	    return false;  // root is a leaf
348  	
349  	  const crush_bucket *b = get_bucket(root);
350  	  if (IS_ERR(b))
351  	    return false;
352  	
353  	  for (unsigned j=0; j<b->size; j++) {
354  	    if (subtree_contains(b->items[j], item))
355  	      return true;
356  	  }
357  	  return false;
358  	}
359  	
360  	bool CrushWrapper::_maybe_remove_last_instance(CephContext *cct, int item, bool unlink_only)
361  	{
362  	  // last instance?
363  	  if (_search_item_exists(item)) {
364  	    return false;
365  	  }
366  	  if (item < 0 && _bucket_is_in_use(item)) {
367  	    return false;
368  	  }
369  	
370  	  if (item < 0 && !unlink_only) {
371  	    crush_bucket *t = get_bucket(item);
372  	    ldout(cct, 5) << "_maybe_remove_last_instance removing bucket " << item << dendl;
373  	    crush_remove_bucket(crush, t);
374  	    if (class_bucket.count(item) != 0)
375  	      class_bucket.erase(item);
376  	    class_remove_item(item);
377  	    update_choose_args(cct);
378  	  }
379  	  if ((item >= 0 || !unlink_only) && name_map.count(item)) {
380  	    ldout(cct, 5) << "_maybe_remove_last_instance removing name for item " << item << dendl;
381  	    name_map.erase(item);
382  	    have_rmaps = false;
383  	    if (item >= 0 && !unlink_only) {
384  	      class_remove_item(item);
385  	    }
386  	  }
387  	  rebuild_roots_with_classes(cct);
388  	  return true;
389  	}
390  	
391  	int CrushWrapper::remove_root(CephContext *cct, int item)
392  	{
393  	  crush_bucket *b = get_bucket(item);
394  	  if (IS_ERR(b)) {
395  	    // should be idempotent
396  	    // e.g.: we use 'crush link' to link same host into
397  	    // different roots, which as a result can cause different
398  	    // shadow trees reference same hosts too. This means
399  	    // we may need to destory the same buckets(hosts, racks, etc.)
400  	    // multiple times during rebuilding all shadow trees.
401  	    return 0;
402  	  }
403  	
404  	  for (unsigned n = 0; n < b->size; n++) {
405  	    if (b->items[n] >= 0)
406  	      continue;
407  	    int r = remove_root(cct, b->items[n]);
408  	    if (r < 0)
409  	      return r;
410  	  }
411  	
412  	  crush_remove_bucket(crush, b);
413  	  if (name_map.count(item) != 0) {
414  	    name_map.erase(item);
415  	    have_rmaps = false;
416  	  }
417  	  if (class_bucket.count(item) != 0)
418  	    class_bucket.erase(item);
419  	  class_remove_item(item);
420  	  update_choose_args(cct);
421  	  return 0;
422  	}
423  	
424  	void CrushWrapper::update_choose_args(CephContext *cct)
425  	{
426  	  for (auto& i : choose_args) {
427  	    crush_choose_arg_map &arg_map = i.second;
428  	    assert(arg_map.size == (unsigned)crush->max_buckets);
429  	    unsigned positions = get_choose_args_positions(arg_map);
430  	    for (int j = 0; j < crush->max_buckets; ++j) {
431  	      crush_bucket *b = crush->buckets[j];
432  	      assert(j < (int)arg_map.size);
433  	      auto& carg = arg_map.args[j];
434  	      // strip out choose_args for any buckets that no longer exist
435  	      if (!b || b->alg != CRUSH_BUCKET_STRAW2) {
436  		if (carg.ids) {
437  		  if (cct)
438  		    ldout(cct,10) << __func__ << " removing " << i.first << " bucket "
439  				  << (-1-j) << " ids" << dendl;
440  		  free(carg.ids);
441  		  carg.ids = 0;
442  		  carg.ids_size = 0;
443  		}
444  		if (carg.weight_set) {
445  		  if (cct)
446  		    ldout(cct,10) << __func__ << " removing " << i.first << " bucket "
447  				  << (-1-j) << " weight_sets" << dendl;
448  		  for (unsigned p = 0; p < carg.weight_set_positions; ++p) {
449  		    free(carg.weight_set[p].weights);
450  		  }
451  		  free(carg.weight_set);
452  		  carg.weight_set = 0;
453  		  carg.weight_set_positions = 0;
454  		}
455  		continue;
456  	      }
457  	      if (carg.weight_set_positions == 0) {
458  		continue;	// skip it
459  	      }
460  	      if (carg.weight_set_positions != positions) {
461  		if (cct)
462  		  lderr(cct) << __func__ << " " << i.first << " bucket "
463  			     << (-1-j) << " positions " << carg.weight_set_positions
464  			     << " -> " << positions << dendl;
465  		continue;	// wth... skip!
466  	      }
467  	      // mis-sized weight_sets?  this shouldn't ever happen.
468  	      for (unsigned p = 0; p < positions; ++p) {
469  		if (carg.weight_set[p].size != b->size) {
470  		  if (cct)
471  		    lderr(cct) << __func__ << " fixing " << i.first << " bucket "
472  			       << (-1-j) << " position " << p
473  			       << " size " << carg.weight_set[p].size << " -> "
474  			       << b->size << dendl;
475  		  auto old_ws = carg.weight_set[p];
476  		  carg.weight_set[p].size = b->size;
477  		  carg.weight_set[p].weights = (__u32*)calloc(b->size, sizeof(__u32));
478  		  auto max = std::min<unsigned>(old_ws.size, b->size);
479  		  for (unsigned k = 0; k < max; ++k) {
480  		    carg.weight_set[p].weights[k] = old_ws.weights[k];
481  		  }
482  		  free(old_ws.weights);
483  		}
484  	      }
485  	    }
486  	  }
487  	}
488  	
489  	int CrushWrapper::remove_item(CephContext *cct, int item, bool unlink_only)
490  	{
491  	  ldout(cct, 5) << "remove_item " << item
492  			<< (unlink_only ? " unlink_only":"") << dendl;
493  	
494  	  int ret = -ENOENT;
495  	
496  	  if (item < 0 && !unlink_only) {
497  	    crush_bucket *t = get_bucket(item);
498  	    if (IS_ERR(t)) {
499  	      ldout(cct, 1) << "remove_item bucket " << item << " does not exist"
500  			    << dendl;
501  	      return -ENOENT;
502  	    }
503  	
504  	    if (t->size) {
505  	      ldout(cct, 1) << "remove_item bucket " << item << " has " << t->size
506  			    << " items, not empty" << dendl;
507  	      return -ENOTEMPTY;
508  	    }
509  	    if (_bucket_is_in_use(item)) {
510  	      return -EBUSY;
511  	    }
512  	  }
513  	
514  	  for (int i = 0; i < crush->max_buckets; i++) {
515  	    if (!crush->buckets[i])
516  	      continue;
517  	    crush_bucket *b = crush->buckets[i];
518  	
519  	    for (unsigned i=0; i<b->size; ++i) {
520  	      int id = b->items[i];
521  	      if (id == item) {
522  		ldout(cct, 5) << "remove_item removing item " << item
523  			      << " from bucket " << b->id << dendl;
524  		adjust_item_weight_in_bucket(cct, item, 0, b->id, true);
525  		bucket_remove_item(b, item);
526  		ret = 0;
527  	      }
528  	    }
529  	  }
530  	
531  	  if (_maybe_remove_last_instance(cct, item, unlink_only))
532  	    ret = 0;
533  	  
534  	  return ret;
535  	}
536  	
537  	bool CrushWrapper::_search_item_exists(int item) const
538  	{
539  	  for (int i = 0; i < crush->max_buckets; i++) {
540  	    if (!crush->buckets[i])
541  	      continue;
542  	    crush_bucket *b = crush->buckets[i];
543  	    for (unsigned j=0; j<b->size; ++j) {
544  	      if (b->items[j] == item)
545  		return true;
546  	    }
547  	  }
548  	  return false;
549  	}
550  	
551  	bool CrushWrapper::_bucket_is_in_use(int item)
552  	{
553  	  for (auto &i : class_bucket)
554  	    for (auto &j : i.second)
555  	      if (j.second == item)
556  		return true;
557  	  for (unsigned i = 0; i < crush->max_rules; ++i) {
558  	    crush_rule *r = crush->rules[i];
559  	    if (!r)
560  	      continue;
561  	    for (unsigned j = 0; j < r->len; ++j) {
562  	      if (r->steps[j].op == CRUSH_RULE_TAKE) {
563  		int step_item = r->steps[j].arg1;
564  		int original_item;
565  		int c;
566  		int res = split_id_class(step_item, &original_item, &c);
567  		if (res < 0)
568  		  return false;
569  		if (step_item == item || original_item == item)
570  		  return true;
571  	      }
572  	    }
573  	  }
574  	  return false;
575  	}
576  	
577  	int CrushWrapper::_remove_item_under(
578  	  CephContext *cct, int item, int ancestor, bool unlink_only)
579  	{
580  	  ldout(cct, 5) << "_remove_item_under " << item << " under " << ancestor
581  			<< (unlink_only ? " unlink_only":"") << dendl;
582  	
583  	  if (ancestor >= 0) {
584  	    return -EINVAL;
585  	  }
586  	
587  	  if (!bucket_exists(ancestor))
588  	    return -EINVAL;
589  	
590  	  int ret = -ENOENT;
591  	
592  	  crush_bucket *b = get_bucket(ancestor);
593  	  for (unsigned i=0; i<b->size; ++i) {
594  	    int id = b->items[i];
595  	    if (id == item) {
596  	      ldout(cct, 5) << "_remove_item_under removing item " << item
597  			    << " from bucket " << b->id << dendl;
598  	      adjust_item_weight_in_bucket(cct, item, 0, b->id, true);
599  	      bucket_remove_item(b, item);
600  	      ret = 0;
601  	    } else if (id < 0) {
602  	      int r = remove_item_under(cct, item, id, unlink_only);
603  	      if (r == 0)
604  		ret = 0;
605  	    }
606  	  }
607  	  return ret;
608  	}
609  	
610  	int CrushWrapper::remove_item_under(
611  	  CephContext *cct, int item, int ancestor, bool unlink_only)
612  	{
613  	  ldout(cct, 5) << "remove_item_under " << item << " under " << ancestor
614  			<< (unlink_only ? " unlink_only":"") << dendl;
615  	
616  	  if (!unlink_only && _bucket_is_in_use(item)) {
617  	    return -EBUSY;
618  	  }
619  	
620  	  int ret = _remove_item_under(cct, item, ancestor, unlink_only);
621  	  if (ret < 0)
622  	    return ret;
623  	
624  	  if (item < 0 && !unlink_only) {
625  	    crush_bucket *t = get_bucket(item);
626  	    if (IS_ERR(t)) {
627  	      ldout(cct, 1) << "remove_item_under bucket " << item
628  	                    << " does not exist" << dendl;
629  	      return -ENOENT;
630  	    }
631  	
632  	    if (t->size) {
633  	      ldout(cct, 1) << "remove_item_under bucket " << item << " has " << t->size
634  			    << " items, not empty" << dendl;
635  	      return -ENOTEMPTY;
636  	    }
637  	  }
638  	
639  	  if (_maybe_remove_last_instance(cct, item, unlink_only))
640  	    ret = 0;
641  	
642  	  return ret;
643  	}
644  	
645  	int CrushWrapper::get_common_ancestor_distance(CephContext *cct, int id,
646  				       const std::multimap<string,string>& loc) const
647  	{
648  	  ldout(cct, 5) << __func__ << " " << id << " " << loc << dendl;
649  	  if (!item_exists(id))
650  	    return -ENOENT;
651  	  map<string,string> id_loc = get_full_location(id);
652  	  ldout(cct, 20) << " id is at " << id_loc << dendl;
653  	
654  	  for (map<int,string>::const_iterator p = type_map.begin();
655  	       p != type_map.end();
656  	       ++p) {
657  	    map<string,string>::iterator ip = id_loc.find(p->second);
658  	    if (ip == id_loc.end())
659  	      continue;
660  	    for (std::multimap<string,string>::const_iterator q = loc.find(p->second);
661  		 q != loc.end();
662  		 ++q) {
663  	      if (q->first != p->second)
664  		break;
665  	      if (q->second == ip->second)
666  		return p->first;
667  	    }
668  	  }
669  	  return -ERANGE;
670  	}
671  	
672  	int CrushWrapper::parse_loc_map(const std::vector<string>& args,
673  					std::map<string,string> *ploc)
674  	{
675  	  ploc->clear();
676  	  for (unsigned i = 0; i < args.size(); ++i) {
677  	    const char *s = args[i].c_str();
678  	    const char *pos = strchr(s, '=');
679  	    if (!pos)
680  	      return -EINVAL;
681  	    string key(s, 0, pos-s);
682  	    string value(pos+1);
683  	    if (value.length())
684  	      (*ploc)[key] = value;
685  	    else
686  	      return -EINVAL;
687  	  }
688  	  return 0;
689  	}
690  	
691  	int CrushWrapper::parse_loc_multimap(const std::vector<string>& args,
692  						    std::multimap<string,string> *ploc)
693  	{
694  	  ploc->clear();
695  	  for (unsigned i = 0; i < args.size(); ++i) {
696  	    const char *s = args[i].c_str();
697  	    const char *pos = strchr(s, '=');
698  	    if (!pos)
699  	      return -EINVAL;
700  	    string key(s, 0, pos-s);
701  	    string value(pos+1);
702  	    if (value.length())
703  	      ploc->insert(make_pair(key, value));
704  	    else
705  	      return -EINVAL;
706  	  }
707  	  return 0;
708  	}
709  	
710  	bool CrushWrapper::check_item_loc(CephContext *cct, int item, const map<string,string>& loc,
711  					  int *weight)
712  	{
713  	  ldout(cct, 5) << "check_item_loc item " << item << " loc " << loc << dendl;
714  	
715  	  for (map<int,string>::const_iterator p = type_map.begin(); p != type_map.end(); ++p) {
716  	    // ignore device
717  	    if (p->first == 0)
718  	      continue;
719  	
720  	    // ignore types that aren't specified in loc
721  	    map<string,string>::const_iterator q = loc.find(p->second);
722  	    if (q == loc.end()) {
723  	      ldout(cct, 2) << "warning: did not specify location for '" << p->second << "' level (levels are "
724  			    << type_map << ")" << dendl;
725  	      continue;
726  	    }
727  	
728  	    if (!name_exists(q->second)) {
729  	      ldout(cct, 5) << "check_item_loc bucket " << q->second << " dne" << dendl;
730  	      return false;
731  	    }
732  	
733  	    int id = get_item_id(q->second);
734  	    if (id >= 0) {
735  	      ldout(cct, 5) << "check_item_loc requested " << q->second << " for type " << p->second
736  			    << " is a device, not bucket" << dendl;
737  	      return false;
738  	    }
739  	
740  	    ceph_assert(bucket_exists(id));
741  	    crush_bucket *b = get_bucket(id);
742  	
743  	    // see if item exists in this bucket
744  	    for (unsigned j=0; j<b->size; j++) {
745  	      if (b->items[j] == item) {
746  		ldout(cct, 2) << "check_item_loc " << item << " exists in bucket " << b->id << dendl;
747  		if (weight)
748  		  *weight = crush_get_bucket_item_weight(b, j);
749  		return true;
750  	      }
751  	    }
752  	    return false;
753  	  }
754  	  
755  	  ldout(cct, 2) << __func__ << " item " << item << " loc " << loc << dendl;
756  	  return false;
757  	}
758  	
759  	map<string, string> CrushWrapper::get_full_location(int id) const
760  	{
761  	  vector<pair<string, string> > full_location_ordered;
762  	  map<string,string> full_location;
763  	
764  	  get_full_location_ordered(id, full_location_ordered);
765  	
766  	  std::copy(full_location_ordered.begin(),
767  	      full_location_ordered.end(),
768  	      std::inserter(full_location, full_location.begin()));
769  	
770  	  return full_location;
771  	}
772  	
773  	int CrushWrapper::get_full_location(const string& name,
774  					    map<string,string> *ploc)
775  	{
776  	  build_rmaps();
777  	  auto p = name_rmap.find(name);
778  	  if (p == name_rmap.end()) {
779  	    return -ENOENT;
780  	  }
781  	  *ploc = get_full_location(p->second);
782  	  return 0;
783  	}
784  	
785  	int CrushWrapper::get_full_location_ordered(int id, vector<pair<string, string> >& path) const
786  	{
787  	  if (!item_exists(id))
788  	    return -ENOENT;
789  	  int cur = id;
790  	  int ret;
791  	  while (true) {
792  	    pair<string, string> parent_coord = get_immediate_parent(cur, &ret);
793  	    if (ret != 0)
794  	      break;
795  	    path.push_back(parent_coord);
796  	    cur = get_item_id(parent_coord.second);
797  	  }
798  	  return 0;
799  	}
800  	
801  	string CrushWrapper::get_full_location_ordered_string(int id) const
802  	{
803  	  vector<pair<string, string> > full_location_ordered;
804  	  string full_location;
805  	  get_full_location_ordered(id, full_location_ordered);
806  	  reverse(begin(full_location_ordered), end(full_location_ordered));
807  	  for(auto i = full_location_ordered.begin(); i != full_location_ordered.end(); i++) {
808  	    full_location = full_location + i->first + "=" + i->second;
809  	    if (i != full_location_ordered.end() - 1) {
810  	      full_location = full_location + ",";
811  	    }
812  	  }
813  	  return full_location;
814  	}
815  	
816  	map<int, string> CrushWrapper::get_parent_hierarchy(int id) const
817  	{
818  	  map<int,string> parent_hierarchy;
819  	  pair<string, string> parent_coord = get_immediate_parent(id);
820  	  int parent_id;
821  	
822  	  // get the integer type for id and create a counter from there
823  	  int type_counter = get_bucket_type(id);
824  	
825  	  // if we get a negative type then we can assume that we have an OSD
826  	  // change behavior in get_item_type FIXME
827  	  if (type_counter < 0)
828  	    type_counter = 0;
829  	
830  	  // read the type map and get the name of the type with the largest ID
831  	  int high_type = 0;
832  	  if (!type_map.empty())
833  	    high_type = type_map.rbegin()->first;
834  	
835  	  parent_id = get_item_id(parent_coord.second);
836  	
837  	  while (type_counter < high_type) {
838  	    type_counter++;
839  	    parent_hierarchy[ type_counter ] = parent_coord.first;
840  	
841  	    if (type_counter < high_type){
842  	      // get the coordinate information for the next parent
843  	      parent_coord = get_immediate_parent(parent_id);
844  	      parent_id = get_item_id(parent_coord.second);
845  	    }
846  	  }
847  	
848  	  return parent_hierarchy;
849  	}
850  	
851  	int CrushWrapper::get_children(int id, list<int> *children) const
852  	{
853  	  // leaf?
854  	  if (id >= 0) {
855  	    return 0;
856  	  }
857  	
858  	  auto *b = get_bucket(id);
859  	  if (IS_ERR(b)) {
860  	    return -ENOENT;
861  	  }
862  	
863  	  for (unsigned n=0; n<b->size; n++) {
864  	    children->push_back(b->items[n]);
865  	  }
866  	  return b->size;
867  	}
868  	
869  	int CrushWrapper::get_all_children(int id, set<int> *children) const
870  	{
871  	  // leaf?
872  	  if (id >= 0) {
873  	    return 0;
874  	  }
875  	
876  	  auto *b = get_bucket(id);
877  	  if (IS_ERR(b)) {
878  	    return -ENOENT;
879  	  }
880  	
881  	  int c = 0;
882  	  for (unsigned n = 0; n < b->size; n++) {
883  	    children->insert(b->items[n]);
884  	    c++;
885  	    auto r = get_all_children(b->items[n], children);
886  	    if (r < 0)
887  	      return r;
888  	    c += r;
889  	  }
890  	  return c;
891  	}
892  	
893  	void CrushWrapper::get_children_of_type(int id,
894  	                                        int type,
895  						vector<int> *children,
896  						bool exclude_shadow) const
897  	{
898  	  if (id >= 0) {
899  	    if (type == 0) {
900  	      // want leaf?
901  	      children->push_back(id);
902  	    }
903  	    return;
904  	  }
905  	  auto b = get_bucket(id);
906  	  if (IS_ERR(b)) {
907  	    return;
908  	  }
909  	  if (b->type < type) {
910  	    // give up
911  	    return;
912  	  } else if (b->type == type) {
913  	    if (!is_shadow_item(b->id) || !exclude_shadow) {
914  	      children->push_back(b->id);
915  	    }
916  	    return;
917  	  }
918  	  for (unsigned n = 0; n < b->size; n++) {
919  	    get_children_of_type(b->items[n], type, children, exclude_shadow);
920  	  }
921  	}
922  	
923  	int CrushWrapper::verify_upmap(CephContext *cct,
924  	                               int rule_id,
925  	                               int pool_size,
926  	                               const vector<int>& up)
927  	{
928  	  auto rule = get_rule(rule_id);
929  	  if (IS_ERR(rule) || !rule) {
930  	    lderr(cct) << __func__ << " rule " << rule_id << " does not exist"
931  	               << dendl;
932  	    return -ENOENT;
933  	  }
934  	  for (unsigned step = 0; step < rule->len; ++step) {
935  	    auto curstep = &rule->steps[step];
936  	    ldout(cct, 10) << __func__ << " step " << step << dendl;
937  	    switch (curstep->op) {
938  	    case CRUSH_RULE_CHOOSELEAF_FIRSTN:
939  	    case CRUSH_RULE_CHOOSELEAF_INDEP:
940  	      {
941  	        int type = curstep->arg2;
942  	        if (type == 0) // osd
943  	          break;
944  	        map<int, set<int>> osds_by_parent; // parent_of_desired_type -> osds
945  	        for (auto osd : up) {
946  	          auto parent = get_parent_of_type(osd, type, rule_id);
947  	          if (parent < 0) {
948  	            osds_by_parent[parent].insert(osd);
949  	          } else {
950  	            ldout(cct, 1) << __func__ << " unable to get parent of osd." << osd
951  	                          << ", skipping for now"
952  	                          << dendl;
953  	          }
954  	        }
955  	        for (auto i : osds_by_parent) {
956  	          if (i.second.size() > 1) {
957  	            lderr(cct) << __func__ << " multiple osds " << i.second
958  	                       << " come from same failure domain " << i.first
959  	                       << dendl;
960  	            return -EINVAL;
961  	          }
962  	        }
963  	      }
964  	      break;
965  	
966  	    case CRUSH_RULE_CHOOSE_FIRSTN:
967  	    case CRUSH_RULE_CHOOSE_INDEP:
968  	      {
969  	        int numrep = curstep->arg1;
970  	        int type = curstep->arg2;
971  	        if (type == 0) // osd
972  	          break;
973  	        if (numrep <= 0)
974  	          numrep += pool_size;
975  	        set<int> parents_of_type;
976  	        for (auto osd : up) {
977  	          auto parent = get_parent_of_type(osd, type, rule_id);
978  	          if (parent < 0) {
979  	            parents_of_type.insert(parent);
980  	          } else {
981  	            ldout(cct, 1) << __func__ << " unable to get parent of osd." << osd
982  	                          << ", skipping for now"
983  	                          << dendl;
984  	          }
985  	        }
986  	        if ((int)parents_of_type.size() > numrep) {
987  	          lderr(cct) << __func__ << " number of buckets "
988  	                     << parents_of_type.size() << " exceeds desired " << numrep
989  	                     << dendl;
990  	          return -EINVAL;
991  	        }
992  	      }
993  	      break;
994  	
995  	    default:
996  	      // ignore
997  	      break;
998  	    }
999  	  }
1000 	  return 0;
1001 	}
1002 	
1003 	int CrushWrapper::_get_leaves(int id, list<int> *leaves) const
1004 	{
1005 	  ceph_assert(leaves);
1006 	
1007 	  // Already leaf?
1008 	  if (id >= 0) {
1009 	    leaves->push_back(id);
1010 	    return 0;
1011 	  }
1012 	
1013 	  auto b = get_bucket(id);
1014 	  if (IS_ERR(b)) {
1015 	    return -ENOENT;
1016 	  }
1017 	
1018 	  for (unsigned n = 0; n < b->size; n++) {
1019 	    if (b->items[n] >= 0) {
1020 	      leaves->push_back(b->items[n]);
1021 	    } else {
1022 	      // is a bucket, do recursive call
1023 	      int r = _get_leaves(b->items[n], leaves);
1024 	      if (r < 0) {
1025 	        return r;
1026 	      }
1027 	    }
1028 	  }
1029 	
1030 	  return 0; // all is well
1031 	}
1032 	
1033 	int CrushWrapper::get_leaves(const string &name, set<int> *leaves) const
1034 	{
1035 	  ceph_assert(leaves);
1036 	  leaves->clear();
1037 	
1038 	  if (!name_exists(name)) {
1039 	    return -ENOENT;
1040 	  }
1041 	
1042 	  int id = get_item_id(name);
1043 	  if (id >= 0) {
1044 	    // already leaf
1045 	    leaves->insert(id);
1046 	    return 0;
1047 	  }
1048 	
1049 	  list<int> unordered;
1050 	  int r = _get_leaves(id, &unordered);
1051 	  if (r < 0) {
1052 	    return r;
1053 	  }
1054 	
1055 	  for (auto &p : unordered) {
1056 	    leaves->insert(p);
1057 	  }
1058 	
1059 	  return 0;
1060 	}
1061 	
1062 	int CrushWrapper::insert_item(
1063 	  CephContext *cct, int item, float weight, string name,
1064 	  const map<string,string>& loc,  // typename -> bucketname
1065 	  bool init_weight_sets)
1066 	{
1067 	  ldout(cct, 5) << "insert_item item " << item << " weight " << weight
1068 			<< " name " << name << " loc " << loc << dendl;
1069 	
1070 	  if (!is_valid_crush_name(name))
1071 	    return -EINVAL;
1072 	
1073 	  if (!is_valid_crush_loc(cct, loc))
1074 	    return -EINVAL;
1075 	
1076 	  int r = validate_weightf(weight);
1077 	  if (r < 0) {
1078 	    return r;
1079 	  }
1080 	
1081 	  if (name_exists(name)) {
1082 	    if (get_item_id(name) != item) {
1083 	      ldout(cct, 10) << "device name '" << name << "' already exists as id "
1084 			     << get_item_id(name) << dendl;
1085 	      return -EEXIST;
1086 	    }
1087 	  } else {
1088 	    set_item_name(item, name);
1089 	  }
1090 	
1091 	  int cur = item;
1092 	
1093 	  // create locations if locations don't exist and add child in
1094 	  // location with 0 weight the more detail in the insert_item method
1095 	  // declaration in CrushWrapper.h
1096 	  for (auto p = type_map.begin(); p != type_map.end(); ++p) {
1097 	    // ignore device type
1098 	    if (p->first == 0)
1099 	      continue;
1100 	
1101 	    // skip types that are unspecified
1102 	    map<string,string>::const_iterator q = loc.find(p->second);
1103 	    if (q == loc.end()) {
1104 	      ldout(cct, 2) << "warning: did not specify location for '"
1105 			    << p->second << "' level (levels are "
1106 			    << type_map << ")" << dendl;
1107 	      continue;
1108 	    }
1109 	
1110 	    if (!name_exists(q->second)) {
1111 	      ldout(cct, 5) << "insert_item creating bucket " << q->second << dendl;
1112 	      int empty = 0, newid;
1113 	      int r = add_bucket(0, 0,
1114 				 CRUSH_HASH_DEFAULT, p->first, 1, &cur, &empty, &newid);
1115 	      if (r < 0) {
1116 	        ldout(cct, 1) << "add_bucket failure error: " << cpp_strerror(r)
1117 			      << dendl;
1118 	        return r;
1119 	      }
1120 	      set_item_name(newid, q->second);
1121 	      
1122 	      cur = newid;
1123 	      continue;
1124 	    }
1125 	
1126 	    // add to an existing bucket
1127 	    int id = get_item_id(q->second);
1128 	    if (!bucket_exists(id)) {
1129 	      ldout(cct, 1) << "insert_item doesn't have bucket " << id << dendl;
1130 	      return -EINVAL;
1131 	    }
1132 	
1133 	    // check that we aren't creating a cycle.
1134 	    if (subtree_contains(id, cur)) {
1135 	      ldout(cct, 1) << "insert_item item " << cur << " already exists beneath "
1136 			    << id << dendl;
1137 	      return -EINVAL;
1138 	    }
1139 	
1140 	    // we have done sanity check above
1141 	    crush_bucket *b = get_bucket(id);
1142 	
1143 	    if (p->first != b->type) {
1144 	      ldout(cct, 1) << "insert_item existing bucket has type "
1145 		<< "'" << type_map[b->type] << "' != "
1146 		<< "'" << type_map[p->first] << "'" << dendl;
1147 	      return -EINVAL;
1148 	    }
1149 	
1150 	    // are we forming a loop?
1151 	    if (subtree_contains(cur, b->id)) {
1152 	      ldout(cct, 1) << "insert_item " << cur << " already contains " << b->id
1153 			    << "; cannot form loop" << dendl;
1154 	      return -ELOOP;
1155 	    }
1156 	
1157 	    ldout(cct, 5) << "insert_item adding " << cur << " weight " << weight
1158 			  << " to bucket " << id << dendl;
1159 	    [[maybe_unused]] int r = bucket_add_item(b, cur, 0);
1160 	    ceph_assert(!r);
1161 	    break;
1162 	  }
1163 	
1164 	  // adjust the item's weight in location
1165 	  if (adjust_item_weightf_in_loc(cct, item, weight, loc,
1166 					 item >= 0 && init_weight_sets) > 0) {
1167 	    if (item >= crush->max_devices) {
1168 	      crush->max_devices = item + 1;
1169 	      ldout(cct, 5) << "insert_item max_devices now " << crush->max_devices
1170 			    << dendl;
1171 	    }
1172 	    r = rebuild_roots_with_classes(cct);
1173 	    if (r < 0) {
1174 	      ldout(cct, 0) << __func__ << " unable to rebuild roots with classes: "
1175 	                    << cpp_strerror(r) << dendl;
1176 	      return r;
1177 	    }
1178 	    return 0;
1179 	  }
1180 	
1181 	  ldout(cct, 1) << "error: didn't find anywhere to add item " << item
1182 			<< " in " << loc << dendl;
1183 	  return -EINVAL;
1184 	}
1185 	
1186 	
1187 	int CrushWrapper::move_bucket(
1188 	  CephContext *cct, int id, const map<string,string>& loc)
1189 	{
1190 	  // sorry this only works for buckets
1191 	  if (id >= 0)
1192 	    return -EINVAL;
1193 	
1194 	  if (!item_exists(id))
1195 	    return -ENOENT;
1196 	
1197 	  // get the name of the bucket we are trying to move for later
1198 	  string id_name = get_item_name(id);
1199 	
1200 	  // detach the bucket
1201 	  int bucket_weight = detach_bucket(cct, id);
1202 	
1203 	  // insert the bucket back into the hierarchy
1204 	  return insert_item(cct, id, bucket_weight / (float)0x10000, id_name, loc,
1205 			     false);
1206 	}
1207 	
1208 	int CrushWrapper::detach_bucket(CephContext *cct, int item)
1209 	{
1210 	  if (!crush)
1211 	    return (-EINVAL);
1212 	
1213 	  if (item >= 0)
1214 	    return (-EINVAL);
1215 	
1216 	  // check that the bucket that we want to detach exists
1217 	  ceph_assert(bucket_exists(item));
1218 	
1219 	  // get the bucket's weight
1220 	  crush_bucket *b = get_bucket(item);
1221 	  unsigned bucket_weight = b->weight;
1222 	
1223 	  // get where the bucket is located
1224 	  pair<string, string> bucket_location = get_immediate_parent(item);
1225 	
1226 	  // get the id of the parent bucket
1227 	  int parent_id = get_item_id(bucket_location.second);
1228 	
1229 	  // get the parent bucket
1230 	  crush_bucket *parent_bucket = get_bucket(parent_id);
1231 	
1232 	  if (!IS_ERR(parent_bucket)) {
1233 	    // zero out the bucket weight
1234 	    adjust_item_weight_in_bucket(cct, item, 0, parent_bucket->id, true);
1235 	
1236 	    // remove the bucket from the parent
1237 	    bucket_remove_item(parent_bucket, item);
1238 	  } else if (PTR_ERR(parent_bucket) != -ENOENT) {
1239 	    return PTR_ERR(parent_bucket);
1240 	  }
1241 	
1242 	  // check that we're happy
1243 	  int test_weight = 0;
1244 	  map<string,string> test_location;
1245 	  test_location[ bucket_location.first ] = (bucket_location.second);
1246 	
1247 	  bool successful_detach = !(check_item_loc(cct, item, test_location,
1248 						    &test_weight));
1249 	  ceph_assert(successful_detach);
1250 	  ceph_assert(test_weight == 0);
1251 	
1252 	  return bucket_weight;
1253 	}
1254 	
1255 	bool CrushWrapper::is_parent_of(int child, int p) const
1256 	{
1257 	  int parent = 0;
1258 	  while (!get_immediate_parent_id(child, &parent)) {
1259 	    if (parent == p) {
1260 	      return true;
1261 	    }
1262 	    child = parent;
1263 	  }
1264 	  return false;
1265 	}
1266 	
1267 	int CrushWrapper::swap_bucket(CephContext *cct, int src, int dst)
1268 	{
1269 	  if (src >= 0 || dst >= 0)
1270 	    return -EINVAL;
1271 	  if (!item_exists(src) || !item_exists(dst))
1272 	    return -EINVAL;
1273 	  crush_bucket *a = get_bucket(src);
1274 	  crush_bucket *b = get_bucket(dst);
1275 	  if (is_parent_of(a->id, b->id) || is_parent_of(b->id, a->id)) {
1276 	    return -EINVAL;
1277 	  }
1278 	  unsigned aw = a->weight;
1279 	  unsigned bw = b->weight;
1280 	
1281 	  // swap weights
1282 	  adjust_item_weight(cct, a->id, bw);
1283 	  adjust_item_weight(cct, b->id, aw);
1284 	
1285 	  // swap items
1286 	  map<int,unsigned> tmp;
1287 	  unsigned as = a->size;
1288 	  unsigned bs = b->size;
1289 	  for (unsigned i = 0; i < as; ++i) {
1290 	    int item = a->items[0];
1291 	    int itemw = crush_get_bucket_item_weight(a, 0);
1292 	    tmp[item] = itemw;
1293 	    bucket_remove_item(a, item);
1294 	  }
1295 	  ceph_assert(a->size == 0);
1296 	  ceph_assert(b->size == bs);
1297 	  for (unsigned i = 0; i < bs; ++i) {
1298 	    int item = b->items[0];
1299 	    int itemw = crush_get_bucket_item_weight(b, 0);
1300 	    bucket_remove_item(b, item);
1301 	    bucket_add_item(a, item, itemw);
1302 	  }
1303 	  ceph_assert(a->size == bs);
1304 	  ceph_assert(b->size == 0);
1305 	  for (auto t : tmp) {
1306 	    bucket_add_item(b, t.first, t.second);
1307 	  }
1308 	  ceph_assert(a->size == bs);
1309 	  ceph_assert(b->size == as);
1310 	
1311 	  // swap names
1312 	  swap_names(src, dst);
1313 	  return rebuild_roots_with_classes(cct);
1314 	}
1315 	
1316 	int CrushWrapper::link_bucket(
1317 	  CephContext *cct, int id, const map<string,string>& loc)
1318 	{
1319 	  // sorry this only works for buckets
1320 	  if (id >= 0)
1321 	    return -EINVAL;
1322 	
1323 	  if (!item_exists(id))
1324 	    return -ENOENT;
1325 	
1326 	  // get the name of the bucket we are trying to move for later
1327 	  string id_name = get_item_name(id);
1328 	
1329 	  crush_bucket *b = get_bucket(id);
1330 	  unsigned bucket_weight = b->weight;
1331 	
1332 	  return insert_item(cct, id, bucket_weight / (float)0x10000, id_name, loc);
1333 	}
1334 	
1335 	int CrushWrapper::create_or_move_item(
1336 	  CephContext *cct, int item, float weight, string name,
1337 	  const map<string,string>& loc,  // typename -> bucketname
1338 	  bool init_weight_sets)
1339 	{
1340 	  int ret = 0;
1341 	  int old_iweight;
1342 	
1343 	  if (!is_valid_crush_name(name))
1344 	    return -EINVAL;
1345 	
1346 	  if (check_item_loc(cct, item, loc, &old_iweight)) {
1347 	    ldout(cct, 5) << "create_or_move_item " << item << " already at " << loc
1348 			  << dendl;
1349 	  } else {
1350 	    if (_search_item_exists(item)) {
1351 	      weight = get_item_weightf(item);
1352 	      ldout(cct, 10) << "create_or_move_item " << item
1353 			     << " exists with weight " << weight << dendl;
1354 	      remove_item(cct, item, true);
1355 	    }
1356 	    ldout(cct, 5) << "create_or_move_item adding " << item
1357 			  << " weight " << weight
1358 			  << " at " << loc << dendl;
1359 	    ret = insert_item(cct, item, weight, name, loc,
1360 			      item >= 0 && init_weight_sets);
1361 	    if (ret == 0)
1362 	      ret = 1;  // changed
1363 	  }
1364 	  return ret;
1365 	}
1366 	
1367 	int CrushWrapper::update_item(
1368 	  CephContext *cct, int item, float weight, string name,
1369 	  const map<string,string>& loc)  // typename -> bucketname
1370 	{
1371 	  ldout(cct, 5) << "update_item item " << item << " weight " << weight
1372 			<< " name " << name << " loc " << loc << dendl;
1373 	  int ret = 0;
1374 	
1375 	  if (!is_valid_crush_name(name))
1376 	    return -EINVAL;
1377 	
1378 	  if (!is_valid_crush_loc(cct, loc))
1379 	    return -EINVAL;
1380 	
1381 	  ret = validate_weightf(weight);
1382 	  if (ret < 0) {
1383 	    return ret;
1384 	  }
1385 	
1386 	  // compare quantized (fixed-point integer) weights!  
1387 	  int iweight = (int)(weight * (float)0x10000);
1388 	  int old_iweight;
1389 	  if (check_item_loc(cct, item, loc, &old_iweight)) {
1390 	    ldout(cct, 5) << "update_item " << item << " already at " << loc << dendl;
1391 	    if (old_iweight != iweight) {
1392 	      ldout(cct, 5) << "update_item " << item << " adjusting weight "
1393 			    << ((float)old_iweight/(float)0x10000) << " -> " << weight
1394 			    << dendl;
1395 	      adjust_item_weight_in_loc(cct, item, iweight, loc);
1396 	      ret = 1;
1397 	    }
1398 	    if (get_item_name(item) != name) {
1399 	      ldout(cct, 5) << "update_item setting " << item << " name to " << name
1400 			    << dendl;
1401 	      set_item_name(item, name);
1402 	      ret = 1;
1403 	    }
1404 	  } else {
1405 	    if (item_exists(item)) {
1406 	      remove_item(cct, item, true);
1407 	    }
1408 	    ldout(cct, 5) << "update_item adding " << item << " weight " << weight
1409 			  << " at " << loc << dendl;
1410 	    ret = insert_item(cct, item, weight, name, loc);
1411 	    if (ret == 0)
1412 	      ret = 1;  // changed
1413 	  }
1414 	  return ret;
1415 	}
1416 	
1417 	int CrushWrapper::get_item_weight(int id) const
1418 	{
1419 	  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1420 	    crush_bucket *b = crush->buckets[bidx];
1421 	    if (b == NULL)
1422 	      continue;
1423 	    if (b->id == id)
1424 	      return b->weight;
1425 	    for (unsigned i = 0; i < b->size; i++)
1426 	      if (b->items[i] == id)
1427 		return crush_get_bucket_item_weight(b, i);
1428 	  }
1429 	  return -ENOENT;
1430 	}
1431 	
1432 	int CrushWrapper::get_item_weight_in_loc(int id, const map<string,string> &loc)
1433 	{
1434 	  for (map<string,string>::const_iterator l = loc.begin(); l != loc.end(); ++l) {
1435 	
1436 	    int bid = get_item_id(l->second);
1437 	    if (!bucket_exists(bid))
1438 	      continue;
1439 	    crush_bucket *b = get_bucket(bid);
1440 	    for (unsigned int i = 0; i < b->size; i++) {
1441 	      if (b->items[i] == id) {
1442 		return crush_get_bucket_item_weight(b, i);
1443 	      }
1444 	    }
1445 	  }
1446 	  return -ENOENT;
1447 	}
1448 	
1449 	int CrushWrapper::adjust_item_weight(CephContext *cct, int id, int weight,
1450 					     bool update_weight_sets)
1451 	{
1452 	  ldout(cct, 5) << __func__ << " " << id << " weight " << weight
1453 			<< " update_weight_sets=" << (int)update_weight_sets
1454 			<< dendl;
1455 	  int changed = 0;
1456 	  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1457 	    if (!crush->buckets[bidx]) {
1458 	      continue;
1459 	    }
1460 	    int r = adjust_item_weight_in_bucket(cct, id, weight, -1-bidx,
1461 						 update_weight_sets);
1462 	    if (r > 0) {
1463 	      ++changed;
1464 	    }
1465 	  }
1466 	  if (!changed) {
1467 	    return -ENOENT;
1468 	  }
1469 	  return changed;
1470 	}
1471 	
1472 	int CrushWrapper::adjust_item_weight_in_bucket(
1473 	  CephContext *cct, int id, int weight,
1474 	  int bucket_id,
1475 	  bool update_weight_sets)
1476 	{
1477 	  ldout(cct, 5) << __func__ << " " << id << " weight " << weight
1478 			<< " in bucket " << bucket_id
1479 			<< " update_weight_sets=" << (int)update_weight_sets
1480 			<< dendl;
1481 	  int changed = 0;
1482 	  if (!bucket_exists(bucket_id)) {
1483 	    return -ENOENT;
1484 	  }
1485 	  crush_bucket *b = get_bucket(bucket_id);
1486 	  for (unsigned int i = 0; i < b->size; i++) {
1487 	    if (b->items[i] == id) {
1488 	      int diff = bucket_adjust_item_weight(cct, b, id, weight,
1489 						   update_weight_sets);
1490 	      ldout(cct, 5) << __func__ << " " << id << " diff " << diff
1491 			    << " in bucket " << bucket_id << dendl;
1492 	      adjust_item_weight(cct, bucket_id, b->weight, false);
1493 	      changed++;
1494 	    }
1495 	  }
1496 	  // update weight-sets so they continue to sum
1497 	  for (auto& p : choose_args) {
1498 	    auto &cmap = p.second;
1499 	    if (!cmap.args) {
1500 	      continue;
1501 	    }
1502 	    crush_choose_arg *arg = &cmap.args[-1 - bucket_id];
1503 	    if (!arg->weight_set) {
1504 	      continue;
1505 	    }
1506 	    ceph_assert(arg->weight_set_positions > 0);
1507 	    vector<int> w(arg->weight_set_positions);
1508 	    for (unsigned i = 0; i < b->size; ++i) {
1509 	      for (unsigned j = 0; j < arg->weight_set_positions; ++j) {
1510 		crush_weight_set *weight_set = &arg->weight_set[j];
1511 		w[j] += weight_set->weights[i];
1512 	      }
1513 	    }
1514 	    ldout(cct,5) << __func__ << "  adjusting bucket " << bucket_id
1515 			 << " cmap " << p.first << " weights to " << w << dendl;
1516 	    ostringstream ss;
1517 	    choose_args_adjust_item_weight(cct, cmap, bucket_id, w, &ss);
1518 	  }
1519 	  if (!changed) {
1520 	    return -ENOENT;
1521 	  }
1522 	  return changed;
1523 	}
1524 	
1525 	int CrushWrapper::adjust_item_weight_in_loc(
1526 	  CephContext *cct, int id, int weight,
1527 	  const map<string,string>& loc,
1528 	  bool update_weight_sets)
1529 	{
1530 	  ldout(cct, 5) << "adjust_item_weight_in_loc " << id << " weight " << weight
1531 			<< " in " << loc
1532 			<< " update_weight_sets=" << (int)update_weight_sets
1533 			<< dendl;
1534 	  int changed = 0;
1535 	  for (auto l = loc.begin(); l != loc.end(); ++l) {
1536 	    int bid = get_item_id(l->second);
1537 	    if (!bucket_exists(bid))
1538 	      continue;
1539 	    int r = adjust_item_weight_in_bucket(cct, id, weight, bid,
1540 						 update_weight_sets);
1541 	    if (r > 0) {
1542 	      ++changed;
1543 	    }
1544 	  }
1545 	  if (!changed) {
1546 	    return -ENOENT;
1547 	  }
1548 	  return changed;
1549 	}
1550 	
1551 	int CrushWrapper::adjust_subtree_weight(CephContext *cct, int id, int weight,
1552 						bool update_weight_sets)
1553 	{
1554 	  ldout(cct, 5) << __func__ << " " << id << " weight " << weight << dendl;
1555 	  crush_bucket *b = get_bucket(id);
1556 	  if (IS_ERR(b))
1557 	    return PTR_ERR(b);
1558 	  int changed = 0;
1559 	  list<crush_bucket*> q;
1560 	  q.push_back(b);
1561 	  while (!q.empty()) {
1562 	    b = q.front();
1563 	    q.pop_front();
1564 	    int local_changed = 0;
1565 	    for (unsigned i=0; i<b->size; ++i) {
1566 	      int n = b->items[i];
1567 	      if (n >= 0) {
1568 		adjust_item_weight_in_bucket(cct, n, weight, b->id, update_weight_sets);
1569 		++changed;
1570 		++local_changed;
1571 	      } else {
1572 		crush_bucket *sub = get_bucket(n);
1573 		if (IS_ERR(sub))
1574 		  continue;
1575 		q.push_back(sub);
1576 	      }
1577 	    }
1578 	  }
1579 	  return changed;
1580 	}
1581 	
1582 	bool CrushWrapper::check_item_present(int id) const
1583 	{
1584 	  bool found = false;
1585 	
1586 	  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1587 	    crush_bucket *b = crush->buckets[bidx];
1588 	    if (b == 0)
1589 	      continue;
1590 	    for (unsigned i = 0; i < b->size; i++)
1591 	      if (b->items[i] == id)
1592 		found = true;
1593 	  }
1594 	  return found;
1595 	}
1596 	
1597 	
1598 	pair<string,string> CrushWrapper::get_immediate_parent(int id, int *_ret) const
1599 	{
1600 	
1601 	  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1602 	    crush_bucket *b = crush->buckets[bidx];
1603 	    if (b == 0)
1604 	      continue;
1605 	   if (is_shadow_item(b->id))
1606 	      continue;
1607 	    for (unsigned i = 0; i < b->size; i++)
1608 	      if (b->items[i] == id) {
1609 	        string parent_id = name_map.at(b->id);
1610 	        string parent_bucket_type = type_map.at(b->type);
1611 	        if (_ret)
1612 	          *_ret = 0;
1613 	        return make_pair(parent_bucket_type, parent_id);
1614 	      }
1615 	  }
1616 	
1617 	  if (_ret)
1618 	    *_ret = -ENOENT;
1619 	
1620 	  return pair<string, string>();
1621 	}
1622 	
1623 	int CrushWrapper::get_immediate_parent_id(int id, int *parent) const
1624 	{
1625 	  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1626 	    crush_bucket *b = crush->buckets[bidx];
1627 	    if (b == 0)
1628 	      continue;
1629 	    if (is_shadow_item(b->id))
1630 	      continue;
1631 	    for (unsigned i = 0; i < b->size; i++) {
1632 	      if (b->items[i] == id) {
1633 		*parent = b->id;
1634 		return 0;
1635 	      }
1636 	    }
1637 	  }
1638 	  return -ENOENT;
1639 	}
1640 	
1641 	int CrushWrapper::get_parent_of_type(int item, int type, int rule) const
1642 	{
1643 	  if (rule < 0) {
1644 	    // no rule specified
1645 	    do {
1646 	      int r = get_immediate_parent_id(item, &item);
1647 	      if (r < 0) {
1648 	        return 0;
1649 	      }
1650 	    } while (get_bucket_type(item) != type);
1651 	    return item;
1652 	  }
1653 	  set<int> roots;
1654 	  find_takes_by_rule(rule, &roots);
1655 	  for (auto root : roots) {
1656 	    vector<int> candidates;
1657 	    get_children_of_type(root, type, &candidates, false);
1658 	    for (auto candidate : candidates) {
1659 	      if (subtree_contains(candidate, item)) {
1660 		// note that here we assure that no two different buckets
1661 		// from a single crush rule will share a same device,
1662 		// which should generally be true.
1663 	        return candidate;
1664 	      }
1665 	    }
1666 	  }
1667 	  return 0; // not found
1668 	}
1669 	
1670 	void CrushWrapper::get_subtree_of_type(int type, vector<int> *subtrees)
1671 	{
1672 	  set<int> roots;
1673 	  find_roots(&roots);
1674 	  for (auto r: roots) {
1675 	    crush_bucket *b = get_bucket(r);
1676 	    if (IS_ERR(b))
1677 	      continue;
1678 	    get_children_of_type(b->id, type, subtrees);
1679 	  }
1680 	}
1681 	
1682 	bool CrushWrapper::class_is_in_use(int class_id, ostream *ss)
1683 	{
1684 	  list<unsigned> rules;
1685 	  for (unsigned i = 0; i < crush->max_rules; ++i) {
1686 	    crush_rule *r = crush->rules[i];
1687 	    if (!r)
1688 	      continue;
1689 	    for (unsigned j = 0; j < r->len; ++j) {
1690 	      if (r->steps[j].op == CRUSH_RULE_TAKE) {
1691 	        int root = r->steps[j].arg1;
1692 	        for (auto &p : class_bucket) {
1693 	          auto& q = p.second;
1694 	          if (q.count(class_id) && q[class_id] == root) {
1695 	            rules.push_back(i);
1696 	          }
1697 	        }
1698 	      }
1699 	    }
1700 	  }
1701 	  if (rules.empty()) {
1702 	    return false;
1703 	  }
1704 	  if (ss) {
1705 	    ostringstream os;
1706 	    for (auto &p: rules) {
1707 	      os << "'" << get_rule_name(p) <<"',";
1708 	    }
1709 	    string out(os.str());
1710 	    out.resize(out.size() - 1); // drop last ','
1711 	    *ss << "still referenced by crush_rule(s): " << out;
1712 	  }
1713 	  return true;
1714 	}
1715 	
1716 	int CrushWrapper::rename_class(const string& srcname, const string& dstname)
1717 	{
1718 	  auto i = class_rname.find(srcname);
1719 	  if (i == class_rname.end())
1720 	    return -ENOENT;
1721 	  auto j = class_rname.find(dstname);
1722 	  if (j != class_rname.end())
1723 	    return -EEXIST;
1724 	
1725 	  int class_id = i->second;
1726 	  ceph_assert(class_name.count(class_id));
1727 	  // rename any shadow buckets of old class name
1728 	  for (auto &it: class_map) {
1729 	    if (it.first < 0 && it.second == class_id) {
1730 	        string old_name = get_item_name(it.first);
1731 	        size_t pos = old_name.find("~");
1732 	        ceph_assert(pos != string::npos);
1733 	        string name_no_class = old_name.substr(0, pos);
1734 	        string old_class_name = old_name.substr(pos + 1);
1735 	        ceph_assert(old_class_name == srcname);
1736 	        string new_name = name_no_class + "~" + dstname;
1737 	        // we do not use set_item_name
1738 	        // because the name is intentionally invalid
1739 	        name_map[it.first] = new_name;
1740 	        have_rmaps = false;
1741 	    }
1742 	  }
1743 	
1744 	  // rename class
1745 	  class_rname.erase(srcname);
1746 	  class_name.erase(class_id);
1747 	  class_rname[dstname] = class_id;
1748 	  class_name[class_id] = dstname;
1749 	  return 0;
1750 	}
1751 	
1752 	int CrushWrapper::populate_classes(
1753 	  const std::map<int32_t, map<int32_t, int32_t>>& old_class_bucket)
1754 	{
1755 	  // build set of previous used shadow ids
1756 	  set<int32_t> used_ids;
1757 	  for (auto& p : old_class_bucket) {
1758 	    for (auto& q : p.second) {
1759 	      used_ids.insert(q.second);
1760 	    }
1761 	  }
1762 	  // accumulate weight values for each carg and bucket as we go. because it is
1763 	  // depth first, we will have the nested bucket weights we need when we
1764 	  // finish constructing the containing buckets.
1765 	  map<int,map<int,vector<int>>> cmap_item_weight; // cargs -> bno -> [bucket weight for each position]
1766 	  set<int> roots;
1767 	  find_nonshadow_roots(&roots);
1768 	  for (auto &r : roots) {
1769 	    if (r >= 0)
1770 	      continue;
1771 	    for (auto &c : class_name) {
1772 	      int clone;
1773 	      int res = device_class_clone(r, c.first, old_class_bucket, used_ids,
1774 					   &clone, &cmap_item_weight);
1775 	      if (res < 0)
1776 		return res;
1777 	    }
1778 	  }
1779 	  return 0;
1780 	}
1781 	
1782 	int CrushWrapper::trim_roots_with_class(CephContext *cct)
1783 	{
1784 	  set<int> roots;
1785 	  find_shadow_roots(&roots);
1786 	  for (auto &r : roots) {
1787 	    if (r >= 0)
1788 	      continue;
1789 	    int res = remove_root(cct, r);
1790 	    if (res)
1791 	      return res;
1792 	  }
1793 	  // there is no need to reweight because we only remove from the
1794 	  // root and down
1795 	  return 0;
1796 	}
1797 	
1798 	int32_t CrushWrapper::_alloc_class_id() const {
1799 	  if (class_name.empty()) {
1800 	    return 0;
1801 	  }
1802 	  int32_t class_id = class_name.rbegin()->first + 1;
1803 	  if (class_id >= 0) {
1804 	    return class_id;
1805 	  }
1806 	  // wrapped, pick a random start and do exhaustive search
1807 	  uint32_t upperlimit = std::numeric_limits<int32_t>::max();
1808 	  upperlimit++;
1809 	  class_id = rand() % upperlimit;
1810 	  const auto start = class_id;
1811 	  do {
1812 	    if (!class_name.count(class_id)) {
1813 	      return class_id;
1814 	    } else {
1815 	      class_id++;
1816 	      if (class_id < 0) {
1817 	        class_id = 0;
1818 	      }
1819 	    }
1820 	  } while (class_id != start);
1821 	  ceph_abort_msg("no available class id");
1822 	}
1823 	
1824 	int CrushWrapper::set_subtree_class(
1825 	  const string& subtree,
1826 	  const string& new_class)
1827 	{
1828 	  if (!name_exists(subtree)) {
1829 	    return -ENOENT;
1830 	  }
1831 	
1832 	  int new_class_id = get_or_create_class_id(new_class);
1833 	  int id = get_item_id(subtree);
1834 	  list<int> q = { id };
1835 	  while (!q.empty()) {
1836 	    int id = q.front();
1837 	    q.pop_front();
1838 	    crush_bucket *b = get_bucket(id);
1839 	    if (IS_ERR(b)) {
1840 	      return PTR_ERR(b);
1841 	    }
1842 	    for (unsigned i = 0; i < b->size; ++i) {
1843 	      int item = b->items[i];
1844 	      if (item >= 0) {
1845 		class_map[item] = new_class_id;
1846 	      } else {
1847 		q.push_back(item);
1848 	      }
1849 	    }
1850 	  }
1851 	  return 0;
1852 	}
1853 	
1854 	int CrushWrapper::reclassify(
1855 	  CephContext *cct,
1856 	  ostream& out,
1857 	  const map<string,string>& classify_root,
1858 	  const map<string,pair<string,string>>& classify_bucket
1859 	  )
1860 	{
1861 	  map<int,string> reclassified_bucket; // orig_id -> class
1862 	
1863 	  // classify_root
1864 	  for (auto& i : classify_root) {
1865 	    string root = i.first;
1866 	    if (!name_exists(root)) {
1867 	      out << "root " << root << " does not exist" << std::endl;
1868 	      return -EINVAL;
1869 	    }
1870 	    int root_id = get_item_id(root);
1871 	    string new_class = i.second;
1872 	    int new_class_id = get_or_create_class_id(new_class);
1873 	    out << "classify_root " << root << " (" << root_id
1874 		<< ") as " << new_class << std::endl;
1875 	
1876 	    // validate rules
1877 	    for (unsigned j = 0; j < crush->max_rules; j++) {
1878 	      if (crush->rules[j]) {
1879 		auto rule = crush->rules[j];
1880 		for (unsigned k = 0; k < rule->len; ++k) {
1881 		  if (rule->steps[k].op == CRUSH_RULE_TAKE) {
1882 		    int step_item = get_rule_arg1(j, k);
1883 		    int original_item;
1884 		    int c;
1885 		    int res = split_id_class(step_item, &original_item, &c);
1886 		    if (res < 0)
1887 		      return res;
1888 		    if (c >= 0) {
1889 		      if (original_item == root_id) {
1890 			out << "  rule " << j << " includes take on root "
1891 			    << root << " class " << c << std::endl;
1892 			return -EINVAL;
1893 		      }
1894 		    }
1895 		  }
1896 		}
1897 	      }
1898 	    }
1899 	
1900 	    // rebuild new buckets for root
1901 	    //cout << "before class_bucket: " << class_bucket << std::endl;
1902 	    map<int,int> renumber;
1903 	    list<int> q;
1904 	    q.push_back(root_id);
1905 	    while (!q.empty()) {
1906 	      int id = q.front();
1907 	      q.pop_front();
1908 	      crush_bucket *bucket = get_bucket(id);
1909 	      if (IS_ERR(bucket)) {
1910 		out << "cannot find bucket " << id
1911 		    << ": " << cpp_strerror(PTR_ERR(bucket)) << std::endl;
1912 		return PTR_ERR(bucket);
1913 	      }
1914 	
1915 	      // move bucket
1916 	      int new_id = get_new_bucket_id();
1917 	      out << "  renumbering bucket " << id << " -> " << new_id << std::endl;
1918 	      renumber[id] = new_id;
1919 	      crush->buckets[-1-new_id] = bucket;
1920 	      bucket->id = new_id;
1921 	      crush->buckets[-1-id] = crush_make_bucket(crush,
1922 							bucket->alg,
1923 							bucket->hash,
1924 							bucket->type,
1925 							0, NULL, NULL);
1926 	      crush->buckets[-1-id]->id = id;
1927 	      for (auto& i : choose_args) {
1928 		i.second.args[-1-new_id] = i.second.args[-1-id];
1929 		memset(&i.second.args[-1-id], 0, sizeof(i.second.args[0]));
1930 	      }
1931 	      class_bucket.erase(id);
1932 	      class_bucket[new_id][new_class_id] = id;
1933 	      name_map[new_id] = string(get_item_name(id));
1934 	      name_map[id] = string(get_item_name(id)) + "~" + new_class;
1935 	
1936 	      for (unsigned j = 0; j < bucket->size; ++j) {
1937 		if (bucket->items[j] < 0) {
1938 		  q.push_front(bucket->items[j]);
1939 		} else {
1940 		  // we don't reclassify the device here; if the users wants that,
1941 		  // they can pass --set-subtree-class separately.
1942 		}
1943 	      }
1944 	    }
1945 	    //cout << "mid class_bucket: " << class_bucket << std::endl;
1946 	
1947 	    for (int i = 0; i < crush->max_buckets; ++i) {
1948 	      crush_bucket *b = crush->buckets[i];
1949 	      if (!b) {
1950 		continue;
1951 	      }
1952 	      for (unsigned j = 0; j < b->size; ++j) {
1953 		if (renumber.count(b->items[j])) {
1954 		  b->items[j] = renumber[b->items[j]];
1955 		}
1956 	      }
1957 	    }
1958 	
1959 	    int r = rebuild_roots_with_classes(cct);
1960 	    if (r < 0) {
1961 	      out << "failed to rebuild_roots_with_classes: " << cpp_strerror(r)
1962 		  << std::endl;
1963 	      return r;
1964 	    }
1965 	    //cout << "final class_bucket: " << class_bucket << std::endl;
1966 	  }
1967 	
1968 	  // classify_bucket
1969 	  map<int,int> send_to;  // source bucket -> dest bucket
1970 	  map<int,map<int,int>> new_class_bucket;
1971 	  map<int,string> new_bucket_names;
1972 	  map<int,map<string,string>> new_buckets;
1973 	  map<string,int> new_bucket_by_name;
1974 	  for (auto& i : classify_bucket) {
1975 	    const string& match = i.first;  // prefix% or %suffix
1976 	    const string& new_class = i.second.first;
1977 	    const string& default_parent = i.second.second;
1978 	    if (!name_exists(default_parent)) {
1979 	      out << "default parent " << default_parent << " does not exist"
1980 		  << std::endl;
1981 	      return -EINVAL;
1982 	    }
1983 	    int default_parent_id = get_item_id(default_parent);
1984 	    crush_bucket *default_parent_bucket = get_bucket(default_parent_id);
1985 	    assert(default_parent_bucket);
1986 	    string default_parent_type_name = get_type_name(default_parent_bucket->type);
1987 	
1988 	    out << "classify_bucket " << match << " as " << new_class
1989 		<< " default bucket " << default_parent
1990 		<< " (" << default_parent_type_name << ")" << std::endl;
1991 	
1992 	    int new_class_id = get_or_create_class_id(new_class);
1993 	    for (int j = 0; j < crush->max_buckets; ++j) {
1994 	      crush_bucket *b = crush->buckets[j];
1995 	      if (!b || is_shadow_item(b->id)) {
1996 		continue;
1997 	      }
1998 	      string name = get_item_name(b->id);
1999 	      if (name.length() < match.length()) {
2000 		continue;
2001 	      }
2002 	      string basename;
2003 	      if (match[0] == '%') {
2004 		if (match.substr(1) != name.substr(name.size() - match.size() + 1)) {
2005 		  continue;
2006 		}
2007 		basename = name.substr(0, name.size() - match.size() + 1);
2008 	      } else if (match[match.size() - 1] == '%') {
2009 		if (match.substr(0, match.size() - 1) !=
2010 		    name.substr(0, match.size() - 1)) {
2011 		  continue;
2012 		}
2013 		basename = name.substr(match.size() - 1);
2014 	      } else if (match == name) {
2015 		basename = default_parent;
2016 	      } else {
2017 		continue;
2018 	      }
2019 	      cout << "match " << match << " to " << name << " basename " << basename
2020 		   << std::endl;
2021 	      // look up or create basename bucket
2022 	      int base_id;
2023 	      if (name_exists(basename)) {
2024 		base_id = get_item_id(basename);
2025 		cout << "  have base " << base_id << std::endl;
2026 	      } else if (new_bucket_by_name.count(basename)) {
2027 		base_id = new_bucket_by_name[basename];
2028 		cout << "  already creating base " << base_id << std::endl;
2029 	      } else {
2030 		base_id = get_new_bucket_id();
2031 		crush->buckets[-1-base_id] = crush_make_bucket(crush,
2032 							       b->alg,
2033 							       b->hash,
2034 							       b->type,
2035 							       0, NULL, NULL);
2036 		crush->buckets[-1-base_id]->id = base_id;
2037 		name_map[base_id] = basename;
2038 		new_bucket_by_name[basename] = base_id;
2039 		cout << "  created base " << base_id << std::endl;
2040 	
2041 		new_buckets[base_id][default_parent_type_name] = default_parent;
2042 	      }
2043 	      send_to[b->id] = base_id;
2044 	      new_class_bucket[base_id][new_class_id] = b->id;
2045 	      new_bucket_names[b->id] = basename + "~" + get_class_name(new_class_id);
2046 	
2047 	      // make sure devices are classified
2048 	      for (unsigned i = 0; i < b->size; ++i) {
2049 		int item = b->items[i];
2050 		if (item >= 0) {
2051 		  class_map[item] = new_class_id;
2052 		}
2053 	      }
2054 	    }
2055 	  }
2056 	
2057 	  // no name_exists() works below,
2058 	  have_rmaps = false;
2059 	
2060 	  // copy items around
2061 	  //cout << "send_to " << send_to << std::endl;
2062 	  set<int> roots;
2063 	  find_roots(&roots);
2064 	  for (auto& i : send_to) {
2065 	    crush_bucket *from = get_bucket(i.first);
2066 	    crush_bucket *to = get_bucket(i.second);
2067 	    cout << "moving items from " << from->id << " (" << get_item_name(from->id)
2068 		 << ") to " << to->id << " (" << get_item_name(to->id) << ")"
2069 		 << std::endl;
2070 	    for (unsigned j = 0; j < from->size; ++j) {
2071 	      int item = from->items[j];
2072 	      int r;
2073 	      map<string,string> to_loc;
2074 	      to_loc[get_type_name(to->type)] = get_item_name(to->id);
2075 	      if (item >= 0) {
2076 		if (subtree_contains(to->id, item)) {
2077 		  continue;
2078 		}
2079 		map<string,string> from_loc;
2080 		from_loc[get_type_name(from->type)] = get_item_name(from->id);
2081 		auto w = get_item_weightf_in_loc(item, from_loc);
2082 		r = insert_item(cct, item,
2083 				w,
2084 				get_item_name(item),
2085 				to_loc);
2086 	      } else {
2087 		if (!send_to.count(item)) {
2088 		  lderr(cct) << "item " << item << " in bucket " << from->id
2089 		       << " is not also a reclassified bucket" << dendl;
2090 		  return -EINVAL;
2091 		}
2092 		int newitem = send_to[item];
2093 		if (subtree_contains(to->id, newitem)) {
2094 		  continue;
2095 		}
2096 		r = link_bucket(cct, newitem, to_loc);
2097 	      }
2098 	      if (r != 0) {
2099 		cout << __func__ << " err from insert_item: " << cpp_strerror(r)
2100 		     << std::endl;
2101 		return r;
2102 	      }
2103 	    }
2104 	  }
2105 	
2106 	  // make sure new buckets have parents
2107 	  for (auto& i : new_buckets) {
2108 	    int parent;
2109 	    if (get_immediate_parent_id(i.first, &parent) < 0) {
2110 	      cout << "new bucket " << i.first << " missing parent, adding at "
2111 		   << i.second << std::endl;
2112 	      int r = link_bucket(cct, i.first, i.second);
2113 	      if (r != 0) {
2114 		cout << __func__ << " err from insert_item: " << cpp_strerror(r)
2115 		     << std::endl;
2116 		return r;
2117 	      }
2118 	    }
2119 	  }
2120 	
2121 	  // set class mappings
2122 	  //cout << "pre class_bucket: " << class_bucket << std::endl;
2123 	  for (auto& i : new_class_bucket) {
2124 	    for (auto& j : i.second) {
2125 	      class_bucket[i.first][j.first] = j.second;
2126 	    }
2127 	
2128 	  }
2129 	  //cout << "post class_bucket: " << class_bucket << std::endl;
2130 	  for (auto& i : new_bucket_names) {
2131 	    name_map[i.first] = i.second;
2132 	  }
2133 	
2134 	  int r = rebuild_roots_with_classes(cct);
2135 	  if (r < 0) {
2136 	    out << "failed to rebuild_roots_with_classes: " << cpp_strerror(r)
2137 		<< std::endl;
2138 	    return r;
2139 	  }
2140 	  //cout << "final class_bucket: " << class_bucket << std::endl;
2141 	
2142 	  return 0;
2143 	}
2144 	
2145 	int CrushWrapper::get_new_bucket_id()
2146 	{
2147 	  int id = -1;
2148 	  while (crush->buckets[-1-id] &&
2149 		 -1-id < crush->max_buckets) {
2150 	    id--;
2151 	  }
2152 	  if (-1-id == crush->max_buckets) {
2153 	    ++crush->max_buckets;
2154 	    crush->buckets = (struct crush_bucket**)realloc(
2155 	      crush->buckets,
2156 	      sizeof(crush->buckets[0]) * crush->max_buckets);
2157 	    for (auto& i : choose_args) {
2158 	      assert(i.second.size == (__u32)crush->max_buckets - 1);
2159 	      ++i.second.size;
2160 	      i.second.args = (struct crush_choose_arg*)realloc(
2161 		i.second.args,
2162 		sizeof(i.second.args[0]) * i.second.size);
2163 	    }
2164 	  }
2165 	  return id;
2166 	}
2167 	
2168 	void CrushWrapper::reweight(CephContext *cct)
2169 	{
2170 	  set<int> roots;
2171 	  find_nonshadow_roots(&roots);
2172 	  for (auto id : roots) {
2173 	    if (id >= 0)
2174 	      continue;
2175 	    crush_bucket *b = get_bucket(id);
2176 	    ldout(cct, 5) << "reweight root bucket " << id << dendl;
2177 	    int r = crush_reweight_bucket(crush, b);
2178 	    ceph_assert(r == 0);
2179 	
2180 	    for (auto& i : choose_args) {
2181 	      //cout << "carg " << i.first << std::endl;
2182 	      vector<uint32_t> w;  // discard top-level weights
2183 	      reweight_bucket(b, i.second, &w);
2184 	    }
2185 	  }
2186 	  int r = rebuild_roots_with_classes(cct);
2187 	  ceph_assert(r == 0);
2188 	}
2189 	
2190 	void CrushWrapper::reweight_bucket(
2191 	  crush_bucket *b,
2192 	  crush_choose_arg_map& arg_map,
2193 	  vector<uint32_t> *weightv)
2194 	{
2195 	  int idx = -1 - b->id;
2196 	  unsigned npos = arg_map.args[idx].weight_set_positions;
2197 	  //cout << __func__ << " " << b->id << " npos " << npos << std::endl;
2198 	  weightv->resize(npos);
2199 	  for (unsigned i = 0; i < b->size; ++i) {
2200 	    int item = b->items[i];
2201 	    if (item >= 0) {
2202 	      for (unsigned pos = 0; pos < npos; ++pos) {
2203 		(*weightv)[pos] += arg_map.args[idx].weight_set->weights[i];
2204 	      }
2205 	    } else {
2206 	      vector<uint32_t> subw(npos);
2207 	      crush_bucket *sub = get_bucket(item);
2208 	      assert(sub);
2209 	      reweight_bucket(sub, arg_map, &subw);
2210 	      for (unsigned pos = 0; pos < npos; ++pos) {
2211 		(*weightv)[pos] += subw[pos];
2212 		// strash the real bucket weight as the weights for this reference
2213 		arg_map.args[idx].weight_set->weights[i] = subw[pos];
2214 	      }
2215 	    }
2216 	  }
2217 	  //cout << __func__ << " finish " << b->id << " " << *weightv << std::endl;
2218 	}
2219 	
2220 	int CrushWrapper::add_simple_rule_at(
2221 	  string name, string root_name,
2222 	  string failure_domain_name,
2223 	  string device_class,
2224 	  string mode, int rule_type,
2225 	  int rno,
2226 	  ostream *err)
2227 	{
2228 	  if (rule_exists(name)) {
2229 	    if (err)
2230 	      *err << "rule " << name << " exists";
2231 	    return -EEXIST;
2232 	  }
2233 	  if (rno >= 0) {
2234 	    if (rule_exists(rno)) {
2235 	      if (err)
2236 	        *err << "rule with ruleno " << rno << " exists";
2237 	      return -EEXIST;
2238 	    }
2239 	    if (ruleset_exists(rno)) {
2240 	      if (err)
2241 	        *err << "ruleset " << rno << " exists";
2242 	      return -EEXIST;
2243 	    }
2244 	  } else {
2245 	    for (rno = 0; rno < get_max_rules(); rno++) {
2246 	      if (!rule_exists(rno) && !ruleset_exists(rno))
2247 	        break;
2248 	    }
2249 	  }
2250 	  if (!name_exists(root_name)) {
2251 	    if (err)
2252 	      *err << "root item " << root_name << " does not exist";
2253 	    return -ENOENT;
2254 	  }
2255 	  int root = get_item_id(root_name);
2256 	  int type = 0;
2257 	  if (failure_domain_name.length()) {
2258 	    type = get_type_id(failure_domain_name);
2259 	    if (type < 0) {
2260 	      if (err)
2261 		*err << "unknown type " << failure_domain_name;
2262 	      return -EINVAL;
2263 	    }
2264 	  }
2265 	  if (device_class.size()) {
2266 	    if (!class_exists(device_class)) {
2267 	      if (err)
2268 		*err << "device class " << device_class << " does not exist";
2269 	      return -EINVAL;
2270 	    }
2271 	    int c = get_class_id(device_class);
2272 	    if (class_bucket.count(root) == 0 ||
2273 		class_bucket[root].count(c) == 0) {
2274 	      if (err)
2275 		*err << "root " << root_name << " has no devices with class "
2276 		     << device_class;
2277 	      return -EINVAL;
2278 	    }
2279 	    root = class_bucket[root][c];
2280 	  }
2281 	  if (mode != "firstn" && mode != "indep") {
2282 	    if (err)
2283 	      *err << "unknown mode " << mode;
2284 	    return -EINVAL;
2285 	  }
2286 	
2287 	  int steps = 3;
2288 	  if (mode == "indep")
2289 	    steps = 5;
2290 	  int min_rep = mode == "firstn" ? 1 : 3;
2291 	  int max_rep = mode == "firstn" ? 10 : 20;
2292 	  //set the ruleset the same as rule_id(rno)
2293 	  crush_rule *rule = crush_make_rule(steps, rno, rule_type, min_rep, max_rep);
2294 	  ceph_assert(rule);
2295 	  int step = 0;
2296 	  if (mode == "indep") {
2297 	    crush_rule_set_step(rule, step++, CRUSH_RULE_SET_CHOOSELEAF_TRIES, 5, 0);
2298 	    crush_rule_set_step(rule, step++, CRUSH_RULE_SET_CHOOSE_TRIES, 100, 0);
2299 	  }
2300 	  crush_rule_set_step(rule, step++, CRUSH_RULE_TAKE, root, 0);
2301 	  if (type)
2302 	    crush_rule_set_step(rule, step++,
2303 				mode == "firstn" ? CRUSH_RULE_CHOOSELEAF_FIRSTN :
2304 				CRUSH_RULE_CHOOSELEAF_INDEP,
2305 				CRUSH_CHOOSE_N,
2306 				type);
2307 	  else
2308 	    crush_rule_set_step(rule, step++,
2309 				mode == "firstn" ? CRUSH_RULE_CHOOSE_FIRSTN :
2310 				CRUSH_RULE_CHOOSE_INDEP,
2311 				CRUSH_CHOOSE_N,
2312 				0);
2313 	  crush_rule_set_step(rule, step++, CRUSH_RULE_EMIT, 0, 0);
2314 	
2315 	  int ret = crush_add_rule(crush, rule, rno);
2316 	  if(ret < 0) {
2317 	    *err << "failed to add rule " << rno << " because " << cpp_strerror(ret);
2318 	    return ret;
2319 	  }
2320 	  set_rule_name(rno, name);
2321 	  have_rmaps = false;
2322 	  return rno;
2323 	}
2324 	
2325 	int CrushWrapper::add_simple_rule(
2326 	  string name, string root_name,
2327 	  string failure_domain_name,
2328 	  string device_class,
2329 	  string mode, int rule_type,
2330 	  ostream *err)
2331 	{
2332 	  return add_simple_rule_at(name, root_name, failure_domain_name, device_class,
2333 				    mode,
2334 				    rule_type, -1, err);
2335 	}
2336 	
2337 	float CrushWrapper::_get_take_weight_osd_map(int root,
2338 						     map<int,float> *pmap) const
2339 	{
2340 	  float sum = 0.0;
2341 	  list<int> q;
2342 	  q.push_back(root);
2343 	  //breadth first iterate the OSD tree
2344 	  while (!q.empty()) {
2345 	    int bno = q.front();
2346 	    q.pop_front();
2347 	    crush_bucket *b = crush->buckets[-1-bno];
2348 	    ceph_assert(b);
2349 	    for (unsigned j=0; j<b->size; ++j) {
2350 	      int item_id = b->items[j];
2351 	      if (item_id >= 0) { //it's an OSD
2352 		float w = crush_get_bucket_item_weight(b, j);
2353 		(*pmap)[item_id] = w;
2354 		sum += w;
2355 	      } else { //not an OSD, expand the child later
2356 		q.push_back(item_id);
2357 	      }
2358 	    }
2359 	  }
2360 	  return sum;
2361 	}
2362 	
2363 	void CrushWrapper::_normalize_weight_map(float sum,
2364 						 const map<int,float>& m,
2365 						 map<int,float> *pmap) const
2366 	{
2367 	  for (auto& p : m) {
2368 	    map<int,float>::iterator q = pmap->find(p.first);
2369 	    if (q == pmap->end()) {
2370 	      (*pmap)[p.first] = p.second / sum;
2371 	    } else {
2372 	      q->second += p.second / sum;
2373 	    }
2374 	  }
2375 	}
2376 	
2377 	int CrushWrapper::get_take_weight_osd_map(int root, map<int,float> *pmap) const
2378 	{
2379 	  map<int,float> m;
2380 	  float sum = _get_take_weight_osd_map(root, &m);
2381 	  _normalize_weight_map(sum, m, pmap);
2382 	  return 0;
2383 	}
2384 	
2385 	int CrushWrapper::get_rule_weight_osd_map(unsigned ruleno,
2386 						  map<int,float> *pmap) const
2387 	{
2388 	  if (ruleno >= crush->max_rules)
2389 	    return -ENOENT;
2390 	  if (crush->rules[ruleno] == NULL)
2391 	    return -ENOENT;
2392 	  crush_rule *rule = crush->rules[ruleno];
2393 	
2394 	  // build a weight map for each TAKE in the rule, and then merge them
2395 	
2396 	  // FIXME: if there are multiple takes that place a different number of
2397 	  // objects we do not take that into account.  (Also, note that doing this
2398 	  // right is also a function of the pool, since the crush rule
2399 	  // might choose 2 + choose 2 but pool size may only be 3.)
2400 	  for (unsigned i=0; i<rule->len; ++i) {
2401 	    map<int,float> m;
2402 	    float sum = 0;
2403 	    if (rule->steps[i].op == CRUSH_RULE_TAKE) {
2404 	      int n = rule->steps[i].arg1;
2405 	      if (n >= 0) {
2406 		m[n] = 1.0;
2407 		sum = 1.0;
2408 	      } else {
2409 		sum += _get_take_weight_osd_map(n, &m);
2410 	      }
2411 	    }
2412 	    _normalize_weight_map(sum, m, pmap);
2413 	  }
2414 	
2415 	  return 0;
2416 	}
2417 	
2418 	int CrushWrapper::remove_rule(int ruleno)
2419 	{
2420 	  if (ruleno >= (int)crush->max_rules)
2421 	    return -ENOENT;
2422 	  if (crush->rules[ruleno] == NULL)
2423 	    return -ENOENT;
2424 	  crush_destroy_rule(crush->rules[ruleno]);
2425 	  crush->rules[ruleno] = NULL;
2426 	  rule_name_map.erase(ruleno);
2427 	  have_rmaps = false;
2428 	  return rebuild_roots_with_classes(nullptr);
2429 	}
2430 	
2431 	int CrushWrapper::bucket_adjust_item_weight(
2432 	  CephContext *cct, crush_bucket *bucket, int item, int weight,
2433 	  bool adjust_weight_sets)
2434 	{
2435 	  if (adjust_weight_sets) {
2436 	    unsigned position;
2437 	    for (position = 0; position < bucket->size; position++)
2438 	      if (bucket->items[position] == item)
2439 		break;
2440 	    ceph_assert(position != bucket->size);
2441 	    for (auto &w : choose_args) {
2442 	      crush_choose_arg_map &arg_map = w.second;
2443 	      crush_choose_arg *arg = &arg_map.args[-1-bucket->id];
2444 	      for (__u32 j = 0; j < arg->weight_set_positions; j++) {
2445 		crush_weight_set *weight_set = &arg->weight_set[j];
2446 		weight_set->weights[position] = weight;
2447 	      }
2448 	    }
2449 	  }
2450 	  return crush_bucket_adjust_item_weight(crush, bucket, item, weight);
2451 	}
2452 	
2453 	int CrushWrapper::add_bucket(
2454 	  int bucketno, int alg, int hash, int type, int size,
2455 	  int *items, int *weights, int *idout)
2456 	{
2457 	  if (alg == 0) {
2458 	    alg = get_default_bucket_alg();
2459 	    if (alg == 0)
2460 	      return -EINVAL;
2461 	  }
2462 	  crush_bucket *b = crush_make_bucket(crush, alg, hash, type, size, items,
2463 					      weights);
2464 	  ceph_assert(b);
2465 	  ceph_assert(idout);
2466 	  int r = crush_add_bucket(crush, bucketno, b, idout);
2467 	  int pos = -1 - *idout;
2468 	  for (auto& p : choose_args) {
2469 	    crush_choose_arg_map& cmap = p.second;
2470 	    unsigned new_size = crush->max_buckets;
2471 	    if (cmap.args) {
2472 	      if ((int)cmap.size < crush->max_buckets) {
2473 		cmap.args = static_cast<crush_choose_arg*>(realloc(
2474 		  cmap.args,
2475 		  sizeof(crush_choose_arg) * new_size));
2476 	        ceph_assert(cmap.args);
2477 		memset(&cmap.args[cmap.size], 0,
2478 		       sizeof(crush_choose_arg) * (new_size - cmap.size));
2479 		cmap.size = new_size;
2480 	      }
2481 	    } else {
2482 	      cmap.args = static_cast<crush_choose_arg*>(calloc(sizeof(crush_choose_arg),
2483 								new_size));
2484 	      ceph_assert(cmap.args);
2485 	      cmap.size = new_size;
2486 	    }
2487 	    if (size > 0) {
2488 	      int positions = get_choose_args_positions(cmap);
2489 	      crush_choose_arg& carg = cmap.args[pos];
2490 	      carg.weight_set = static_cast<crush_weight_set*>(calloc(sizeof(crush_weight_set),
2491 							  size));
2492 	      carg.weight_set_positions = positions;
2493 	      for (int ppos = 0; ppos < positions; ++ppos) {
2494 		carg.weight_set[ppos].weights = (__u32*)calloc(sizeof(__u32), size);
2495 		carg.weight_set[ppos].size = size;
2496 		for (int bpos = 0; bpos < size; ++bpos) {
2497 		  carg.weight_set[ppos].weights[bpos] = weights[bpos];
2498 		}
2499 	      }
2500 	    }
2501 	    assert(crush->max_buckets == (int)cmap.size);
2502 	  }
2503 	  return r;
2504 	}
2505 	
2506 	int CrushWrapper::bucket_add_item(crush_bucket *bucket, int item, int weight)
2507 	{
2508 	  __u32 new_size = bucket->size + 1;
2509 	  int r = crush_bucket_add_item(crush, bucket, item, weight);
2510 	  if (r < 0) {
2511 	    return r;
2512 	  }
2513 	  for (auto &w : choose_args) {
2514 	    crush_choose_arg_map &arg_map = w.second;
2515 	    crush_choose_arg *arg = &arg_map.args[-1-bucket->id];
2516 	    for (__u32 j = 0; j < arg->weight_set_positions; j++) {
2517 	      crush_weight_set *weight_set = &arg->weight_set[j];
2518 	      weight_set->weights = (__u32*)realloc(weight_set->weights,
2519 						    new_size * sizeof(__u32));
2520 	      ceph_assert(weight_set->size + 1 == new_size);
2521 	      weight_set->weights[weight_set->size] = weight;
2522 	      weight_set->size = new_size;
2523 	    }
2524 	    if (arg->ids_size) {
2525 	      arg->ids = (__s32 *)realloc(arg->ids, new_size * sizeof(__s32));
2526 	      ceph_assert(arg->ids_size + 1 == new_size);
2527 	      arg->ids[arg->ids_size] = item;
2528 	      arg->ids_size = new_size;
2529 	    }
2530 	  }
2531 	  return 0;
2532 	}
2533 	
2534 	int CrushWrapper::bucket_remove_item(crush_bucket *bucket, int item)
2535 	{
2536 	  __u32 new_size = bucket->size - 1;
2537 	  unsigned position;
2538 	  for (position = 0; position < bucket->size; position++)
2539 	    if (bucket->items[position] == item)
2540 	      break;
2541 	  ceph_assert(position != bucket->size);
2542 	  int r = crush_bucket_remove_item(crush, bucket, item);
2543 	  if (r < 0) {
2544 	    return r;
2545 	  }
2546 	  for (auto &w : choose_args) {
2547 	    crush_choose_arg_map &arg_map = w.second;
2548 	    crush_choose_arg *arg = &arg_map.args[-1-bucket->id];
2549 	    for (__u32 j = 0; j < arg->weight_set_positions; j++) {
2550 	      crush_weight_set *weight_set = &arg->weight_set[j];
2551 	      ceph_assert(weight_set->size - 1 == new_size);
2552 	      for (__u32 k = position; k < new_size; k++)
2553 		weight_set->weights[k] = weight_set->weights[k+1];
2554 	      if (new_size) {
2555 		weight_set->weights = (__u32*)realloc(weight_set->weights,
2556 						      new_size * sizeof(__u32));
2557 	      } else {
2558 	        free(weight_set->weights);
2559 		weight_set->weights = NULL;
2560 	      }
2561 	      weight_set->size = new_size;
2562 	    }
2563 	    if (arg->ids_size) {
2564 	      ceph_assert(arg->ids_size - 1 == new_size);
2565 	      for (__u32 k = position; k < new_size; k++)
2566 		arg->ids[k] = arg->ids[k+1];
2567 	      if (new_size) {
2568 		arg->ids = (__s32 *)realloc(arg->ids, new_size * sizeof(__s32));
2569 	      } else {
2570 	        free(arg->ids);
2571 		arg->ids = NULL;
2572 	      }
2573 	      arg->ids_size = new_size;
2574 	    }
2575 	  }
2576 	  return 0;
2577 	}
2578 	
2579 	int CrushWrapper::bucket_set_alg(int bid, int alg)
2580 	{
2581 	  crush_bucket *b = get_bucket(bid);
2582 	  if (!b) {
2583 	    return -ENOENT;
2584 	  }
2585 	  b->alg = alg;
2586 	  return 0;
2587 	}
2588 	
2589 	int CrushWrapper::update_device_class(int id,
2590 	                                      const string& class_name,
2591 	                                      const string& name,
2592 	                                      ostream *ss)
2593 	{
2594 	  ceph_assert(item_exists(id));
2595 	  auto old_class_name = get_item_class(id);
2596 	  if (old_class_name && old_class_name != class_name) {
2597 	    *ss << "osd." << id << " has already bound to class '" << old_class_name
2598 	        << "', can not reset class to '" << class_name  << "'; "
2599 	        << "use 'ceph osd crush rm-device-class <id>' to "
2600 	        << "remove old class first";
2601 	    return -EBUSY;
2602 	  }
2603 	
2604 	  int class_id = get_or_create_class_id(class_name);
2605 	  if (id < 0) {
2606 	    *ss << name << " id " << id << " is negative";
2607 	    return -EINVAL;
2608 	  }
2609 	
2610 	  if (class_map.count(id) != 0 && class_map[id] == class_id) {
2611 	    *ss << name << " already set to class " << class_name << ". ";
2612 	    return 0;
2613 	  }
2614 	
2615 	  set_item_class(id, class_id);
2616 	
2617 	  int r = rebuild_roots_with_classes(nullptr);
2618 	  if (r < 0)
2619 	    return r;
2620 	  return 1;
2621 	}
2622 	
2623 	int CrushWrapper::remove_device_class(CephContext *cct, int id, ostream *ss)
2624 	{
2625 	  ceph_assert(ss);
2626 	  const char *name = get_item_name(id);
2627 	  if (!name) {
2628 	    *ss << "osd." << id << " does not have a name";
2629 	    return -ENOENT;
2630 	  }
2631 	
2632 	  const char *class_name = get_item_class(id);
2633 	  if (!class_name) {
2634 	    *ss << "osd." << id << " has not been bound to a specific class yet";
2635 	    return 0;
2636 	  }
2637 	  class_remove_item(id);
2638 	
2639 	  int r = rebuild_roots_with_classes(cct);
2640 	  if (r < 0) {
2641 	    *ss << "unable to rebuild roots with class '" << class_name << "' "
2642 	        << "of osd." << id << ": " << cpp_strerror(r);
2643 	    return r;
2644 	  }
2645 	  return 0;
2646 	}
2647 	
2648 	int CrushWrapper::device_class_clone(
2649 	  int original_id, int device_class,
2650 	  const std::map<int32_t, map<int32_t, int32_t>>& old_class_bucket,
2651 	  const std::set<int32_t>& used_ids,
2652 	  int *clone,
2653 	  map<int,map<int,vector<int>>> *cmap_item_weight)
2654 	{
2655 	  const char *item_name = get_item_name(original_id);
2656 	  if (item_name == NULL)
2657 	    return -ECHILD;
2658 	  const char *class_name = get_class_name(device_class);
2659 	  if (class_name == NULL)
2660 	    return -EBADF;
2661 	  string copy_name = item_name + string("~") + class_name;
2662 	  if (name_exists(copy_name)) {
2663 	    *clone = get_item_id(copy_name);
2664 	    return 0;
2665 	  }
2666 	
2667 	  crush_bucket *original = get_bucket(original_id);
2668 	  ceph_assert(!IS_ERR(original));
2669 	  crush_bucket *copy = crush_make_bucket(crush,
2670 						 original->alg,
2671 						 original->hash,
2672 						 original->type,
2673 						 0, NULL, NULL);
2674 	  ceph_assert(copy);
2675 	
2676 	  vector<unsigned> item_orig_pos;  // new item pos -> orig item pos
2677 	  for (unsigned i = 0; i < original->size; i++) {
2678 	    int item = original->items[i];
2679 	    int weight = crush_get_bucket_item_weight(original, i);
2680 	    if (item >= 0) {
2681 	      if (class_map.count(item) != 0 && class_map[item] == device_class) {
2682 		int res = crush_bucket_add_item(crush, copy, item, weight);
2683 		if (res)
2684 		  return res;
2685 	      } else {
2686 		continue;
2687 	      }
2688 	    } else {
2689 	      int child_copy_id;
2690 	      int res = device_class_clone(item, device_class, old_class_bucket,
2691 					   used_ids, &child_copy_id,
2692 					   cmap_item_weight);
2693 	      if (res < 0)
2694 		return res;
2695 	      crush_bucket *child_copy = get_bucket(child_copy_id);
2696 	      ceph_assert(!IS_ERR(child_copy));
2697 	      res = crush_bucket_add_item(crush, copy, child_copy_id,
2698 					  child_copy->weight);
2699 	      if (res)
2700 		return res;
2701 	    }
2702 	    item_orig_pos.push_back(i);
2703 	  }
2704 	  ceph_assert(item_orig_pos.size() == copy->size);
2705 	
2706 	  int bno = 0;
2707 	  if (old_class_bucket.count(original_id) &&
2708 	      old_class_bucket.at(original_id).count(device_class)) {
2709 	    bno = old_class_bucket.at(original_id).at(device_class);
2710 	  } else {
2711 	    // pick a new shadow bucket id that is not used by the current map
2712 	    // *or* any previous shadow buckets.
2713 	    bno = -1;
2714 	    while (((-1-bno) < crush->max_buckets && crush->buckets[-1-bno]) ||
2715 		   used_ids.count(bno)) {
2716 	      --bno;
2717 	    }
2718 	  }
2719 	  int res = crush_add_bucket(crush, bno, copy, clone);
2720 	  if (res)
2721 	    return res;
2722 	  ceph_assert(!bno || bno == *clone);
2723 	
2724 	  res = set_item_class(*clone, device_class);
2725 	  if (res < 0)
2726 	    return res;
2727 	
2728 	  // we do not use set_item_name because the name is intentionally invalid
2729 	  name_map[*clone] = copy_name;
2730 	  if (have_rmaps)
2731 	    name_rmap[copy_name] = *clone;
2732 	  class_bucket[original_id][device_class] = *clone;
2733 	
2734 	  // set up choose_args for the new bucket.
2735 	  for (auto& w : choose_args) {
2736 	    crush_choose_arg_map& cmap = w.second;
2737 	    if (crush->max_buckets > (int)cmap.size) {
2738 	      unsigned new_size = crush->max_buckets;
2739 	      cmap.args = static_cast<crush_choose_arg*>(realloc(cmap.args,
2740 						     new_size * sizeof(cmap.args[0])));
2741 	      ceph_assert(cmap.args);
2742 	      memset(cmap.args + cmap.size, 0,
2743 		     (new_size - cmap.size) * sizeof(cmap.args[0]));
2744 	      cmap.size = new_size;
2745 	    }
2746 	    auto& o = cmap.args[-1-original_id];
2747 	    auto& n = cmap.args[-1-bno];
2748 	    n.ids_size = 0; // FIXME: implement me someday
2749 	    n.weight_set_positions = o.weight_set_positions;
2750 	    n.weight_set = static_cast<crush_weight_set*>(calloc(
2751 	      n.weight_set_positions, sizeof(crush_weight_set)));
2752 	    for (size_t s = 0; s < n.weight_set_positions; ++s) {
2753 	      n.weight_set[s].size = copy->size;
2754 	      n.weight_set[s].weights = (__u32*)calloc(copy->size, sizeof(__u32));
2755 	    }
2756 	    for (size_t s = 0; s < n.weight_set_positions; ++s) {
2757 	      vector<int> bucket_weights(n.weight_set_positions);
2758 	      for (size_t i = 0; i < copy->size; ++i) {
2759 		int item = copy->items[i];
2760 		if (item >= 0) {
2761 		  n.weight_set[s].weights[i] = o.weight_set[s].weights[item_orig_pos[i]];
2762 		} else if ((*cmap_item_weight)[w.first].count(item)) {
2763 		  n.weight_set[s].weights[i] = (*cmap_item_weight)[w.first][item][s];
2764 		} else {
2765 		  n.weight_set[s].weights[i] = 0;
2766 		}
2767 		bucket_weights[s] += n.weight_set[s].weights[i];
2768 	      }
2769 	      (*cmap_item_weight)[w.first][bno] = bucket_weights;
2770 	    }
2771 	  }
2772 	  return 0;
2773 	}
2774 	
2775 	int CrushWrapper::get_rules_by_class(const string &class_name, set<int> *rules)
2776 	{
2777 	  ceph_assert(rules);
2778 	  rules->clear();
2779 	  if (!class_exists(class_name)) {
2780 	    return -ENOENT;
2781 	  }
2782 	  int class_id = get_class_id(class_name);
2783 	  for (unsigned i = 0; i < crush->max_rules; ++i) {
2784 	    crush_rule *r = crush->rules[i];
2785 	    if (!r)
2786 	      continue;
2787 	    for (unsigned j = 0; j < r->len; ++j) {
2788 	      if (r->steps[j].op == CRUSH_RULE_TAKE) {
2789 	        int step_item = r->steps[j].arg1;
2790 	        int original_item;
2791 	        int c;
2792 	        int res = split_id_class(step_item, &original_item, &c);
2793 	        if (res < 0) {
2794 	          return res;
2795 	        }
2796 	        if (c != -1 && c == class_id) {
2797 	          rules->insert(i);
2798 	          break;
2799 	        }
2800 	      }
2801 	    }
2802 	  }
2803 	  return 0;
2804 	}
2805 	
2806 	// return rules that might reference the given osd
2807 	int CrushWrapper::get_rules_by_osd(int osd, set<int> *rules)
2808 	{
2809 	  ceph_assert(rules);
2810 	  rules->clear();
2811 	  if (osd < 0) {
2812 	    return -EINVAL;
2813 	  }
2814 	  for (unsigned i = 0; i < crush->max_rules; ++i) {
2815 	    crush_rule *r = crush->rules[i];
2816 	    if (!r)
2817 	      continue;
2818 	    for (unsigned j = 0; j < r->len; ++j) {
2819 	      if (r->steps[j].op == CRUSH_RULE_TAKE) {
2820 	        int step_item = r->steps[j].arg1;
2821 	        list<int> unordered;
2822 	        int rc = _get_leaves(step_item, &unordered);
2823 	        if (rc < 0) {
2824 	          return rc; // propagate fatal errors!
2825 	        }
2826 	        bool match = false;
2827 	        for (auto &o: unordered) {
2828 	          ceph_assert(o >= 0);
2829 	          if (o == osd) {
2830 	            match = true;
2831 	            break;
2832 	          }
2833 	        }
2834 	        if (match) {
2835 	          rules->insert(i);
2836 	          break;
2837 	        }
2838 	      }
2839 	    }
2840 	  }
2841 	  return 0;
2842 	}
2843 	
2844 	bool CrushWrapper::_class_is_dead(int class_id)
2845 	{
2846 	  for (auto &p: class_map) {
2847 	    if (p.first >= 0 && p.second == class_id) {
2848 	      return false;
2849 	    }
2850 	  }
2851 	  for (unsigned i = 0; i < crush->max_rules; ++i) {
2852 	    crush_rule *r = crush->rules[i];
2853 	    if (!r)
2854 	      continue;
2855 	    for (unsigned j = 0; j < r->len; ++j) {
2856 	      if (r->steps[j].op == CRUSH_RULE_TAKE) {
2857 	        int root = r->steps[j].arg1;
2858 	        for (auto &p : class_bucket) {
2859 	          auto& q = p.second;
2860 	          if (q.count(class_id) && q[class_id] == root) {
2861 	            return false;
2862 	          }
2863 	        }
2864 	      }
2865 	    }
2866 	  }
2867 	  // no more referenced by any devices or crush rules
2868 	  return true;
2869 	}
2870 	
2871 	void CrushWrapper::cleanup_dead_classes()
2872 	{
2873 	  auto p = class_name.begin();
2874 	  while (p != class_name.end()) {
2875 	    if (_class_is_dead(p->first)) {
2876 	      string n = p->second;
2877 	      ++p;
2878 	      remove_class_name(n);
2879 	    } else {
2880 	      ++p;
2881 	    }
2882 	  }
2883 	}
2884 	
2885 	int CrushWrapper::rebuild_roots_with_classes(CephContext *cct)
2886 	{
2887 	  std::map<int32_t, map<int32_t, int32_t> > old_class_bucket = class_bucket;
2888 	  cleanup_dead_classes();
2889 	  int r = trim_roots_with_class(cct);
2890 	  if (r < 0)
2891 	    return r;
2892 	  class_bucket.clear();
2893 	  return populate_classes(old_class_bucket);
2894 	}
2895 	
2896 	void CrushWrapper::encode(bufferlist& bl, uint64_t features) const
2897 	{
2898 	  using ceph::encode;
2899 	  ceph_assert(crush);
2900 	
2901 	  __u32 magic = CRUSH_MAGIC;
2902 	  encode(magic, bl);
2903 	
2904 	  encode(crush->max_buckets, bl);
2905 	  encode(crush->max_rules, bl);
2906 	  encode(crush->max_devices, bl);
2907 	
2908 	  bool encode_compat_choose_args = false;
2909 	  crush_choose_arg_map arg_map;
2910 	  memset(&arg_map, '\0', sizeof(arg_map));
2911 	  if (has_choose_args() &&
2912 	      !HAVE_FEATURE(features, CRUSH_CHOOSE_ARGS)) {
2913 	    ceph_assert(!has_incompat_choose_args());
2914 	    encode_compat_choose_args = true;
2915 	    arg_map = choose_args.begin()->second;
2916 	  }
2917 	
2918 	  // buckets
2919 	  for (int i=0; i<crush->max_buckets; i++) {
2920 	    __u32 alg = 0;
2921 	    if (crush->buckets[i]) alg = crush->buckets[i]->alg;
2922 	    encode(alg, bl);
2923 	    if (!alg)
2924 	      continue;
2925 	
2926 	    encode(crush->buckets[i]->id, bl);
2927 	    encode(crush->buckets[i]->type, bl);
2928 	    encode(crush->buckets[i]->alg, bl);
2929 	    encode(crush->buckets[i]->hash, bl);
2930 	    encode(crush->buckets[i]->weight, bl);
2931 	    encode(crush->buckets[i]->size, bl);
2932 	    for (unsigned j=0; j<crush->buckets[i]->size; j++)
2933 	      encode(crush->buckets[i]->items[j], bl);
2934 	
2935 	    switch (crush->buckets[i]->alg) {
2936 	    case CRUSH_BUCKET_UNIFORM:
2937 	      encode((reinterpret_cast<crush_bucket_uniform*>(crush->buckets[i]))->item_weight, bl);
2938 	      break;
2939 	
2940 	    case CRUSH_BUCKET_LIST:
2941 	      for (unsigned j=0; j<crush->buckets[i]->size; j++) {
2942 		encode((reinterpret_cast<crush_bucket_list*>(crush->buckets[i]))->item_weights[j], bl);
2943 		encode((reinterpret_cast<crush_bucket_list*>(crush->buckets[i]))->sum_weights[j], bl);
2944 	      }
2945 	      break;
2946 	
2947 	    case CRUSH_BUCKET_TREE:
2948 	      encode((reinterpret_cast<crush_bucket_tree*>(crush->buckets[i]))->num_nodes, bl);
2949 	      for (unsigned j=0; j<(reinterpret_cast<crush_bucket_tree*>(crush->buckets[i]))->num_nodes; j++)
2950 		encode((reinterpret_cast<crush_bucket_tree*>(crush->buckets[i]))->node_weights[j], bl);
2951 	      break;
2952 	
2953 	    case CRUSH_BUCKET_STRAW:
2954 	      for (unsigned j=0; j<crush->buckets[i]->size; j++) {
2955 		encode((reinterpret_cast<crush_bucket_straw*>(crush->buckets[i]))->item_weights[j], bl);
2956 		encode((reinterpret_cast<crush_bucket_straw*>(crush->buckets[i]))->straws[j], bl);
2957 	      }
2958 	      break;
2959 	
2960 	    case CRUSH_BUCKET_STRAW2:
2961 	      {
2962 		__u32 *weights;
2963 		if (encode_compat_choose_args &&
2964 		    arg_map.args[i].weight_set_positions > 0) {
2965 		  weights = arg_map.args[i].weight_set[0].weights;
2966 		} else {
2967 		  weights = (reinterpret_cast<crush_bucket_straw2*>(crush->buckets[i]))->item_weights;
2968 		}
2969 		for (unsigned j=0; j<crush->buckets[i]->size; j++) {
2970 		  encode(weights[j], bl);
2971 		}
2972 	      }
2973 	      break;
2974 	
2975 	    default:
2976 	      ceph_abort();
2977 	      break;
2978 	    }
2979 	  }
2980 	
2981 	  // rules
2982 	  for (unsigned i=0; i<crush->max_rules; i++) {
2983 	    __u32 yes = crush->rules[i] ? 1:0;
2984 	    encode(yes, bl);
2985 	    if (!yes)
2986 	      continue;
2987 	
2988 	    encode(crush->rules[i]->len, bl);
2989 	    encode(crush->rules[i]->mask, bl);
2990 	    for (unsigned j=0; j<crush->rules[i]->len; j++)
2991 	      encode(crush->rules[i]->steps[j], bl);
2992 	  }
2993 	
2994 	  // name info
2995 	  encode(type_map, bl);
2996 	  encode(name_map, bl);
2997 	  encode(rule_name_map, bl);
2998 	
2999 	  // tunables
3000 	  encode(crush->choose_local_tries, bl);
3001 	  encode(crush->choose_local_fallback_tries, bl);
3002 	  encode(crush->choose_total_tries, bl);
3003 	  encode(crush->chooseleaf_descend_once, bl);
3004 	  encode(crush->chooseleaf_vary_r, bl);
3005 	  encode(crush->straw_calc_version, bl);
3006 	  encode(crush->allowed_bucket_algs, bl);
3007 	  if (features & CEPH_FEATURE_CRUSH_TUNABLES5) {
3008 	    encode(crush->chooseleaf_stable, bl);
3009 	  }
3010 	
3011 	  if (HAVE_FEATURE(features, SERVER_LUMINOUS)) {
3012 	    // device classes
3013 	    encode(class_map, bl);
3014 	    encode(class_name, bl);
3015 	    encode(class_bucket, bl);
3016 	
3017 	    // choose args
3018 	    __u32 size = (__u32)choose_args.size();
3019 	    encode(size, bl);
3020 	    for (auto c : choose_args) {
3021 	      encode(c.first, bl);
3022 	      crush_choose_arg_map arg_map = c.second;
3023 	      size = 0;
3024 	      for (__u32 i = 0; i < arg_map.size; i++) {
3025 		crush_choose_arg *arg = &arg_map.args[i];
3026 		if (arg->weight_set_positions == 0 &&
3027 		    arg->ids_size == 0)
3028 		  continue;
3029 		size++;
3030 	      }
3031 	      encode(size, bl);
3032 	      for (__u32 i = 0; i < arg_map.size; i++) {
3033 		crush_choose_arg *arg = &arg_map.args[i];
3034 		if (arg->weight_set_positions == 0 &&
3035 		    arg->ids_size == 0)
3036 		  continue;
3037 		encode(i, bl);
3038 		encode(arg->weight_set_positions, bl);
3039 		for (__u32 j = 0; j < arg->weight_set_positions; j++) {
3040 		  crush_weight_set *weight_set = &arg->weight_set[j];
3041 		  encode(weight_set->size, bl);
3042 		  for (__u32 k = 0; k < weight_set->size; k++)
3043 		    encode(weight_set->weights[k], bl);
3044 		}
3045 		encode(arg->ids_size, bl);
3046 		for (__u32 j = 0; j < arg->ids_size; j++)
3047 		  encode(arg->ids[j], bl);
3048 	      }
3049 	    }
3050 	  }
3051 	}
3052 	
3053 	static void decode_32_or_64_string_map(map<int32_t,string>& m, bufferlist::const_iterator& blp)
3054 	{
3055 	  m.clear();
3056 	  __u32 n;
3057 	  decode(n, blp);
3058 	  while (n--) {
3059 	    __s32 key;
3060 	    decode(key, blp);
3061 	
3062 	    __u32 strlen;
3063 	    decode(strlen, blp);
3064 	    if (strlen == 0) {
3065 	      // der, key was actually 64-bits!
3066 	      decode(strlen, blp);
3067 	    }
3068 	    decode_nohead(strlen, m[key], blp);
3069 	  }
3070 	}
3071 	
3072 	void CrushWrapper::decode(bufferlist::const_iterator& blp)
3073 	{
3074 	  using ceph::decode;
3075 	  create();
3076 	
3077 	  __u32 magic;
3078 	  decode(magic, blp);
3079 	  if (magic != CRUSH_MAGIC)
3080 	    throw ceph::buffer::malformed_input("bad magic number");
3081 	
3082 	  decode(crush->max_buckets, blp);
3083 	  decode(crush->max_rules, blp);
3084 	  decode(crush->max_devices, blp);
3085 	
3086 	  // legacy tunables, unless we decode something newer
3087 	  set_tunables_legacy();
3088 	
3089 	  try {
3090 	    // buckets
3091 	    crush->buckets = (crush_bucket**)calloc(1, crush->max_buckets * sizeof(crush_bucket*));
3092 	    for (int i=0; i<crush->max_buckets; i++) {
3093 	      decode_crush_bucket(&crush->buckets[i], blp);
3094 	    }
3095 	
3096 	    // rules
3097 	    crush->rules = (crush_rule**)calloc(1, crush->max_rules * sizeof(crush_rule*));
3098 	    for (unsigned i = 0; i < crush->max_rules; ++i) {
3099 	      __u32 yes;
3100 	      decode(yes, blp);
3101 	      if (!yes) {
3102 		crush->rules[i] = NULL;
3103 		continue;
3104 	      }
3105 	
3106 	      __u32 len;
3107 	      decode(len, blp);
3108 	      crush->rules[i] = reinterpret_cast<crush_rule*>(calloc(1, crush_rule_size(len)));
3109 	      crush->rules[i]->len = len;
3110 	      decode(crush->rules[i]->mask, blp);
3111 	      for (unsigned j=0; j<crush->rules[i]->len; j++)
3112 		decode(crush->rules[i]->steps[j], blp);
3113 	    }
3114 	
3115 	    // name info
3116 	    // NOTE: we had a bug where we were incoding int instead of int32, which means the
3117 	    // 'key' field for these maps may be either 32 or 64 bits, depending.  tolerate
3118 	    // both by assuming the string is always non-empty.
3119 	    decode_32_or_64_string_map(type_map, blp);
3120 	    decode_32_or_64_string_map(name_map, blp);
3121 	    decode_32_or_64_string_map(rule_name_map, blp);
3122 	
3123 	    // tunables
3124 	    if (!blp.end()) {
3125 	      decode(crush->choose_local_tries, blp);
3126 	      decode(crush->choose_local_fallback_tries, blp);
3127 	      decode(crush->choose_total_tries, blp);
3128 	    }
3129 	    if (!blp.end()) {
3130 	      decode(crush->chooseleaf_descend_once, blp);
3131 	    }
3132 	    if (!blp.end()) {
3133 	      decode(crush->chooseleaf_vary_r, blp);
3134 	    }
3135 	    if (!blp.end()) {
3136 	      decode(crush->straw_calc_version, blp);
3137 	    }
3138 	    if (!blp.end()) {
3139 	      decode(crush->allowed_bucket_algs, blp);
3140 	    }
3141 	    if (!blp.end()) {
3142 	      decode(crush->chooseleaf_stable, blp);
3143 	    }
3144 	    if (!blp.end()) {
3145 	      decode(class_map, blp);
3146 	      decode(class_name, blp);
3147 	      for (auto &c : class_name)
3148 		class_rname[c.second] = c.first;
3149 	      decode(class_bucket, blp);
3150 	    }
3151 	    if (!blp.end()) {
3152 	      __u32 choose_args_size;
3153 	      decode(choose_args_size, blp);
3154 	      for (__u32 i = 0; i < choose_args_size; i++) {
3155 	        typename decltype(choose_args)::key_type choose_args_index;
3156 		decode(choose_args_index, blp);
3157 		crush_choose_arg_map arg_map;
3158 		arg_map.size = crush->max_buckets;
3159 		arg_map.args = static_cast<crush_choose_arg*>(calloc(
3160 		  arg_map.size, sizeof(crush_choose_arg)));
3161 		__u32 size;
3162 		decode(size, blp);
3163 		for (__u32 j = 0; j < size; j++) {
3164 		  __u32 bucket_index;
3165 		  decode(bucket_index, blp);
3166 		  ceph_assert(bucket_index < arg_map.size);
3167 		  crush_choose_arg *arg = &arg_map.args[bucket_index];
3168 		  decode(arg->weight_set_positions, blp);
3169 		  if (arg->weight_set_positions) {
3170 		    arg->weight_set = static_cast<crush_weight_set*>(calloc(
3171 		      arg->weight_set_positions, sizeof(crush_weight_set)));
3172 		    for (__u32 k = 0; k < arg->weight_set_positions; k++) {
3173 		      crush_weight_set *weight_set = &arg->weight_set[k];
3174 		      decode(weight_set->size, blp);
3175 		      weight_set->weights = (__u32*)calloc(
3176 			weight_set->size, sizeof(__u32));
3177 		      for (__u32 l = 0; l < weight_set->size; l++)
3178 			decode(weight_set->weights[l], blp);
3179 		    }
3180 		  }
3181 		  decode(arg->ids_size, blp);
3182 		  if (arg->ids_size) {
3183 		    ceph_assert(arg->ids_size == crush->buckets[bucket_index]->size);
3184 		    arg->ids = (__s32 *)calloc(arg->ids_size, sizeof(__s32));
3185 		    for (__u32 k = 0; k < arg->ids_size; k++)
3186 		      decode(arg->ids[k], blp);
3187 		  }
3188 		}
3189 		choose_args[choose_args_index] = arg_map;
3190 	      }
3191 	    }
3192 	    update_choose_args(nullptr); // in case we decode a legacy "corrupted" map
3193 	    finalize();
3194 	  }
3195 	  catch (...) {
3196 	    crush_destroy(crush);
3197 	    throw;
3198 	  }
3199 	}
3200 	
3201 	void CrushWrapper::decode_crush_bucket(crush_bucket** bptr, bufferlist::const_iterator &blp)
3202 	{
3203 	  using ceph::decode;
3204 	  __u32 alg;
3205 	  decode(alg, blp);
3206 	  if (!alg) {
3207 	    *bptr = NULL;
3208 	    return;
3209 	  }
3210 	
3211 	  int size = 0;
3212 	  switch (alg) {
3213 	  case CRUSH_BUCKET_UNIFORM:
3214 	    size = sizeof(crush_bucket_uniform);
3215 	    break;
3216 	  case CRUSH_BUCKET_LIST:
3217 	    size = sizeof(crush_bucket_list);
3218 	    break;
3219 	  case CRUSH_BUCKET_TREE:
3220 	    size = sizeof(crush_bucket_tree);
3221 	    break;
3222 	  case CRUSH_BUCKET_STRAW:
3223 	    size = sizeof(crush_bucket_straw);
3224 	    break;
3225 	  case CRUSH_BUCKET_STRAW2:
3226 	    size = sizeof(crush_bucket_straw2);
3227 	    break;
3228 	  default:
3229 	    {
3230 	      char str[128];
3231 	      snprintf(str, sizeof(str), "unsupported bucket algorithm: %d", alg);
3232 	      throw ceph::buffer::malformed_input(str);
3233 	    }
3234 	  }
3235 	  crush_bucket *bucket = reinterpret_cast<crush_bucket*>(calloc(1, size));
3236 	  *bptr = bucket;
3237 	    
3238 	  decode(bucket->id, blp);
3239 	  decode(bucket->type, blp);
3240 	  decode(bucket->alg, blp);
3241 	  decode(bucket->hash, blp);
3242 	  decode(bucket->weight, blp);
3243 	  decode(bucket->size, blp);
3244 	
3245 	  bucket->items = (__s32*)calloc(1, bucket->size * sizeof(__s32));
3246 	  for (unsigned j = 0; j < bucket->size; ++j) {
3247 	    decode(bucket->items[j], blp);
3248 	  }
3249 	
3250 	  switch (bucket->alg) {
3251 	  case CRUSH_BUCKET_UNIFORM:
3252 	    decode((reinterpret_cast<crush_bucket_uniform*>(bucket))->item_weight, blp);
3253 	    break;
3254 	
3255 	  case CRUSH_BUCKET_LIST: {
3256 	    crush_bucket_list* cbl = reinterpret_cast<crush_bucket_list*>(bucket);
3257 	    cbl->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
3258 	    cbl->sum_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
3259 	
3260 	    for (unsigned j = 0; j < bucket->size; ++j) {
3261 	      decode(cbl->item_weights[j], blp);
3262 	      decode(cbl->sum_weights[j], blp);
3263 	    }
3264 	    break;
3265 	  }
3266 	
3267 	  case CRUSH_BUCKET_TREE: {
3268 	    crush_bucket_tree* cbt = reinterpret_cast<crush_bucket_tree*>(bucket);
3269 	    decode(cbt->num_nodes, blp);
3270 	    cbt->node_weights = (__u32*)calloc(1, cbt->num_nodes * sizeof(__u32));
3271 	    for (unsigned j=0; j<cbt->num_nodes; j++) {
3272 	      decode(cbt->node_weights[j], blp);
3273 	    }
3274 	    break;
3275 	  }
3276 	
3277 	  case CRUSH_BUCKET_STRAW: {
3278 	    crush_bucket_straw* cbs = reinterpret_cast<crush_bucket_straw*>(bucket);
3279 	    cbs->straws = (__u32*)calloc(1, bucket->size * sizeof(__u32));
3280 	    cbs->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
3281 	    for (unsigned j = 0; j < bucket->size; ++j) {
3282 	      decode(cbs->item_weights[j], blp);
3283 	      decode(cbs->straws[j], blp);
3284 	    }
3285 	    break;
3286 	  }
3287 	
3288 	  case CRUSH_BUCKET_STRAW2: {
3289 	    crush_bucket_straw2* cbs = reinterpret_cast<crush_bucket_straw2*>(bucket);
3290 	    cbs->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
3291 	    for (unsigned j = 0; j < bucket->size; ++j) {
3292 	      decode(cbs->item_weights[j], blp);
3293 	    }
3294 	    break;
3295 	  }
3296 	
3297 	  default:
3298 	    // We should have handled this case in the first switch statement
3299 	    ceph_abort();
3300 	    break;
3301 	  }
3302 	}
3303 	
3304 	  
3305 	void CrushWrapper::dump(Formatter *f) const
3306 	{
3307 	  f->open_array_section("devices");
3308 	  for (int i=0; i<get_max_devices(); i++) {
3309 	    f->open_object_section("device");
3310 	    f->dump_int("id", i);
3311 	    const char *n = get_item_name(i);
3312 	    if (n) {
3313 	      f->dump_string("name", n);
3314 	    } else {
3315 	      char name[20];
3316 	      sprintf(name, "device%d", i);
3317 	      f->dump_string("name", name);
3318 	    }
3319 	    const char *device_class = get_item_class(i);
3320 	    if (device_class != NULL)
3321 	      f->dump_string("class", device_class);
3322 	    f->close_section();
3323 	  }
3324 	  f->close_section();
3325 	
3326 	  f->open_array_section("types");
3327 	  int n = get_num_type_names();
3328 	  for (int i=0; n; i++) {
3329 	    const char *name = get_type_name(i);
3330 	    if (!name) {
3331 	      if (i == 0) {
3332 		f->open_object_section("type");
3333 		f->dump_int("type_id", 0);
3334 		f->dump_string("name", "device");
3335 		f->close_section();
3336 	      }
3337 	      continue;
3338 	    }
3339 	    n--;
3340 	    f->open_object_section("type");
3341 	    f->dump_int("type_id", i);
3342 	    f->dump_string("name", name);
3343 	    f->close_section();
3344 	  }
3345 	  f->close_section();
3346 	
3347 	  f->open_array_section("buckets");
3348 	  for (int bucket = -1; bucket > -1-get_max_buckets(); --bucket) {
3349 	    if (!bucket_exists(bucket))
3350 	      continue;
3351 	    f->open_object_section("bucket");
3352 	    f->dump_int("id", bucket);
3353 	    if (get_item_name(bucket))
3354 	      f->dump_string("name", get_item_name(bucket));
3355 	    f->dump_int("type_id", get_bucket_type(bucket));
3356 	    if (get_type_name(get_bucket_type(bucket)))
3357 	      f->dump_string("type_name", get_type_name(get_bucket_type(bucket)));
3358 	    f->dump_int("weight", get_bucket_weight(bucket));
3359 	    f->dump_string("alg", crush_bucket_alg_name(get_bucket_alg(bucket)));
3360 	    f->dump_string("hash", crush_hash_name(get_bucket_hash(bucket)));
3361 	    f->open_array_section("items");
3362 	    for (int j=0; j<get_bucket_size(bucket); j++) {
3363 	      f->open_object_section("item");
3364 	      f->dump_int("id", get_bucket_item(bucket, j));
3365 	      f->dump_int("weight", get_bucket_item_weight(bucket, j));
3366 	      f->dump_int("pos", j);
3367 	      f->close_section();
3368 	    }
3369 	    f->close_section();
3370 	    f->close_section();
3371 	  }
3372 	  f->close_section();
3373 	
3374 	  f->open_array_section("rules");
3375 	  dump_rules(f);
3376 	  f->close_section();
3377 	
3378 	  f->open_object_section("tunables");
3379 	  dump_tunables(f);
3380 	  f->close_section();
3381 	
3382 	  dump_choose_args(f);
3383 	}
3384 	
3385 	namespace {
3386 	  // depth first walker
3387 	  class TreeDumper {
3388 	    typedef CrushTreeDumper::Item Item;
3389 	    const CrushWrapper *crush;
3390 	    const CrushTreeDumper::name_map_t& weight_set_names;
3391 	  public:
3392 	    explicit TreeDumper(const CrushWrapper *crush,
3393 				const CrushTreeDumper::name_map_t& wsnames)
3394 	      : crush(crush), weight_set_names(wsnames) {}
3395 	
3396 	    void dump(Formatter *f) {
3397 	      set<int> roots;
3398 	      crush->find_roots(&roots);
3399 	      for (set<int>::iterator root = roots.begin(); root != roots.end(); ++root) {
3400 		dump_item(Item(*root, 0, 0, crush->get_bucket_weightf(*root)), f);
3401 	      }
3402 	    }
3403 	
3404 	  private:
3405 	    void dump_item(const Item& qi, Formatter* f) {
3406 	      if (qi.is_bucket()) {
3407 		f->open_object_section("bucket");
3408 		CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
3409 		dump_bucket_children(qi, f);
3410 		f->close_section();
3411 	      } else {
3412 		f->open_object_section("device");
3413 		CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
3414 		f->close_section();
3415 	      }
3416 	    }
3417 	
3418 	    void dump_bucket_children(const Item& parent, Formatter* f) {
3419 	      f->open_array_section("items");
3420 	      const int max_pos = crush->get_bucket_size(parent.id);
3421 	      for (int pos = 0; pos < max_pos; pos++) {
3422 		int id = crush->get_bucket_item(parent.id, pos);
3423 		float weight = crush->get_bucket_item_weightf(parent.id, pos);
3424 		dump_item(Item(id, parent.id, parent.depth + 1, weight), f);
3425 	      }
3426 	      f->close_section();
3427 	    }
3428 	  };
3429 	}
3430 	
3431 	void CrushWrapper::dump_tree(
3432 	  Formatter *f,
3433 	  const CrushTreeDumper::name_map_t& weight_set_names) const
3434 	{
3435 	  ceph_assert(f);
3436 	  TreeDumper(this, weight_set_names).dump(f);
3437 	}
3438 	
3439 	void CrushWrapper::dump_tunables(Formatter *f) const
3440 	{
3441 	  f->dump_int("choose_local_tries", get_choose_local_tries());
3442 	  f->dump_int("choose_local_fallback_tries", get_choose_local_fallback_tries());
3443 	  f->dump_int("choose_total_tries", get_choose_total_tries());
3444 	  f->dump_int("chooseleaf_descend_once", get_chooseleaf_descend_once());
3445 	  f->dump_int("chooseleaf_vary_r", get_chooseleaf_vary_r());
3446 	  f->dump_int("chooseleaf_stable", get_chooseleaf_stable());
3447 	  f->dump_int("straw_calc_version", get_straw_calc_version());
3448 	  f->dump_int("allowed_bucket_algs", get_allowed_bucket_algs());
3449 	
3450 	  // be helpful about it
3451 	  if (has_jewel_tunables())
3452 	    f->dump_string("profile", "jewel");
3453 	  else if (has_hammer_tunables())
3454 	    f->dump_string("profile", "hammer");
3455 	  else if (has_firefly_tunables())
3456 	    f->dump_string("profile", "firefly");
3457 	  else if (has_bobtail_tunables())
3458 	    f->dump_string("profile", "bobtail");
3459 	  else if (has_argonaut_tunables())
3460 	    f->dump_string("profile", "argonaut");
3461 	  else
3462 	    f->dump_string("profile", "unknown");
3463 	  f->dump_int("optimal_tunables", (int)has_optimal_tunables());
3464 	  f->dump_int("legacy_tunables", (int)has_legacy_tunables());
3465 	
3466 	  // be helpful about minimum version required
3467 	  f->dump_string("minimum_required_version", get_min_required_version());
3468 	
3469 	  f->dump_int("require_feature_tunables", (int)has_nondefault_tunables());
3470 	  f->dump_int("require_feature_tunables2", (int)has_nondefault_tunables2());
3471 	  f->dump_int("has_v2_rules", (int)has_v2_rules());
3472 	  f->dump_int("require_feature_tunables3", (int)has_nondefault_tunables3());
3473 	  f->dump_int("has_v3_rules", (int)has_v3_rules());
3474 	  f->dump_int("has_v4_buckets", (int)has_v4_buckets());
3475 	  f->dump_int("require_feature_tunables5", (int)has_nondefault_tunables5());
3476 	  f->dump_int("has_v5_rules", (int)has_v5_rules());
3477 	}
3478 	
3479 	void CrushWrapper::dump_choose_args(Formatter *f) const
3480 	{
3481 	  f->open_object_section("choose_args");
3482 	  for (auto c : choose_args) {
3483 	    crush_choose_arg_map arg_map = c.second;
3484 	    f->open_array_section(stringify(c.first).c_str());
3485 	    for (__u32 i = 0; i < arg_map.size; i++) {
3486 	      crush_choose_arg *arg = &arg_map.args[i];
3487 	      if (arg->weight_set_positions == 0 &&
3488 		  arg->ids_size == 0)
3489 		continue;
3490 	      f->open_object_section("choose_args");
3491 	      int bucket_index = i;
3492 	      f->dump_int("bucket_id", -1-bucket_index);
3493 	      if (arg->weight_set_positions > 0) {
3494 		f->open_array_section("weight_set");
3495 		for (__u32 j = 0; j < arg->weight_set_positions; j++) {
3496 		  f->open_array_section("weights");
3497 		  __u32 *weights = arg->weight_set[j].weights;
3498 		  __u32 size = arg->weight_set[j].size;
3499 		  for (__u32 k = 0; k < size; k++) {
3500 		    f->dump_float("weight", (float)weights[k]/(float)0x10000);
3501 		  }
3502 		  f->close_section();
3503 		}
3504 		f->close_section();
3505 	      }
3506 	      if (arg->ids_size > 0) {
3507 		f->open_array_section("ids");
3508 		for (__u32 j = 0; j < arg->ids_size; j++)
3509 		  f->dump_int("id", arg->ids[j]);
3510 		f->close_section();
3511 	      }
3512 	      f->close_section();
3513 	    }
3514 	    f->close_section();
3515 	  }
3516 	  f->close_section();
3517 	}
3518 	
3519 	void CrushWrapper::dump_rules(Formatter *f) const
3520 	{
3521 	  for (int i=0; i<get_max_rules(); i++) {
3522 	    if (!rule_exists(i))
3523 	      continue;
3524 	    dump_rule(i, f);
3525 	  }
3526 	}
3527 	
3528 	void CrushWrapper::dump_rule(int ruleset, Formatter *f) const
3529 	{
3530 	  f->open_object_section("rule");
3531 	  f->dump_int("rule_id", ruleset);
3532 	  if (get_rule_name(ruleset))
3533 	    f->dump_string("rule_name", get_rule_name(ruleset));
3534 	  f->dump_int("ruleset", get_rule_mask_ruleset(ruleset));
3535 	  f->dump_int("type", get_rule_mask_type(ruleset));
3536 	  f->dump_int("min_size", get_rule_mask_min_size(ruleset));
3537 	  f->dump_int("max_size", get_rule_mask_max_size(ruleset));
3538 	  f->open_array_section("steps");
3539 	  for (int j=0; j<get_rule_len(ruleset); j++) {
3540 	    f->open_object_section("step");
3541 	    switch (get_rule_op(ruleset, j)) {
3542 	    case CRUSH_RULE_NOOP:
3543 	      f->dump_string("op", "noop");
3544 	      break;
3545 	    case CRUSH_RULE_TAKE:
3546 	      f->dump_string("op", "take");
3547 	      {
3548 	        int item = get_rule_arg1(ruleset, j);
3549 	        f->dump_int("item", item);
3550 	
3551 	        const char *name = get_item_name(item);
3552 	        f->dump_string("item_name", name ? name : "");
3553 	      }
3554 	      break;
3555 	    case CRUSH_RULE_EMIT:
3556 	      f->dump_string("op", "emit");
3557 	      break;
3558 	    case CRUSH_RULE_CHOOSE_FIRSTN:
3559 	      f->dump_string("op", "choose_firstn");
3560 	      f->dump_int("num", get_rule_arg1(ruleset, j));
3561 	      f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
3562 	      break;
3563 	    case CRUSH_RULE_CHOOSE_INDEP:
3564 	      f->dump_string("op", "choose_indep");
3565 	      f->dump_int("num", get_rule_arg1(ruleset, j));
3566 	      f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
3567 	      break;
3568 	    case CRUSH_RULE_CHOOSELEAF_FIRSTN:
3569 	      f->dump_string("op", "chooseleaf_firstn");
3570 	      f->dump_int("num", get_rule_arg1(ruleset, j));
3571 	      f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
3572 	      break;
3573 	    case CRUSH_RULE_CHOOSELEAF_INDEP:
3574 	      f->dump_string("op", "chooseleaf_indep");
3575 	      f->dump_int("num", get_rule_arg1(ruleset, j));
3576 	      f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
3577 	      break;
3578 	    case CRUSH_RULE_SET_CHOOSE_TRIES:
3579 	      f->dump_string("op", "set_choose_tries");
3580 	      f->dump_int("num", get_rule_arg1(ruleset, j));
3581 	      break;
3582 	    case CRUSH_RULE_SET_CHOOSELEAF_TRIES:
3583 	      f->dump_string("op", "set_chooseleaf_tries");
3584 	      f->dump_int("num", get_rule_arg1(ruleset, j));
3585 	      break;
3586 	    default:
3587 	      f->dump_int("opcode", get_rule_op(ruleset, j));
3588 	      f->dump_int("arg1", get_rule_arg1(ruleset, j));
3589 	      f->dump_int("arg2", get_rule_arg2(ruleset, j));
3590 	    }
3591 	    f->close_section();
3592 	  }
3593 	  f->close_section();
3594 	  f->close_section();
3595 	}
3596 	
3597 	void CrushWrapper::list_rules(Formatter *f) const
3598 	{
3599 	  for (int rule = 0; rule < get_max_rules(); rule++) {
3600 	    if (!rule_exists(rule))
3601 	      continue;
3602 	    f->dump_string("name", get_rule_name(rule));
3603 	  }
3604 	}
3605 	
3606 	void CrushWrapper::list_rules(ostream *ss) const
3607 	{
3608 	  for (int rule = 0; rule < get_max_rules(); rule++) {
3609 	    if (!rule_exists(rule))
3610 	      continue;
3611 	    *ss << get_rule_name(rule) << "\n";
3612 	  }
3613 	}
3614 	
3615 	class CrushTreePlainDumper : public CrushTreeDumper::Dumper<TextTable> {
3616 	public:
3617 	  typedef CrushTreeDumper::Dumper<TextTable> Parent;
3618 	
3619 	  explicit CrushTreePlainDumper(const CrushWrapper *crush,
3620 					const CrushTreeDumper::name_map_t& wsnames)
3621 	    : Parent(crush, wsnames) {}
3622 	  explicit CrushTreePlainDumper(const CrushWrapper *crush,
3623 	                                const CrushTreeDumper::name_map_t& wsnames,
3624 	                                bool show_shadow)
3625 	    : Parent(crush, wsnames, show_shadow) {}
3626 	
3627 	
3628 	  void dump(TextTable *tbl) {
3629 	    tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
3630 	    tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
3631 	    tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
3632 	    for (auto& p : crush->choose_args) {
3633 	      if (p.first == CrushWrapper::DEFAULT_CHOOSE_ARGS) {
3634 		tbl->define_column("(compat)", TextTable::LEFT, TextTable::RIGHT);
3635 	      } else {
3636 		string name;
3637 		auto q = weight_set_names.find(p.first);
3638 		name = q != weight_set_names.end() ? q->second :
3639 		  stringify(p.first);
3640 		tbl->define_column(name.c_str(), TextTable::LEFT, TextTable::RIGHT);
3641 	      }
3642 	    }
3643 	    tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
3644 	    Parent::dump(tbl);
3645 	  }
3646 	
3647 	protected:
3648 	  void dump_item(const CrushTreeDumper::Item &qi, TextTable *tbl) override {
3649 	    const char *c = crush->get_item_class(qi.id);
3650 	    if (!c)
3651 	      c = "";
3652 	    *tbl << qi.id
3653 		 << c
3654 		 << weightf_t(qi.weight);
3655 	    for (auto& p : crush->choose_args) {
3656 	      if (qi.parent < 0) {
3657 		const crush_choose_arg_map cmap = crush->choose_args_get(p.first);
3658 		int bidx = -1 - qi.parent;
3659 		const crush_bucket *b = crush->get_bucket(qi.parent);
3660 		if (b &&
3661 		    bidx < (int)cmap.size &&
3662 		    cmap.args[bidx].weight_set &&
3663 		    cmap.args[bidx].weight_set_positions >= 1) {
3664 		  int pos;
3665 		  for (pos = 0;
3666 		       pos < (int)cmap.args[bidx].weight_set[0].size &&
3667 			 b->items[pos] != qi.id;
3668 		       ++pos) ;
3669 		  *tbl << weightf_t((float)cmap.args[bidx].weight_set[0].weights[pos] /
3670 				    (float)0x10000);
3671 		  continue;
3672 		}
3673 	      }
3674 	      *tbl << "";
3675 	    }
3676 	    ostringstream ss;
3677 	    for (int k=0; k < qi.depth; k++) {
3678 	      ss << "    ";
3679 	    }
3680 	    if (qi.is_bucket()) {
3681 	      ss << crush->get_type_name(crush->get_bucket_type(qi.id)) << " "
3682 		 << crush->get_item_name(qi.id);
3683 	    } else {
3684 	      ss << "osd." << qi.id;
3685 	    }
3686 	    *tbl << ss.str();
3687 	    *tbl << TextTable::endrow;
3688 	  }
3689 	};
3690 	
3691 	
3692 	class CrushTreeFormattingDumper : public CrushTreeDumper::FormattingDumper {
3693 	public:
3694 	  typedef CrushTreeDumper::FormattingDumper Parent;
3695 	
3696 	  explicit CrushTreeFormattingDumper(
3697 	    const CrushWrapper *crush,
3698 	    const CrushTreeDumper::name_map_t& wsnames)
3699 	    : Parent(crush, wsnames) {}
3700 	
3701 	  explicit CrushTreeFormattingDumper(
3702 	    const CrushWrapper *crush,
3703 	    const CrushTreeDumper::name_map_t& wsnames,
3704 	    bool show_shadow)
3705 	    : Parent(crush, wsnames, show_shadow) {}
3706 	
3707 	  void dump(Formatter *f) {
3708 	    f->open_array_section("nodes");
3709 	    Parent::dump(f);
3710 	    f->close_section();
3711 	
3712 	    // There is no stray bucket whose id is a negative number, so just get
3713 	    // the max_id and iterate from 0 to max_id to dump stray osds.
3714 	    f->open_array_section("stray");
3715 	    int32_t max_id = -1;
3716 	    if (!crush->name_map.empty()) {
3717 	      max_id = crush->name_map.rbegin()->first;
3718 	    }
3719 	    for (int32_t i = 0; i <= max_id; i++) {
3720 	      if (crush->item_exists(i) && !is_touched(i) && should_dump(i)) {
3721 	        dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
3722 	      }
3723 	    }
3724 	    f->close_section();
3725 	  }
3726 	};
3727 	
3728 	
3729 	void CrushWrapper::dump_tree(
3730 	  ostream *out,
3731 	  Formatter *f,
3732 	  const CrushTreeDumper::name_map_t& weight_set_names,
3733 	  bool show_shadow) const
3734 	{
3735 	  if (out) {
3736 	    TextTable tbl;
3737 	    CrushTreePlainDumper(this, weight_set_names, show_shadow).dump(&tbl);
3738 	    *out << tbl;
3739 	  }
3740 	  if (f) {
3741 	    CrushTreeFormattingDumper(this, weight_set_names, show_shadow).dump(f);
3742 	  }
3743 	}
3744 	
3745 	void CrushWrapper::generate_test_instances(list<CrushWrapper*>& o)
3746 	{
3747 	  o.push_back(new CrushWrapper);
3748 	  // fixme
3749 	}
3750 	
3751 	/**
3752 	 * Determine the default CRUSH ruleset ID to be used with
3753 	 * newly created replicated pools.
3754 	 *
3755 	 * @returns a ruleset ID (>=0) or -1 if no suitable ruleset found
3756 	 */
3757 	int CrushWrapper::get_osd_pool_default_crush_replicated_ruleset(CephContext *cct)
3758 	{
3759 	  int crush_ruleset = cct->_conf.get_val<int64_t>("osd_pool_default_crush_rule");
3760 	  if (crush_ruleset < 0) {
3761 	    crush_ruleset = find_first_ruleset(pg_pool_t::TYPE_REPLICATED);
3762 	  } else if (!ruleset_exists(crush_ruleset)) {
3763 	    crush_ruleset = -1; // match find_first_ruleset() retval
3764 	  }
3765 	  return crush_ruleset;
3766 	}
3767 	
3768 	bool CrushWrapper::is_valid_crush_name(const string& s)
3769 	{
3770 	  if (s.empty())
3771 	    return false;
3772 	  for (string::const_iterator p = s.begin(); p != s.end(); ++p) {
3773 	    if (!(*p == '-') &&
3774 		!(*p == '_') &&
3775 		!(*p == '.') &&
3776 		!(*p >= '0' && *p <= '9') &&
3777 		!(*p >= 'A' && *p <= 'Z') &&
3778 		!(*p >= 'a' && *p <= 'z'))
3779 	      return false;
3780 	  }
3781 	  return true;
3782 	}
3783 	
3784 	bool CrushWrapper::is_valid_crush_loc(CephContext *cct,
3785 	                                      const map<string,string>& loc)
3786 	{
3787 	  for (map<string,string>::const_iterator l = loc.begin(); l != loc.end(); ++l) {
3788 	    if (!is_valid_crush_name(l->first) ||
3789 	        !is_valid_crush_name(l->second)) {
3790 	      ldout(cct, 1) << "loc["
3791 	                    << l->first << "] = '"
3792 	                    << l->second << "' not a valid crush name ([A-Za-z0-9_-.]+)"
3793 	                    << dendl;
3794 	      return false;
3795 	    }
3796 	  }
3797 	  return true;
3798 	}
3799 	
3800 	int CrushWrapper::_choose_type_stack(
3801 	  CephContext *cct,
3802 	  const vector<pair<int,int>>& stack,
3803 	  const set<int>& overfull,
3804 	  const vector<int>& underfull,
3805 	  const vector<int>& orig,
3806 	  vector<int>::const_iterator& i,
3807 	  set<int>& used,
3808 	  vector<int> *pw,
3809 	  int root_bucket) const
3810 	{
3811 	  vector<int> w = *pw;
3812 	  vector<int> o;
3813 	
(1) Event cond_true: Condition "should_gather", taking true branch.
3814 	  ldout(cct, 10) << __func__ << " stack " << stack
3815 			 << " orig " << orig
3816 			 << " at " << *i
3817 			 << " pw " << *pw
3818 			 << dendl;
(2) Event cond_true: Condition "root_bucket < 0", taking true branch.
3819 	  ceph_assert(root_bucket < 0);
3820 	  vector<int> cumulative_fanout(stack.size());
3821 	  int f = 1;
(3) Event cond_true: Condition "j >= 0", taking true branch.
(5) Event loop_begin: Jumped back to beginning of loop.
(6) Event cond_false: Condition "j >= 0", taking false branch.
3822 	  for (int j = (int)stack.size() - 1; j >= 0; --j) {
3823 	    cumulative_fanout[j] = f;
3824 	    f *= stack[j].second;
(4) Event loop: Jumping back to the beginning of the loop.
(7) Event loop_end: Reached end of loop.
3825 	  }
(8) Event cond_true: Condition "should_gather", taking true branch.
3826 	  ldout(cct, 10) << __func__ << " cumulative_fanout " << cumulative_fanout
3827 			 << dendl;
3828 	
3829 	  // identify underfull targets for each intermediate level.
3830 	  // this serves two purposes:
3831 	  //   1. we can tell when we are selecting a bucket that does not have any underfull
3832 	  //      devices beneath it.  that means that if the current input includes an overfull
3833 	  //      device, we won't be able to find an underfull device with this parent to
3834 	  //      swap for it.
3835 	  //   2. when we decide we should reject a bucket due to the above, this list gives us
3836 	  //      a list of peers to consider that *do* have underfull devices available..  (we
3837 	  //      are careful to pick one that has the same parent.)
3838 	  vector<set<int>> underfull_buckets; // level -> set of buckets with >0 underfull item(s)
3839 	  underfull_buckets.resize(stack.size() - 1);
(9) Event for_end: No elements left in "underfull", leaving loop.
3840 	  for (auto osd : underfull) {
3841 	    int item = osd;
3842 	    for (int j = (int)stack.size() - 2; j >= 0; --j) {
3843 	      int type = stack[j].first;
3844 	      item = get_parent_of_type(item, type);
3845 	      ldout(cct, 10) << __func__ << " underfull " << osd << " type " << type
3846 			     << " is " << item << dendl;
3847 	      if (!subtree_contains(root_bucket, item)) {
3848 	        ldout(cct, 20) << __func__ << " not in root subtree " << root_bucket << dendl;
3849 	        continue;
3850 	      }
3851 	      underfull_buckets[j].insert(item);
3852 	    }
(10) Event loop_end: Reached end of loop.
3853 	  }
(11) Event cond_true: Condition "should_gather", taking true branch.
3854 	  ldout(cct, 20) << __func__ << " underfull_buckets " << underfull_buckets << dendl;
3855 	
(12) Event cond_true: Condition "j < stack->size()", taking true branch.
3856 	  for (unsigned j = 0; j < stack.size(); ++j) {
3857 	    int type = stack[j].first;
3858 	    int fanout = stack[j].second;
3859 	    int cum_fanout = cumulative_fanout[j];
(13) Event cond_true: Condition "should_gather", taking true branch.
3860 	    ldout(cct, 10) << " level " << j << ": type " << type << " fanout " << fanout
3861 			   << " cumulative " << cum_fanout
3862 			   << " w " << w << dendl;
3863 	    vector<int> o;
3864 	    auto tmpi = i;
(14) Event cond_false: Condition "i == orig->end()", taking false branch.
3865 	    if (i == orig.end()) {
3866 	      ldout(cct, 10) << __func__ << " end of orig, break 0" << dendl;
3867 	      break;
(15) Event if_end: End of if statement.
3868 	    }
(16) Event for_loop: Iterating over another element of "w".
3869 	    for (auto from : w) {
(17) Event cond_true: Condition "should_gather", taking true branch.
3870 	      ldout(cct, 10) << " from " << from << dendl;
3871 	      // identify leaves under each choice.  we use this to check whether any of these
3872 	      // leaves are overfull.  (if so, we need to make sure there are underfull candidates
3873 	      // to swap for them.)
3874 	      vector<set<int>> leaves;
3875 	      leaves.resize(fanout);
(18) Event cond_true: Condition "pos < fanout", taking true branch.
(30) Event loop_begin: Jumped back to beginning of loop.
(31) Event cond_true: Condition "pos < fanout", taking true branch.
(43) Event loop_begin: Jumped back to beginning of loop.
(44) Event cond_true: Condition "pos < fanout", taking true branch.
(56) Event loop_begin: Jumped back to beginning of loop.
(57) Event cond_true: Condition "pos < fanout", taking true branch.
3876 	      for (int pos = 0; pos < fanout; ++pos) {
(19) Event cond_true: Condition "type > 0", taking true branch.
(32) Event cond_true: Condition "type > 0", taking true branch.
(45) Event cond_true: Condition "type > 0", taking true branch.
(58) Event cond_true: Condition "type > 0", taking true branch.
3877 		if (type > 0) {
3878 		  // non-leaf
(59) Event deref_iterator: Dereferencing iterator "tmpi" though it is already past the end of its container.
Also see events: [past_the_end][tested_end]
3879 		  int item = get_parent_of_type(*tmpi, type);
3880 		  o.push_back(item);
3881 		  int n = cum_fanout;
(20) Event cond_true: Condition "n--", taking true branch.
(21) Event cond_true: Condition "tmpi != orig->end()", taking true branch.
(23) Event loop_begin: Jumped back to beginning of loop.
(24) Event cond_false: Condition "n--", taking false branch.
(33) Event cond_true: Condition "n--", taking true branch.
(34) Event cond_true: Condition "tmpi != orig->end()", taking true branch.
(36) Event loop_begin: Jumped back to beginning of loop.
(37) Event cond_false: Condition "n--", taking false branch.
(46) Event cond_true: Condition "n--", taking true branch.
(47) Event past_the_end: Function "end" creates an iterator.
(48) Event cond_false: Condition "tmpi != orig->end()", taking false branch.
(50) Event tested_end: "tmpi" testing equal to "orig->end()".
Also see events: [deref_iterator]
3882 		  while (n-- && tmpi != orig.end()) {
3883 		    leaves[pos].insert(*tmpi++);
(22) Event loop: Jumping back to the beginning of the loop.
(25) Event loop_end: Reached end of loop.
(35) Event loop: Jumping back to the beginning of the loop.
(38) Event loop_end: Reached end of loop.
(49) Event loop_end: Reached end of loop.
3884 		  }
(26) Event cond_true: Condition "should_gather", taking true branch.
(39) Event cond_true: Condition "should_gather", taking true branch.
(51) Event cond_false: Condition "should_gather", taking false branch.
3885 		  ldout(cct, 10) << __func__ << "   from " << *tmpi << " got " << item
(52) Event if_end: End of if statement.
3886 				 << " of type " << type << " over leaves " << leaves[pos] << dendl;
(27) Event if_fallthrough: Falling through to end of if statement.
(40) Event if_fallthrough: Falling through to end of if statement.
(53) Event if_fallthrough: Falling through to end of if statement.
3887 		} else {
3888 		  // leaf
3889 		  bool replaced = false;
3890 		  if (overfull.count(*i)) {
3891 		    for (auto item : underfull) {
3892 		      ldout(cct, 10) << __func__ << " pos " << pos
3893 				     << " was " << *i << " considering " << item
3894 				     << dendl;
3895 		      if (used.count(item)) {
3896 			ldout(cct, 20) << __func__ << "   in used " << used << dendl;
3897 			continue;
3898 		      }
3899 		      if (!subtree_contains(from, item)) {
3900 			ldout(cct, 20) << __func__ << "   not in subtree " << from << dendl;
3901 			continue;
3902 		      }
3903 		      if (std::find(orig.begin(), orig.end(), item) != orig.end()) {
3904 			ldout(cct, 20) << __func__ << "   in orig " << orig << dendl;
3905 			continue;
3906 		      }
3907 		      o.push_back(item);
3908 		      used.insert(item);
3909 		      ldout(cct, 10) << __func__ << " pos " << pos << " replace "
3910 				     << *i << " -> " << item << dendl;
3911 		      replaced = true;
3912 	              ceph_assert(i != orig.end());
3913 		      ++i;
3914 		      break;
3915 		    }
3916 		  }
3917 		  if (!replaced) {
3918 		    ldout(cct, 10) << __func__ << " pos " << pos << " keep " << *i
3919 				   << dendl;
3920 	            ceph_assert(i != orig.end());
3921 		    o.push_back(*i);
3922 		    ++i;
3923 		  }
3924 		  if (i == orig.end()) {
3925 		    ldout(cct, 10) << __func__ << " end of orig, break 1" << dendl;
3926 		    break;
3927 		  }
(28) Event if_end: End of if statement.
(41) Event if_end: End of if statement.
(54) Event if_end: End of if statement.
3928 		}
(29) Event loop: Jumping back to the beginning of the loop.
(42) Event loop: Jumping back to the beginning of the loop.
(55) Event loop: Jumping back to the beginning of the loop.
3929 	      }
3930 	      if (j + 1 < stack.size()) {
3931 		// check if any buckets have overfull leaves but no underfull candidates
3932 		for (int pos = 0; pos < fanout; ++pos) {
3933 		  if (underfull_buckets[j].count(o[pos]) == 0) {
3934 		    // are any leaves overfull?
3935 		    bool any_overfull = false;
3936 		    for (auto osd : leaves[pos]) {
3937 		      if (overfull.count(osd)) {
3938 			any_overfull = true;
3939 	               break;
3940 		      }
3941 		    }
3942 		    if (any_overfull) {
3943 		      ldout(cct, 10) << " bucket " << o[pos] << " has no underfull targets and "
3944 				     << ">0 leaves " << leaves[pos] << " is overfull; alts "
3945 				     << underfull_buckets[j]
3946 				     << dendl;
3947 		      for (auto alt : underfull_buckets[j]) {
3948 			if (std::find(o.begin(), o.end(), alt) == o.end()) {
3949 			  // see if alt has the same parent
3950 			  if (j == 0 ||
3951 			      get_parent_of_type(o[pos], stack[j-1].first) ==
3952 			      get_parent_of_type(alt, stack[j-1].first)) {
3953 			    if (j)
3954 			      ldout(cct, 10) << "  replacing " << o[pos]
3955 					     << " (which has no underfull leaves) with " << alt
3956 					     << " (same parent "
3957 					     << get_parent_of_type(alt, stack[j-1].first) << " type "
3958 					     << type << ")" << dendl;
3959 			    else
3960 			      ldout(cct, 10) << "  replacing " << o[pos]
3961 					     << " (which has no underfull leaves) with " << alt
3962 					     << " (first level)" << dendl;
3963 			    o[pos] = alt;
3964 			    break;
3965 			  } else {
3966 			    ldout(cct, 30) << "  alt " << alt << " for " << o[pos]
3967 					   << " has different parent, skipping" << dendl;
3968 			  }
3969 			}
3970 		      }
3971 		    }
3972 		  }
3973 		}
3974 	      }
3975 	      if (i == orig.end()) {
3976 		ldout(cct, 10) << __func__ << " end of orig, break 2" << dendl;
3977 		break;
3978 	      }
3979 	    }
3980 	    ldout(cct, 10) << __func__ << "  w <- " << o << " was " << w << dendl;
3981 	    w.swap(o);
3982 	  }
3983 	  *pw = w;
3984 	  return 0;
3985 	}
3986 	
3987 	int CrushWrapper::try_remap_rule(
3988 	  CephContext *cct,
3989 	  int ruleno,
3990 	  int maxout,
3991 	  const set<int>& overfull,
3992 	  const vector<int>& underfull,
3993 	  const vector<int>& orig,
3994 	  vector<int> *out) const
3995 	{
3996 	  const crush_map *map = crush;
3997 	  const crush_rule *rule = get_rule(ruleno);
3998 	  ceph_assert(rule);
3999 	
4000 	  ldout(cct, 10) << __func__ << " ruleno " << ruleno
4001 			<< " numrep " << maxout << " overfull " << overfull
4002 			<< " underfull " << underfull << " orig " << orig
4003 			<< dendl;
4004 	  vector<int> w; // working set
4005 	  out->clear();
4006 	
4007 	  auto i = orig.begin();
4008 	  set<int> used;
4009 	
4010 	  vector<pair<int,int>> type_stack;  // (type, fan-out)
4011 	  int root_bucket = 0;
4012 	  for (unsigned step = 0; step < rule->len; ++step) {
4013 	    const crush_rule_step *curstep = &rule->steps[step];
4014 	    ldout(cct, 10) << __func__ << " step " << step << " w " << w << dendl;
4015 	    switch (curstep->op) {
4016 	    case CRUSH_RULE_TAKE:
4017 	      if ((curstep->arg1 >= 0 && curstep->arg1 < map->max_devices) ||
4018 		  (-1-curstep->arg1 >= 0 && -1-curstep->arg1 < map->max_buckets &&
4019 		   map->buckets[-1-curstep->arg1])) {
4020 		w.clear();
4021 		w.push_back(curstep->arg1);
4022 		root_bucket = curstep->arg1;
4023 		ldout(cct, 10) << __func__ << " take " << w << dendl;
4024 	      } else {
4025 		ldout(cct, 1) << " bad take value " << curstep->arg1 << dendl;
4026 	      }
4027 	      break;
4028 	
4029 	    case CRUSH_RULE_CHOOSELEAF_FIRSTN:
4030 	    case CRUSH_RULE_CHOOSELEAF_INDEP:
4031 	      {
4032 		int numrep = curstep->arg1;
4033 		int type = curstep->arg2;
4034 		if (numrep <= 0)
4035 		  numrep += maxout;
4036 		type_stack.push_back(make_pair(type, numrep));
4037 	        if (type > 0)
4038 		  type_stack.push_back(make_pair(0, 1));
4039 		int r = _choose_type_stack(cct, type_stack, overfull, underfull, orig,
4040 					   i, used, &w, root_bucket);
4041 		if (r < 0)
4042 		  return r;
4043 		type_stack.clear();
4044 	      }
4045 	      break;
4046 	
4047 	    case CRUSH_RULE_CHOOSE_FIRSTN:
4048 	    case CRUSH_RULE_CHOOSE_INDEP:
4049 	      {
4050 		int numrep = curstep->arg1;
4051 		int type = curstep->arg2;
4052 		if (numrep <= 0)
4053 		  numrep += maxout;
4054 		type_stack.push_back(make_pair(type, numrep));
4055 	      }
4056 	      break;
4057 	
4058 	    case CRUSH_RULE_EMIT:
4059 	      ldout(cct, 10) << " emit " << w << dendl;
4060 	      if (!type_stack.empty()) {
4061 		int r = _choose_type_stack(cct, type_stack, overfull, underfull, orig,
4062 					   i, used, &w, root_bucket);
4063 		if (r < 0)
4064 		  return r;
4065 		type_stack.clear();
4066 	      }
4067 	      for (auto item : w) {
4068 		out->push_back(item);
4069 	      }
4070 	      w.clear();
4071 	      break;
4072 	
4073 	    default:
4074 	      // ignore
4075 	      break;
4076 	    }
4077 	  }
4078 	
4079 	  return 0;
4080 	}
4081 	
4082 	
4083 	int CrushWrapper::_choose_args_adjust_item_weight_in_bucket(
4084 	  CephContext *cct,
4085 	  crush_choose_arg_map cmap,
4086 	  int bucketid,
4087 	  int id,
4088 	  const vector<int>& weight,
4089 	  ostream *ss)
4090 	{
4091 	  int changed = 0;
4092 	  int bidx = -1 - bucketid;
4093 	  crush_bucket *b = crush->buckets[bidx];
4094 	  if (bidx >= (int)cmap.size) {
4095 	    if (ss)
4096 	      *ss << "no weight-set for bucket " << b->id;
4097 	    ldout(cct, 10) << __func__ << "  no crush_choose_arg for bucket " << b->id
4098 			   << dendl;
4099 	    return 0;
4100 	  }
4101 	  crush_choose_arg *carg = &cmap.args[bidx];
4102 	  if (carg->weight_set == NULL) {
4103 	    // create a weight-set for this bucket and populate it with the
4104 	    // bucket weights
4105 	    unsigned positions = get_choose_args_positions(cmap);
4106 	    carg->weight_set_positions = positions;
4107 	    carg->weight_set = static_cast<crush_weight_set*>(
4108 	      calloc(sizeof(crush_weight_set), positions));
4109 	    for (unsigned p = 0; p < positions; ++p) {
4110 	      carg->weight_set[p].size = b->size;
4111 	      carg->weight_set[p].weights = (__u32*)calloc(b->size, sizeof(__u32));
4112 	      for (unsigned i = 0; i < b->size; ++i) {
4113 		carg->weight_set[p].weights[i] = crush_get_bucket_item_weight(b, i);
4114 	      }
4115 	    }
4116 	    changed++;
4117 	  }
4118 	  if (carg->weight_set_positions != weight.size()) {
4119 	    if (ss)
4120 	      *ss << "weight_set_positions != " << weight.size() << " for bucket " << b->id;
4121 	    ldout(cct, 10) << __func__ << "  weight_set_positions != " << weight.size()
4122 			   << " for bucket " << b->id << dendl;
4123 	    return 0;
4124 	  }
4125 	  for (unsigned i = 0; i < b->size; i++) {
4126 	    if (b->items[i] == id) {
4127 	      for (unsigned j = 0; j < weight.size(); ++j) {
4128 		carg->weight_set[j].weights[i] = weight[j];
4129 	      }
4130 	      ldout(cct, 5) << __func__ << "  set " << id << " to " << weight
4131 			    << " in bucket " << b->id << dendl;
4132 	      changed++;
4133 	    }
4134 	  }
4135 	  if (changed) {
4136 	    vector<int> bucket_weight(weight.size(), 0);
4137 	    for (unsigned i = 0; i < b->size; i++) {
4138 	      for (unsigned j = 0; j < weight.size(); ++j) {
4139 		bucket_weight[j] += carg->weight_set[j].weights[i];
4140 	      }
4141 	    }
4142 	    choose_args_adjust_item_weight(cct, cmap, b->id, bucket_weight, nullptr);
4143 	  }
4144 	  return changed;
4145 	}
4146 	
4147 	int CrushWrapper::choose_args_adjust_item_weight(
4148 	  CephContext *cct,
4149 	  crush_choose_arg_map cmap,
4150 	  int id,
4151 	  const vector<int>& weight,
4152 	  ostream *ss)
4153 	{
4154 	  ldout(cct, 5) << __func__ << " " << id << " weight " << weight << dendl;
4155 	  int changed = 0;
4156 	  for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
4157 	    crush_bucket *b = crush->buckets[bidx];
4158 	    if (b == nullptr) {
4159 	      continue;
4160 	    }
4161 	    changed += _choose_args_adjust_item_weight_in_bucket(
4162 	      cct, cmap, b->id, id, weight, ss);
4163 	  }
4164 	  if (!changed) {
4165 	    if (ss)
4166 	      *ss << "item " << id << " not found in crush map";
4167 	    return -ENOENT;
4168 	  }
4169 	  return changed;
4170 	}
4171