1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "CacheClient.h"
5    	#include "common/Cond.h"
6    	
7    	#define dout_context g_ceph_context
8    	#define dout_subsys ceph_subsys_immutable_obj_cache
9    	#undef dout_prefix
10   	#define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
11   	                           << __func__ << ": "
12   	
13   	namespace ceph {
14   	namespace immutable_obj_cache {
15   	
16   	  CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
17   	    : m_cct(ceph_ctx), m_io_service_work(m_io_service),
18   	      m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
19   	      m_io_thread(nullptr), m_session_work(false), m_writing(false),
20   	      m_reading(false), m_sequence_id(0) {
21   	    m_worker_thread_num =
22   	      m_cct->_conf.get_val<uint64_t>(
23   	        "immutable_object_cache_client_dedicated_thread_num");
24   	
(1) Event cond_false: Condition "this->m_worker_thread_num != 0", taking false branch.
25   	    if (m_worker_thread_num != 0) {
26   	      m_worker = new boost::asio::io_service();
27   	      m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
28   	      for (uint64_t i = 0; i < m_worker_thread_num; i++) {
29   	        std::thread* thd = new std::thread([this](){m_worker->run();});
30   	        m_worker_threads.push_back(thd);
31   	      }
(2) Event if_end: End of if statement.
32   	    }
33   	    m_bp_header = buffer::create(get_header_size());
(4) Event uninit_member: Non-static class member "m_worker" is not initialized in this constructor nor in any functions that it calls.
(6) Event uninit_member: Non-static class member "m_worker_io_service_work" is not initialized in this constructor nor in any functions that it calls.
Also see events: [member_decl][member_decl]
34   	  }
35   	
36   	  CacheClient::~CacheClient() {
37   	    stop();
38   	  }
39   	
40   	  void CacheClient::run() {
41   	     m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
42   	  }
43   	
44   	  bool CacheClient::is_session_work() {
45   	    return m_session_work.load() == true;
46   	  }
47   	
48   	  int CacheClient::stop() {
49   	    m_session_work.store(false);
50   	    m_io_service.stop();
51   	
52   	    if (m_io_thread != nullptr) {
53   	      m_io_thread->join();
54   	    }
55   	    if (m_worker_thread_num != 0) {
56   	      m_worker->stop();
57   	      for (auto thd : m_worker_threads) {
58   	        thd->join();
59   	        delete thd;
60   	      }
61   	      delete m_worker;
62   	    }
63   	    return 0;
64   	  }
65   	
66   	  // close domain socket
67   	  void CacheClient::close() {
68   	    m_session_work.store(false);
69   	    boost::system::error_code close_ec;
70   	    m_dm_socket.close(close_ec);
71   	    if (close_ec) {
72   	       ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
73   	    }
74   	  }
75   	
76   	  // sync connect
77   	  int CacheClient::connect() {
78   	    int ret = -1;
79   	    C_SaferCond cond;
80   	    Context* on_finish = new LambdaContext([&cond, &ret](int err) {
81   	      ret = err;
82   	      cond.complete(err);
83   	    });
84   	
85   	    connect(on_finish);
86   	    cond.wait();
87   	
88   	    return ret;
89   	  }
90   	
91   	  // async connect
92   	  void CacheClient::connect(Context* on_finish) {
93   	    m_dm_socket.async_connect(m_ep,
94   	      boost::bind(&CacheClient::handle_connect, this,
95   	                  on_finish, boost::asio::placeholders::error));
96   	  }
97   	
98   	  void CacheClient::handle_connect(Context* on_finish,
99   	                                   const boost::system::error_code& err) {
100  	    if (err) {
101  	      ldout(m_cct, 20) << "fails to connect to cache server. error : "
102  	                       << err.message() << dendl;
103  	      fault(ASIO_ERROR_CONNECT, err);
104  	      on_finish->complete(-1);
105  	      return;
106  	    }
107  	
108  	    ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
109  	    on_finish->complete(0);
110  	  }
111  	
112  	  void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
113  	                                  uint64_t snap_id, std::string oid,
114  	                                  CacheGenContextURef&& on_finish) {
115  	    ldout(m_cct, 20) << dendl;
116  	    ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
117  	                                    ++m_sequence_id, 0, 0,
118  	                                    pool_id, snap_id, oid, pool_nspace);
119  	    req->process_msg = std::move(on_finish);
120  	    req->encode();
121  	
122  	    {
123  	      std::lock_guard locker{m_lock};
124  	      m_outcoming_bl.append(req->get_payload_bufferlist());
125  	      ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
126  	      m_seq_to_req[req->seq] = req;
127  	    }
128  	
129  	    // try to send message to server.
130  	    try_send();
131  	
132  	    // try to receive ack from server.
133  	    try_receive();
134  	  }
135  	
136  	  void CacheClient::try_send() {
137  	    ldout(m_cct, 20) << dendl;
138  	    if (!m_writing.load()) {
139  	      m_writing.store(true);
140  	      send_message();
141  	    }
142  	  }
143  	
144  	  void CacheClient::send_message() {
145  	    ldout(m_cct, 20) << dendl;
146  	    bufferlist bl;
147  	    {
148  	      std::lock_guard locker{m_lock};
149  	      bl.swap(m_outcoming_bl);
150  	      ceph_assert(m_outcoming_bl.length() == 0);
151  	    }
152  	
153  	    // send bytes as many as possible.
154  	    boost::asio::async_write(m_dm_socket,
155  	        boost::asio::buffer(bl.c_str(), bl.length()),
156  	        boost::asio::transfer_exactly(bl.length()),
157  	        [this, bl](const boost::system::error_code& err, size_t cb) {
158  	        if (err || cb != bl.length()) {
159  	           fault(ASIO_ERROR_WRITE, err);
160  	           return;
161  	        }
162  	
163  	        ceph_assert(cb == bl.length());
164  	
165  	        {
166  		  std::lock_guard locker{m_lock};
167  	           if (m_outcoming_bl.length() == 0) {
168  	             m_writing.store(false);
169  	             return;
170  	           }
171  	        }
172  	
173  	        // still have left bytes, continue to send.
174  	        send_message();
175  	    });
176  	    try_receive();
177  	  }
178  	
179  	  void CacheClient::try_receive() {
180  	    ldout(m_cct, 20) << dendl;
181  	    if (!m_reading.load()) {
182  	      m_reading.store(true);
183  	      receive_message();
184  	    }
185  	  }
186  	
187  	  void CacheClient::receive_message() {
188  	    ldout(m_cct, 20) << dendl;
189  	    ceph_assert(m_reading.load());
190  	    read_reply_header();
191  	  }
192  	
193  	  void CacheClient::read_reply_header() {
194  	    ldout(m_cct, 20) << dendl;
195  	    /* create new head buffer for every reply */
196  	    bufferptr bp_head(buffer::create(get_header_size()));
197  	    auto raw_ptr = bp_head.c_str();
198  	
199  	    boost::asio::async_read(m_dm_socket,
200  	      boost::asio::buffer(raw_ptr, get_header_size()),
201  	      boost::asio::transfer_exactly(get_header_size()),
202  	      boost::bind(&CacheClient::handle_reply_header,
203  	                  this, bp_head,
204  	                  boost::asio::placeholders::error,
205  	                  boost::asio::placeholders::bytes_transferred));
206  	  }
207  	
208  	  void CacheClient::handle_reply_header(bufferptr bp_head,
209  	         const boost::system::error_code& ec,
210  	         size_t bytes_transferred) {
211  	    ldout(m_cct, 20) << dendl;
212  	    if (ec || bytes_transferred != get_header_size()) {
213  	      fault(ASIO_ERROR_READ, ec);
214  	      return;
215  	    }
216  	
217  	    ceph_assert(bytes_transferred == bp_head.length());
218  	
219  	    uint32_t data_len = get_data_len(bp_head.c_str());
220  	
221  	    bufferptr bp_data(buffer::create(data_len));
222  	    read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
223  	  }
224  	
225  	  void CacheClient::read_reply_data(bufferptr&& bp_head,
226  	                                    bufferptr&& bp_data,
227  	                                    const uint64_t data_len) {
228  	    ldout(m_cct, 20) << dendl;
229  	    auto raw_ptr = bp_data.c_str();
230  	    boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
231  	      boost::asio::transfer_exactly(data_len),
232  	      boost::bind(&CacheClient::handle_reply_data,
233  	                  this, std::move(bp_head), std::move(bp_data), data_len,
234  	                  boost::asio::placeholders::error,
235  	                  boost::asio::placeholders::bytes_transferred));
236  	  }
237  	
238  	  void CacheClient::handle_reply_data(bufferptr bp_head,
239  	                                      bufferptr bp_data,
240  	                                      const uint64_t data_len,
241  	                                      const boost::system::error_code& ec,
242  	                                      size_t bytes_transferred) {
243  	    ldout(m_cct, 20) << dendl;
244  	    if (ec || bytes_transferred != data_len) {
245  	      fault(ASIO_ERROR_WRITE, ec);
246  	      return;
247  	    }
248  	    ceph_assert(bp_data.length() == data_len);
249  	
250  	    bufferlist data_buffer;
251  	    data_buffer.append(std::move(bp_head));
252  	    data_buffer.append(std::move(bp_data));
253  	
254  	    ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
255  	    data_buffer.clear();
256  	    ceph_assert(data_buffer.length() == 0);
257  	
258  	    process(reply, reply->seq);
259  	
260  	    {
261  	      std::lock_guard locker{m_lock};
262  	      if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
263  	        m_reading.store(false);
264  	        return;
265  	      }
266  	    }
267  	    if (is_session_work()) {
268  	      receive_message();
269  	    }
270  	  }
271  	
272  	  void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
273  	    ldout(m_cct, 20) << dendl;
274  	    ObjectCacheRequest* current_request = nullptr;
275  	    {
276  	      std::lock_guard locker{m_lock};
277  	      ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
278  	      current_request = m_seq_to_req[seq_id];
279  	      m_seq_to_req.erase(seq_id);
280  	    }
281  	
282  	    ceph_assert(current_request != nullptr);
283  	    auto process_reply = new LambdaContext([current_request, reply]
284  	      (bool dedicated) {
285  	       if (dedicated) {
286  	         // dedicated thrad to execute this context.
287  	       }
288  	       current_request->process_msg.release()->complete(reply);
289  	       delete current_request;
290  	       delete reply;
291  	    });
292  	
293  	    if (m_worker_thread_num != 0) {
294  	      m_worker->post([process_reply]() {
295  	        process_reply->complete(true);
296  	      });
297  	    } else {
298  	      process_reply->complete(false);
299  	    }
300  	  }
301  	
302  	  // if there is one request fails, just execute fault, then shutdown RO.
303  	  void CacheClient::fault(const int err_type,
304  	                          const boost::system::error_code& ec) {
305  	    ldout(m_cct, 20) << "fault." << ec.message() << dendl;
306  	
307  	    if (err_type == ASIO_ERROR_CONNECT) {
308  	       ceph_assert(!m_session_work.load());
309  	       if (ec == boost::asio::error::connection_refused) {
310  	         ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
311  	                        << ". Immutable-object-cache daemon is down ? "
312  	                        << "Data will be read from ceph cluster " << dendl;
313  	       } else {
314  	         ldout(m_cct, 20) << "Connecting RO daemon fails : "
315  	                        << ec.message() << dendl;
316  	       }
317  	
318  	       if (m_dm_socket.is_open()) {
319  	         // Set to indicate what error occurred, if any.
320  	         // Note that, even if the function indicates an error,
321  	         // the underlying descriptor is closed.
322  	         boost::system::error_code close_ec;
323  	         m_dm_socket.close(close_ec);
324  	         if (close_ec) {
325  	           ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
326  	         }
327  	      }
328  	      return;
329  	    }
330  	
331  	    if (!m_session_work.load()) {
332  	      return;
333  	    }
334  	
335  	    /* when current session don't work, ASIO will don't receive any new request from hook.
336  	     * On the other hand, for pending request of ASIO, cancle these request,
337  	     * then call their callback. these request which are cancled by this method,
338  	     * will be re-dispatched to RADOS layer.
339  	     * make sure just have one thread to modify execute below code. */
340  	    m_session_work.store(false);
341  	
342  	    if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
343  	       ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
344  	       ceph_assert(0);
345  	    }
346  	
347  	    if (err_type == ASIO_ERROR_READ) {
348  	       ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
349  	    }
350  	
351  	    if (err_type == ASIO_ERROR_WRITE) {
352  	       ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
353  	       // CacheClient should not occur this error.
354  	       ceph_assert(0);
355  	    }
356  	
357  	    // currently, for any asio error, just shutdown RO.
358  	    close();
359  	
360  	    /* all pending request, which have entered into ASIO,
361  	     * will be re-dispatched to RADOS.*/
362  	    {
363  	      std::lock_guard locker{m_lock};
364  	      for (auto it : m_seq_to_req) {
365  	        it.second->type = RBDSC_READ_RADOS;
366  	        it.second->process_msg->complete(it.second);
367  	      }
368  	      m_seq_to_req.clear();
369  	    }
370  	
371  	    ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
372  	                       Later all reading will be re-dispatched RADOS layer"
373  	                       << ec.message() << dendl;
374  	  }
375  	
376  	  // TODO : re-implement this method
377  	  int CacheClient::register_client(Context* on_finish) {
378  	    ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
379  	                                                         m_sequence_id++);
380  	    reg_req->encode();
381  	
382  	    bufferlist bl;
383  	    bl.append(reg_req->get_payload_bufferlist());
384  	
385  	    uint64_t ret;
386  	    boost::system::error_code ec;
387  	
388  	    ret = boost::asio::write(m_dm_socket,
389  	      boost::asio::buffer(bl.c_str(), bl.length()), ec);
390  	
391  	    if (ec || ret != bl.length()) {
392  	      fault(ASIO_ERROR_WRITE, ec);
393  	      return -1;
394  	    }
395  	    delete reg_req;
396  	
397  	    ret = boost::asio::read(m_dm_socket,
398  	      boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
399  	    if (ec || ret != get_header_size()) {
400  	      fault(ASIO_ERROR_READ, ec);
401  	      return -1;
402  	    }
403  	
404  	    uint64_t data_len = get_data_len(m_bp_header.c_str());
405  	    bufferptr bp_data(buffer::create(data_len));
406  	
407  	    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
408  	                            data_len), ec);
409  	    if (ec || ret != data_len) {
410  	      fault(ASIO_ERROR_READ, ec);
411  	      return -1;
412  	    }
413  	
414  	    bufferlist data_buffer;
415  	    data_buffer.append(m_bp_header);
416  	    data_buffer.append(std::move(bp_data));
417  	    ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
418  	    if (req->type == RBDSC_REGISTER_REPLY) {
419  	      m_session_work.store(true);
420  	      on_finish->complete(0);
421  	    } else {
422  	      on_finish->complete(-1);
423  	    }
424  	
425  	    delete req;
426  	    return 0;
427  	  }
428  	
429  	}  // namespace immutable_obj_cache
430  	}  // namespace ceph
431