1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
7
8 #include <vector>
9 #include <map>
10
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
13 #include "common/Cond.h"
14
15 class OSDMap;
16
17 /// work queue to perform work on batches of pgids on multiple CPUs
18 class ParallelPGMapper {
19 public:
20 struct Job {
21 utime_t start, finish;
22 unsigned shards = 0;
23 const OSDMap *osdmap;
24 bool aborted = false;
25 Context *onfinish = nullptr;
26
27 ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
28 ceph::condition_variable cond;
29
30 Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
31 virtual ~Job() {
32 ceph_assert(shards == 0);
33 }
34
35 // child must implement either form of process
36 virtual void process(const vector<pg_t>& pgs) = 0;
37 virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
38 virtual void complete() = 0;
39
40 void set_finish_event(Context *fin) {
41 lock.lock();
42 if (shards == 0) {
43 // already done.
44 lock.unlock();
45 fin->complete(0);
46 } else {
47 // set finisher
48 onfinish = fin;
49 lock.unlock();
50 }
51 }
52 bool is_done() {
53 std::lock_guard l(lock);
54 return shards == 0;
55 }
56 utime_t get_duration() {
57 return finish - start;
58 }
59 void wait() {
60 std::unique_lock l(lock);
61 cond.wait(l, [this] { return shards == 0; });
62 }
63 bool wait_for(double duration) {
64 utime_t until = start;
65 until += duration;
66 std::unique_lock l(lock);
67 while (shards > 0) {
68 if (ceph_clock_now() >= until) {
69 return false;
70 }
71 cond.wait(l);
72 }
73 return true;
74 }
75 void abort() {
76 Context *fin = nullptr;
77 {
78 std::unique_lock l(lock);
79 aborted = true;
80 fin = onfinish;
81 onfinish = nullptr;
82 cond.wait(l, [this] { return shards == 0; });
83 }
84 if (fin) {
85 fin->complete(-ECANCELED);
86 }
87 }
88
89 void start_one() {
90 std::lock_guard l(lock);
91 ++shards;
92 }
93 void finish_one();
94 };
95
96 protected:
97 CephContext *cct;
98
99 struct Item {
100 Job *job;
101 int64_t pool;
102 unsigned begin, end;
103 vector<pg_t> pgs;
104
(2) Event uninit_member: |
Non-static class member "pool" is not initialized in this constructor nor in any functions that it calls. |
(4) Event uninit_member: |
Non-static class member "begin" is not initialized in this constructor nor in any functions that it calls. |
(6) Event uninit_member: |
Non-static class member "end" is not initialized in this constructor nor in any functions that it calls. |
Also see events: |
[member_decl][member_decl][member_decl] |
105 Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {}
106 Item(Job *j, int64_t p, unsigned b, unsigned e)
107 : job(j),
108 pool(p),
109 begin(b),
110 end(e) {}
111 };
112 std::deque<Item*> q;
113
114 struct WQ : public ThreadPool::WorkQueue<Item> {
115 ParallelPGMapper *m;
116
117 WQ(ParallelPGMapper *m_, ThreadPool *tp)
118 : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
119 m(m_) {}
120
121 bool _enqueue(Item *i) override {
122 m->q.push_back(i);
123 return true;
124 }
125 void _dequeue(Item *i) override {
126 ceph_abort();
127 }
128 Item *_dequeue() override {
129 while (!m->q.empty()) {
130 Item *i = m->q.front();
131 m->q.pop_front();
132 if (i->job->aborted) {
133 i->job->finish_one();
134 delete i;
135 } else {
136 return i;
137 }
138 }
139 return nullptr;
140 }
141
142 void _process(Item *i, ThreadPool::TPHandle &h) override;
143
144 void _clear() override {
145 ceph_assert(_empty());
146 }
147
148 bool _empty() override {
149 return m->q.empty();
150 }
151 } wq;
152
153 public:
154 ParallelPGMapper(CephContext *cct, ThreadPool *tp)
155 : cct(cct),
156 wq(this, tp) {}
157
158 void queue(
159 Job *job,
160 unsigned pgs_per_item,
161 const vector<pg_t>& input_pgs);
162
163 void drain() {
164 wq.drain();
165 }
166 };
167
168
169 /// a precalculated mapping of every PG for a given OSDMap
170 class OSDMapMapping {
171 public:
172 MEMPOOL_CLASS_HELPERS();
173 private:
174
175 struct PoolMapping {
176 MEMPOOL_CLASS_HELPERS();
177
178 unsigned size = 0;
179 unsigned pg_num = 0;
180 bool erasure = false;
181 mempool::osdmap_mapping::vector<int32_t> table;
182
183 size_t row_size() const {
184 return
185 1 + // acting_primary
186 1 + // up_primary
187 1 + // num acting
188 1 + // num up
189 size + // acting
190 size; // up
191 }
192
193 PoolMapping(int s, int p, bool e)
194 : size(s),
195 pg_num(p),
196 erasure(e),
197 table(pg_num * row_size()) {
198 }
199
200 void get(size_t ps,
201 std::vector<int> *up,
202 int *up_primary,
203 std::vector<int> *acting,
204 int *acting_primary) const {
205 const int32_t *row = &table[row_size() * ps];
206 if (acting_primary) {
207 *acting_primary = row[0];
208 }
209 if (up_primary) {
210 *up_primary = row[1];
211 }
212 if (acting) {
213 acting->resize(row[2]);
214 for (int i = 0; i < row[2]; ++i) {
215 (*acting)[i] = row[4 + i];
216 }
217 }
218 if (up) {
219 up->resize(row[3]);
220 for (int i = 0; i < row[3]; ++i) {
221 (*up)[i] = row[4 + size + i];
222 }
223 }
224 }
225
226 void set(size_t ps,
227 const std::vector<int>& up,
228 int up_primary,
229 const std::vector<int>& acting,
230 int acting_primary) {
231 int32_t *row = &table[row_size() * ps];
232 row[0] = acting_primary;
233 row[1] = up_primary;
234 // these should always be <= the pool size, but just in case, avoid
235 // blowing out the array. Note that our mapping is not completely
236 // accurate in this case--this is just to avoid crashing.
237 row[2] = std::min<int32_t>(acting.size(), size);
238 row[3] = std::min<int32_t>(up.size(), size);
239 for (int i = 0; i < row[2]; ++i) {
240 row[4 + i] = acting[i];
241 }
242 for (int i = 0; i < row[3]; ++i) {
243 row[4 + size + i] = up[i];
244 }
245 }
246 };
247
248 mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
249 mempool::osdmap_mapping::vector<
250 mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
251 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
252 epoch_t epoch = 0;
253 uint64_t num_pgs = 0;
254
255 void _init_mappings(const OSDMap& osdmap);
256 void _update_range(
257 const OSDMap& map,
258 int64_t pool,
259 unsigned pg_begin, unsigned pg_end);
260
261 void _build_rmap(const OSDMap& osdmap);
262
263 void _start(const OSDMap& osdmap) {
264 _init_mappings(osdmap);
265 }
266 void _finish(const OSDMap& osdmap);
267
268 void _dump();
269
270 friend class ParallelPGMapper;
271
272 struct MappingJob : public ParallelPGMapper::Job {
273 OSDMapMapping *mapping;
274 MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
275 : Job(osdmap), mapping(m) {
276 mapping->_start(*osdmap);
277 }
278 void process(const vector<pg_t>& pgs) override {}
279 void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
280 mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
281 }
282 void complete() override {
283 mapping->_finish(*osdmap);
284 }
285 };
286
287 public:
288 void get(pg_t pgid,
289 std::vector<int> *up,
290 int *up_primary,
291 std::vector<int> *acting,
292 int *acting_primary) const {
293 auto p = pools.find(pgid.pool());
294 ceph_assert(p != pools.end());
295 ceph_assert(pgid.ps() < p->second.pg_num);
296 p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
297 }
298
299 bool get_primary_and_shard(pg_t pgid,
300 int *acting_primary,
301 spg_t *spgid) {
302 auto p = pools.find(pgid.pool());
303 ceph_assert(p != pools.end());
304 ceph_assert(pgid.ps() < p->second.pg_num);
305 std::vector<int> acting;
306 p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary);
307 if (p->second.erasure) {
308 for (uint8_t i = 0; i < acting.size(); ++i) {
309 if (acting[i] == *acting_primary) {
310 *spgid = spg_t(pgid, shard_id_t(i));
311 return true;
312 }
313 }
314 return false;
315 } else {
316 *spgid = spg_t(pgid);
317 return true;
318 }
319 }
320
321 const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
322 ceph_assert(osd < acting_rmap.size());
323 return acting_rmap[osd];
324 }
325
326 void update(const OSDMap& map);
327 void update(const OSDMap& map, pg_t pgid);
328
329 std::unique_ptr<MappingJob> start_update(
330 const OSDMap& map,
331 ParallelPGMapper& mapper,
332 unsigned pgs_per_item) {
333 std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
334 mapper.queue(job.get(), pgs_per_item, {});
335 return job;
336 }
337
338 epoch_t get_epoch() const {
339 return epoch;
340 }
341
342 uint64_t get_num_pgs() const {
343 return num_pgs;
344 }
345 };
346
347
348 #endif
349