1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "journal/ObjectPlayer.h"
5    	#include "journal/Utils.h"
6    	#include "common/Timer.h"
7    	#include <limits>
8    	
9    	#define dout_subsys ceph_subsys_journaler
10   	#undef dout_prefix
11   	#define dout_prefix *_dout << "ObjectPlayer: " << this << " "
12   	
13   	namespace journal {
14   	
15   	namespace {
16   	
17   	bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
18   	                              uint32_t *pad_len, bool *partial_entry) {
19   	  const uint32_t MAX_PAD = 8;
20   	  auto pad_bytes = MAX_PAD - off % MAX_PAD;
21   	  auto next = *iter;
22   	
23   	  ceph_assert(!next.end());
24   	  if (*next != '\0') {
25   	    return false;
26   	  }
27   	
28   	  for (auto i = pad_bytes - 1; i > 0; i--) {
29   	    if ((++next).end()) {
30   	      *partial_entry = true;
31   	      return false;
32   	    }
33   	    if (*next != '\0') {
34   	      return false;
35   	    }
36   	  }
37   	
38   	  *iter = next;
39   	  *pad_len += pad_bytes;
40   	  return true;
41   	}
42   	
43   	} // anonymous namespace
44   	
45   	ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
46   	                           const std::string& object_oid_prefix,
47   	                           uint64_t object_num, SafeTimer &timer,
48   	                           ceph::mutex &timer_lock, uint8_t order,
49   	                           uint64_t max_fetch_bytes)
50   	  : m_object_num(object_num),
51   	    m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
52   	    m_timer(timer), m_timer_lock(timer_lock), m_order(order),
53   	    m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
54   	    m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
55   	{
56   	  m_ioctx.dup(ioctx);
57   	  m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
58   	}
59   	
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
60   	ObjectPlayer::~ObjectPlayer() {
61   	  {
62   	    std::lock_guard timer_locker{m_timer_lock};
63   	    std::lock_guard locker{m_lock};
64   	    ceph_assert(!m_fetch_in_progress);
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
65   	    ceph_assert(m_watch_ctx == nullptr);
66   	  }
67   	}
68   	
69   	void ObjectPlayer::fetch(Context *on_finish) {
70   	  ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
71   	
72   	  std::lock_guard locker{m_lock};
73   	  ceph_assert(!m_fetch_in_progress);
74   	  m_fetch_in_progress = true;
75   	
76   	  C_Fetch *context = new C_Fetch(this, on_finish);
77   	  librados::ObjectReadOperation op;
78   	  op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
79   	  op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
80   	
81   	  librados::AioCompletion *rados_completion =
82   	    librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
83   	                                           NULL);
84   	  int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
85   	  ceph_assert(r == 0);
86   	  rados_completion->release();
87   	}
88   	
89   	void ObjectPlayer::watch(Context *on_fetch, double interval) {
90   	  ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
91   	
92   	  std::lock_guard timer_locker{m_timer_lock};
93   	  m_watch_interval = interval;
94   	
95   	  ceph_assert(m_watch_ctx == nullptr);
96   	  m_watch_ctx = on_fetch;
97   	
98   	  schedule_watch();
99   	}
100  	
101  	void ObjectPlayer::unwatch() {
102  	  ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
103  	  Context *watch_ctx = nullptr;
104  	  {
105  	    std::lock_guard timer_locker{m_timer_lock};
106  	    ceph_assert(!m_unwatched);
107  	    m_unwatched = true;
108  	
109  	    if (!cancel_watch()) {
110  	      return;
111  	    }
112  	
113  	    std::swap(watch_ctx, m_watch_ctx);
114  	  }
115  	
116  	  if (watch_ctx != nullptr) {
117  	    watch_ctx->complete(-ECANCELED);
118  	  }
119  	}
120  	
121  	void ObjectPlayer::front(Entry *entry) const {
122  	  std::lock_guard locker{m_lock};
123  	  ceph_assert(!m_entries.empty());
124  	  *entry = m_entries.front();
125  	}
126  	
127  	void ObjectPlayer::pop_front() {
128  	  std::lock_guard locker{m_lock};
129  	  ceph_assert(!m_entries.empty());
130  	
131  	  auto &entry = m_entries.front();
132  	  m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
133  	  m_entries.pop_front();
134  	}
135  	
136  	int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
137  	                                        bool *refetch) {
138  	  ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
139  	                   << bl.length() << dendl;
140  	
141  	  *refetch = false;
142  	  if (r == -ENOENT) {
143  	    return 0;
144  	  } else if (r < 0) {
145  	    return r;
146  	  } else if (bl.length() == 0) {
147  	    return 0;
148  	  }
149  	
150  	  std::lock_guard locker{m_lock};
151  	  ceph_assert(m_fetch_in_progress);
152  	  m_read_off += bl.length();
153  	  m_read_bl.append(bl);
154  	  m_refetch_state = REFETCH_STATE_REQUIRED;
155  	
156  	  bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
157  	  bool partial_entry = false;
158  	  bool invalid = false;
159  	  uint32_t invalid_start_off = 0;
160  	
161  	  clear_invalid_range(m_read_bl_off, m_read_bl.length());
162  	  bufferlist::const_iterator iter{&m_read_bl, 0};
163  	  uint32_t pad_len = 0;
164  	  while (!iter.end()) {
165  	    uint32_t bytes_needed;
166  	    uint32_t bl_off = iter.get_off();
167  	    if (!Entry::is_readable(iter, &bytes_needed)) {
168  	      if (bytes_needed != 0) {
169  	        invalid_start_off = m_read_bl_off + bl_off;
170  	        invalid = true;
171  	        partial_entry = true;
172  	        if (full_fetch) {
173  	          lderr(m_cct) << ": partial record at offset " << invalid_start_off
174  	                       << dendl;
175  	        } else {
176  	          ldout(m_cct, 20) << ": partial record detected, will re-fetch"
177  	                           << dendl;
178  	        }
179  	        break;
180  	      }
181  	
182  	      if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
183  	                                    &pad_len, &partial_entry)) {
184  	        invalid_start_off = m_read_bl_off + bl_off;
185  	        invalid = true;
186  	        if (partial_entry) {
187  	          if (full_fetch) {
188  	            lderr(m_cct) << ": partial pad at offset " << invalid_start_off
189  	                         << dendl;
190  	          } else {
191  	            ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
192  	                             << dendl;
193  	          }
194  	        } else {
195  	          lderr(m_cct) << ": detected corrupt journal entry at offset "
196  	                       << invalid_start_off << dendl;
197  	        }
198  	        break;
199  	      }
200  	      ++iter;
201  	      continue;
202  	    }
203  	
204  	    Entry entry;
205  	    decode(entry, iter);
206  	    ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
207  	
208  	    uint32_t entry_len = iter.get_off() - bl_off;
209  	    if (invalid) {
210  	      // new corrupt region detected
211  	      uint32_t invalid_end_off = m_read_bl_off + bl_off;
212  	      lderr(m_cct) << ": corruption range [" << invalid_start_off
213  	                   << ", " << invalid_end_off << ")" << dendl;
214  	      m_invalid_ranges.insert(invalid_start_off,
215  	                              invalid_end_off - invalid_start_off);
216  	      invalid = false;
217  	
218  	      m_read_bl_off = invalid_end_off;
219  	    }
220  	
221  	    EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
222  	                                      entry.get_entry_tid()));
223  	    if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
224  	      m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
225  	    } else {
226  	      ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
227  	      *m_entry_keys[entry_key] = entry;
228  	    }
229  	
230  	    // prune decoded / corrupted journal entries from front of bl
231  	    bufferlist sub_bl;
232  	    sub_bl.substr_of(m_read_bl, iter.get_off(),
233  	                     m_read_bl.length() - iter.get_off());
234  	    sub_bl.swap(m_read_bl);
235  	    iter = bufferlist::iterator(&m_read_bl, 0);
236  	
237  	    // advance the decoded entry offset
238  	    m_read_bl_off += entry_len + pad_len;
239  	    pad_len = 0;
240  	  }
241  	
242  	  if (invalid) {
243  	    uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
244  	    if (!partial_entry) {
245  	      lderr(m_cct) << ": corruption range [" << invalid_start_off
246  	                   << ", " << invalid_end_off << ")" << dendl;
247  	    }
248  	    m_invalid_ranges.insert(invalid_start_off,
249  	                            invalid_end_off - invalid_start_off);
250  	  }
251  	
252  	  if (!m_invalid_ranges.empty() && !partial_entry) {
253  	    return -EBADMSG;
254  	  } else if (partial_entry && (full_fetch || m_entries.empty())) {
255  	    *refetch = true;
256  	    return -EAGAIN;
257  	  }
258  	
259  	  return 0;
260  	}
261  	
262  	void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
263  	  // possibly remove previously partial record region
264  	  InvalidRanges decode_range;
265  	  decode_range.insert(off, len);
266  	  InvalidRanges intersect_range;
267  	  intersect_range.intersection_of(m_invalid_ranges, decode_range);
268  	  if (!intersect_range.empty()) {
269  	    ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
270  	                     << dendl;
271  	    m_invalid_ranges.subtract(intersect_range);
272  	  }
273  	}
274  	
275  	void ObjectPlayer::schedule_watch() {
276  	  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
277  	  if (m_watch_ctx == NULL) {
278  	    return;
279  	  }
280  	
281  	  ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
282  	  ceph_assert(m_watch_task == nullptr);
283  	  m_watch_task = m_timer.add_event_after(
284  	    m_watch_interval,
285  	    new LambdaContext([this](int) {
286  		handle_watch_task();
287  	      }));
288  	}
289  	
290  	bool ObjectPlayer::cancel_watch() {
291  	  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
292  	  ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
293  	  if (m_watch_task != nullptr) {
294  	    bool canceled = m_timer.cancel_event(m_watch_task);
295  	    ceph_assert(canceled);
296  	
297  	    m_watch_task = nullptr;
298  	    return true;
299  	  }
300  	  return false;
301  	}
302  	
303  	void ObjectPlayer::handle_watch_task() {
304  	  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
305  	
306  	  ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
307  	  ceph_assert(m_watch_ctx != nullptr);
308  	  ceph_assert(m_watch_task != nullptr);
309  	
310  	  m_watch_task = nullptr;
311  	  fetch(new C_WatchFetch(this));
312  	}
313  	
314  	void ObjectPlayer::handle_watch_fetched(int r) {
315  	  ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
316  	                   << dendl;
317  	
318  	  Context *watch_ctx = nullptr;
319  	  {
320  	    std::lock_guard timer_locker{m_timer_lock};
321  	    std::swap(watch_ctx, m_watch_ctx);
322  	
323  	    if (m_unwatched) {
324  	      m_unwatched = false;
325  	      r = -ECANCELED;
326  	    }
327  	  }
328  	
329  	  if (watch_ctx != nullptr) {
330  	    watch_ctx->complete(r);
331  	  }
332  	}
333  	
334  	void ObjectPlayer::C_Fetch::finish(int r) {
335  	  bool refetch = false;
336  	  r = object_player->handle_fetch_complete(r, read_bl, &refetch);
337  	
338  	  {
339  	    std::lock_guard locker{object_player->m_lock};
340  	    object_player->m_fetch_in_progress = false;
341  	  }
342  	
343  	  if (refetch) {
344  	    object_player->fetch(on_finish);
345  	    return;
346  	  }
347  	
348  	  object_player.reset();
349  	  on_finish->complete(r);
350  	}
351  	
352  	void ObjectPlayer::C_WatchFetch::finish(int r) {
353  	  object_player->handle_watch_fetched(r);
354  	}
355  	
356  	} // namespace journal
357