1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "test/librados_test_stub/TestWatchNotify.h"
5    	#include "include/Context.h"
6    	#include "common/Cond.h"
7    	#include "include/stringify.h"
8    	#include "common/Finisher.h"
9    	#include "test/librados_test_stub/TestCluster.h"
10   	#include "test/librados_test_stub/TestRadosClient.h"
11   	#include <boost/bind.hpp>
12   	#include <boost/function.hpp>
13   	#include "include/ceph_assert.h"
14   	
15   	#define dout_subsys ceph_subsys_rados
16   	#undef dout_prefix
17   	#define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
18   	
19   	namespace librados {
20   	
21   	std::ostream& operator<<(std::ostream& out,
22   				 const TestWatchNotify::WatcherID &watcher_id) {
23   	  out << "(" << watcher_id.first << "," << watcher_id.second << ")";
24   	  return out;
25   	}
26   	
27   	struct TestWatchNotify::ObjectHandler : public TestCluster::ObjectHandler {
28   	  TestWatchNotify* test_watch_notify;
29   	  int64_t pool_id;
30   	  std::string nspace;
31   	  std::string oid;
32   	
33   	  ObjectHandler(TestWatchNotify* test_watch_notify, int64_t pool_id,
34   	                const std::string& nspace, const std::string& oid)
35   	    : test_watch_notify(test_watch_notify), pool_id(pool_id),
36   	      nspace(nspace), oid(oid) {
37   	  }
38   	
39   	  void handle_removed(TestRadosClient* test_rados_client) override {
40   	    // copy member variables since this object might be deleted
41   	    auto _test_watch_notify = test_watch_notify;
42   	    auto _pool_id = pool_id;
43   	    auto _nspace = nspace;
44   	    auto _oid = oid;
45   	    auto ctx = new LambdaContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) {
46   	        _test_watch_notify->handle_object_removed(_pool_id, _nspace, _oid);
47   	      });
48   	    test_rados_client->get_aio_finisher()->queue(ctx);
49   	  }
50   	};
51   	
52   	TestWatchNotify::TestWatchNotify(TestCluster* test_cluster)
53   	  : m_test_cluster(test_cluster) {
54   	}
55   	
56   	void TestWatchNotify::flush(TestRadosClient *rados_client) {
57   	  CephContext *cct = rados_client->cct();
58   	
59   	  ldout(cct, 20) << "enter" << dendl;
60   	  // block until we know no additional async notify callbacks will occur
61   	  C_SaferCond ctx;
62   	  m_async_op_tracker.wait_for_ops(&ctx);
63   	  ctx.wait();
64   	}
65   	
66   	int TestWatchNotify::list_watchers(int64_t pool_id, const std::string& nspace,
67   	                                   const std::string& o,
68   	                                   std::list<obj_watch_t> *out_watchers) {
69   	  std::lock_guard lock{m_lock};
70   	  SharedWatcher watcher = get_watcher(pool_id, nspace, o);
71   	  if (!watcher) {
72   	    return -ENOENT;
73   	  }
74   	
75   	  out_watchers->clear();
76   	  for (TestWatchNotify::WatchHandles::iterator it =
77   	         watcher->watch_handles.begin();
78   	       it != watcher->watch_handles.end(); ++it) {
79   	    obj_watch_t obj;
80   	    strncpy(obj.addr, it->second.addr.c_str(), sizeof(obj.addr) - 1);
81   	    obj.addr[sizeof(obj.addr) - 1] = '\0';
82   	    obj.watcher_id = static_cast<int64_t>(it->second.gid);
83   	    obj.cookie = it->second.handle;
84   	    obj.timeout_seconds = 30;
85   	    out_watchers->push_back(obj);
86   	  }
87   	  return 0;
88   	}
89   	
90   	void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
91   	                                Context *on_finish) {
92   	  rados_client->get_aio_finisher()->queue(on_finish);
93   	}
94   	
95   	int TestWatchNotify::watch(TestRadosClient *rados_client, int64_t pool_id,
96   	                           const std::string& nspace, const std::string& o,
97   	                           uint64_t gid, uint64_t *handle,
98   	                           librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
99   	  C_SaferCond cond;
100  	  aio_watch(rados_client, pool_id, nspace, o, gid, handle, ctx, ctx2, &cond);
101  	  return cond.wait();
102  	}
103  	
104  	void TestWatchNotify::aio_watch(TestRadosClient *rados_client, int64_t pool_id,
105  	                                const std::string& nspace, const std::string& o,
106  	                                uint64_t gid, uint64_t *handle,
107  	                                librados::WatchCtx *watch_ctx,
108  	                                librados::WatchCtx2 *watch_ctx2,
109  	                                Context *on_finish) {
110  	  auto ctx = new LambdaContext([=](int) {
111  	      execute_watch(rados_client, pool_id, nspace, o, gid, handle, watch_ctx,
112  	                    watch_ctx2, on_finish);
113  	    });
114  	  rados_client->get_aio_finisher()->queue(ctx);
115  	}
116  	
117  	int TestWatchNotify::unwatch(TestRadosClient *rados_client,
118  	                             uint64_t handle) {
119  	  C_SaferCond ctx;
120  	  aio_unwatch(rados_client, handle, &ctx);
121  	  return ctx.wait();
122  	}
123  	
124  	void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
125  	                                  uint64_t handle, Context *on_finish) {
126  	  auto ctx = new LambdaContext([this, rados_client, handle, on_finish](int) {
127  	      execute_unwatch(rados_client, handle, on_finish);
128  	    });
129  	  rados_client->get_aio_finisher()->queue(ctx);
130  	}
131  	
132  	void TestWatchNotify::aio_notify(TestRadosClient *rados_client, int64_t pool_id,
133  	                                 const std::string& nspace,
134  	                                 const std::string& oid, const bufferlist& bl,
135  	                                 uint64_t timeout_ms, bufferlist *pbl,
136  	                                 Context *on_notify) {
(1) Event exn_spec_violation: An exception of type "ceph::buffer::v14_2_0::end_of_buffer" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate().
(2) Event fun_call_w_exception: Called function throws an exception of type "ceph::buffer::v14_2_0::end_of_buffer". [details]
137  	  auto ctx = new LambdaContext([=](int) {
138  	      execute_notify(rados_client, pool_id, nspace, oid, bl, pbl, on_notify);
139  	    });
140  	  rados_client->get_aio_finisher()->queue(ctx);
141  	}
142  	
143  	int TestWatchNotify::notify(TestRadosClient *rados_client, int64_t pool_id,
144  	                            const std::string& nspace, const std::string& oid,
145  	                            bufferlist& bl, uint64_t timeout_ms,
146  	                            bufferlist *pbl) {
147  	  C_SaferCond cond;
148  	  aio_notify(rados_client, pool_id, nspace, oid, bl, timeout_ms, pbl, &cond);
149  	  return cond.wait();
150  	}
151  	
152  	void TestWatchNotify::notify_ack(TestRadosClient *rados_client, int64_t pool_id,
153  	                                 const std::string& nspace,
154  	                                 const std::string& o, uint64_t notify_id,
155  	                                 uint64_t handle, uint64_t gid,
156  	                                 bufferlist& bl) {
157  	  CephContext *cct = rados_client->cct();
158  	  ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
159  			 << ", gid=" << gid << dendl;
160  	  std::lock_guard lock{m_lock};
161  	  WatcherID watcher_id = std::make_pair(gid, handle);
162  	  ack_notify(rados_client, pool_id, nspace, o, notify_id, watcher_id, bl);
163  	  finish_notify(rados_client, pool_id, nspace, o, notify_id);
164  	}
165  	
166  	void TestWatchNotify::execute_watch(TestRadosClient *rados_client,
167  	                                    int64_t pool_id, const std::string& nspace,
168  	                                    const std::string& o, uint64_t gid,
169  	                                    uint64_t *handle, librados::WatchCtx *ctx,
170  	                                    librados::WatchCtx2 *ctx2,
171  	                                    Context* on_finish) {
172  	  CephContext *cct = rados_client->cct();
173  	
174  	  m_lock.lock();
175  	  SharedWatcher watcher = get_watcher(pool_id, nspace, o);
176  	  if (!watcher) {
177  	    m_lock.unlock();
178  	    on_finish->complete(-ENOENT);
179  	    return;
180  	  }
181  	
182  	  WatchHandle watch_handle;
183  	  watch_handle.rados_client = rados_client;
184  	  watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
185  	  watch_handle.nonce = rados_client->get_nonce();
186  	  watch_handle.gid = gid;
187  	  watch_handle.handle = ++m_handle;
188  	  watch_handle.watch_ctx = ctx;
189  	  watch_handle.watch_ctx2 = ctx2;
190  	  watcher->watch_handles[watch_handle.handle] = watch_handle;
191  	
192  	  *handle = watch_handle.handle;
193  	
194  	  ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
195  		         << dendl;
196  	  m_lock.unlock();
197  	
198  	  on_finish->complete(0);
199  	}
200  	
201  	void TestWatchNotify::execute_unwatch(TestRadosClient *rados_client,
202  	                                      uint64_t handle, Context* on_finish) {
203  	  CephContext *cct = rados_client->cct();
204  	
205  	  ldout(cct, 20) << "handle=" << handle << dendl;
206  	  {
207  	    std::lock_guard locker{m_lock};
208  	    for (FileWatchers::iterator it = m_file_watchers.begin();
209  	         it != m_file_watchers.end(); ++it) {
210  	      SharedWatcher watcher = it->second;
211  	
212  	      WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
213  	      if (w_it != watcher->watch_handles.end()) {
214  	        watcher->watch_handles.erase(w_it);
215  	        maybe_remove_watcher(watcher);
216  	        break;
217  	      }
218  	    }
219  	  }
220  	  on_finish->complete(0);
221  	}
222  	
223  	TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
224  	    int64_t pool_id, const std::string& nspace, const std::string& oid) {
225  	  ceph_assert(ceph_mutex_is_locked(m_lock));
226  	
227  	  auto it = m_file_watchers.find({pool_id, nspace, oid});
228  	  if (it == m_file_watchers.end()) {
229  	    SharedWatcher watcher(new Watcher(pool_id, nspace, oid));
230  	    watcher->object_handler.reset(new ObjectHandler(
231  	      this, pool_id, nspace, oid));
232  	    int r = m_test_cluster->register_object_handler(
233  	      pool_id, {nspace, oid}, watcher->object_handler.get());
234  	    if (r < 0) {
235  	      // object doesn't exist
236  	      return SharedWatcher();
237  	    }
238  	    m_file_watchers[{pool_id, nspace, oid}] = watcher;
239  	    return watcher;
240  	  }
241  	
242  	  return it->second;
243  	}
244  	
245  	void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher) {
246  	  ceph_assert(ceph_mutex_is_locked(m_lock));
247  	
248  	  // TODO
249  	  if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
250  	    auto pool_id = watcher->pool_id;
251  	    auto& nspace = watcher->nspace;
252  	    auto& oid = watcher->oid;
253  	    if (watcher->object_handler) {
254  	      m_test_cluster->unregister_object_handler(pool_id, {nspace, oid},
255  	                                                watcher->object_handler.get());
256  	      watcher->object_handler.reset();
257  	    }
258  	
259  	    m_file_watchers.erase({pool_id, nspace, oid});
260  	  }
261  	}
262  	
263  	void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
264  	                                     int64_t pool_id, const std::string& nspace,
265  	                                     const std::string &oid,
266  	                                     const bufferlist &bl, bufferlist *pbl,
267  	                                     Context *on_notify) {
268  	  CephContext *cct = rados_client->cct();
269  	
270  	  m_lock.lock();
271  	  uint64_t notify_id = ++m_notify_id;
272  	
273  	  SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
274  	  if (!watcher) {
275  	    ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
276  	    m_lock.unlock();
277  	    on_notify->complete(-ENOENT);
278  	    return;
279  	  }
280  	
281  	  ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
282  	
283  	  SharedNotifyHandle notify_handle(new NotifyHandle());
284  	  notify_handle->rados_client = rados_client;
285  	  notify_handle->pbl = pbl;
286  	  notify_handle->on_notify = on_notify;
287  	
288  	  WatchHandles &watch_handles = watcher->watch_handles;
289  	  for (auto &watch_handle_pair : watch_handles) {
290  	    WatchHandle &watch_handle = watch_handle_pair.second;
291  	    notify_handle->pending_watcher_ids.insert(std::make_pair(
292  	      watch_handle.gid, watch_handle.handle));
293  	
294  	    m_async_op_tracker.start_op();
295  	    uint64_t notifier_id = rados_client->get_instance_id();
296  	    watch_handle.rados_client->get_aio_finisher()->queue(new LambdaContext(
297  	      [this, pool_id, nspace, oid, bl, notify_id, watch_handle, notifier_id](int r) {
298  	        bufferlist notify_bl;
299  	        notify_bl.append(bl);
300  	
301  	        if (watch_handle.watch_ctx2 != NULL) {
302  	          watch_handle.watch_ctx2->handle_notify(notify_id,
303  	                                                 watch_handle.handle,
304  	                                                 notifier_id, notify_bl);
305  	        } else if (watch_handle.watch_ctx != NULL) {
306  	          watch_handle.watch_ctx->notify(0, 0, notify_bl);
307  	
308  	          // auto ack old-style watch/notify clients
309  	          ack_notify(watch_handle.rados_client, pool_id, nspace, oid, notify_id,
310  	                     {watch_handle.gid, watch_handle.handle}, bufferlist());
311  	        }
312  	
313  	        m_async_op_tracker.finish_op();
314  	      }));
315  	  }
316  	  watcher->notify_handles[notify_id] = notify_handle;
317  	
318  	  finish_notify(rados_client, pool_id, nspace, oid, notify_id);
319  	  m_lock.unlock();
320  	}
321  	
322  	void TestWatchNotify::ack_notify(TestRadosClient *rados_client, int64_t pool_id,
323  	                                 const std::string& nspace,
324  	                                 const std::string &oid, uint64_t notify_id,
325  	                                 const WatcherID &watcher_id,
326  	                                 const bufferlist &bl) {
327  	  CephContext *cct = rados_client->cct();
328  	
329  	  ceph_assert(ceph_mutex_is_locked(m_lock));
330  	  SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
331  	  if (!watcher) {
332  	    ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
333  	    return;
334  	  }
335  	
336  	  NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
337  	  if (it == watcher->notify_handles.end()) {
338  	    ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
339  			  << ", WatcherID=" << watcher_id << ": not found" << dendl;
340  	    return;
341  	  }
342  	
343  	  ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
344  			 << ", WatcherID=" << watcher_id << dendl;
345  	
346  	  bufferlist response;
347  	  response.append(bl);
348  	
349  	  SharedNotifyHandle notify_handle = it->second;
350  	  notify_handle->notify_responses[watcher_id] = response;
351  	  notify_handle->pending_watcher_ids.erase(watcher_id);
352  	}
353  	
354  	void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
355  	                                    int64_t pool_id, const std::string& nspace,
356  	                                    const std::string &oid,
357  	                                    uint64_t notify_id) {
358  	  CephContext *cct = rados_client->cct();
359  	
360  	  ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
361  	
362  	  ceph_assert(ceph_mutex_is_locked(m_lock));
363  	  SharedWatcher watcher = get_watcher(pool_id, nspace, oid);
364  	  if (!watcher) {
365  	    ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
366  	    return;
367  	  }
368  	
369  	  NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
370  	  if (it == watcher->notify_handles.end()) {
371  	    ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
372  		          << ": not found" << dendl;
373  	    return;
374  	  }
375  	
376  	  SharedNotifyHandle notify_handle = it->second;
377  	  if (!notify_handle->pending_watcher_ids.empty()) {
378  	    ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
379  		           << ": pending watchers, returning" << dendl;
380  	    return;
381  	  }
382  	
383  	  ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
384  			 << ": completing" << dendl;
385  	
386  	  if (notify_handle->pbl != NULL) {
387  	    encode(notify_handle->notify_responses, *notify_handle->pbl);
388  	    encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
389  	  }
390  	
391  	  notify_handle->rados_client->get_aio_finisher()->queue(
392  	    notify_handle->on_notify, 0);
393  	  watcher->notify_handles.erase(notify_id);
394  	  maybe_remove_watcher(watcher);
395  	}
396  	
397  	void TestWatchNotify::blacklist(uint32_t nonce) {
398  	  std::lock_guard locker{m_lock};
399  	
400  	  for (auto file_it = m_file_watchers.begin();
401  	       file_it != m_file_watchers.end(); ) {
402  	    auto &watcher = file_it->second;
403  	    for (auto w_it = watcher->watch_handles.begin();
404  	         w_it != watcher->watch_handles.end();) {
405  	      if (w_it->second.nonce == nonce) {
406  	        w_it = watcher->watch_handles.erase(w_it);
407  	      } else {
408  	        ++w_it;
409  	      }
410  	    }
411  	
412  	    ++file_it;
413  	    maybe_remove_watcher(watcher);
414  	  }
415  	}
416  	
417  	void TestWatchNotify::handle_object_removed(int64_t pool_id,
418  	                                            const std::string& nspace,
419  	                                            const std::string& oid) {
420  	  std::lock_guard locker{m_lock};
421  	  auto it = m_file_watchers.find({pool_id, nspace, oid});
422  	  if (it == m_file_watchers.end()) {
423  	    return;
424  	  }
425  	
426  	  auto watcher = it->second;
427  	
428  	  // cancel all in-flight notifications
429  	  for (auto& notify_handle_pair : watcher->notify_handles) {
430  	    auto notify_handle = notify_handle_pair.second;
431  	    notify_handle->rados_client->get_aio_finisher()->queue(
432  	      notify_handle->on_notify, -ENOENT);
433  	  }
434  	
435  	  // alert all watchers of the loss of connection
436  	  for (auto& watch_handle_pair : watcher->watch_handles) {
437  	    auto& watch_handle = watch_handle_pair.second;
438  	    auto handle = watch_handle.handle;
439  	    auto watch_ctx2 = watch_handle.watch_ctx2;
440  	    if (watch_ctx2 != nullptr) {
441  	      auto ctx = new LambdaContext([handle, watch_ctx2](int) {
442  	          watch_ctx2->handle_error(handle, -ENOTCONN);
443  	        });
444  	      watch_handle.rados_client->get_aio_finisher()->queue(ctx);
445  	    }
446  	  }
447  	  m_file_watchers.erase(it);
448  	}
449  	
450  	} // namespace librados
451