1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "journal/ObjectRecorder.h"
5    	#include "journal/Future.h"
6    	#include "journal/Utils.h"
7    	#include "include/ceph_assert.h"
8    	#include "common/Timer.h"
9    	#include "common/errno.h"
10   	#include "cls/journal/cls_journal_client.h"
11   	
12   	#define dout_subsys ceph_subsys_journaler
13   	#undef dout_prefix
14   	#define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
15   	                           << __func__ << " (" << m_oid << "): "
16   	
17   	using namespace cls::journal;
18   	using std::shared_ptr;
19   	
20   	namespace journal {
21   	
22   	ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
23   	                               uint64_t object_number, ceph::mutex* lock,
24   	                               ContextWQ *work_queue, Handler *handler,
25   	                               uint8_t order, int32_t max_in_flight_appends)
26   	  : m_oid(oid), m_object_number(object_number),
27   	    m_op_work_queue(work_queue), m_handler(handler),
28   	    m_order(order), m_soft_max_size(1 << m_order),
29   	    m_max_in_flight_appends(max_in_flight_appends),
30   	    m_lock(lock), m_last_flush_time(ceph_clock_now())
31   	{
32   	  m_ioctx.dup(ioctx);
33   	  m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
34   	  ceph_assert(m_handler != NULL);
35   	
36   	  librados::Rados rados(m_ioctx);
37   	  int8_t require_osd_release = 0;
38   	  int r = rados.get_min_compatible_osd(&require_osd_release);
39   	  if (r < 0) {
40   	    ldout(m_cct, 0) << "failed to retrieve min OSD release: "
41   	                    << cpp_strerror(r) << dendl;
42   	  }
43   	  m_compat_mode = require_osd_release < CEPH_RELEASE_OCTOPUS;
44   	
45   	  ldout(m_cct, 20) << dendl;
46   	}
47   	
(1) Event exn_spec_violation: An exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
Also see events: [fun_call_w_exception]
48   	ObjectRecorder::~ObjectRecorder() {
49   	  ldout(m_cct, 20) << dendl;
50   	  ceph_assert(m_pending_buffers.empty());
51   	  ceph_assert(m_in_flight_tids.empty());
(2) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
Also see events: [exn_spec_violation]
52   	  ceph_assert(m_in_flight_appends.empty());
53   	}
54   	
55   	void ObjectRecorder::set_append_batch_options(int flush_interval,
56   	                                              uint64_t flush_bytes,
57   	                                              double flush_age) {
58   	  ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
59   	                  << "flush_bytes=" << flush_bytes << ", "
60   	                  << "flush_age=" << flush_age << dendl;
61   	
62   	  ceph_assert(ceph_mutex_is_locked(*m_lock));
63   	  m_flush_interval = flush_interval;
64   	  m_flush_bytes = flush_bytes;
65   	  m_flush_age = flush_age;
66   	}
67   	
68   	bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
69   	  ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
70   	
71   	  ceph_assert(ceph_mutex_is_locked(*m_lock));
72   	
73   	  ceph::ref_t<FutureImpl> last_flushed_future;
74   	  auto flush_handler = get_flush_handler();
75   	  for (auto& append_buffer : append_buffers) {
76   	    ldout(m_cct, 20) << *append_buffer.first << ", "
77   	                     << "size=" << append_buffer.second.length() << dendl;
78   	    bool flush_requested = append_buffer.first->attach(flush_handler);
79   	    if (flush_requested) {
80   	      last_flushed_future = append_buffer.first;
81   	    }
82   	
83   	    m_pending_buffers.push_back(append_buffer);
84   	    m_pending_bytes += append_buffer.second.length();
85   	  }
86   	
87   	  return send_appends(!!last_flushed_future, last_flushed_future);
88   	}
89   	
90   	void ObjectRecorder::flush(Context *on_safe) {
91   	  ldout(m_cct, 20) << dendl;
92   	
93   	  Future future;
94   	  {
95   	    std::unique_lock locker{*m_lock};
96   	
97   	    // if currently handling flush notifications, wait so that
98   	    // we notify in the correct order (since lock is dropped on
99   	    // callback)
100  	    if (m_in_flight_flushes) {
101  	      m_in_flight_flushes_cond.wait(locker);
102  	    }
103  	
104  	    // attach the flush to the most recent append
105  	    if (!m_pending_buffers.empty()) {
106  	      future = Future(m_pending_buffers.rbegin()->first);
107  	    } else if (!m_in_flight_appends.empty()) {
108  	      AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
109  	      ceph_assert(!append_buffers.empty());
110  	      future = Future(append_buffers.rbegin()->first);
111  	    }
112  	  }
113  	
114  	  if (future.is_valid()) {
115  	    // cannot be invoked while the same lock context
116  	    m_op_work_queue->queue(new LambdaContext(
117  	      [future, on_safe] (int r) mutable {
118  	        future.flush(on_safe);
119  	      }));
120  	  } else {
121  	    on_safe->complete(0);
122  	  }
123  	}
124  	
125  	void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) {
126  	  ldout(m_cct, 20) << "flushing " << *future << dendl;
127  	
128  	  m_lock->lock();
129  	  {
130  	    auto flush_handler = future->get_flush_handler();
131  	    auto my_handler = get_flush_handler();
132  	    if (flush_handler != my_handler) {
133  	      // if we don't own this future, re-issue the flush so that it hits the
134  	      // correct journal object owner
135  	      future->flush();
136  	      m_lock->unlock();
137  	      return;
138  	    } else if (future->is_flush_in_progress()) {
139  	      m_lock->unlock();
140  	      return;
141  	    }
142  	  }
143  	
144  	  bool overflowed = send_appends(true, future);
145  	  if (overflowed) {
146  	    notify_handler_unlock();
147  	  } else {
148  	    m_lock->unlock();
149  	  }
150  	}
151  	
152  	void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
153  	  ldout(m_cct, 20) << dendl;
154  	
155  	  ceph_assert(ceph_mutex_is_locked(*m_lock));
156  	  ceph_assert(m_in_flight_tids.empty());
157  	  ceph_assert(m_in_flight_appends.empty());
158  	  ceph_assert(m_object_closed || m_overflowed);
159  	
160  	  for (auto& append_buffer : m_pending_buffers) {
161  	    ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
162  	    append_buffer.first->detach();
163  	  }
164  	  append_buffers->splice(append_buffers->end(), m_pending_buffers,
165  	                         m_pending_buffers.begin(), m_pending_buffers.end());
166  	}
167  	
168  	bool ObjectRecorder::close() {
169  	  ceph_assert(ceph_mutex_is_locked(*m_lock));
170  	
171  	  ldout(m_cct, 20) << dendl;
172  	  send_appends(true, {});
173  	
174  	  ceph_assert(!m_object_closed);
175  	  m_object_closed = true;
176  	  return (m_in_flight_tids.empty() && !m_in_flight_flushes);
177  	}
178  	
179  	void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
180  	  ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
181  	
182  	  AppendBuffers append_buffers;
183  	  {
184  	    m_lock->lock();
185  	    auto tid_iter = m_in_flight_tids.find(tid);
186  	    ceph_assert(tid_iter != m_in_flight_tids.end());
187  	    m_in_flight_tids.erase(tid_iter);
188  	
189  	    InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
190  	    ceph_assert(iter != m_in_flight_appends.end());
191  	
192  	    if (r == -EOVERFLOW) {
193  	      ldout(m_cct, 10) << "append overflowed" << dendl;
194  	      m_overflowed = true;
195  	
196  	      // notify of overflow once all in-flight ops are complete
197  	      if (m_in_flight_tids.empty()) {
198  	        append_overflowed();
199  	        notify_handler_unlock();
200  	      } else {
201  	        m_lock->unlock();
202  	      }
203  	      return;
204  	    }
205  	
206  	    append_buffers.swap(iter->second);
207  	    ceph_assert(!append_buffers.empty());
208  	
209  	    for (auto& append_buffer : append_buffers) {
210  	      m_object_bytes += append_buffer.second.length();
211  	    }
212  	    ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
213  	
214  	    m_in_flight_appends.erase(iter);
215  	    m_in_flight_flushes = true;
216  	    m_lock->unlock();
217  	  }
218  	
219  	  // Flag the associated futures as complete.
220  	  for (auto& append_buffer : append_buffers) {
221  	    ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
222  	    append_buffer.first->safe(r);
223  	  }
224  	
225  	  // wake up any flush requests that raced with a RADOS callback
226  	  m_lock->lock();
227  	  m_in_flight_flushes = false;
228  	  m_in_flight_flushes_cond.notify_all();
229  	
230  	  if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
231  	    // all remaining unsent appends should be redirected to new object
232  	    notify_handler_unlock();
233  	  } else {
234  	    bool overflowed = send_appends(false, {});
235  	    if (overflowed) {
236  	      notify_handler_unlock();
237  	    } else {
238  	      m_lock->unlock();
239  	    }
240  	  }
241  	}
242  	
243  	void ObjectRecorder::append_overflowed() {
244  	  ldout(m_cct, 10) << dendl;
245  	
246  	  ceph_assert(ceph_mutex_is_locked(*m_lock));
247  	  ceph_assert(!m_in_flight_appends.empty());
248  	
249  	  InFlightAppends in_flight_appends;
250  	  in_flight_appends.swap(m_in_flight_appends);
251  	
252  	  AppendBuffers restart_append_buffers;
253  	  for (InFlightAppends::iterator it = in_flight_appends.begin();
254  	       it != in_flight_appends.end(); ++it) {
255  	    restart_append_buffers.insert(restart_append_buffers.end(),
256  	                                  it->second.begin(), it->second.end());
257  	  }
258  	
259  	  restart_append_buffers.splice(restart_append_buffers.end(),
260  	                                m_pending_buffers,
261  	                                m_pending_buffers.begin(),
262  	                                m_pending_buffers.end());
263  	  restart_append_buffers.swap(m_pending_buffers);
264  	}
265  	
266  	bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_future) {
267  	  ldout(m_cct, 20) << dendl;
268  	
269  	  ceph_assert(ceph_mutex_is_locked(*m_lock));
270  	  if (m_object_closed || m_overflowed) {
271  	    ldout(m_cct, 20) << "already closed or overflowed" << dendl;
272  	    return false;
273  	  }
274  	
275  	  if (m_pending_buffers.empty()) {
276  	    ldout(m_cct, 20) << "append buffers empty" << dendl;
277  	    return false;
278  	  }
279  	
280  	  if (!force &&
281  	      ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
282  	       (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
283  	       (m_flush_age > 0 &&
284  	        m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
285  	    ldout(m_cct, 20) << "forcing batch flush" << dendl;
286  	    force = true;
287  	  }
288  	
289  	  auto max_in_flight_appends = m_max_in_flight_appends;
290  	  if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
291  	    if (!force && max_in_flight_appends == 0) {
292  	      ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
293  	      max_in_flight_appends = 1;
294  	    }
295  	  } else if (max_in_flight_appends < 0) {
296  	    max_in_flight_appends = 0;
297  	  }
298  	
299  	  if (!force && max_in_flight_appends != 0 &&
300  	      static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
301  	    ldout(m_cct, 10) << "max in flight appends reached" << dendl;
302  	    return false;
303  	  }
304  	
305  	  librados::ObjectWriteOperation op;
306  	  if (m_compat_mode) {
307  	    client::guard_append(&op, m_soft_max_size);
308  	  }
309  	
310  	  size_t append_bytes = 0;
311  	  AppendBuffers append_buffers;
312  	  bufferlist append_bl;
313  	  for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
314  	    auto& future = it->first;
315  	    auto& bl = it->second;
316  	    auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
317  	    if (size == m_soft_max_size) {
318  	      ldout(m_cct, 10) << "object at capacity " << *future << dendl;
319  	      m_overflowed = true;
320  	    } else if (size > m_soft_max_size) {
321  	      ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
322  	      m_overflowed = true;
323  	      break;
324  	    }
325  	
326  	    bool flush_break = (force && flush_future && flush_future == future);
327  	    ldout(m_cct, 20) << "flushing " << *future << dendl;
328  	    future->set_flush_in_progress();
329  	
330  	    if (m_compat_mode) {
331  	      op.append(bl);
332  	      op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
333  	    } else {
334  	      append_bl.append(bl);
335  	    }
336  	
337  	    append_bytes += bl.length();
338  	    append_buffers.push_back(*it);
339  	    it = m_pending_buffers.erase(it);
340  	
341  	    if (flush_break) {
342  	      ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
343  	      break;
344  	    }
345  	  }
346  	
347  	  if (append_bytes > 0) {
348  	    m_last_flush_time = ceph_clock_now();
349  	
350  	    uint64_t append_tid = m_append_tid++;
351  	    m_in_flight_tids.insert(append_tid);
352  	    m_in_flight_appends[append_tid].swap(append_buffers);
353  	    m_in_flight_bytes += append_bytes;
354  	
355  	    ceph_assert(m_pending_bytes >= append_bytes);
356  	    m_pending_bytes -= append_bytes;
357  	
358  	    if (!m_compat_mode) {
359  	      client::append(&op, m_soft_max_size, append_bl);
360  	    }
361  	
362  	    auto rados_completion = librados::Rados::aio_create_completion(
363  	      new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
364  	    int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
365  	    ceph_assert(r == 0);
366  	    rados_completion->release();
367  	    ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
368  	                     << "append_bytes=" << append_bytes << ", "
369  	                     << "in_flight_bytes=" << m_in_flight_bytes << ", "
370  	                     << "pending_bytes=" << m_pending_bytes << dendl;
371  	  }
372  	
373  	  return m_overflowed;
374  	}
375  	
376  	void ObjectRecorder::notify_handler_unlock() {
377  	  ceph_assert(ceph_mutex_is_locked(*m_lock));
378  	  if (m_object_closed) {
379  	    m_lock->unlock();
380  	    m_handler->closed(this);
381  	  } else {
382  	    // TODO need to delay completion until after aio_notify completes
383  	    m_lock->unlock();
384  	    m_handler->overflow(this);
385  	  }
386  	}
387  	
388  	} // namespace journal
389