1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	
5    	#ifndef CEPH_OSDMAPMAPPING_H
6    	#define CEPH_OSDMAPMAPPING_H
7    	
8    	#include <vector>
9    	#include <map>
10   	
11   	#include "osd/osd_types.h"
12   	#include "common/WorkQueue.h"
13   	#include "common/Cond.h"
14   	
15   	class OSDMap;
16   	
17   	/// work queue to perform work on batches of pgids on multiple CPUs
18   	class ParallelPGMapper {
19   	public:
20   	  struct Job {
21   	    utime_t start, finish;
22   	    unsigned shards = 0;
23   	    const OSDMap *osdmap;
24   	    bool aborted = false;
25   	    Context *onfinish = nullptr;
26   	
27   	    ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
28   	    ceph::condition_variable cond;
29   	
30   	    Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
31   	    virtual ~Job() {
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
32   	      ceph_assert(shards == 0);
33   	    }
34   	
35   	    // child must implement either form of process
36   	    virtual void process(const vector<pg_t>& pgs) = 0;
37   	    virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
38   	    virtual void complete() = 0;
39   	
40   	    void set_finish_event(Context *fin) {
41   	      lock.lock();
42   	      if (shards == 0) {
43   		// already done.
44   		lock.unlock();
45   		fin->complete(0);
46   	      } else {
47   		// set finisher
48   		onfinish = fin;
49   		lock.unlock();
50   	      }
51   	    }
52   	    bool is_done() {
53   	      std::lock_guard l(lock);
54   	      return shards == 0;
55   	    }
56   	    utime_t get_duration() {
57   	      return finish - start;
58   	    }
59   	    void wait() {
60   	      std::unique_lock l(lock);
61   	      cond.wait(l, [this] { return shards == 0; });
62   	    }
63   	    bool wait_for(double duration) {
64   	      utime_t until = start;
65   	      until += duration;
66   	      std::unique_lock l(lock);
67   	      while (shards > 0) {
68   		if (ceph_clock_now() >= until) {
69   		  return false;
70   		}
71   		cond.wait(l);
72   	      }
73   	      return true;
74   	    }
75   	    void abort() {
76   	      Context *fin = nullptr;
77   	      {
78   		std::unique_lock l(lock);
79   		aborted = true;
80   		fin = onfinish;
81   		onfinish = nullptr;
82   		cond.wait(l, [this] { return shards == 0; });
83   	      }
84   	      if (fin) {
85   		fin->complete(-ECANCELED);
86   	      }
87   	    }
88   	
89   	    void start_one() {
90   	      std::lock_guard l(lock);
91   	      ++shards;
92   	    }
93   	    void finish_one();
94   	  };
95   	
96   	protected:
97   	  CephContext *cct;
98   	
99   	  struct Item {
100  	    Job *job;
101  	    int64_t pool;
102  	    unsigned begin, end;
103  	    vector<pg_t> pgs;
104  	
105  	    Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {}
106  	    Item(Job *j, int64_t p, unsigned b, unsigned e)
107  	      : job(j),
108  		pool(p),
109  		begin(b),
110  		end(e) {}
111  	  };
112  	  std::deque<Item*> q;
113  	
114  	  struct WQ : public ThreadPool::WorkQueue<Item> {
115  	    ParallelPGMapper *m;
116  	
117  	    WQ(ParallelPGMapper *m_, ThreadPool *tp)
118  	      : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
119  	        m(m_) {}
120  	
121  	    bool _enqueue(Item *i) override {
122  	      m->q.push_back(i);
123  	      return true;
124  	    }
125  	    void _dequeue(Item *i) override {
126  	      ceph_abort();
127  	    }
128  	    Item *_dequeue() override {
129  	      while (!m->q.empty()) {
130  		Item *i = m->q.front();
131  		m->q.pop_front();
132  		if (i->job->aborted) {
133  		  i->job->finish_one();
134  		  delete i;
135  		} else {
136  		  return i;
137  		}
138  	      }
139  	      return nullptr;
140  	    }
141  	
142  	    void _process(Item *i, ThreadPool::TPHandle &h) override;
143  	
144  	    void _clear() override {
145  	      ceph_assert(_empty());
146  	    }
147  	
148  	    bool _empty() override {
149  	      return m->q.empty();
150  	    }
151  	  } wq;
152  	
153  	public:
154  	  ParallelPGMapper(CephContext *cct, ThreadPool *tp)
155  	    : cct(cct),
156  	      wq(this, tp) {}
157  	
158  	  void queue(
159  	    Job *job,
160  	    unsigned pgs_per_item,
161  	    const vector<pg_t>& input_pgs);
162  	
163  	  void drain() {
164  	    wq.drain();
165  	  }
166  	};
167  	
168  	
169  	/// a precalculated mapping of every PG for a given OSDMap
170  	class OSDMapMapping {
171  	public:
172  	  MEMPOOL_CLASS_HELPERS();
173  	private:
174  	
175  	  struct PoolMapping {
176  	    MEMPOOL_CLASS_HELPERS();
177  	
178  	    unsigned size = 0;
179  	    unsigned pg_num = 0;
180  	    bool erasure = false;
181  	    mempool::osdmap_mapping::vector<int32_t> table;
182  	
183  	    size_t row_size() const {
184  	      return
185  		1 + // acting_primary
186  		1 + // up_primary
187  		1 + // num acting
188  		1 + // num up
189  		size + // acting
190  		size;  // up
191  	    }
192  	
193  	    PoolMapping(int s, int p, bool e)
194  	      : size(s),
195  		pg_num(p),
196  		erasure(e),
197  		table(pg_num * row_size()) {
198  	    }
199  	
200  	    void get(size_t ps,
201  		     std::vector<int> *up,
202  		     int *up_primary,
203  		     std::vector<int> *acting,
204  		     int *acting_primary) const {
205  	      const int32_t *row = &table[row_size() * ps];
206  	      if (acting_primary) {
207  		*acting_primary = row[0];
208  	      }
209  	      if (up_primary) {
210  		*up_primary = row[1];
211  	      }
212  	      if (acting) {
213  		acting->resize(row[2]);
214  		for (int i = 0; i < row[2]; ++i) {
215  		  (*acting)[i] = row[4 + i];
216  		}
217  	      }
218  	      if (up) {
219  		up->resize(row[3]);
220  		for (int i = 0; i < row[3]; ++i) {
221  		  (*up)[i] = row[4 + size + i];
222  		}
223  	      }
224  	    }
225  	
226  	    void set(size_t ps,
227  		     const std::vector<int>& up,
228  		     int up_primary,
229  		     const std::vector<int>& acting,
230  		     int acting_primary) {
231  	      int32_t *row = &table[row_size() * ps];
232  	      row[0] = acting_primary;
233  	      row[1] = up_primary;
234  	      // these should always be <= the pool size, but just in case, avoid
235  	      // blowing out the array.  Note that our mapping is not completely
236  	      // accurate in this case--this is just to avoid crashing.
237  	      row[2] = std::min<int32_t>(acting.size(), size);
238  	      row[3] = std::min<int32_t>(up.size(), size);
239  	      for (int i = 0; i < row[2]; ++i) {
240  		row[4 + i] = acting[i];
241  	      }
242  	      for (int i = 0; i < row[3]; ++i) {
243  		row[4 + size + i] = up[i];
244  	      }
245  	    }
246  	  };
247  	
248  	  mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
249  	  mempool::osdmap_mapping::vector<
250  	    mempool::osdmap_mapping::vector<pg_t>> acting_rmap;  // osd -> pg
251  	  //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap;  // osd -> pg
252  	  epoch_t epoch = 0;
253  	  uint64_t num_pgs = 0;
254  	
255  	  void _init_mappings(const OSDMap& osdmap);
256  	  void _update_range(
257  	    const OSDMap& map,
258  	    int64_t pool,
259  	    unsigned pg_begin, unsigned pg_end);
260  	
261  	  void _build_rmap(const OSDMap& osdmap);
262  	
263  	  void _start(const OSDMap& osdmap) {
264  	    _init_mappings(osdmap);
265  	  }
266  	  void _finish(const OSDMap& osdmap);
267  	
268  	  void _dump();
269  	
270  	  friend class ParallelPGMapper;
271  	
272  	  struct MappingJob : public ParallelPGMapper::Job {
273  	    OSDMapMapping *mapping;
274  	    MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
275  	      : Job(osdmap), mapping(m) {
276  	      mapping->_start(*osdmap);
277  	    }
278  	    void process(const vector<pg_t>& pgs) override {}
279  	    void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
280  	      mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
281  	    }
282  	    void complete() override {
283  	      mapping->_finish(*osdmap);
284  	    }
285  	  };
286  	
287  	public:
288  	  void get(pg_t pgid,
289  		   std::vector<int> *up,
290  		   int *up_primary,
291  		   std::vector<int> *acting,
292  		   int *acting_primary) const {
293  	    auto p = pools.find(pgid.pool());
294  	    ceph_assert(p != pools.end());
295  	    ceph_assert(pgid.ps() < p->second.pg_num);
296  	    p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
297  	  }
298  	
299  	  bool get_primary_and_shard(pg_t pgid,
300  				     int *acting_primary,
301  				     spg_t *spgid) {
302  	    auto p = pools.find(pgid.pool());
303  	    ceph_assert(p != pools.end());
304  	    ceph_assert(pgid.ps() < p->second.pg_num);
305  	    std::vector<int> acting;
306  	    p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary);
307  	    if (p->second.erasure) {
308  	      for (uint8_t i = 0; i < acting.size(); ++i) {
309  		if (acting[i] == *acting_primary) {
310  		  *spgid = spg_t(pgid, shard_id_t(i));
311  		  return true;
312  		}
313  	      }
314  	      return false;
315  	    } else {
316  	      *spgid = spg_t(pgid);
317  	      return true;
318  	    }
319  	  }
320  	
321  	  const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
322  	    ceph_assert(osd < acting_rmap.size());
323  	    return acting_rmap[osd];
324  	  }
325  	
326  	  void update(const OSDMap& map);
327  	  void update(const OSDMap& map, pg_t pgid);
328  	
329  	  std::unique_ptr<MappingJob> start_update(
330  	    const OSDMap& map,
331  	    ParallelPGMapper& mapper,
332  	    unsigned pgs_per_item) {
333  	    std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
334  	    mapper.queue(job.get(), pgs_per_item, {});
335  	    return job;
336  	  }
337  	
338  	  epoch_t get_epoch() const {
339  	    return epoch;
340  	  }
341  	
342  	  uint64_t get_num_pgs() const {
343  	    return num_pgs;
344  	  }
345  	};
346  	
347  	
348  	#endif
349