1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectPlayer.h"
5 #include "journal/Utils.h"
6 #include "common/Timer.h"
7 #include <limits>
8
9 #define dout_subsys ceph_subsys_journaler
10 #undef dout_prefix
11 #define dout_prefix *_dout << "ObjectPlayer: " << this << " "
12
13 namespace journal {
14
15 namespace {
16
17 bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
18 uint32_t *pad_len, bool *partial_entry) {
19 const uint32_t MAX_PAD = 8;
20 auto pad_bytes = MAX_PAD - off % MAX_PAD;
21 auto next = *iter;
22
23 ceph_assert(!next.end());
24 if (*next != '\0') {
25 return false;
26 }
27
28 for (auto i = pad_bytes - 1; i > 0; i--) {
29 if ((++next).end()) {
30 *partial_entry = true;
31 return false;
32 }
33 if (*next != '\0') {
34 return false;
35 }
36 }
37
38 *iter = next;
39 *pad_len += pad_bytes;
40 return true;
41 }
42
43 } // anonymous namespace
44
45 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
46 const std::string& object_oid_prefix,
47 uint64_t object_num, SafeTimer &timer,
48 ceph::mutex &timer_lock, uint8_t order,
49 uint64_t max_fetch_bytes)
50 : m_object_num(object_num),
51 m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
52 m_timer(timer), m_timer_lock(timer_lock), m_order(order),
53 m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
54 m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
55 {
56 m_ioctx.dup(ioctx);
57 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
58 }
59
(1) Event exn_spec_violation: |
An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
60 ObjectPlayer::~ObjectPlayer() {
61 {
(2) Event fun_call_w_exception: |
Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details] |
Also see events: |
[exn_spec_violation] |
62 std::lock_guard timer_locker{m_timer_lock};
63 std::lock_guard locker{m_lock};
64 ceph_assert(!m_fetch_in_progress);
65 ceph_assert(m_watch_ctx == nullptr);
66 }
67 }
68
69 void ObjectPlayer::fetch(Context *on_finish) {
70 ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
71
72 std::lock_guard locker{m_lock};
73 ceph_assert(!m_fetch_in_progress);
74 m_fetch_in_progress = true;
75
76 C_Fetch *context = new C_Fetch(this, on_finish);
77 librados::ObjectReadOperation op;
78 op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
79 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
80
81 librados::AioCompletion *rados_completion =
82 librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
83 NULL);
84 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
85 ceph_assert(r == 0);
86 rados_completion->release();
87 }
88
89 void ObjectPlayer::watch(Context *on_fetch, double interval) {
90 ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
91
92 std::lock_guard timer_locker{m_timer_lock};
93 m_watch_interval = interval;
94
95 ceph_assert(m_watch_ctx == nullptr);
96 m_watch_ctx = on_fetch;
97
98 schedule_watch();
99 }
100
101 void ObjectPlayer::unwatch() {
102 ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
103 Context *watch_ctx = nullptr;
104 {
105 std::lock_guard timer_locker{m_timer_lock};
106 ceph_assert(!m_unwatched);
107 m_unwatched = true;
108
109 if (!cancel_watch()) {
110 return;
111 }
112
113 std::swap(watch_ctx, m_watch_ctx);
114 }
115
116 if (watch_ctx != nullptr) {
117 watch_ctx->complete(-ECANCELED);
118 }
119 }
120
121 void ObjectPlayer::front(Entry *entry) const {
122 std::lock_guard locker{m_lock};
123 ceph_assert(!m_entries.empty());
124 *entry = m_entries.front();
125 }
126
127 void ObjectPlayer::pop_front() {
128 std::lock_guard locker{m_lock};
129 ceph_assert(!m_entries.empty());
130
131 auto &entry = m_entries.front();
132 m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
133 m_entries.pop_front();
134 }
135
136 int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
137 bool *refetch) {
138 ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
139 << bl.length() << dendl;
140
141 *refetch = false;
142 if (r == -ENOENT) {
143 return 0;
144 } else if (r < 0) {
145 return r;
146 } else if (bl.length() == 0) {
147 return 0;
148 }
149
150 std::lock_guard locker{m_lock};
151 ceph_assert(m_fetch_in_progress);
152 m_read_off += bl.length();
153 m_read_bl.append(bl);
154 m_refetch_state = REFETCH_STATE_REQUIRED;
155
156 bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
157 bool partial_entry = false;
158 bool invalid = false;
159 uint32_t invalid_start_off = 0;
160
161 clear_invalid_range(m_read_bl_off, m_read_bl.length());
162 bufferlist::const_iterator iter{&m_read_bl, 0};
163 uint32_t pad_len = 0;
164 while (!iter.end()) {
165 uint32_t bytes_needed;
166 uint32_t bl_off = iter.get_off();
167 if (!Entry::is_readable(iter, &bytes_needed)) {
168 if (bytes_needed != 0) {
169 invalid_start_off = m_read_bl_off + bl_off;
170 invalid = true;
171 partial_entry = true;
172 if (full_fetch) {
173 lderr(m_cct) << ": partial record at offset " << invalid_start_off
174 << dendl;
175 } else {
176 ldout(m_cct, 20) << ": partial record detected, will re-fetch"
177 << dendl;
178 }
179 break;
180 }
181
182 if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
183 &pad_len, &partial_entry)) {
184 invalid_start_off = m_read_bl_off + bl_off;
185 invalid = true;
186 if (partial_entry) {
187 if (full_fetch) {
188 lderr(m_cct) << ": partial pad at offset " << invalid_start_off
189 << dendl;
190 } else {
191 ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
192 << dendl;
193 }
194 } else {
195 lderr(m_cct) << ": detected corrupt journal entry at offset "
196 << invalid_start_off << dendl;
197 }
198 break;
199 }
200 ++iter;
201 continue;
202 }
203
204 Entry entry;
205 decode(entry, iter);
206 ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
207
208 uint32_t entry_len = iter.get_off() - bl_off;
209 if (invalid) {
210 // new corrupt region detected
211 uint32_t invalid_end_off = m_read_bl_off + bl_off;
212 lderr(m_cct) << ": corruption range [" << invalid_start_off
213 << ", " << invalid_end_off << ")" << dendl;
214 m_invalid_ranges.insert(invalid_start_off,
215 invalid_end_off - invalid_start_off);
216 invalid = false;
217
218 m_read_bl_off = invalid_end_off;
219 }
220
221 EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
222 entry.get_entry_tid()));
223 if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
224 m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
225 } else {
226 ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
227 *m_entry_keys[entry_key] = entry;
228 }
229
230 // prune decoded / corrupted journal entries from front of bl
231 bufferlist sub_bl;
232 sub_bl.substr_of(m_read_bl, iter.get_off(),
233 m_read_bl.length() - iter.get_off());
234 sub_bl.swap(m_read_bl);
235 iter = bufferlist::iterator(&m_read_bl, 0);
236
237 // advance the decoded entry offset
238 m_read_bl_off += entry_len + pad_len;
239 pad_len = 0;
240 }
241
242 if (invalid) {
243 uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
244 if (!partial_entry) {
245 lderr(m_cct) << ": corruption range [" << invalid_start_off
246 << ", " << invalid_end_off << ")" << dendl;
247 }
248 m_invalid_ranges.insert(invalid_start_off,
249 invalid_end_off - invalid_start_off);
250 }
251
252 if (!m_invalid_ranges.empty() && !partial_entry) {
253 return -EBADMSG;
254 } else if (partial_entry && (full_fetch || m_entries.empty())) {
255 *refetch = true;
256 return -EAGAIN;
257 }
258
259 return 0;
260 }
261
262 void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
263 // possibly remove previously partial record region
264 InvalidRanges decode_range;
265 decode_range.insert(off, len);
266 InvalidRanges intersect_range;
267 intersect_range.intersection_of(m_invalid_ranges, decode_range);
268 if (!intersect_range.empty()) {
269 ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
270 << dendl;
271 m_invalid_ranges.subtract(intersect_range);
272 }
273 }
274
275 void ObjectPlayer::schedule_watch() {
276 ceph_assert(ceph_mutex_is_locked(m_timer_lock));
277 if (m_watch_ctx == NULL) {
278 return;
279 }
280
281 ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
282 ceph_assert(m_watch_task == nullptr);
283 m_watch_task = m_timer.add_event_after(
284 m_watch_interval,
285 new LambdaContext([this](int) {
286 handle_watch_task();
287 }));
288 }
289
290 bool ObjectPlayer::cancel_watch() {
291 ceph_assert(ceph_mutex_is_locked(m_timer_lock));
292 ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
293 if (m_watch_task != nullptr) {
294 bool canceled = m_timer.cancel_event(m_watch_task);
295 ceph_assert(canceled);
296
297 m_watch_task = nullptr;
298 return true;
299 }
300 return false;
301 }
302
303 void ObjectPlayer::handle_watch_task() {
304 ceph_assert(ceph_mutex_is_locked(m_timer_lock));
305
306 ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
307 ceph_assert(m_watch_ctx != nullptr);
308 ceph_assert(m_watch_task != nullptr);
309
310 m_watch_task = nullptr;
311 fetch(new C_WatchFetch(this));
312 }
313
314 void ObjectPlayer::handle_watch_fetched(int r) {
315 ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
316 << dendl;
317
318 Context *watch_ctx = nullptr;
319 {
320 std::lock_guard timer_locker{m_timer_lock};
321 std::swap(watch_ctx, m_watch_ctx);
322
323 if (m_unwatched) {
324 m_unwatched = false;
325 r = -ECANCELED;
326 }
327 }
328
329 if (watch_ctx != nullptr) {
330 watch_ctx->complete(r);
331 }
332 }
333
334 void ObjectPlayer::C_Fetch::finish(int r) {
335 bool refetch = false;
336 r = object_player->handle_fetch_complete(r, read_bl, &refetch);
337
338 {
339 std::lock_guard locker{object_player->m_lock};
340 object_player->m_fetch_in_progress = false;
341 }
342
343 if (refetch) {
344 object_player->fetch(on_finish);
345 return;
346 }
347
348 object_player.reset();
349 on_finish->complete(r);
350 }
351
352 void ObjectPlayer::C_WatchFetch::finish(int r) {
353 object_player->handle_watch_fetched(r);
354 }
355
356 } // namespace journal
357