1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "CacheClient.h"
5 #include "common/Cond.h"
6
7 #define dout_context g_ceph_context
8 #define dout_subsys ceph_subsys_immutable_obj_cache
9 #undef dout_prefix
10 #define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
11 << __func__ << ": "
12
13 namespace ceph {
14 namespace immutable_obj_cache {
15
16 CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
17 : m_cct(ceph_ctx), m_io_service_work(m_io_service),
18 m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
19 m_io_thread(nullptr), m_session_work(false), m_writing(false),
20 m_reading(false), m_sequence_id(0) {
21 m_worker_thread_num =
22 m_cct->_conf.get_val<uint64_t>(
23 "immutable_object_cache_client_dedicated_thread_num");
24
(1) Event cond_true: |
Condition "this->m_worker_thread_num != 0", taking true branch. |
25 if (m_worker_thread_num != 0) {
26 m_worker = new boost::asio::io_service();
(2) Event alloc_new: |
Allocating memory by calling "new boost::asio::io_context::work(this->m_worker)". |
(3) Event assign: |
Assigning: "this->m_worker_io_service_work" = "new boost::asio::io_context::work(this->m_worker)". |
(12) Event ctor_dtor_leak: |
The constructor allocates field "m_worker_io_service_work" of "ceph::immutable_obj_cache::CacheClient" but the destructor and whatever functions it calls do not free it. |
Also see events: |
[destructor] |
27 m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
(4) Event cond_true: |
Condition "i < this->m_worker_thread_num", taking true branch. |
(6) Event loop_begin: |
Jumped back to beginning of loop. |
(7) Event cond_true: |
Condition "i < this->m_worker_thread_num", taking true branch. |
(9) Event loop_begin: |
Jumped back to beginning of loop. |
(10) Event cond_false: |
Condition "i < this->m_worker_thread_num", taking false branch. |
28 for (uint64_t i = 0; i < m_worker_thread_num; i++) {
29 std::thread* thd = new std::thread([this](){m_worker->run();});
30 m_worker_threads.push_back(thd);
(5) Event loop: |
Jumping back to the beginning of the loop. |
(8) Event loop: |
Jumping back to the beginning of the loop. |
(11) Event loop_end: |
Reached end of loop. |
31 }
32 }
33 m_bp_header = buffer::create(get_header_size());
34 }
35
36 CacheClient::~CacheClient() {
37 stop();
38 }
39
40 void CacheClient::run() {
41 m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
42 }
43
44 bool CacheClient::is_session_work() {
45 return m_session_work.load() == true;
46 }
47
48 int CacheClient::stop() {
49 m_session_work.store(false);
50 m_io_service.stop();
51
52 if (m_io_thread != nullptr) {
53 m_io_thread->join();
54 }
55 if (m_worker_thread_num != 0) {
56 m_worker->stop();
57 for (auto thd : m_worker_threads) {
58 thd->join();
59 delete thd;
60 }
61 delete m_worker;
62 }
63 return 0;
64 }
65
66 // close domain socket
67 void CacheClient::close() {
68 m_session_work.store(false);
69 boost::system::error_code close_ec;
70 m_dm_socket.close(close_ec);
71 if (close_ec) {
72 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
73 }
74 }
75
76 // sync connect
77 int CacheClient::connect() {
78 int ret = -1;
79 C_SaferCond cond;
80 Context* on_finish = new LambdaContext([&cond, &ret](int err) {
81 ret = err;
82 cond.complete(err);
83 });
84
85 connect(on_finish);
86 cond.wait();
87
88 return ret;
89 }
90
91 // async connect
92 void CacheClient::connect(Context* on_finish) {
93 m_dm_socket.async_connect(m_ep,
94 boost::bind(&CacheClient::handle_connect, this,
95 on_finish, boost::asio::placeholders::error));
96 }
97
98 void CacheClient::handle_connect(Context* on_finish,
99 const boost::system::error_code& err) {
100 if (err) {
101 ldout(m_cct, 20) << "fails to connect to cache server. error : "
102 << err.message() << dendl;
103 fault(ASIO_ERROR_CONNECT, err);
104 on_finish->complete(-1);
105 return;
106 }
107
108 ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
109 on_finish->complete(0);
110 }
111
112 void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
113 uint64_t snap_id, std::string oid,
114 CacheGenContextURef&& on_finish) {
115 ldout(m_cct, 20) << dendl;
116 ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
117 ++m_sequence_id, 0, 0,
118 pool_id, snap_id, oid, pool_nspace);
119 req->process_msg = std::move(on_finish);
120 req->encode();
121
122 {
123 std::lock_guard locker{m_lock};
124 m_outcoming_bl.append(req->get_payload_bufferlist());
125 ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
126 m_seq_to_req[req->seq] = req;
127 }
128
129 // try to send message to server.
130 try_send();
131
132 // try to receive ack from server.
133 try_receive();
134 }
135
136 void CacheClient::try_send() {
137 ldout(m_cct, 20) << dendl;
138 if (!m_writing.load()) {
139 m_writing.store(true);
140 send_message();
141 }
142 }
143
144 void CacheClient::send_message() {
145 ldout(m_cct, 20) << dendl;
146 bufferlist bl;
147 {
148 std::lock_guard locker{m_lock};
149 bl.swap(m_outcoming_bl);
150 ceph_assert(m_outcoming_bl.length() == 0);
151 }
152
153 // send bytes as many as possible.
154 boost::asio::async_write(m_dm_socket,
155 boost::asio::buffer(bl.c_str(), bl.length()),
156 boost::asio::transfer_exactly(bl.length()),
157 [this, bl](const boost::system::error_code& err, size_t cb) {
158 if (err || cb != bl.length()) {
159 fault(ASIO_ERROR_WRITE, err);
160 return;
161 }
162
163 ceph_assert(cb == bl.length());
164
165 {
166 std::lock_guard locker{m_lock};
167 if (m_outcoming_bl.length() == 0) {
168 m_writing.store(false);
169 return;
170 }
171 }
172
173 // still have left bytes, continue to send.
174 send_message();
175 });
176 try_receive();
177 }
178
179 void CacheClient::try_receive() {
180 ldout(m_cct, 20) << dendl;
181 if (!m_reading.load()) {
182 m_reading.store(true);
183 receive_message();
184 }
185 }
186
187 void CacheClient::receive_message() {
188 ldout(m_cct, 20) << dendl;
189 ceph_assert(m_reading.load());
190 read_reply_header();
191 }
192
193 void CacheClient::read_reply_header() {
194 ldout(m_cct, 20) << dendl;
195 /* create new head buffer for every reply */
196 bufferptr bp_head(buffer::create(get_header_size()));
197 auto raw_ptr = bp_head.c_str();
198
199 boost::asio::async_read(m_dm_socket,
200 boost::asio::buffer(raw_ptr, get_header_size()),
201 boost::asio::transfer_exactly(get_header_size()),
202 boost::bind(&CacheClient::handle_reply_header,
203 this, bp_head,
204 boost::asio::placeholders::error,
205 boost::asio::placeholders::bytes_transferred));
206 }
207
208 void CacheClient::handle_reply_header(bufferptr bp_head,
209 const boost::system::error_code& ec,
210 size_t bytes_transferred) {
211 ldout(m_cct, 20) << dendl;
212 if (ec || bytes_transferred != get_header_size()) {
213 fault(ASIO_ERROR_READ, ec);
214 return;
215 }
216
217 ceph_assert(bytes_transferred == bp_head.length());
218
219 uint32_t data_len = get_data_len(bp_head.c_str());
220
221 bufferptr bp_data(buffer::create(data_len));
222 read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
223 }
224
225 void CacheClient::read_reply_data(bufferptr&& bp_head,
226 bufferptr&& bp_data,
227 const uint64_t data_len) {
228 ldout(m_cct, 20) << dendl;
229 auto raw_ptr = bp_data.c_str();
230 boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
231 boost::asio::transfer_exactly(data_len),
232 boost::bind(&CacheClient::handle_reply_data,
233 this, std::move(bp_head), std::move(bp_data), data_len,
234 boost::asio::placeholders::error,
235 boost::asio::placeholders::bytes_transferred));
236 }
237
238 void CacheClient::handle_reply_data(bufferptr bp_head,
239 bufferptr bp_data,
240 const uint64_t data_len,
241 const boost::system::error_code& ec,
242 size_t bytes_transferred) {
243 ldout(m_cct, 20) << dendl;
244 if (ec || bytes_transferred != data_len) {
245 fault(ASIO_ERROR_WRITE, ec);
246 return;
247 }
248 ceph_assert(bp_data.length() == data_len);
249
250 bufferlist data_buffer;
251 data_buffer.append(std::move(bp_head));
252 data_buffer.append(std::move(bp_data));
253
254 ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
255 data_buffer.clear();
256 ceph_assert(data_buffer.length() == 0);
257
258 process(reply, reply->seq);
259
260 {
261 std::lock_guard locker{m_lock};
262 if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
263 m_reading.store(false);
264 return;
265 }
266 }
267 if (is_session_work()) {
268 receive_message();
269 }
270 }
271
272 void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
273 ldout(m_cct, 20) << dendl;
274 ObjectCacheRequest* current_request = nullptr;
275 {
276 std::lock_guard locker{m_lock};
277 ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
278 current_request = m_seq_to_req[seq_id];
279 m_seq_to_req.erase(seq_id);
280 }
281
282 ceph_assert(current_request != nullptr);
283 auto process_reply = new LambdaContext([current_request, reply]
284 (bool dedicated) {
285 if (dedicated) {
286 // dedicated thrad to execute this context.
287 }
288 current_request->process_msg.release()->complete(reply);
289 delete current_request;
290 delete reply;
291 });
292
293 if (m_worker_thread_num != 0) {
294 m_worker->post([process_reply]() {
295 process_reply->complete(true);
296 });
297 } else {
298 process_reply->complete(false);
299 }
300 }
301
302 // if there is one request fails, just execute fault, then shutdown RO.
303 void CacheClient::fault(const int err_type,
304 const boost::system::error_code& ec) {
305 ldout(m_cct, 20) << "fault." << ec.message() << dendl;
306
307 if (err_type == ASIO_ERROR_CONNECT) {
308 ceph_assert(!m_session_work.load());
309 if (ec == boost::asio::error::connection_refused) {
310 ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
311 << ". Immutable-object-cache daemon is down ? "
312 << "Data will be read from ceph cluster " << dendl;
313 } else {
314 ldout(m_cct, 20) << "Connecting RO daemon fails : "
315 << ec.message() << dendl;
316 }
317
318 if (m_dm_socket.is_open()) {
319 // Set to indicate what error occurred, if any.
320 // Note that, even if the function indicates an error,
321 // the underlying descriptor is closed.
322 boost::system::error_code close_ec;
323 m_dm_socket.close(close_ec);
324 if (close_ec) {
325 ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
326 }
327 }
328 return;
329 }
330
331 if (!m_session_work.load()) {
332 return;
333 }
334
335 /* when current session don't work, ASIO will don't receive any new request from hook.
336 * On the other hand, for pending request of ASIO, cancle these request,
337 * then call their callback. these request which are cancled by this method,
338 * will be re-dispatched to RADOS layer.
339 * make sure just have one thread to modify execute below code. */
340 m_session_work.store(false);
341
342 if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
343 ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
344 ceph_assert(0);
345 }
346
347 if (err_type == ASIO_ERROR_READ) {
348 ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
349 }
350
351 if (err_type == ASIO_ERROR_WRITE) {
352 ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
353 // CacheClient should not occur this error.
354 ceph_assert(0);
355 }
356
357 // currently, for any asio error, just shutdown RO.
358 close();
359
360 /* all pending request, which have entered into ASIO,
361 * will be re-dispatched to RADOS.*/
362 {
363 std::lock_guard locker{m_lock};
364 for (auto it : m_seq_to_req) {
365 it.second->type = RBDSC_READ_RADOS;
366 it.second->process_msg->complete(it.second);
367 }
368 m_seq_to_req.clear();
369 }
370
371 ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
372 Later all reading will be re-dispatched RADOS layer"
373 << ec.message() << dendl;
374 }
375
376 // TODO : re-implement this method
377 int CacheClient::register_client(Context* on_finish) {
378 ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
379 m_sequence_id++);
380 reg_req->encode();
381
382 bufferlist bl;
383 bl.append(reg_req->get_payload_bufferlist());
384
385 uint64_t ret;
386 boost::system::error_code ec;
387
388 ret = boost::asio::write(m_dm_socket,
389 boost::asio::buffer(bl.c_str(), bl.length()), ec);
390
391 if (ec || ret != bl.length()) {
392 fault(ASIO_ERROR_WRITE, ec);
393 return -1;
394 }
395 delete reg_req;
396
397 ret = boost::asio::read(m_dm_socket,
398 boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
399 if (ec || ret != get_header_size()) {
400 fault(ASIO_ERROR_READ, ec);
401 return -1;
402 }
403
404 uint64_t data_len = get_data_len(m_bp_header.c_str());
405 bufferptr bp_data(buffer::create(data_len));
406
407 ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
408 data_len), ec);
409 if (ec || ret != data_len) {
410 fault(ASIO_ERROR_READ, ec);
411 return -1;
412 }
413
414 bufferlist data_buffer;
415 data_buffer.append(m_bp_header);
416 data_buffer.append(std::move(bp_data));
417 ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
418 if (req->type == RBDSC_REGISTER_REPLY) {
419 m_session_work.store(true);
420 on_finish->complete(0);
421 } else {
422 on_finish->complete(-1);
423 }
424
425 delete req;
426 return 0;
427 }
428
429 } // namespace immutable_obj_cache
430 } // namespace ceph
431