1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "CacheController.h"
5
6 #define dout_context g_ceph_context
7 #define dout_subsys ceph_subsys_immutable_obj_cache
8 #undef dout_prefix
9 #define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
10 << __func__ << ": "
11
12 namespace ceph {
13 namespace immutable_obj_cache {
14
15 CacheController::CacheController(CephContext *cct,
16 const std::vector<const char*> &args):
17 m_args(args), m_cct(cct) {
18 ldout(m_cct, 20) << dendl;
19 }
20
21 CacheController::~CacheController() {
22 delete m_cache_server;
23 delete m_object_cache_store;
24 }
25
26 int CacheController::init() {
27 ldout(m_cct, 20) << dendl;
28
29 m_object_cache_store = new ObjectCacheStore(m_cct);
30 // TODO(dehao): make this configurable
31 int r = m_object_cache_store->init(true);
32 if (r < 0) {
33 lderr(m_cct) << "init error\n" << dendl;
34 return r;
35 }
36
37 r = m_object_cache_store->init_cache();
38 if (r < 0) {
39 lderr(m_cct) << "init error\n" << dendl;
40 }
41
42 return r;
43 }
44
45 int CacheController::shutdown() {
46 ldout(m_cct, 20) << dendl;
47
48 int r = m_cache_server->stop();
49 if (r < 0) {
50 lderr(m_cct) << "stop error\n" << dendl;
51 return r;
52 }
53
54 r = m_object_cache_store->shutdown();
55 if (r < 0) {
56 lderr(m_cct) << "stop error\n" << dendl;
57 return r;
58 }
59
60 return r;
61 }
62
63 void CacheController::handle_signal(int signum) {
64 shutdown();
65 }
66
67 void CacheController::run() {
68 try {
69 std::string controller_path =
70 m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
(1) Event check_return: |
Calling "remove(controller_path.c_str())" without checking return value. This library function may fail and return an error code. |
71 std::remove(controller_path.c_str());
72
73 m_cache_server = new CacheServer(m_cct, controller_path,
74 std::bind(&CacheController::handle_request, this,
75 std::placeholders::_1, std::placeholders::_2));
76
77 int ret = m_cache_server->run();
78 if (ret != 0) {
79 throw std::runtime_error("io serivce run error");
80 }
81 } catch (std::exception& e) {
82 lderr(m_cct) << "Exception: " << e.what() << dendl;
83 }
84 }
85
86 void CacheController::handle_request(CacheSession* session,
87 ObjectCacheRequest* req) {
88 ldout(m_cct, 20) << dendl;
89
90 switch (req->get_request_type()) {
91 case RBDSC_REGISTER: {
92 // TODO(dehao): skip register and allow clients to lookup directly
93
94 ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
95 RBDSC_REGISTER_REPLY, req->seq);
96 session->send(reply);
97 break;
98 }
99 case RBDSC_READ: {
100 // lookup object in local cache store
101 std::string cache_path;
102 ObjectCacheReadData* req_read_data =
103 reinterpret_cast <ObjectCacheReadData*> (req);
104 int ret = m_object_cache_store->lookup_object(
105 req_read_data->pool_namespace, req_read_data->pool_id,
106 req_read_data->snap_id, req_read_data->oid, cache_path);
107 ObjectCacheRequest* reply = nullptr;
108 if (ret != OBJ_CACHE_PROMOTED) {
109 reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
110 } else {
111 reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
112 req->seq, cache_path);
113 }
114 session->send(reply);
115 break;
116 }
117 default:
118 ldout(m_cct, 5) << "can't recongize request" << dendl;
119 ceph_assert(0);
120 }
121 }
122
123 } // namespace immutable_obj_cache
124 } // namespace ceph
125