1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include <poll.h>
15 #include <sys/un.h>
16
17 #include "common/admin_socket.h"
18 #include "common/admin_socket_client.h"
19 #include "common/dout.h"
20 #include "common/errno.h"
21 #include "common/safe_io.h"
22 #include "common/Thread.h"
23 #include "common/version.h"
24 #include "common/ceph_mutex.h"
25
26 #ifndef WITH_SEASTAR
27 #include "common/Cond.h"
28 #endif
29
30 #include "messages/MCommand.h"
31 #include "messages/MCommandReply.h"
32
33 // re-include our assert to clobber the system one; fix dout:
34 #include "include/ceph_assert.h"
35 #include "include/compat.h"
36 #include "include/sock_compat.h"
37
38 #define dout_subsys ceph_subsys_asok
39 #undef dout_prefix
40 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
41
42
43 using std::ostringstream;
44
45 /*
46 * UNIX domain sockets created by an application persist even after that
47 * application closes, unless they're explicitly unlinked. This is because the
48 * directory containing the socket keeps a reference to the socket.
49 *
50 * This code makes things a little nicer by unlinking those dead sockets when
51 * the application exits normally.
52 */
53
54 template<typename F, typename... Args>
55 inline int retry_sys_call(F f, Args... args) {
56 int r;
57 do {
58 r = f(args...);
59 } while (r < 0 && errno == EINTR);
60 return r;
61 };
62
63
64 static std::mutex cleanup_lock;
65 static std::vector<std::string> cleanup_files;
66 static bool cleanup_atexit = false;
67
68 static void remove_cleanup_file(std::string_view file) {
69 std::unique_lock l(cleanup_lock);
70
71 if (auto i = std::find(cleanup_files.cbegin(), cleanup_files.cend(), file);
72 i != cleanup_files.cend()) {
73 retry_sys_call(::unlink, i->c_str());
74 cleanup_files.erase(i);
75 }
76 }
77
78 void remove_all_cleanup_files() {
79 std::unique_lock l(cleanup_lock);
80 for (const auto& s : cleanup_files) {
81 retry_sys_call(::unlink, s.c_str());
82 }
83 cleanup_files.clear();
84 }
85
86 static void add_cleanup_file(std::string file) {
87 std::unique_lock l(cleanup_lock);
88 cleanup_files.push_back(std::move(file));
89 if (!cleanup_atexit) {
90 atexit(remove_all_cleanup_files);
91 cleanup_atexit = true;
92 }
93 }
94
95 AdminSocket::AdminSocket(CephContext *cct)
96 : m_cct(cct)
97 {}
98
99 AdminSocket::~AdminSocket()
100 {
101 shutdown();
102 }
103
104 /*
105 * This thread listens on the UNIX domain socket for incoming connections.
106 * It only handles one connection at a time at the moment. All I/O is nonblocking,
107 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
108 *
109 * This thread also listens to m_wakeup_rd_fd. If there is any data sent to this
110 * pipe, the thread wakes up. If m_shutdown is set, the thread terminates
111 * itself gracefully, allowing the AdminSocketConfigObs class to join() it.
112 */
113
114 std::string AdminSocket::create_wakeup_pipe(int *pipe_rd, int *pipe_wr)
115 {
116 int pipefd[2];
117 if (pipe_cloexec(pipefd, O_NONBLOCK) < 0) {
118 int e = errno;
119 ostringstream oss;
120 oss << "AdminSocket::create_wakeup_pipe error: " << cpp_strerror(e);
121 return oss.str();
122 }
123
124 *pipe_rd = pipefd[0];
125 *pipe_wr = pipefd[1];
126 return "";
127 }
128
129 std::string AdminSocket::destroy_wakeup_pipe()
130 {
131 // Send a byte to the wakeup pipe that the thread is listening to
132 char buf[1] = { 0x0 };
133 int ret = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
134
135 // Close write end
136 retry_sys_call(::close, m_wakeup_wr_fd);
137 m_wakeup_wr_fd = -1;
138
139 if (ret != 0) {
140 ostringstream oss;
141 oss << "AdminSocket::destroy_shutdown_pipe error: failed to write"
142 "to thread shutdown pipe: error " << ret;
143 return oss.str();
144 }
145
146 th.join();
147
148 // Close read end. Doing this before join() blocks the listenter and prevents
149 // joining.
150 retry_sys_call(::close, m_wakeup_rd_fd);
151 m_wakeup_rd_fd = -1;
152
153 return "";
154 }
155
156 std::string AdminSocket::bind_and_listen(const std::string &sock_path, int *fd)
157 {
158 ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
159
160 struct sockaddr_un address;
161 if (sock_path.size() > sizeof(address.sun_path) - 1) {
162 ostringstream oss;
163 oss << "AdminSocket::bind_and_listen: "
164 << "The UNIX domain socket path " << sock_path << " is too long! The "
165 << "maximum length on this system is "
166 << (sizeof(address.sun_path) - 1);
167 return oss.str();
168 }
169 int sock_fd = socket_cloexec(PF_UNIX, SOCK_STREAM, 0);
170 if (sock_fd < 0) {
171 int err = errno;
172 ostringstream oss;
173 oss << "AdminSocket::bind_and_listen: "
174 << "failed to create socket: " << cpp_strerror(err);
175 return oss.str();
176 }
177 memset(&address, 0, sizeof(struct sockaddr_un));
178 address.sun_family = AF_UNIX;
179 snprintf(address.sun_path, sizeof(address.sun_path),
180 "%s", sock_path.c_str());
181 if (::bind(sock_fd, (struct sockaddr*)&address,
182 sizeof(struct sockaddr_un)) != 0) {
183 int err = errno;
184 if (err == EADDRINUSE) {
185 AdminSocketClient client(sock_path);
186 bool ok;
187 client.ping(&ok);
188 if (ok) {
189 ldout(m_cct, 20) << "socket " << sock_path << " is in use" << dendl;
190 err = EEXIST;
191 } else {
192 ldout(m_cct, 20) << "unlink stale file " << sock_path << dendl;
193 retry_sys_call(::unlink, sock_path.c_str());
194 if (::bind(sock_fd, (struct sockaddr*)&address,
195 sizeof(struct sockaddr_un)) == 0) {
196 err = 0;
197 } else {
198 err = errno;
199 }
200 }
201 }
202 if (err != 0) {
203 ostringstream oss;
204 oss << "AdminSocket::bind_and_listen: "
205 << "failed to bind the UNIX domain socket to '" << sock_path
206 << "': " << cpp_strerror(err);
207 close(sock_fd);
208 return oss.str();
209 }
210 }
211 if (listen(sock_fd, 5) != 0) {
212 int err = errno;
213 ostringstream oss;
214 oss << "AdminSocket::bind_and_listen: "
215 << "failed to listen to socket: " << cpp_strerror(err);
216 close(sock_fd);
217 retry_sys_call(::unlink, sock_path.c_str());
218 return oss.str();
219 }
220 *fd = sock_fd;
221 return "";
222 }
223
224 void AdminSocket::entry() noexcept
225 {
226 ldout(m_cct, 5) << "entry start" << dendl;
227 while (true) {
228 struct pollfd fds[2];
229 memset(fds, 0, sizeof(fds));
230 fds[0].fd = m_sock_fd;
231 fds[0].events = POLLIN | POLLRDBAND;
232 fds[1].fd = m_wakeup_rd_fd;
233 fds[1].events = POLLIN | POLLRDBAND;
234
235 ldout(m_cct,20) << __func__ << " waiting" << dendl;
236 int ret = poll(fds, 2, -1);
237 if (ret < 0) {
238 int err = errno;
239 if (err == EINTR) {
240 continue;
241 }
242 lderr(m_cct) << "AdminSocket: poll(2) error: '"
243 << cpp_strerror(err) << dendl;
244 return;
245 }
246 ldout(m_cct,20) << __func__ << " awake" << dendl;
247
248 if (fds[0].revents & POLLIN) {
249 // Send out some data
250 do_accept();
251 }
252 if (fds[1].revents & POLLIN) {
253 // read off one byte
254 char buf;
255 ::read(m_wakeup_rd_fd, &buf, 1);
256 do_tell_queue();
257 }
258 if (m_shutdown) {
259 // Parent wants us to shut down
260 return;
261 }
262 }
263 ldout(m_cct, 5) << "entry exit" << dendl;
264 }
265
266 void AdminSocket::chown(uid_t uid, gid_t gid)
267 {
268 if (m_sock_fd >= 0) {
269 int r = ::chown(m_path.c_str(), uid, gid);
270 if (r < 0) {
271 r = -errno;
272 lderr(m_cct) << "AdminSocket: failed to chown socket: "
273 << cpp_strerror(r) << dendl;
274 }
275 }
276 }
277
278 void AdminSocket::chmod(mode_t mode)
279 {
280 if (m_sock_fd >= 0) {
281 int r = ::chmod(m_path.c_str(), mode);
282 if (r < 0) {
283 r = -errno;
284 lderr(m_cct) << "AdminSocket: failed to chmod socket: "
285 << cpp_strerror(r) << dendl;
286 }
287 }
288 }
289
290 void AdminSocket::do_accept()
291 {
292 struct sockaddr_un address;
293 socklen_t address_length = sizeof(address);
294 ldout(m_cct, 30) << "AdminSocket: calling accept" << dendl;
295 int connection_fd = accept_cloexec(m_sock_fd, (struct sockaddr*) &address,
296 &address_length);
297 if (connection_fd < 0) {
298 int err = errno;
299 lderr(m_cct) << "AdminSocket: do_accept error: '"
300 << cpp_strerror(err) << dendl;
301 return;
302 }
303 ldout(m_cct, 30) << "AdminSocket: finished accept" << dendl;
304
305 char cmd[1024];
306 unsigned pos = 0;
307 string c;
308 while (1) {
309 int ret = safe_read(connection_fd, &cmd[pos], 1);
310 if (ret <= 0) {
311 if (ret < 0) {
312 lderr(m_cct) << "AdminSocket: error reading request code: "
313 << cpp_strerror(ret) << dendl;
314 }
315 retry_sys_call(::close, connection_fd);
316 return;
317 }
318 if (cmd[0] == '\0') {
319 // old protocol: __be32
320 if (pos == 3 && cmd[0] == '\0') {
321 switch (cmd[3]) {
322 case 0:
323 c = "0";
324 break;
325 case 1:
326 c = "perfcounters_dump";
327 break;
328 case 2:
329 c = "perfcounters_schema";
330 break;
331 default:
332 c = "foo";
333 break;
334 }
335 //wrap command with new protocol
336 c = "{\"prefix\": \"" + c + "\"}";
337 break;
338 }
339 } else {
340 // new protocol: null or \n terminated string
341 if (cmd[pos] == '\n' || cmd[pos] == '\0') {
342 cmd[pos] = '\0';
343 c = cmd;
344 break;
345 }
346 }
347 if (++pos >= sizeof(cmd)) {
348 lderr(m_cct) << "AdminSocket: error reading request too long" << dendl;
349 retry_sys_call(::close, connection_fd);
350 return;
351 }
352 }
353
354 std::vector<std::string> cmdvec = { c };
355 bufferlist empty, out;
356 ostringstream err;
357 int rval = execute_command(cmdvec, empty /* inbl */, err, &out);
358
359 // Unfortunately, the asok wire protocol does not let us pass an error code,
360 // and many asok command implementations return helpful error strings. So,
361 // let's prepend an error string to the output if there is an error code.
362 if (rval < 0) {
363 ostringstream ss;
364 ss << "ERROR: " << cpp_strerror(rval) << "\n";
365 bufferlist o;
366 o.append(ss.str());
367 o.claim_append(out);
368 out.claim_append(o);
369 }
370 uint32_t len = htonl(out.length());
371 int ret = safe_write(connection_fd, &len, sizeof(len));
372 if (ret < 0) {
373 lderr(m_cct) << "AdminSocket: error writing response length "
374 << cpp_strerror(ret) << dendl;
375 } else {
376 if (out.write_fd(connection_fd) < 0) {
377 lderr(m_cct) << "AdminSocket: error writing response payload "
378 << cpp_strerror(ret) << dendl;
379 }
380 }
381 retry_sys_call(::close, connection_fd);
382 }
383
384 void AdminSocket::do_tell_queue()
385 {
386 ldout(m_cct,10) << __func__ << dendl;
387 std::list<ref_t<MCommand>> q;
388 {
389 std::lock_guard l(tell_lock);
390 q.swap(tell_queue);
391 }
392 for (auto& m : q) {
393 bufferlist outbl;
394 execute_command(
395 m->cmd,
396 m->get_data(),
397 [m](int r, const std::string& err, bufferlist& outbl) {
398 auto reply = new MCommandReply(r, err);
399 reply->set_tid(m->get_tid());
400 reply->set_data(outbl);
401 #ifdef WITH_SEASTAR
402 #warning "fix message send with crimson"
403 #else
404 m->get_connection()->send_message(reply);
405 #endif
406 });
407 }
408 }
409
410 int AdminSocket::execute_command(
411 const std::vector<std::string>& cmd,
412 const bufferlist& inbl,
413 std::ostream& errss,
414 bufferlist *outbl)
415 {
416 #ifdef WITH_SEASTAR
417 #warning "must implement admin socket blocking execute_command() for crimson"
418 return -ENOSYS;
419 #else
420 bool done = false;
421 int rval = 0;
422 ceph::mutex mylock = ceph::make_mutex("admin_socket::excute_command::mylock");
423 ceph::condition_variable mycond;
424 C_SafeCond fin(mylock, mycond, &done, &rval);
425 execute_command(
426 cmd,
427 inbl,
428 [&errss, outbl, &fin](int r, const std::string& err, bufferlist& out) {
429 errss << err;
430 outbl->claim(out);
431 fin.finish(r);
432 });
433 {
434 std::unique_lock l{mylock};
435 mycond.wait(l, [&done] { return done;});
436 }
437 return rval;
438 #endif
439 }
440
441 void AdminSocket::execute_command(
442 const std::vector<std::string>& cmdvec,
443 const bufferlist& inbl,
444 std::function<void(int,const std::string&,bufferlist&)> on_finish)
445 {
446 cmdmap_t cmdmap;
447 string format;
448 stringstream errss;
449 bufferlist empty;
450 ldout(m_cct,10) << __func__ << " cmdvec='" << cmdvec << "'" << dendl;
451 if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) {
452 ldout(m_cct, 0) << "AdminSocket: " << errss.str() << dendl;
453 return on_finish(-EINVAL, "invalid json", empty);
454 }
455 string prefix;
456 try {
457 cmd_getval(m_cct, cmdmap, "format", format);
458 cmd_getval(m_cct, cmdmap, "prefix", prefix);
459 } catch (const bad_cmd_get& e) {
460 return on_finish(-EINVAL, "invalid json, missing format and/or prefix",
461 empty);
462 }
463
464 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
465
466 std::unique_lock l(lock);
467 decltype(hooks)::iterator p;
468 p = hooks.find(prefix);
469 if (p == hooks.cend()) {
470 lderr(m_cct) << "AdminSocket: request '" << cmdvec
471 << "' not defined" << dendl;
472 delete f;
473 return on_finish(-EINVAL, "unknown command prefix "s + prefix, empty);
474 }
475
476 // make sure one of the registered commands with this prefix validates.
477 while (!validate_cmd(m_cct, p->second.desc, cmdmap, errss)) {
478 ++p;
479 if (p->first != prefix) {
480 delete f;
481 return on_finish(-EINVAL, "invalid command json", empty);
482 }
483 }
484
485 // Drop lock to avoid cycles in cases where the hook takes
486 // the same lock that was held during calls to register/unregister,
487 // and set in_hook to allow unregister to wait for us before
488 // removing this hook.
489 in_hook = true;
490 auto hook = p->second.hook;
491 l.unlock();
492 hook->call_async(
493 prefix, cmdmap, f, inbl,
494 [f, on_finish](int r, const std::string& err, bufferlist& out) {
495 // handle either existing output in bufferlist *or* via formatter
496 if (r >= 0 && out.length() == 0) {
497 f->flush(out);
498 }
499 delete f;
500 on_finish(r, err, out);
501 });
502
503 l.lock();
504 in_hook = false;
505 in_hook_cond.notify_all();
506 }
507
508 void AdminSocket::queue_tell_command(ref_t<MCommand> m)
509 {
510 ldout(m_cct,10) << __func__ << " " << *m << dendl;
511 std::lock_guard l(tell_lock);
512 tell_queue.push_back(std::move(m));
513 wakeup();
514 }
515
516 int AdminSocket::register_command(std::string_view cmddesc,
517 AdminSocketHook *hook,
518 std::string_view help)
519 {
520 int ret;
521 std::unique_lock l(lock);
522 string prefix = cmddesc_get_prefix(cmddesc);
523 auto i = hooks.find(prefix);
524 if (i != hooks.cend() &&
525 i->second.desc == cmddesc) {
526 ldout(m_cct, 5) << "register_command " << prefix
527 << " cmddesc " << cmddesc << " hook " << hook
528 << " EEXIST" << dendl;
529 ret = -EEXIST;
530 } else {
531 ldout(m_cct, 5) << "register_command " << prefix << " hook " << hook
532 << dendl;
533 hooks.emplace_hint(i,
534 std::piecewise_construct,
535 std::forward_as_tuple(prefix),
536 std::forward_as_tuple(hook, cmddesc, help));
537 ret = 0;
538 }
539 return ret;
540 }
541
542 void AdminSocket::unregister_commands(const AdminSocketHook *hook)
543 {
544 std::unique_lock l(lock);
545 auto i = hooks.begin();
546 while (i != hooks.end()) {
547 if (i->second.hook == hook) {
548 ldout(m_cct, 5) << __func__ << " " << i->first << dendl;
549
550 // If we are currently processing a command, wait for it to
551 // complete in case it referenced the hook that we are
552 // unregistering.
553 in_hook_cond.wait(l, [this]() { return !in_hook; });
554 hooks.erase(i++);
555 } else {
556 i++;
557 }
558 }
559 }
560
561 class VersionHook : public AdminSocketHook {
562 public:
563 int call(std::string_view command, const cmdmap_t& cmdmap,
564 Formatter *f,
565 std::ostream& errss,
566 bufferlist& out) override {
567 if (command == "0"sv) {
568 out.append(CEPH_ADMIN_SOCK_VERSION);
569 } else {
570 f->open_object_section("version");
571 if (command == "version") {
572 f->dump_string("version", ceph_version_to_str());
573 f->dump_string("release", ceph_release_to_str());
574 f->dump_string("release_type", ceph_release_type());
575 } else if (command == "git_version") {
576 f->dump_string("git_version", git_version_to_str());
577 }
578 ostringstream ss;
579 f->close_section();
580 }
581 return 0;
582 }
583 };
584
585 class HelpHook : public AdminSocketHook {
586 AdminSocket *m_as;
587 public:
588 explicit HelpHook(AdminSocket *as) : m_as(as) {}
589 int call(std::string_view command, const cmdmap_t& cmdmap,
590 Formatter *f,
591 std::ostream& errss,
592 bufferlist& out) override {
593 f->open_object_section("help");
(1) Event parameter_hidden: |
declaration hides parameter "command" (declared at line 589) |
(2) Event caretline: |
^ |
594 for (const auto& [command, info] : m_as->hooks) {
595 if (info.help.length())
596 f->dump_string(command.c_str(), info.help);
597 }
598 f->close_section();
599 return 0;
600 }
601 };
602
603 class GetdescsHook : public AdminSocketHook {
604 AdminSocket *m_as;
605 public:
606 explicit GetdescsHook(AdminSocket *as) : m_as(as) {}
607 int call(std::string_view command, const cmdmap_t& cmdmap,
608 Formatter *f,
609 std::ostream& errss,
610 bufferlist& out) override {
611 int cmdnum = 0;
612 f->open_object_section("command_descriptions");
613 for (const auto& [command, info] : m_as->hooks) {
614 // GCC 8 actually has [[maybe_unused]] on a structured binding
615 // do what you'd expect. GCC 7 does not.
616 (void)command;
617 ostringstream secname;
618 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
619 dump_cmd_and_help_to_json(f,
620 CEPH_FEATURES_ALL,
621 secname.str().c_str(),
622 info.desc,
623 info.help);
624 cmdnum++;
625 }
626 f->close_section(); // command_descriptions
627 return 0;
628 }
629 };
630
631 bool AdminSocket::init(const std::string& path)
632 {
633 ldout(m_cct, 5) << "init " << path << dendl;
634
635 /* Set up things for the new thread */
636 std::string err;
637 int pipe_rd = -1, pipe_wr = -1;
638 err = create_wakeup_pipe(&pipe_rd, &pipe_wr);
639 if (!err.empty()) {
640 lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
641 return false;
642 }
643 int sock_fd;
644 err = bind_and_listen(path, &sock_fd);
645 if (!err.empty()) {
646 lderr(m_cct) << "AdminSocketConfigObs::init: failed: " << err << dendl;
647 close(pipe_rd);
648 close(pipe_wr);
649 return false;
650 }
651
652 /* Create new thread */
653 m_sock_fd = sock_fd;
654 m_wakeup_rd_fd = pipe_rd;
655 m_wakeup_wr_fd = pipe_wr;
656 m_path = path;
657
658 version_hook = std::make_unique<VersionHook>();
659 register_command("0", version_hook.get(), "");
660 register_command("version", version_hook.get(), "get ceph version");
661 register_command("git_version", version_hook.get(),
662 "get git sha1");
663 help_hook = std::make_unique<HelpHook>(this);
664 register_command("help", help_hook.get(),
665 "list available commands");
666 getdescs_hook = std::make_unique<GetdescsHook>(this);
667 register_command("get_command_descriptions",
668 getdescs_hook.get(), "list available commands");
669
670 th = make_named_thread("admin_socket", &AdminSocket::entry, this);
671 add_cleanup_file(m_path.c_str());
672 return true;
673 }
674
675 void AdminSocket::shutdown()
676 {
677 // Under normal operation this is unlikely to occur. However for some unit
678 // tests, some object members are not initialized and so cannot be deleted
679 // without fault.
680 if (m_wakeup_wr_fd < 0)
681 return;
682
683 ldout(m_cct, 5) << "shutdown" << dendl;
684 m_shutdown = true;
685
686 auto err = destroy_wakeup_pipe();
687 if (!err.empty()) {
688 lderr(m_cct) << "AdminSocket::shutdown: error: " << err << dendl;
689 }
690
691 retry_sys_call(::close, m_sock_fd);
692
693 unregister_commands(version_hook.get());
694 version_hook.reset();
695
696 unregister_commands(help_hook.get());
697 help_hook.reset();
698
699 unregister_commands(getdescs_hook.get());
700 getdescs_hook.reset();
701
702 remove_cleanup_file(m_path);
703 m_path.clear();
704 }
705
706 void AdminSocket::wakeup()
707 {
708 // Send a byte to the wakeup pipe that the thread is listening to
709 char buf[1] = { 0x0 };
710 int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
711 (void)r;
712 }
713