1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#ifndef CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H
16   	#define CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H
17   	
18   	#include "common/ceph_mutex.h"
19   	#include "include/buffer.h"
20   	#include "include/xlist.h"
21   	#include "osd/osd_types.h"
22   	
23   	class IoCtxImpl;
24   	
25   	struct librados::AioCompletionImpl {
26   	  ceph::mutex lock = ceph::make_mutex("AioCompletionImpl lock", false);
27   	  ceph::condition_variable cond;
28   	  int ref = 1, rval = 0;
29   	  bool released = false;
30   	  bool complete = false;
31   	  version_t objver = 0;
32   	  ceph_tid_t tid = 0;
33   	
34   	  rados_callback_t callback_complete = nullptr, callback_safe = nullptr;
35   	  void *callback_complete_arg = nullptr, *callback_safe_arg = nullptr;
36   	
37   	  // for read
38   	  bool is_read = false;
39   	  bufferlist bl;
40   	  bufferlist *blp = nullptr;
41   	  char *out_buf = nullptr;
42   	
43   	  IoCtxImpl *io = nullptr;
44   	  ceph_tid_t aio_write_seq = 0;
45   	  xlist<AioCompletionImpl*>::item aio_write_list_item;
46   	
47   	  AioCompletionImpl() : aio_write_list_item(this) { }
48   	
49   	  int set_complete_callback(void *cb_arg, rados_callback_t cb) {
50   	    std::scoped_lock l{lock};
51   	    callback_complete = cb;
52   	    callback_complete_arg = cb_arg;
53   	    return 0;
54   	  }
55   	  int set_safe_callback(void *cb_arg, rados_callback_t cb) {
56   	    std::scoped_lock l{lock};
57   	    callback_safe = cb;
58   	    callback_safe_arg = cb_arg;
59   	    return 0;
60   	  }
61   	  int wait_for_complete() {
62   	    std::unique_lock l{lock};
63   	    cond.wait(l, [this] { return complete; });
64   	    return 0;
65   	  }
66   	  int wait_for_safe() {
67   	    return wait_for_complete();
68   	  }
69   	  int is_complete() {
70   	    std::scoped_lock l{lock};
71   	    return complete;
72   	  }
73   	  int is_safe() {
74   	    return is_complete();
75   	  }
76   	  int wait_for_complete_and_cb() {
77   	    std::unique_lock l{lock};
78   	    cond.wait(l, [this] { return complete && !callback_complete && !callback_safe; });
79   	    return 0;
80   	  }
81   	  int wait_for_safe_and_cb() {
82   	    return wait_for_complete_and_cb();
83   	  }
84   	  int is_complete_and_cb() {
85   	    std::scoped_lock l{lock};
86   	    return complete && !callback_complete && !callback_safe;
87   	  }
88   	  int is_safe_and_cb() {
89   	    return is_complete_and_cb();
90   	  }
91   	  int get_return_value() {
92   	    std::scoped_lock l{lock};
93   	    return rval;
94   	  }
95   	  uint64_t get_version() {
96   	    std::scoped_lock l{lock};
97   	    return objver;
98   	  }
99   	
100  	  void get() {
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
101  	    std::scoped_lock l{lock};
102  	    _get();
103  	  }
104  	  void _get() {
105  	    ceph_assert(ceph_mutex_is_locked(lock));
106  	    ceph_assert(ref > 0);
107  	    ++ref;
108  	  }
109  	  void release() {
110  	    lock.lock();
111  	    ceph_assert(!released);
112  	    released = true;
113  	    put_unlock();
114  	  }
115  	  void put() {
116  	    lock.lock();
117  	    put_unlock();
118  	  }
119  	  void put_unlock() {
120  	    ceph_assert(ref > 0);
121  	    int n = --ref;
122  	    lock.unlock();
123  	    if (!n)
124  	      delete this;
125  	  }
126  	};
127  	
128  	namespace librados {
129  	struct C_AioComplete : public Context {
130  	  AioCompletionImpl *c;
131  	
132  	  explicit C_AioComplete(AioCompletionImpl *cc) : c(cc) {
133  	    c->_get();
134  	  }
135  	
136  	  void finish(int r) override {
137  	    rados_callback_t cb_complete = c->callback_complete;
138  	    void *cb_complete_arg = c->callback_complete_arg;
139  	    if (cb_complete)
140  	      cb_complete(c, cb_complete_arg);
141  	
142  	    rados_callback_t cb_safe = c->callback_safe;
143  	    void *cb_safe_arg = c->callback_safe_arg;
144  	    if (cb_safe)
145  	      cb_safe(c, cb_safe_arg);
146  	
147  	    c->lock.lock();
148  	    c->callback_complete = NULL;
149  	    c->callback_safe = NULL;
150  	    c->cond.notify_all();
151  	    c->put_unlock();
152  	  }
153  	};
154  	
155  	/**
156  	  * Fills in all completed request data, and calls both
157  	  * complete and safe callbacks if they exist.
158  	  *
159  	  * Not useful for usual I/O, but for special things like
160  	  * flush where we only want to wait for things to be safe,
161  	  * but allow users to specify any of the callbacks.
162  	  */
163  	struct C_AioCompleteAndSafe : public Context {
164  	  AioCompletionImpl *c;
165  	
166  	  explicit C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
167  	    c->get();
168  	  }
169  	
170  	  void finish(int r) override {
171  	    c->lock.lock();
172  	    c->rval = r;
173  	    c->complete = true;
174  	    c->lock.unlock();
175  	
176  	    rados_callback_t cb_complete = c->callback_complete;
177  	    void *cb_complete_arg = c->callback_complete_arg;
178  	    if (cb_complete)
179  	      cb_complete(c, cb_complete_arg);
180  	
181  	    rados_callback_t cb_safe = c->callback_safe;
182  	    void *cb_safe_arg = c->callback_safe_arg;
183  	    if (cb_safe)
184  	      cb_safe(c, cb_safe_arg);
185  	
186  	    c->lock.lock();
187  	    c->callback_complete = NULL;
188  	    c->callback_safe = NULL;
189  	    c->cond.notify_all();
190  	    c->put_unlock();
191  	  }
192  	};
193  	
194  	}
195  	
196  	#endif
197