1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H
16 #define CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H
17
18 #include "common/ceph_mutex.h"
19 #include "include/buffer.h"
20 #include "include/xlist.h"
21 #include "osd/osd_types.h"
22
23 class IoCtxImpl;
24
25 struct librados::AioCompletionImpl {
26 ceph::mutex lock = ceph::make_mutex("AioCompletionImpl lock", false);
27 ceph::condition_variable cond;
28 int ref = 1, rval = 0;
29 bool released = false;
30 bool complete = false;
31 version_t objver = 0;
32 ceph_tid_t tid = 0;
33
34 rados_callback_t callback_complete = nullptr, callback_safe = nullptr;
35 void *callback_complete_arg = nullptr, *callback_safe_arg = nullptr;
36
37 // for read
38 bool is_read = false;
39 bufferlist bl;
40 bufferlist *blp = nullptr;
41 char *out_buf = nullptr;
42
43 IoCtxImpl *io = nullptr;
44 ceph_tid_t aio_write_seq = 0;
45 xlist<AioCompletionImpl*>::item aio_write_list_item;
46
47 AioCompletionImpl() : aio_write_list_item(this) { }
48
49 int set_complete_callback(void *cb_arg, rados_callback_t cb) {
50 std::scoped_lock l{lock};
51 callback_complete = cb;
52 callback_complete_arg = cb_arg;
53 return 0;
54 }
55 int set_safe_callback(void *cb_arg, rados_callback_t cb) {
56 std::scoped_lock l{lock};
57 callback_safe = cb;
58 callback_safe_arg = cb_arg;
59 return 0;
60 }
61 int wait_for_complete() {
62 std::unique_lock l{lock};
63 cond.wait(l, [this] { return complete; });
64 return 0;
65 }
66 int wait_for_safe() {
67 return wait_for_complete();
68 }
69 int is_complete() {
70 std::scoped_lock l{lock};
71 return complete;
72 }
73 int is_safe() {
74 return is_complete();
75 }
76 int wait_for_complete_and_cb() {
77 std::unique_lock l{lock};
78 cond.wait(l, [this] { return complete && !callback_complete && !callback_safe; });
79 return 0;
80 }
81 int wait_for_safe_and_cb() {
82 return wait_for_complete_and_cb();
83 }
84 int is_complete_and_cb() {
85 std::scoped_lock l{lock};
86 return complete && !callback_complete && !callback_safe;
87 }
88 int is_safe_and_cb() {
89 return is_complete_and_cb();
90 }
91 int get_return_value() {
92 std::scoped_lock l{lock};
93 return rval;
94 }
95 uint64_t get_version() {
96 std::scoped_lock l{lock};
97 return objver;
98 }
99
100 void get() {
(1) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
101 std::scoped_lock l{lock};
102 _get();
103 }
104 void _get() {
105 ceph_assert(ceph_mutex_is_locked(lock));
106 ceph_assert(ref > 0);
107 ++ref;
108 }
109 void release() {
110 lock.lock();
111 ceph_assert(!released);
112 released = true;
113 put_unlock();
114 }
115 void put() {
116 lock.lock();
117 put_unlock();
118 }
119 void put_unlock() {
120 ceph_assert(ref > 0);
121 int n = --ref;
122 lock.unlock();
123 if (!n)
124 delete this;
125 }
126 };
127
128 namespace librados {
129 struct C_AioComplete : public Context {
130 AioCompletionImpl *c;
131
132 explicit C_AioComplete(AioCompletionImpl *cc) : c(cc) {
133 c->_get();
134 }
135
136 void finish(int r) override {
137 rados_callback_t cb_complete = c->callback_complete;
138 void *cb_complete_arg = c->callback_complete_arg;
139 if (cb_complete)
140 cb_complete(c, cb_complete_arg);
141
142 rados_callback_t cb_safe = c->callback_safe;
143 void *cb_safe_arg = c->callback_safe_arg;
144 if (cb_safe)
145 cb_safe(c, cb_safe_arg);
146
147 c->lock.lock();
148 c->callback_complete = NULL;
149 c->callback_safe = NULL;
150 c->cond.notify_all();
151 c->put_unlock();
152 }
153 };
154
155 /**
156 * Fills in all completed request data, and calls both
157 * complete and safe callbacks if they exist.
158 *
159 * Not useful for usual I/O, but for special things like
160 * flush where we only want to wait for things to be safe,
161 * but allow users to specify any of the callbacks.
162 */
163 struct C_AioCompleteAndSafe : public Context {
164 AioCompletionImpl *c;
165
166 explicit C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
167 c->get();
168 }
169
170 void finish(int r) override {
171 c->lock.lock();
172 c->rval = r;
173 c->complete = true;
174 c->lock.unlock();
175
176 rados_callback_t cb_complete = c->callback_complete;
177 void *cb_complete_arg = c->callback_complete_arg;
178 if (cb_complete)
179 cb_complete(c, cb_complete_arg);
180
181 rados_callback_t cb_safe = c->callback_safe;
182 void *cb_safe_arg = c->callback_safe_arg;
183 if (cb_safe)
184 cb_safe(c, cb_safe_arg);
185
186 c->lock.lock();
187 c->callback_complete = NULL;
188 c->callback_safe = NULL;
189 c->cond.notify_all();
190 c->put_unlock();
191 }
192 };
193
194 }
195
196 #endif
197