1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_CONTEXT_H
17 #define CEPH_CONTEXT_H
18
19 #include "common/dout.h"
20
21 #include <boost/function.hpp>
22 #include <list>
23 #include <set>
24 #include <memory>
25
26 #include "include/ceph_assert.h"
27 #include "common/ceph_mutex.h"
28
29 #define mydout(cct, v) lgeneric_subdout(cct, context, v)
30
31 /*
32 * GenContext - abstract callback class
33 */
34 template <typename T>
35 class GenContext {
36 GenContext(const GenContext& other);
37 const GenContext& operator=(const GenContext& other);
38
39 protected:
40 virtual void finish(T t) = 0;
41
42 public:
43 GenContext() {}
44 virtual ~GenContext() {} // we want a virtual destructor!!!
45
46 template <typename C>
47 void complete(C &&t) {
48 finish(std::forward<C>(t));
49 delete this;
50 }
51 };
52
53 template <typename T>
54 using GenContextURef = std::unique_ptr<GenContext<T> >;
55
56 /*
57 * Context - abstract callback class
58 */
59 class Finisher;
60 class Context {
61 Context(const Context& other);
62 const Context& operator=(const Context& other);
63
64 protected:
65 virtual void finish(int r) = 0;
66
67 // variant of finish that is safe to call "synchronously." override should
68 // return true.
69 virtual bool sync_finish(int r) {
70 return false;
71 }
72
73 public:
74 Context() {}
75 virtual ~Context() {} // we want a virtual destructor!!!
76 virtual void complete(int r) {
77 finish(r);
78 delete this;
79 }
80 virtual bool sync_complete(int r) {
81 if (sync_finish(r)) {
82 delete this;
83 return true;
84 }
85 return false;
86 }
87 };
88
89 /**
90 * Simple context holding a single object
91 */
92 template<class T>
93 class ContainerContext : public Context {
94 T obj;
95 public:
96 ContainerContext(T &obj) : obj(obj) {}
97 void finish(int r) override {}
98 };
99 template <typename T>
100 ContainerContext<T> *make_container_context(T &&t) {
101 return new ContainerContext<T>(std::forward<T>(t));
102 }
103
104 template <class T>
105 struct Wrapper : public Context {
106 Context *to_run;
107 T val;
108 Wrapper(Context *to_run, T val) : to_run(to_run), val(val) {}
109 void finish(int r) override {
110 if (to_run)
111 to_run->complete(r);
112 }
113 };
114 struct RunOnDelete {
115 Context *to_run;
116 RunOnDelete(Context *to_run) : to_run(to_run) {}
117 ~RunOnDelete() {
118 if (to_run)
119 to_run->complete(0);
120 }
121 };
122 typedef std::shared_ptr<RunOnDelete> RunOnDeleteRef;
123
124 template <typename T>
125 class LambdaContext : public Context {
126 public:
127 LambdaContext(T &&t) : t(std::forward<T>(t)) {}
128 void finish(int r) override {
129 t(r);
130 }
131 private:
132 T t;
133 };
134
135 template <typename T>
136 LambdaContext<T> *make_lambda_context(T &&t) {
137 return new LambdaContext<T>(std::move(t));
138 }
139
140 template <typename F, typename T>
141 struct LambdaGenContext : GenContext<T> {
142 F f;
143 LambdaGenContext(F &&f) : f(std::forward<F>(f)) {}
144 void finish(T t) override {
145 f(std::forward<T>(t));
146 }
147 };
148 template <typename T, typename F>
149 GenContextURef<T> make_gen_lambda_context(F &&f) {
150 return GenContextURef<T>(new LambdaGenContext<F, T>(std::move(f)));
151 }
152
153 /*
154 * finish and destroy a list of Contexts
155 */
156 template<class C>
157 inline void finish_contexts(CephContext *cct, C& finished, int result = 0)
158 {
159 if (finished.empty())
160 return;
161
162 C ls;
163 ls.swap(finished); // swap out of place to avoid weird loops
164
165 if (cct)
166 mydout(cct,10) << ls.size() << " contexts to finish with " << result << dendl;
167 for (Context* c : ls) {
168 if (cct)
169 mydout(cct,10) << "---- " << c << dendl;
170 c->complete(result);
171 }
172 }
173
174 class C_NoopContext : public Context {
175 public:
176 void finish(int r) override { }
177 };
178
179
180 struct C_Lock : public Context {
181 ceph::mutex *lock;
182 Context *fin;
183 C_Lock(ceph::mutex *l, Context *c) : lock(l), fin(c) {}
184 ~C_Lock() override {
185 delete fin;
186 }
187 void finish(int r) override {
188 if (fin) {
189 std::lock_guard l{*lock};
190 fin->complete(r);
191 fin = NULL;
192 }
193 }
194 };
195
196 /*
197 * C_Contexts - set of Contexts
198 *
199 * ContextType must be an ancestor class of ContextInstanceType, or the same class.
200 * ContextInstanceType must be default-constructable.
201 */
202 template <class ContextType, class ContextInstanceType, class Container = std::list<ContextType *>>
203 class C_ContextsBase : public ContextInstanceType {
204 public:
205 CephContext *cct;
206 Container contexts;
207
208 C_ContextsBase(CephContext *cct_)
209 : cct(cct_)
210 {
211 }
212 ~C_ContextsBase() override {
213 for (auto c : contexts) {
214 delete c;
215 }
216 }
217 void add(ContextType* c) {
218 contexts.push_back(c);
219 }
220 void take(Container& ls) {
221 Container c;
222 c.swap(ls);
223 if constexpr (std::is_same_v<Container, std::list<ContextType *>>) {
224 contexts.splice(contexts.end(), c);
225 } else {
226 contexts.insert(contexts.end(), c.begin(), c.end());
227 }
228 }
229 void complete(int r) override {
230 // Neuter any ContextInstanceType custom complete(), because although
231 // I want to look like it, I don't actually want to run its code.
232 Context::complete(r);
233 }
234 void finish(int r) override {
235 finish_contexts(cct, contexts, r);
236 }
237 bool empty() { return contexts.empty(); }
238
239 template<class C>
240 static ContextType *list_to_context(C& cs) {
241 if (cs.size() == 0) {
242 return 0;
243 } else if (cs.size() == 1) {
244 ContextType *c = cs.front();
245 cs.clear();
246 return c;
247 } else {
248 C_ContextsBase<ContextType, ContextInstanceType> *c(new C_ContextsBase<ContextType, ContextInstanceType>(0));
249 c->take(cs);
250 return c;
251 }
252 }
253 };
254
255 typedef C_ContextsBase<Context, Context> C_Contexts;
256
257 /*
258 * C_Gather
259 *
260 * ContextType must be an ancestor class of ContextInstanceType, or the same class.
261 * ContextInstanceType must be default-constructable.
262 *
263 * BUG:? only reports error from last sub to have an error return
264 */
265 template <class ContextType, class ContextInstanceType>
266 class C_GatherBase {
267 private:
268 CephContext *cct;
269 int result = 0;
270 ContextType *onfinish;
271 #ifdef DEBUG_GATHER
272 std::set<ContextType*> waitfor;
273 #endif
274 int sub_created_count = 0;
275 int sub_existing_count = 0;
276 mutable ceph::recursive_mutex lock =
277 ceph::make_recursive_mutex("C_GatherBase::lock"); // disable lockdep
278 bool activated = false;
279
280 void sub_finish(ContextType* sub, int r) {
(1) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
281 lock.lock();
282 #ifdef DEBUG_GATHER
283 ceph_assert(waitfor.count(sub));
284 waitfor.erase(sub);
285 #endif
286 --sub_existing_count;
287 mydout(cct,10) << "C_GatherBase " << this << ".sub_finish(r=" << r << ") " << sub
288 #ifdef DEBUG_GATHER
289 << " (remaining " << waitfor << ")"
290 #endif
291 << dendl;
292 if (r < 0 && result == 0)
293 result = r;
294 if ((activated == false) || (sub_existing_count != 0)) {
295 lock.unlock();
296 return;
297 }
298 lock.unlock();
299 delete_me();
300 }
301
302 void delete_me() {
303 if (onfinish) {
304 onfinish->complete(result);
305 onfinish = 0;
306 }
307 delete this;
308 }
309
310 class C_GatherSub : public ContextInstanceType {
311 C_GatherBase *gather;
312 public:
313 C_GatherSub(C_GatherBase *g) : gather(g) {}
314 void complete(int r) override {
315 // Cancel any customized complete() functionality
316 // from the Context subclass we're templated for,
317 // we only want to hit that in onfinish, not at each
318 // sub finish. e.g. MDSInternalContext.
319 Context::complete(r);
320 }
321 void finish(int r) override {
322 gather->sub_finish(this, r);
323 gather = 0;
324 }
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
325 ~C_GatherSub() override {
326 if (gather)
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
327 gather->sub_finish(this, 0);
328 }
329 };
330
331 public:
332 C_GatherBase(CephContext *cct_, ContextType *onfinish_)
333 : cct(cct_), onfinish(onfinish_)
334 {
335 mydout(cct,10) << "C_GatherBase " << this << ".new" << dendl;
336 }
337 ~C_GatherBase() {
338 mydout(cct,10) << "C_GatherBase " << this << ".delete" << dendl;
339 }
340 void set_finisher(ContextType *onfinish_) {
341 std::lock_guard l{lock};
342 ceph_assert(!onfinish);
343 onfinish = onfinish_;
344 }
345 void activate() {
346 lock.lock();
347 ceph_assert(activated == false);
348 activated = true;
349 if (sub_existing_count != 0) {
350 lock.unlock();
351 return;
352 }
353 lock.unlock();
354 delete_me();
355 }
356 ContextType *new_sub() {
357 std::lock_guard l{lock};
358 ceph_assert(activated == false);
359 sub_created_count++;
360 sub_existing_count++;
361 ContextType *s = new C_GatherSub(this);
362 #ifdef DEBUG_GATHER
363 waitfor.insert(s);
364 #endif
365 mydout(cct,10) << "C_GatherBase " << this << ".new_sub is " << sub_created_count << " " << s << dendl;
366 return s;
367 }
368
369 inline int get_sub_existing_count() const {
370 std::lock_guard l{lock};
371 return sub_existing_count;
372 }
373
374 inline int get_sub_created_count() const {
375 std::lock_guard l{lock};
376 return sub_created_count;
377 }
378 };
379
380 /*
381 * The C_GatherBuilder remembers each C_Context created by
382 * C_GatherBuilder.new_sub() in a C_Gather. When a C_Context created
383 * by new_sub() is complete(), C_Gather forgets about it. When
384 * C_GatherBuilder notices that there are no C_Context left in
385 * C_Gather, it calls complete() on the C_Context provided as the
386 * second argument of the constructor (finisher).
387 *
388 * How to use C_GatherBuilder:
389 *
390 * 1. Create a C_GatherBuilder on the stack
391 * 2. Call gather_bld.new_sub() as many times as you want to create new subs
392 * It is safe to call this 0 times, or 100, or anything in between.
393 * 3. If you didn't supply a finisher in the C_GatherBuilder constructor,
394 * set one with gather_bld.set_finisher(my_finisher)
395 * 4. Call gather_bld.activate()
396 *
397 * Example:
398 *
399 * C_SaferCond all_done;
400 * C_GatherBuilder gb(g_ceph_context, all_done);
401 * j.submit_entry(1, first, 0, gb.new_sub()); // add a C_Context to C_Gather
402 * j.submit_entry(2, first, 0, gb.new_sub()); // add a C_Context to C_Gather
403 * gb.activate(); // consume C_Context as soon as they complete()
404 * all_done.wait(); // all_done is complete() after all new_sub() are complete()
405 *
406 * The finisher may be called at any point after step 4, including immediately
407 * from the activate() function.
408 * The finisher will never be called before activate().
409 *
410 * Note: Currently, subs must be manually freed by the caller (for some reason.)
411 */
412 template <class ContextType, class GatherType>
413 class C_GatherBuilderBase
414 {
415 public:
416 C_GatherBuilderBase(CephContext *cct_)
417 : cct(cct_), c_gather(NULL), finisher(NULL), activated(false)
418 {
419 }
420 C_GatherBuilderBase(CephContext *cct_, ContextType *finisher_)
421 : cct(cct_), c_gather(NULL), finisher(finisher_), activated(false)
422 {
423 }
424 ~C_GatherBuilderBase() {
425 if (c_gather) {
426 ceph_assert(activated); // Don't forget to activate your C_Gather!
427 }
428 else {
429 delete finisher;
430 }
431 }
432 ContextType *new_sub() {
433 if (!c_gather) {
434 c_gather = new GatherType(cct, finisher);
435 }
436 return c_gather->new_sub();
437 }
438 void activate() {
439 if (!c_gather)
440 return;
441 ceph_assert(finisher != NULL);
442 activated = true;
443 c_gather->activate();
444 }
445 void set_finisher(ContextType *finisher_) {
446 finisher = finisher_;
447 if (c_gather)
448 c_gather->set_finisher(finisher);
449 }
450 GatherType *get() const {
451 return c_gather;
452 }
453 bool has_subs() const {
454 return (c_gather != NULL);
455 }
456 int num_subs_created() {
457 ceph_assert(!activated);
458 if (c_gather == NULL)
459 return 0;
460 return c_gather->get_sub_created_count();
461 }
462 int num_subs_remaining() {
463 ceph_assert(!activated);
464 if (c_gather == NULL)
465 return 0;
466 return c_gather->get_sub_existing_count();
467 }
468
469 private:
470 CephContext *cct;
471 GatherType *c_gather;
472 ContextType *finisher;
473 bool activated;
474 };
475
476 typedef C_GatherBase<Context, Context> C_Gather;
477 typedef C_GatherBuilderBase<Context, C_Gather > C_GatherBuilder;
478
479 template <class ContextType>
480 class ContextFactory {
481 public:
482 virtual ~ContextFactory() {}
483 virtual ContextType *build() = 0;
484 };
485
486 #undef mydout
487
488 #endif
489