1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <cerrno>
16
17 #include "Objecter.h"
18 #include "osd/OSDMap.h"
19 #include "Filer.h"
20
21 #include "mon/MonClient.h"
22
23 #include "msg/Messenger.h"
24 #include "msg/Message.h"
25
26 #include "messages/MPing.h"
27 #include "messages/MOSDOp.h"
28 #include "messages/MOSDOpReply.h"
29 #include "messages/MOSDBackoff.h"
30 #include "messages/MOSDMap.h"
31
32 #include "messages/MPoolOp.h"
33 #include "messages/MPoolOpReply.h"
34
35 #include "messages/MGetPoolStats.h"
36 #include "messages/MGetPoolStatsReply.h"
37 #include "messages/MStatfs.h"
38 #include "messages/MStatfsReply.h"
39
40 #include "messages/MMonCommand.h"
41
42 #include "messages/MCommand.h"
43 #include "messages/MCommandReply.h"
44
45 #include "messages/MWatchNotify.h"
46
47
48 #include "common/config.h"
49 #include "common/perf_counters.h"
50 #include "common/scrub_types.h"
51 #include "include/str_list.h"
52 #include "common/errno.h"
53 #include "common/EventTrace.h"
54
55 using std::list;
56 using std::make_pair;
57 using std::map;
58 using std::ostream;
59 using std::ostringstream;
60 using std::pair;
61 using std::set;
62 using std::string;
63 using std::stringstream;
64 using std::vector;
65
66 using ceph::decode;
67 using ceph::encode;
68 using ceph::Formatter;
69
70 using std::defer_lock;
71
72 using ceph::real_time;
73 using ceph::real_clock;
74
75 using ceph::mono_clock;
76 using ceph::mono_time;
77
78 using ceph::timespan;
79
80 using ceph::shunique_lock;
81 using ceph::acquire_shared;
82 using ceph::acquire_unique;
83
84 #define dout_subsys ceph_subsys_objecter
85 #undef dout_prefix
86 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
87
88
89 enum {
90 l_osdc_first = 123200,
91 l_osdc_op_active,
92 l_osdc_op_laggy,
93 l_osdc_op_send,
94 l_osdc_op_send_bytes,
95 l_osdc_op_resend,
96 l_osdc_op_reply,
97
98 l_osdc_op,
99 l_osdc_op_r,
100 l_osdc_op_w,
101 l_osdc_op_rmw,
102 l_osdc_op_pg,
103
104 l_osdc_osdop_stat,
105 l_osdc_osdop_create,
106 l_osdc_osdop_read,
107 l_osdc_osdop_write,
108 l_osdc_osdop_writefull,
109 l_osdc_osdop_writesame,
110 l_osdc_osdop_append,
111 l_osdc_osdop_zero,
112 l_osdc_osdop_truncate,
113 l_osdc_osdop_delete,
114 l_osdc_osdop_mapext,
115 l_osdc_osdop_sparse_read,
116 l_osdc_osdop_clonerange,
117 l_osdc_osdop_getxattr,
118 l_osdc_osdop_setxattr,
119 l_osdc_osdop_cmpxattr,
120 l_osdc_osdop_rmxattr,
121 l_osdc_osdop_resetxattrs,
122 l_osdc_osdop_call,
123 l_osdc_osdop_watch,
124 l_osdc_osdop_notify,
125 l_osdc_osdop_src_cmpxattr,
126 l_osdc_osdop_pgls,
127 l_osdc_osdop_pgls_filter,
128 l_osdc_osdop_other,
129
130 l_osdc_linger_active,
131 l_osdc_linger_send,
132 l_osdc_linger_resend,
133 l_osdc_linger_ping,
134
135 l_osdc_poolop_active,
136 l_osdc_poolop_send,
137 l_osdc_poolop_resend,
138
139 l_osdc_poolstat_active,
140 l_osdc_poolstat_send,
141 l_osdc_poolstat_resend,
142
143 l_osdc_statfs_active,
144 l_osdc_statfs_send,
145 l_osdc_statfs_resend,
146
147 l_osdc_command_active,
148 l_osdc_command_send,
149 l_osdc_command_resend,
150
151 l_osdc_map_epoch,
152 l_osdc_map_full,
153 l_osdc_map_inc,
154
155 l_osdc_osd_sessions,
156 l_osdc_osd_session_open,
157 l_osdc_osd_session_close,
158 l_osdc_osd_laggy,
159
160 l_osdc_osdop_omap_wr,
161 l_osdc_osdop_omap_rd,
162 l_osdc_osdop_omap_del,
163
164 l_osdc_last,
165 };
166
167
168 // config obs ----------------------------
169
170 static const char *config_keys[] = {
171 "crush_location",
172 NULL
173 };
174
175 class Objecter::RequestStateHook : public AdminSocketHook {
176 Objecter *m_objecter;
177 public:
178 explicit RequestStateHook(Objecter *objecter);
179 int call(std::string_view command, const cmdmap_t& cmdmap,
180 Formatter *f,
181 std::ostream& ss,
182 ceph::buffer::list& out) override;
183 };
184
185 /**
186 * This is a more limited form of C_Contexts, but that requires
187 * a ceph_context which we don't have here.
188 */
189 class ObjectOperation::C_TwoContexts : public Context {
190 Context *first;
191 Context *second;
192 public:
193 C_TwoContexts(Context *first, Context *second) :
194 first(first), second(second) {}
195 void finish(int r) override {
196 first->complete(r);
197 second->complete(r);
198 first = NULL;
199 second = NULL;
200 }
201
202 ~C_TwoContexts() override {
203 delete first;
204 delete second;
205 }
206 };
207
208 void ObjectOperation::add_handler(Context *extra) {
209 size_t last = out_handler.size() - 1;
210 Context *orig = out_handler[last];
211 if (orig) {
212 Context *wrapper = new C_TwoContexts(orig, extra);
213 out_handler[last] = wrapper;
214 } else {
215 out_handler[last] = extra;
216 }
217 }
218
219 Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
220 object_t& oid)
221 {
222 if (oid.name.empty())
223 return unique_completion_lock();
224
225 static constexpr uint32_t HASH_PRIME = 1021;
226 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
227 % HASH_PRIME;
228
229 return unique_completion_lock(completion_locks[h % num_locks],
230 std::defer_lock);
231 }
232
233 const char** Objecter::get_tracked_conf_keys() const
234 {
235 return config_keys;
236 }
237
238
239 void Objecter::handle_conf_change(const ConfigProxy& conf,
240 const std::set <std::string> &changed)
241 {
242 if (changed.count("crush_location")) {
243 update_crush_location();
244 }
245 }
246
247 void Objecter::update_crush_location()
248 {
249 unique_lock wl(rwlock);
250 crush_location = cct->crush_location.get_location();
251 }
252
253 // messages ------------------------------
254
255 /*
256 * initialize only internal data structures, don't initiate cluster interaction
257 */
258 void Objecter::init()
259 {
260 ceph_assert(!initialized);
261
262 if (!logger) {
263 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
264
265 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
266 PerfCountersBuilder::PRIO_CRITICAL);
267 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
268 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
269 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
270 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
271 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
272
273 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
274 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
275 PerfCountersBuilder::PRIO_CRITICAL);
276 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
277 PerfCountersBuilder::PRIO_CRITICAL);
278 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
279 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
280 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
281
282 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
283 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
284 "Create object operations");
285 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
286 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
287 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
288 "Write full object operations");
289 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
290 "Write same operations");
291 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
292 "Append operation");
293 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
294 "Set object to zero operations");
295 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
296 "Truncate object operations");
297 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
298 "Delete object operations");
299 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
300 "Map extent operations");
301 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
302 "Sparse read operations");
303 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
304 "Clone range operations");
305 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
306 "Get xattr operations");
307 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
308 "Set xattr operations");
309 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
310 "Xattr comparison operations");
311 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
312 "Remove xattr operations");
313 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
314 "Reset xattr operations");
315 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
316 "Call (execute) operations");
317 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
318 "Watch by object operations");
319 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
320 "Notify about object operations");
321 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
322 "Extended attribute comparison in multi operations");
323 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
324 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
325 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
326
327 pcb.add_u64(l_osdc_linger_active, "linger_active",
328 "Active lingering operations");
329 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
330 "Sent lingering operations");
331 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
332 "Resent lingering operations");
333 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
334 "Sent pings to lingering operations");
335
336 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
337 "Active pool operations");
338 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
339 "Sent pool operations");
340 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
341 "Resent pool operations");
342
343 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
344 "Active get pool stat operations");
345 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
346 "Pool stat operations sent");
347 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
348 "Resent pool stats");
349
350 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
351 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
352 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
353 "Resent FS stats");
354
355 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
356 pcb.add_u64_counter(l_osdc_command_send, "command_send",
357 "Sent commands");
358 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
359 "Resent commands");
360
361 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
362 pcb.add_u64_counter(l_osdc_map_full, "map_full",
363 "Full OSD maps received");
364 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
365 "Incremental OSD maps received");
366
367 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
368 "Open sessions"); // open sessions
369 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
370 "Sessions opened");
371 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
372 "Sessions closed");
373 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
374
375 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
376 "OSD OMAP write operations");
377 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
378 "OSD OMAP read operations");
379 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
380 "OSD OMAP delete operations");
381
382 logger = pcb.create_perf_counters();
383 cct->get_perfcounters_collection()->add(logger);
384 }
385
386 m_request_state_hook = new RequestStateHook(this);
387 AdminSocket* admin_socket = cct->get_admin_socket();
388 int ret = admin_socket->register_command("objecter_requests",
389 m_request_state_hook,
390 "show in-progress osd requests");
391
392 /* Don't warn on EEXIST, happens if multiple ceph clients
393 * are instantiated from one process */
394 if (ret < 0 && ret != -EEXIST) {
395 lderr(cct) << "error registering admin socket command: "
396 << cpp_strerror(ret) << dendl;
397 }
398
399 update_crush_location();
400
401 cct->_conf.add_observer(this);
402
403 initialized = true;
404 }
405
406 /*
407 * ok, cluster interaction can happen
408 */
409 void Objecter::start(const OSDMap* o)
410 {
411 shared_lock rl(rwlock);
412
413 start_tick();
414 if (o) {
415 osdmap->deepish_copy_from(*o);
416 prune_pg_mapping(osdmap->get_pools());
417 } else if (osdmap->get_epoch() == 0) {
418 _maybe_request_map();
419 }
420 }
421
422 void Objecter::shutdown()
423 {
424 ceph_assert(initialized);
425
426 unique_lock wl(rwlock);
427
428 initialized = false;
429
430 wl.unlock();
431 cct->_conf.remove_observer(this);
432 wl.lock();
433
434 map<int,OSDSession*>::iterator p;
435 while (!osd_sessions.empty()) {
436 p = osd_sessions.begin();
437 close_session(p->second);
438 }
439
440 while(!check_latest_map_lingers.empty()) {
441 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
442 i->second->put();
443 check_latest_map_lingers.erase(i->first);
444 }
445
446 while(!check_latest_map_ops.empty()) {
447 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
448 i->second->put();
449 check_latest_map_ops.erase(i->first);
450 }
451
452 while(!check_latest_map_commands.empty()) {
453 map<ceph_tid_t, CommandOp*>::iterator i
454 = check_latest_map_commands.begin();
455 i->second->put();
456 check_latest_map_commands.erase(i->first);
457 }
458
459 while(!poolstat_ops.empty()) {
460 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
461 delete i->second;
462 poolstat_ops.erase(i->first);
463 }
464
465 while(!statfs_ops.empty()) {
466 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
467 delete i->second;
468 statfs_ops.erase(i->first);
469 }
470
471 while(!pool_ops.empty()) {
472 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
473 delete i->second;
474 pool_ops.erase(i->first);
475 }
476
477 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
478 while(!homeless_session->linger_ops.empty()) {
479 std::map<uint64_t, LingerOp*>::iterator i
480 = homeless_session->linger_ops.begin();
481 ldout(cct, 10) << " linger_op " << i->first << dendl;
482 LingerOp *lop = i->second;
483 {
484 OSDSession::unique_lock swl(homeless_session->lock);
485 _session_linger_op_remove(homeless_session, lop);
486 }
487 linger_ops.erase(lop->linger_id);
488 linger_ops_set.erase(lop);
489 lop->put();
490 }
491
492 while(!homeless_session->ops.empty()) {
493 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
494 ldout(cct, 10) << " op " << i->first << dendl;
495 Op *op = i->second;
496 {
497 OSDSession::unique_lock swl(homeless_session->lock);
498 _session_op_remove(homeless_session, op);
499 }
500 op->put();
501 }
502
503 while(!homeless_session->command_ops.empty()) {
504 std::map<ceph_tid_t, CommandOp*>::iterator i
505 = homeless_session->command_ops.begin();
506 ldout(cct, 10) << " command_op " << i->first << dendl;
507 CommandOp *cop = i->second;
508 {
509 OSDSession::unique_lock swl(homeless_session->lock);
510 _session_command_op_remove(homeless_session, cop);
511 }
512 cop->put();
513 }
514
515 if (tick_event) {
516 if (timer.cancel_event(tick_event)) {
517 ldout(cct, 10) << " successfully canceled tick" << dendl;
518 }
519 tick_event = 0;
520 }
521
522 if (logger) {
523 cct->get_perfcounters_collection()->remove(logger);
524 delete logger;
525 logger = NULL;
526 }
527
528 // Let go of Objecter write lock so timer thread can shutdown
529 wl.unlock();
530
531 // Outside of lock to avoid cycle WRT calls to RequestStateHook
532 // This is safe because we guarantee no concurrent calls to
533 // shutdown() with the ::initialized check at start.
534 if (m_request_state_hook) {
535 AdminSocket* admin_socket = cct->get_admin_socket();
536 admin_socket->unregister_commands(m_request_state_hook);
537 delete m_request_state_hook;
538 m_request_state_hook = NULL;
539 }
540 }
541
542 void Objecter::_send_linger(LingerOp *info,
543 shunique_lock& sul)
544 {
545 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
546
547 vector<OSDOp> opv;
548 Context *oncommit = NULL;
549 LingerOp::shared_lock watchl(info->watch_lock);
550 ceph::buffer::list *poutbl = NULL;
551 if (info->registered && info->is_watch) {
552 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
553 << dendl;
554 opv.push_back(OSDOp());
555 opv.back().op.op = CEPH_OSD_OP_WATCH;
556 opv.back().op.watch.cookie = info->get_cookie();
557 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
558 opv.back().op.watch.gen = ++info->register_gen;
559 oncommit = new C_Linger_Reconnect(this, info);
560 } else {
561 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
562 << dendl;
563 opv = info->ops;
564 C_Linger_Commit *c = new C_Linger_Commit(this, info);
565 if (!info->is_watch) {
566 info->notify_id = 0;
567 poutbl = &c->outbl;
568 }
569 oncommit = c;
570 }
571 watchl.unlock();
572 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
573 opv, info->target.flags | CEPH_OSD_FLAG_READ,
574 oncommit, info->pobjver);
575 o->outbl = poutbl;
576 o->snapid = info->snap;
577 o->snapc = info->snapc;
578 o->mtime = info->mtime;
579
580 o->target = info->target;
581 o->tid = ++last_tid;
582
583 // do not resend this; we will send a new op to reregister
584 o->should_resend = false;
585 o->ctx_budgeted = true;
586
587 if (info->register_tid) {
588 // repeat send. cancel old registration op, if any.
589 OSDSession::unique_lock sl(info->session->lock);
590 if (info->session->ops.count(info->register_tid)) {
591 Op *o = info->session->ops[info->register_tid];
592 _op_cancel_map_check(o);
593 _cancel_linger_op(o);
594 }
595 sl.unlock();
596 }
597
598 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
599
600 logger->inc(l_osdc_linger_send);
601 }
602
603 void Objecter::_linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl)
604 {
605 LingerOp::unique_lock wl(info->watch_lock);
606 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
607 if (info->on_reg_commit) {
608 info->on_reg_commit->complete(r);
609 info->on_reg_commit = NULL;
610 }
611 if (r < 0 && info->on_notify_finish) {
612 info->on_notify_finish->complete(r);
613 info->on_notify_finish = nullptr;
614 }
615
616 // only tell the user the first time we do this
617 info->registered = true;
618 info->pobjver = NULL;
619
620 if (!info->is_watch) {
621 // make note of the notify_id
622 auto p = outbl.cbegin();
623 try {
624 decode(info->notify_id, p);
625 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
626 << dendl;
627 }
628 catch (ceph::buffer::error& e) {
629 }
630 }
631 }
632
633 struct C_DoWatchError : public Context {
634 Objecter *objecter;
635 Objecter::LingerOp *info;
636 int err;
637 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
638 : objecter(o), info(i), err(r) {
639 info->get();
640 info->_queued_async();
641 }
642 void finish(int r) override {
643 Objecter::unique_lock wl(objecter->rwlock);
644 bool canceled = info->canceled;
645 wl.unlock();
646
647 if (!canceled) {
648 info->watch_context->handle_error(info->get_cookie(), err);
649 }
650
651 info->finished_async();
652 info->put();
653 }
654 };
655
656 int Objecter::_normalize_watch_error(int r)
657 {
658 // translate ENOENT -> ENOTCONN so that a delete->disconnection
659 // notification and a failure to reconnect because we raced with
660 // the delete appear the same to the user.
661 if (r == -ENOENT)
662 r = -ENOTCONN;
663 return r;
664 }
665
666 void Objecter::_linger_reconnect(LingerOp *info, int r)
667 {
668 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
669 << " (last_error " << info->last_error << ")" << dendl;
670 if (r < 0) {
671 LingerOp::unique_lock wl(info->watch_lock);
672 if (!info->last_error) {
673 r = _normalize_watch_error(r);
674 info->last_error = r;
675 if (info->watch_context) {
676 finisher->queue(new C_DoWatchError(this, info, r));
677 }
678 }
679 wl.unlock();
680 }
681 }
682
683 void Objecter::_send_linger_ping(LingerOp *info)
684 {
685 // rwlock is locked unique
686 // info->session->lock is locked
687
688 if (cct->_conf->objecter_inject_no_watch_ping) {
689 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
690 << dendl;
691 return;
692 }
693 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
694 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
695 return;
696 }
697
698 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
699 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
700 << dendl;
701
702 vector<OSDOp> opv(1);
703 opv[0].op.op = CEPH_OSD_OP_WATCH;
704 opv[0].op.watch.cookie = info->get_cookie();
705 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
706 opv[0].op.watch.gen = info->register_gen;
707 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
708 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
709 opv, info->target.flags | CEPH_OSD_FLAG_READ,
710 onack, NULL, NULL);
711 o->target = info->target;
712 o->should_resend = false;
713 _send_op_account(o);
714 o->tid = ++last_tid;
715 _session_op_assign(info->session, o);
716 _send_op(o);
717 info->ping_tid = o->tid;
718
719 onack->sent = now;
720 logger->inc(l_osdc_linger_ping);
721 }
722
723 void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
724 uint32_t register_gen)
725 {
726 LingerOp::unique_lock l(info->watch_lock);
727 ldout(cct, 10) << __func__ << " " << info->linger_id
728 << " sent " << sent << " gen " << register_gen << " = " << r
729 << " (last_error " << info->last_error
730 << " register_gen " << info->register_gen << ")" << dendl;
731 if (info->register_gen == register_gen) {
732 if (r == 0) {
733 info->watch_valid_thru = sent;
734 } else if (r < 0 && !info->last_error) {
735 r = _normalize_watch_error(r);
736 info->last_error = r;
737 if (info->watch_context) {
738 finisher->queue(new C_DoWatchError(this, info, r));
739 }
740 }
741 } else {
742 ldout(cct, 20) << " ignoring old gen" << dendl;
743 }
744 }
745
746 int Objecter::linger_check(LingerOp *info)
747 {
748 LingerOp::shared_lock l(info->watch_lock);
749
750 ceph::coarse_mono_time stamp = info->watch_valid_thru;
751 if (!info->watch_pending_async.empty())
752 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
753 auto age = ceph::coarse_mono_clock::now() - stamp;
754
755 ldout(cct, 10) << __func__ << " " << info->linger_id
756 << " err " << info->last_error
757 << " age " << age << dendl;
758 if (info->last_error)
759 return info->last_error;
760 // return a safe upper bound (we are truncating to ms)
761 return
762 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
763 }
764
765 void Objecter::linger_cancel(LingerOp *info)
766 {
767 unique_lock wl(rwlock);
768 _linger_cancel(info);
769 info->put();
770 }
771
772 void Objecter::_linger_cancel(LingerOp *info)
773 {
774 // rwlock is locked unique
775 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
776 if (!info->canceled) {
777 OSDSession *s = info->session;
778 OSDSession::unique_lock sl(s->lock);
779 _session_linger_op_remove(s, info);
780 sl.unlock();
781
782 linger_ops.erase(info->linger_id);
783 linger_ops_set.erase(info);
784 ceph_assert(linger_ops.size() == linger_ops_set.size());
785
786 info->canceled = true;
787 info->put();
788
789 logger->dec(l_osdc_linger_active);
790 }
791 }
792
793
794
795 Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
796 const object_locator_t& oloc,
797 int flags)
798 {
799 LingerOp *info = new LingerOp(this);
800 info->target.base_oid = oid;
801 info->target.base_oloc = oloc;
802 if (info->target.base_oloc.key == oid)
803 info->target.base_oloc.key.clear();
804 info->target.flags = flags;
805 info->watch_valid_thru = ceph::coarse_mono_clock::now();
806
807 unique_lock l(rwlock);
808
809 // Acquire linger ID
810 info->linger_id = ++max_linger_id;
811 ldout(cct, 10) << __func__ << " info " << info
812 << " linger_id " << info->linger_id
813 << " cookie " << info->get_cookie()
814 << dendl;
815 linger_ops[info->linger_id] = info;
816 linger_ops_set.insert(info);
817 ceph_assert(linger_ops.size() == linger_ops_set.size());
818
819 info->get(); // for the caller
820 return info;
821 }
822
823 ceph_tid_t Objecter::linger_watch(LingerOp *info,
824 ObjectOperation& op,
825 const SnapContext& snapc,
826 real_time mtime,
827 ceph::buffer::list& inbl,
828 Context *oncommit,
829 version_t *objver)
830 {
831 info->is_watch = true;
832 info->snapc = snapc;
833 info->mtime = mtime;
834 info->target.flags |= CEPH_OSD_FLAG_WRITE;
835 info->ops = op.ops;
836 info->inbl = inbl;
837 info->poutbl = NULL;
838 info->pobjver = objver;
839 info->on_reg_commit = oncommit;
840
841 info->ctx_budget = take_linger_budget(info);
842
843 shunique_lock sul(rwlock, ceph::acquire_unique);
844 _linger_submit(info, sul);
845 logger->inc(l_osdc_linger_active);
846
847 return info->linger_id;
848 }
849
850 ceph_tid_t Objecter::linger_notify(LingerOp *info,
851 ObjectOperation& op,
852 snapid_t snap, ceph::buffer::list& inbl,
853 ceph::buffer::list *poutbl,
854 Context *onfinish,
855 version_t *objver)
856 {
857 info->snap = snap;
858 info->target.flags |= CEPH_OSD_FLAG_READ;
859 info->ops = op.ops;
860 info->inbl = inbl;
861 info->poutbl = poutbl;
862 info->pobjver = objver;
863 info->on_reg_commit = onfinish;
864
865 info->ctx_budget = take_linger_budget(info);
866
867 shunique_lock sul(rwlock, ceph::acquire_unique);
868 _linger_submit(info, sul);
869 logger->inc(l_osdc_linger_active);
870
871 return info->linger_id;
872 }
873
874 void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
875 {
876 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
877 ceph_assert(info->linger_id);
878 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
879
880 // Populate Op::target
881 OSDSession *s = NULL;
882 _calc_target(&info->target, nullptr);
883
884 // Create LingerOp<->OSDSession relation
885 int r = _get_session(info->target.osd, &s, sul);
886 ceph_assert(r == 0);
887 OSDSession::unique_lock sl(s->lock);
888 _session_linger_op_assign(s, info);
889 sl.unlock();
890 put_session(s);
891
892 _send_linger(info, sul);
893 }
894
895 struct C_DoWatchNotify : public Context {
896 Objecter *objecter;
897 Objecter::LingerOp *info;
898 MWatchNotify *msg;
899 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
900 : objecter(o), info(i), msg(m) {
901 info->get();
902 info->_queued_async();
903 msg->get();
904 }
905 void finish(int r) override {
906 objecter->_do_watch_notify(info, msg);
907 }
908 };
909
910 void Objecter::handle_watch_notify(MWatchNotify *m)
911 {
912 shared_lock l(rwlock);
913 if (!initialized) {
914 return;
915 }
916
917 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
918 if (linger_ops_set.count(info) == 0) {
919 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
920 return;
921 }
922 LingerOp::unique_lock wl(info->watch_lock);
923 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
924 if (!info->last_error) {
925 info->last_error = -ENOTCONN;
926 if (info->watch_context) {
927 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
928 }
929 }
930 } else if (!info->is_watch) {
931 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
932 // since we know the only user (librados) is safe to call in
933 // fast-dispatch context
934 if (info->notify_id &&
935 info->notify_id != m->notify_id) {
936 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
937 << " != " << info->notify_id << ", ignoring" << dendl;
938 } else if (info->on_notify_finish) {
939 info->notify_result_bl->claim(m->get_data());
940 info->on_notify_finish->complete(m->return_code);
941
942 // if we race with reconnect we might get a second notify; only
943 // notify the caller once!
944 info->on_notify_finish = NULL;
945 }
946 } else {
947 finisher->queue(new C_DoWatchNotify(this, info, m));
948 }
949 }
950
951 void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
952 {
953 ldout(cct, 10) << __func__ << " " << *m << dendl;
954
955 shared_lock l(rwlock);
956 ceph_assert(initialized);
957
958 if (info->canceled) {
959 l.unlock();
960 goto out;
961 }
962
963 // notify completion?
964 ceph_assert(info->is_watch);
965 ceph_assert(info->watch_context);
966 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
967
968 l.unlock();
969
970 switch (m->opcode) {
971 case CEPH_WATCH_EVENT_NOTIFY:
972 info->watch_context->handle_notify(m->notify_id, m->cookie,
973 m->notifier_gid, m->bl);
974 break;
975 }
976
977 out:
978 info->finished_async();
979 info->put();
980 m->put();
981 }
982
983 bool Objecter::ms_dispatch(Message *m)
984 {
985 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
986 switch (m->get_type()) {
987 // these we exlusively handle
988 case CEPH_MSG_OSD_OPREPLY:
989 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
990 return true;
991
992 case CEPH_MSG_OSD_BACKOFF:
993 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
994 return true;
995
996 case CEPH_MSG_WATCH_NOTIFY:
997 handle_watch_notify(static_cast<MWatchNotify*>(m));
998 m->put();
999 return true;
1000
1001 case MSG_COMMAND_REPLY:
1002 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
1003 handle_command_reply(static_cast<MCommandReply*>(m));
1004 return true;
1005 } else {
1006 return false;
1007 }
1008
1009 case MSG_GETPOOLSTATSREPLY:
1010 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1011 return true;
1012
1013 case CEPH_MSG_POOLOP_REPLY:
1014 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1015 return true;
1016
1017 case CEPH_MSG_STATFS_REPLY:
1018 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1019 return true;
1020
1021 // these we give others a chance to inspect
1022
1023 // MDS, OSD
1024 case CEPH_MSG_OSD_MAP:
1025 handle_osd_map(static_cast<MOSDMap*>(m));
1026 return false;
1027 }
1028 return false;
1029 }
1030
1031 void Objecter::_scan_requests(
1032 OSDSession *s,
1033 bool skipped_map,
1034 bool cluster_full,
1035 map<int64_t, bool> *pool_full_map,
1036 map<ceph_tid_t, Op*>& need_resend,
1037 list<LingerOp*>& need_resend_linger,
1038 map<ceph_tid_t, CommandOp*>& need_resend_command,
1039 shunique_lock& sul)
1040 {
1041 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
1042
1043 list<LingerOp*> unregister_lingers;
1044
1045 OSDSession::unique_lock sl(s->lock);
1046
1047 // check for changed linger mappings (_before_ regular ops)
1048 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1049 while (lp != s->linger_ops.end()) {
1050 LingerOp *op = lp->second;
1051 ceph_assert(op->session == s);
1052 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1053 // invalidation
1054 ++lp;
1055 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1056 bool unregister, force_resend_writes = cluster_full;
1057 int r = _recalc_linger_op_target(op, sul);
1058 if (pool_full_map)
1059 force_resend_writes = force_resend_writes ||
1060 (*pool_full_map)[op->target.base_oloc.pool];
1061 switch (r) {
1062 case RECALC_OP_TARGET_NO_ACTION:
1063 if (!skipped_map && !force_resend_writes)
1064 break;
1065 // -- fall-thru --
1066 case RECALC_OP_TARGET_NEED_RESEND:
1067 need_resend_linger.push_back(op);
1068 _linger_cancel_map_check(op);
1069 break;
1070 case RECALC_OP_TARGET_POOL_DNE:
1071 _check_linger_pool_dne(op, &unregister);
1072 if (unregister) {
1073 ldout(cct, 10) << " need to unregister linger op "
1074 << op->linger_id << dendl;
1075 op->get();
1076 unregister_lingers.push_back(op);
1077 }
1078 break;
1079 }
1080 }
1081
1082 // check for changed request mappings
1083 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1084 while (p != s->ops.end()) {
1085 Op *op = p->second;
1086 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1087 ldout(cct, 10) << " checking op " << op->tid << dendl;
1088 _prune_snapc(osdmap->get_new_removed_snaps(), op);
1089 bool force_resend_writes = cluster_full;
1090 if (pool_full_map)
1091 force_resend_writes = force_resend_writes ||
1092 (*pool_full_map)[op->target.base_oloc.pool];
1093 int r = _calc_target(&op->target,
1094 op->session ? op->session->con.get() : nullptr);
1095 switch (r) {
1096 case RECALC_OP_TARGET_NO_ACTION:
1097 if (!skipped_map && !(force_resend_writes && op->respects_full()))
1098 break;
1099 // -- fall-thru --
1100 case RECALC_OP_TARGET_NEED_RESEND:
1101 _session_op_remove(op->session, op);
1102 need_resend[op->tid] = op;
1103 _op_cancel_map_check(op);
1104 break;
1105 case RECALC_OP_TARGET_POOL_DNE:
1106 _check_op_pool_dne(op, &sl);
1107 break;
1108 }
1109 }
1110
1111 // commands
1112 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1113 while (cp != s->command_ops.end()) {
1114 CommandOp *c = cp->second;
1115 ++cp;
1116 ldout(cct, 10) << " checking command " << c->tid << dendl;
1117 bool force_resend_writes = cluster_full;
1118 if (pool_full_map)
1119 force_resend_writes = force_resend_writes ||
1120 (*pool_full_map)[c->target_pg.pool()];
1121 int r = _calc_command_target(c, sul);
1122 switch (r) {
1123 case RECALC_OP_TARGET_NO_ACTION:
1124 // resend if skipped map; otherwise do nothing.
1125 if (!skipped_map && !force_resend_writes)
1126 break;
1127 // -- fall-thru --
1128 case RECALC_OP_TARGET_NEED_RESEND:
1129 need_resend_command[c->tid] = c;
1130 _session_command_op_remove(c->session, c);
1131 _command_cancel_map_check(c);
1132 break;
1133 case RECALC_OP_TARGET_POOL_DNE:
1134 case RECALC_OP_TARGET_OSD_DNE:
1135 case RECALC_OP_TARGET_OSD_DOWN:
1136 _check_command_map_dne(c);
1137 break;
1138 }
1139 }
1140
1141 sl.unlock();
1142
1143 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1144 iter != unregister_lingers.end();
1145 ++iter) {
1146 _linger_cancel(*iter);
1147 (*iter)->put();
1148 }
1149 }
1150
1151 void Objecter::handle_osd_map(MOSDMap *m)
1152 {
1153 shunique_lock sul(rwlock, acquire_unique);
1154 if (!initialized)
1155 return;
1156
1157 ceph_assert(osdmap);
1158
1159 if (m->fsid != monc->get_fsid()) {
1160 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1161 << " != " << monc->get_fsid() << dendl;
1162 return;
1163 }
1164
1165 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1166 bool cluster_full = _osdmap_full_flag();
1167 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1168 _osdmap_has_pool_full();
1169 map<int64_t, bool> pool_full_map;
1170 for (map<int64_t, pg_pool_t>::const_iterator it
1171 = osdmap->get_pools().begin();
1172 it != osdmap->get_pools().end(); ++it)
1173 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1174
1175
1176 list<LingerOp*> need_resend_linger;
1177 map<ceph_tid_t, Op*> need_resend;
1178 map<ceph_tid_t, CommandOp*> need_resend_command;
1179
1180 if (m->get_last() <= osdmap->get_epoch()) {
1181 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1182 << m->get_first() << "," << m->get_last()
1183 << "] <= " << osdmap->get_epoch() << dendl;
1184 } else {
1185 ldout(cct, 3) << "handle_osd_map got epochs ["
1186 << m->get_first() << "," << m->get_last()
1187 << "] > " << osdmap->get_epoch() << dendl;
1188
1189 if (osdmap->get_epoch()) {
1190 bool skipped_map = false;
1191 // we want incrementals
1192 for (epoch_t e = osdmap->get_epoch() + 1;
1193 e <= m->get_last();
1194 e++) {
1195
1196 if (osdmap->get_epoch() == e-1 &&
1197 m->incremental_maps.count(e)) {
1198 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1199 << dendl;
1200 OSDMap::Incremental inc(m->incremental_maps[e]);
1201 osdmap->apply_incremental(inc);
1202
1203 emit_blacklist_events(inc);
1204
1205 logger->inc(l_osdc_map_inc);
1206 }
1207 else if (m->maps.count(e)) {
1208 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
1209 auto new_osdmap = std::make_unique<OSDMap>();
1210 new_osdmap->decode(m->maps[e]);
1211
1212 emit_blacklist_events(*osdmap, *new_osdmap);
1213 osdmap = std::move(new_osdmap);
1214
1215 logger->inc(l_osdc_map_full);
1216 }
1217 else {
1218 if (e >= m->get_oldest()) {
1219 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1220 << osdmap->get_epoch()+1 << dendl;
1221 _maybe_request_map();
1222 break;
1223 }
1224 ldout(cct, 3) << "handle_osd_map missing epoch "
1225 << osdmap->get_epoch()+1
1226 << ", jumping to " << m->get_oldest() << dendl;
1227 e = m->get_oldest() - 1;
1228 skipped_map = true;
1229 continue;
1230 }
1231 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1232
1233 prune_pg_mapping(osdmap->get_pools());
1234 cluster_full = cluster_full || _osdmap_full_flag();
1235 update_pool_full_map(pool_full_map);
1236
1237 // check all outstanding requests on every epoch
1238 for (auto& i : need_resend) {
1239 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
1240 }
1241 _scan_requests(homeless_session, skipped_map, cluster_full,
1242 &pool_full_map, need_resend,
1243 need_resend_linger, need_resend_command, sul);
1244 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1245 p != osd_sessions.end(); ) {
1246 OSDSession *s = p->second;
1247 _scan_requests(s, skipped_map, cluster_full,
1248 &pool_full_map, need_resend,
1249 need_resend_linger, need_resend_command, sul);
1250 ++p;
1251 // osd down or addr change?
1252 if (!osdmap->is_up(s->osd) ||
1253 (s->con &&
1254 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
1255 close_session(s);
1256 }
1257 }
1258
1259 ceph_assert(e == osdmap->get_epoch());
1260 }
1261
1262 } else {
1263 // first map. we want the full thing.
1264 if (m->maps.count(m->get_last())) {
1265 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1266 p != osd_sessions.end(); ++p) {
1267 OSDSession *s = p->second;
1268 _scan_requests(s, false, false, NULL, need_resend,
1269 need_resend_linger, need_resend_command, sul);
1270 }
1271 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1272 << m->get_last() << dendl;
1273 osdmap->decode(m->maps[m->get_last()]);
1274 prune_pg_mapping(osdmap->get_pools());
1275
1276 _scan_requests(homeless_session, false, false, NULL,
1277 need_resend, need_resend_linger,
1278 need_resend_command, sul);
1279 } else {
1280 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1281 << dendl;
1282 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1283 monc->renew_subs();
1284 }
1285 }
1286 }
1287
1288 // make sure need_resend targets reflect latest map
1289 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1290 Op *op = p->second;
1291 if (op->target.epoch < osdmap->get_epoch()) {
1292 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1293 int r = _calc_target(&op->target, nullptr);
1294 if (r == RECALC_OP_TARGET_POOL_DNE) {
1295 p = need_resend.erase(p);
1296 _check_op_pool_dne(op, nullptr);
1297 } else {
1298 ++p;
1299 }
1300 } else {
1301 ++p;
1302 }
1303 }
1304
1305 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1306 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1307 || _osdmap_has_pool_full();
1308
1309 // was/is paused?
1310 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1311 osdmap->get_epoch() < epoch_barrier) {
1312 _maybe_request_map();
1313 }
1314
1315 // resend requests
1316 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1317 p != need_resend.end(); ++p) {
1318 Op *op = p->second;
1319 OSDSession *s = op->session;
1320 bool mapped_session = false;
1321 if (!s) {
1322 int r = _map_session(&op->target, &s, sul);
1323 ceph_assert(r == 0);
1324 mapped_session = true;
1325 } else {
1326 get_session(s);
1327 }
1328 OSDSession::unique_lock sl(s->lock);
1329 if (mapped_session) {
1330 _session_op_assign(s, op);
1331 }
1332 if (op->should_resend) {
1333 if (!op->session->is_homeless() && !op->target.paused) {
1334 logger->inc(l_osdc_op_resend);
1335 _send_op(op);
1336 }
1337 } else {
1338 _op_cancel_map_check(op);
1339 _cancel_linger_op(op);
1340 }
1341 sl.unlock();
1342 put_session(s);
1343 }
1344 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1345 p != need_resend_linger.end(); ++p) {
1346 LingerOp *op = *p;
1347 ceph_assert(op->session);
1348 if (!op->session->is_homeless()) {
1349 logger->inc(l_osdc_linger_resend);
1350 _send_linger(op, sul);
1351 }
1352 }
1353 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1354 p != need_resend_command.end(); ++p) {
1355 CommandOp *c = p->second;
1356 if (c->target.osd >= 0) {
1357 _assign_command_session(c, sul);
1358 if (c->session && !c->session->is_homeless()) {
1359 _send_command(c);
1360 }
1361 }
1362 }
1363
1364 _dump_active();
1365
1366 // finish any Contexts that were waiting on a map update
1367 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1368 waiting_for_map.begin();
1369 while (p != waiting_for_map.end() &&
1370 p->first <= osdmap->get_epoch()) {
1371 //go through the list and call the onfinish methods
1372 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1373 i != p->second.end(); ++i) {
1374 i->first->complete(i->second);
1375 }
1376 waiting_for_map.erase(p++);
1377 }
1378
1379 monc->sub_got("osdmap", osdmap->get_epoch());
1380
1381 if (!waiting_for_map.empty()) {
1382 _maybe_request_map();
1383 }
1384 }
1385
1386 void Objecter::enable_blacklist_events()
1387 {
1388 unique_lock wl(rwlock);
1389
1390 blacklist_events_enabled = true;
1391 }
1392
1393 void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1394 {
1395 unique_lock wl(rwlock);
1396
1397 if (events->empty()) {
1398 events->swap(blacklist_events);
1399 } else {
1400 for (const auto &i : blacklist_events) {
1401 events->insert(i);
1402 }
1403 blacklist_events.clear();
1404 }
1405 }
1406
1407 void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1408 {
1409 if (!blacklist_events_enabled) {
1410 return;
1411 }
1412
1413 for (const auto &i : inc.new_blacklist) {
1414 blacklist_events.insert(i.first);
1415 }
1416 }
1417
1418 void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1419 const OSDMap &new_osd_map)
1420 {
1421 if (!blacklist_events_enabled) {
1422 return;
1423 }
1424
1425 std::set<entity_addr_t> old_set;
1426 std::set<entity_addr_t> new_set;
1427
1428 old_osd_map.get_blacklist(&old_set);
1429 new_osd_map.get_blacklist(&new_set);
1430
1431 std::set<entity_addr_t> delta_set;
1432 std::set_difference(
1433 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1434 std::inserter(delta_set, delta_set.begin()));
1435 blacklist_events.insert(delta_set.begin(), delta_set.end());
1436 }
1437
1438 // op pool check
1439
1440 void Objecter::C_Op_Map_Latest::finish(int r)
1441 {
1442 if (r == -EAGAIN || r == -ECANCELED)
1443 return;
1444
1445 lgeneric_subdout(objecter->cct, objecter, 10)
1446 << "op_map_latest r=" << r << " tid=" << tid
1447 << " latest " << latest << dendl;
1448
1449 Objecter::unique_lock wl(objecter->rwlock);
1450
1451 map<ceph_tid_t, Op*>::iterator iter =
1452 objecter->check_latest_map_ops.find(tid);
1453 if (iter == objecter->check_latest_map_ops.end()) {
1454 lgeneric_subdout(objecter->cct, objecter, 10)
1455 << "op_map_latest op "<< tid << " not found" << dendl;
1456 return;
1457 }
1458
1459 Op *op = iter->second;
1460 objecter->check_latest_map_ops.erase(iter);
1461
1462 lgeneric_subdout(objecter->cct, objecter, 20)
1463 << "op_map_latest op "<< op << dendl;
1464
1465 if (op->map_dne_bound == 0)
1466 op->map_dne_bound = latest;
1467
1468 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1469 objecter->_check_op_pool_dne(op, &sl);
1470
1471 op->put();
1472 }
1473
1474 int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1475 snapid_t *snap) const
1476 {
1477 shared_lock rl(rwlock);
1478
1479 auto& pools = osdmap->get_pools();
1480 auto iter = pools.find(poolid);
1481 if (iter == pools.end()) {
1482 return -ENOENT;
1483 }
1484 const pg_pool_t& pg_pool = iter->second;
1485 for (auto p = pg_pool.snaps.begin();
1486 p != pg_pool.snaps.end();
1487 ++p) {
1488 if (p->second.name == snap_name) {
1489 *snap = p->first;
1490 return 0;
1491 }
1492 }
1493 return -ENOENT;
1494 }
1495
1496 int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1497 pool_snap_info_t *info) const
1498 {
1499 shared_lock rl(rwlock);
1500
1501 auto& pools = osdmap->get_pools();
1502 auto iter = pools.find(poolid);
1503 if (iter == pools.end()) {
1504 return -ENOENT;
1505 }
1506 const pg_pool_t& pg_pool = iter->second;
1507 auto p = pg_pool.snaps.find(snap);
1508 if (p == pg_pool.snaps.end())
1509 return -ENOENT;
1510 *info = p->second;
1511
1512 return 0;
1513 }
1514
1515 int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1516 {
1517 shared_lock rl(rwlock);
1518
1519 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1520 if (!pi)
1521 return -ENOENT;
1522 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1523 p != pi->snaps.end();
1524 ++p) {
1525 snaps->push_back(p->first);
1526 }
1527 return 0;
1528 }
1529
1530 // sl may be unlocked.
1531 void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1532 {
1533 // rwlock is locked unique
1534
1535 if (op->target.pool_ever_existed) {
1536 // the pool previously existed and now it does not, which means it
1537 // was deleted.
1538 op->map_dne_bound = osdmap->get_epoch();
1539 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1540 << " pool previously exists but now does not"
1541 << dendl;
1542 } else {
1543 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1544 << " current " << osdmap->get_epoch()
1545 << " map_dne_bound " << op->map_dne_bound
1546 << dendl;
1547 }
1548 if (op->map_dne_bound > 0) {
1549 if (osdmap->get_epoch() >= op->map_dne_bound) {
1550 // we had a new enough map
1551 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1552 << " concluding pool " << op->target.base_pgid.pool()
1553 << " dne" << dendl;
1554 if (op->onfinish) {
1555 num_in_flight--;
1556 op->onfinish->complete(-ENOENT);
1557 }
1558
1559 OSDSession *s = op->session;
1560 if (s) {
1561 ceph_assert(s != NULL);
1562 ceph_assert(sl->mutex() == &s->lock);
1563 bool session_locked = sl->owns_lock();
1564 if (!session_locked) {
1565 sl->lock();
1566 }
1567 _finish_op(op, 0);
1568 if (!session_locked) {
1569 sl->unlock();
1570 }
1571 } else {
1572 _finish_op(op, 0); // no session
1573 }
1574 }
1575 } else {
1576 _send_op_map_check(op);
1577 }
1578 }
1579
1580 void Objecter::_send_op_map_check(Op *op)
1581 {
1582 // rwlock is locked unique
1583 // ask the monitor
1584 if (check_latest_map_ops.count(op->tid) == 0) {
1585 op->get();
1586 check_latest_map_ops[op->tid] = op;
1587 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1588 monc->get_version("osdmap", &c->latest, NULL, c);
1589 }
1590 }
1591
1592 void Objecter::_op_cancel_map_check(Op *op)
1593 {
1594 // rwlock is locked unique
1595 map<ceph_tid_t, Op*>::iterator iter =
1596 check_latest_map_ops.find(op->tid);
1597 if (iter != check_latest_map_ops.end()) {
1598 Op *op = iter->second;
1599 op->put();
1600 check_latest_map_ops.erase(iter);
1601 }
1602 }
1603
1604 // linger pool check
1605
1606 void Objecter::C_Linger_Map_Latest::finish(int r)
1607 {
1608 if (r == -EAGAIN || r == -ECANCELED) {
1609 // ignore callback; we will retry in resend_mon_ops()
1610 return;
1611 }
1612
1613 unique_lock wl(objecter->rwlock);
1614
1615 map<uint64_t, LingerOp*>::iterator iter =
1616 objecter->check_latest_map_lingers.find(linger_id);
1617 if (iter == objecter->check_latest_map_lingers.end()) {
1618 return;
1619 }
1620
1621 LingerOp *op = iter->second;
1622 objecter->check_latest_map_lingers.erase(iter);
1623
1624 if (op->map_dne_bound == 0)
1625 op->map_dne_bound = latest;
1626
1627 bool unregister;
1628 objecter->_check_linger_pool_dne(op, &unregister);
1629
1630 if (unregister) {
1631 objecter->_linger_cancel(op);
1632 }
1633
1634 op->put();
1635 }
1636
1637 void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1638 {
1639 // rwlock is locked unique
1640
1641 *need_unregister = false;
1642
1643 if (op->register_gen > 0) {
1644 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1645 << " pool previously existed but now does not"
1646 << dendl;
1647 op->map_dne_bound = osdmap->get_epoch();
1648 } else {
1649 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1650 << " current " << osdmap->get_epoch()
1651 << " map_dne_bound " << op->map_dne_bound
1652 << dendl;
1653 }
1654 if (op->map_dne_bound > 0) {
1655 if (osdmap->get_epoch() >= op->map_dne_bound) {
1656 LingerOp::unique_lock wl{op->watch_lock};
1657 if (op->on_reg_commit) {
1658 op->on_reg_commit->complete(-ENOENT);
1659 op->on_reg_commit = nullptr;
1660 }
1661 if (op->on_notify_finish) {
1662 op->on_notify_finish->complete(-ENOENT);
1663 op->on_notify_finish = nullptr;
1664 }
1665 *need_unregister = true;
1666 }
1667 } else {
1668 _send_linger_map_check(op);
1669 }
1670 }
1671
1672 void Objecter::_send_linger_map_check(LingerOp *op)
1673 {
1674 // ask the monitor
1675 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1676 op->get();
1677 check_latest_map_lingers[op->linger_id] = op;
1678 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1679 monc->get_version("osdmap", &c->latest, NULL, c);
1680 }
1681 }
1682
1683 void Objecter::_linger_cancel_map_check(LingerOp *op)
1684 {
1685 // rwlock is locked unique
1686
1687 map<uint64_t, LingerOp*>::iterator iter =
1688 check_latest_map_lingers.find(op->linger_id);
1689 if (iter != check_latest_map_lingers.end()) {
1690 LingerOp *op = iter->second;
1691 op->put();
1692 check_latest_map_lingers.erase(iter);
1693 }
1694 }
1695
1696 // command pool check
1697
1698 void Objecter::C_Command_Map_Latest::finish(int r)
1699 {
1700 if (r == -EAGAIN || r == -ECANCELED) {
1701 // ignore callback; we will retry in resend_mon_ops()
1702 return;
1703 }
1704
1705 unique_lock wl(objecter->rwlock);
1706
1707 map<uint64_t, CommandOp*>::iterator iter =
1708 objecter->check_latest_map_commands.find(tid);
1709 if (iter == objecter->check_latest_map_commands.end()) {
1710 return;
1711 }
1712
1713 CommandOp *c = iter->second;
1714 objecter->check_latest_map_commands.erase(iter);
1715
1716 if (c->map_dne_bound == 0)
1717 c->map_dne_bound = latest;
1718
1719 OSDSession::unique_lock sul(c->session->lock);
1720 objecter->_check_command_map_dne(c);
1721 sul.unlock();
1722
1723 c->put();
1724 }
1725
1726 void Objecter::_check_command_map_dne(CommandOp *c)
1727 {
1728 // rwlock is locked unique
1729 // session is locked unique
1730
1731 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1732 << " current " << osdmap->get_epoch()
1733 << " map_dne_bound " << c->map_dne_bound
1734 << dendl;
1735 if (c->map_dne_bound > 0) {
1736 if (osdmap->get_epoch() >= c->map_dne_bound) {
1737 _finish_command(c, c->map_check_error, c->map_check_error_str);
1738 }
1739 } else {
1740 _send_command_map_check(c);
1741 }
1742 }
1743
1744 void Objecter::_send_command_map_check(CommandOp *c)
1745 {
1746 // rwlock is locked unique
1747 // session is locked unique
1748
1749 // ask the monitor
1750 if (check_latest_map_commands.count(c->tid) == 0) {
1751 c->get();
1752 check_latest_map_commands[c->tid] = c;
1753 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1754 monc->get_version("osdmap", &f->latest, NULL, f);
1755 }
1756 }
1757
1758 void Objecter::_command_cancel_map_check(CommandOp *c)
1759 {
1760 // rwlock is locked uniqe
1761
1762 map<uint64_t, CommandOp*>::iterator iter =
1763 check_latest_map_commands.find(c->tid);
1764 if (iter != check_latest_map_commands.end()) {
1765 CommandOp *c = iter->second;
1766 c->put();
1767 check_latest_map_commands.erase(iter);
1768 }
1769 }
1770
1771
1772 /**
1773 * Look up OSDSession by OSD id.
1774 *
1775 * @returns 0 on success, or -EAGAIN if the lock context requires
1776 * promotion to write.
1777 */
1778 int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1779 {
1780 ceph_assert(sul && sul.mutex() == &rwlock);
1781
1782 if (osd < 0) {
1783 *session = homeless_session;
1784 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1785 << dendl;
1786 return 0;
1787 }
1788
1789 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1790 if (p != osd_sessions.end()) {
1791 OSDSession *s = p->second;
1792 s->get();
1793 *session = s;
1794 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1795 << s->get_nref() << dendl;
1796 return 0;
1797 }
1798 if (!sul.owns_lock()) {
1799 return -EAGAIN;
1800 }
1801 OSDSession *s = new OSDSession(cct, osd);
1802 osd_sessions[osd] = s;
1803 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1804 s->con->set_priv(RefCountedPtr{s});
1805 logger->inc(l_osdc_osd_session_open);
1806 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1807 s->get();
1808 *session = s;
1809 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1810 << s->get_nref() << dendl;
1811 return 0;
1812 }
1813
1814 void Objecter::put_session(Objecter::OSDSession *s)
1815 {
1816 if (s && !s->is_homeless()) {
1817 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1818 << s->get_nref() << dendl;
1819 s->put();
1820 }
1821 }
1822
1823 void Objecter::get_session(Objecter::OSDSession *s)
1824 {
1825 ceph_assert(s != NULL);
1826
1827 if (!s->is_homeless()) {
1828 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1829 << s->get_nref() << dendl;
1830 s->get();
1831 }
1832 }
1833
1834 void Objecter::_reopen_session(OSDSession *s)
1835 {
1836 // rwlock is locked unique
1837 // s->lock is locked
1838
1839 auto addrs = osdmap->get_addrs(s->osd);
1840 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1841 << addrs << dendl;
1842 if (s->con) {
1843 s->con->set_priv(NULL);
1844 s->con->mark_down();
1845 logger->inc(l_osdc_osd_session_close);
1846 }
1847 s->con = messenger->connect_to_osd(addrs);
1848 s->con->set_priv(RefCountedPtr{s});
1849 s->incarnation++;
1850 logger->inc(l_osdc_osd_session_open);
1851 }
1852
1853 void Objecter::close_session(OSDSession *s)
1854 {
1855 // rwlock is locked unique
1856
1857 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1858 if (s->con) {
1859 s->con->set_priv(NULL);
1860 s->con->mark_down();
1861 logger->inc(l_osdc_osd_session_close);
1862 }
1863 OSDSession::unique_lock sl(s->lock);
1864
1865 std::list<LingerOp*> homeless_lingers;
1866 std::list<CommandOp*> homeless_commands;
1867 std::list<Op*> homeless_ops;
1868
1869 while (!s->linger_ops.empty()) {
1870 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1871 ldout(cct, 10) << " linger_op " << i->first << dendl;
1872 homeless_lingers.push_back(i->second);
1873 _session_linger_op_remove(s, i->second);
1874 }
1875
1876 while (!s->ops.empty()) {
1877 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1878 ldout(cct, 10) << " op " << i->first << dendl;
1879 homeless_ops.push_back(i->second);
1880 _session_op_remove(s, i->second);
1881 }
1882
1883 while (!s->command_ops.empty()) {
1884 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1885 ldout(cct, 10) << " command_op " << i->first << dendl;
1886 homeless_commands.push_back(i->second);
1887 _session_command_op_remove(s, i->second);
1888 }
1889
1890 osd_sessions.erase(s->osd);
1891 sl.unlock();
1892 put_session(s);
1893
1894 // Assign any leftover ops to the homeless session
1895 {
1896 OSDSession::unique_lock hsl(homeless_session->lock);
1897 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1898 i != homeless_lingers.end(); ++i) {
1899 _session_linger_op_assign(homeless_session, *i);
1900 }
1901 for (std::list<Op*>::iterator i = homeless_ops.begin();
1902 i != homeless_ops.end(); ++i) {
1903 _session_op_assign(homeless_session, *i);
1904 }
1905 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1906 i != homeless_commands.end(); ++i) {
1907 _session_command_op_assign(homeless_session, *i);
1908 }
1909 }
1910
1911 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1912 }
1913
1914 void Objecter::wait_for_osd_map()
1915 {
1916 unique_lock l(rwlock);
1917 if (osdmap->get_epoch()) {
1918 l.unlock();
1919 return;
1920 }
1921
1922 // Leave this since it goes with C_SafeCond
1923 ceph::mutex lock = ceph::make_mutex("");
1924 ceph::condition_variable cond;
1925 bool done;
1926 std::unique_lock mlock{lock};
1927 C_SafeCond *context = new C_SafeCond(lock, cond, &done, NULL);
1928 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1929 l.unlock();
1930 cond.wait(mlock, [&done] { return done; });
1931 }
1932
1933 struct C_Objecter_GetVersion : public Context {
1934 Objecter *objecter;
1935 uint64_t oldest, newest;
1936 Context *fin;
1937 C_Objecter_GetVersion(Objecter *o, Context *c)
1938 : objecter(o), oldest(0), newest(0), fin(c) {}
1939 void finish(int r) override {
1940 if (r >= 0) {
1941 objecter->get_latest_version(oldest, newest, fin);
1942 } else if (r == -EAGAIN) { // try again as instructed
1943 objecter->wait_for_latest_osdmap(fin);
1944 } else {
1945 // it doesn't return any other error codes!
1946 ceph_abort();
1947 }
1948 }
1949 };
1950
1951 void Objecter::wait_for_latest_osdmap(Context *fin)
1952 {
1953 ldout(cct, 10) << __func__ << dendl;
1954 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1955 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1956 }
1957
1958 void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1959 {
1960 unique_lock wl(rwlock);
1961 if (osdmap->get_epoch() >= newest) {
1962 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1963 wl.unlock();
1964 if (fin)
1965 fin->complete(0);
1966 return;
1967 }
1968
1969 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1970 _wait_for_new_map(fin, newest, 0);
1971 }
1972
1973 void Objecter::maybe_request_map()
1974 {
1975 shared_lock rl(rwlock);
1976 _maybe_request_map();
1977 }
1978
1979 void Objecter::_maybe_request_map()
1980 {
1981 // rwlock is locked
1982 int flag = 0;
1983 if (_osdmap_full_flag()
1984 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1985 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1986 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1987 "osd map (FULL flag is set)" << dendl;
1988 } else {
1989 ldout(cct, 10)
1990 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1991 flag = CEPH_SUBSCRIBE_ONETIME;
1992 }
1993 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1994 if (monc->sub_want("osdmap", epoch, flag)) {
1995 monc->renew_subs();
1996 }
1997 }
1998
1999 void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
2000 {
2001 // rwlock is locked unique
2002 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2003 _maybe_request_map();
2004 }
2005
2006
2007 /**
2008 * Use this together with wait_for_map: this is a pre-check to avoid
2009 * allocating a Context for wait_for_map if we can see that we
2010 * definitely already have the epoch.
2011 *
2012 * This does *not* replace the need to handle the return value of
2013 * wait_for_map: just because we don't have it in this pre-check
2014 * doesn't mean we won't have it when calling back into wait_for_map,
2015 * since the objecter lock is dropped in between.
2016 */
2017 bool Objecter::have_map(const epoch_t epoch)
2018 {
2019 shared_lock rl(rwlock);
2020 if (osdmap->get_epoch() >= epoch) {
2021 return true;
2022 } else {
2023 return false;
2024 }
2025 }
2026
2027 bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2028 {
2029 unique_lock wl(rwlock);
2030 if (osdmap->get_epoch() >= epoch) {
2031 return true;
2032 }
2033 _wait_for_new_map(c, epoch, err);
2034 return false;
2035 }
2036
2037 void Objecter::_kick_requests(OSDSession *session,
2038 map<uint64_t, LingerOp *>& lresend)
2039 {
2040 // rwlock is locked unique
2041
2042 // clear backoffs
2043 session->backoffs.clear();
2044 session->backoffs_by_id.clear();
2045
2046 // resend ops
2047 map<ceph_tid_t,Op*> resend; // resend in tid order
2048 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2049 p != session->ops.end();) {
2050 Op *op = p->second;
2051 ++p;
2052 if (op->should_resend) {
2053 if (!op->target.paused)
2054 resend[op->tid] = op;
2055 } else {
2056 _op_cancel_map_check(op);
2057 _cancel_linger_op(op);
2058 }
2059 }
2060
2061 logger->inc(l_osdc_op_resend, resend.size());
2062 while (!resend.empty()) {
2063 _send_op(resend.begin()->second);
2064 resend.erase(resend.begin());
2065 }
2066
2067 // resend lingers
2068 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
2069 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2070 j != session->linger_ops.end(); ++j) {
2071 LingerOp *op = j->second;
2072 op->get();
2073 ceph_assert(lresend.count(j->first) == 0);
2074 lresend[j->first] = op;
2075 }
2076
2077 // resend commands
2078 logger->inc(l_osdc_command_resend, session->command_ops.size());
2079 map<uint64_t,CommandOp*> cresend; // resend in order
2080 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2081 k != session->command_ops.end(); ++k) {
2082 cresend[k->first] = k->second;
2083 }
2084 while (!cresend.empty()) {
2085 _send_command(cresend.begin()->second);
2086 cresend.erase(cresend.begin());
2087 }
2088 }
2089
2090 void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2091 unique_lock& ul)
2092 {
2093 ceph_assert(ul.owns_lock());
2094 shunique_lock sul(std::move(ul));
2095 while (!lresend.empty()) {
2096 LingerOp *op = lresend.begin()->second;
2097 if (!op->canceled) {
2098 _send_linger(op, sul);
2099 }
2100 op->put();
2101 lresend.erase(lresend.begin());
2102 }
2103 ul = sul.release_to_unique();
2104 }
2105
2106 void Objecter::start_tick()
2107 {
2108 ceph_assert(tick_event == 0);
2109 tick_event =
2110 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2111 &Objecter::tick, this);
2112 }
2113
2114 void Objecter::tick()
2115 {
2116 shared_lock rl(rwlock);
2117
2118 ldout(cct, 10) << "tick" << dendl;
2119
2120 // we are only called by C_Tick
2121 tick_event = 0;
2122
2123 if (!initialized) {
2124 // we raced with shutdown
2125 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2126 return;
2127 }
2128
2129 set<OSDSession*> toping;
2130
2131
2132 // look for laggy requests
2133 auto cutoff = ceph::coarse_mono_clock::now();
2134 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2135
2136 unsigned laggy_ops = 0;
2137
2138 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2139 siter != osd_sessions.end(); ++siter) {
2140 OSDSession *s = siter->second;
2141 OSDSession::lock_guard l(s->lock);
2142 bool found = false;
2143 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2144 p != s->ops.end();
2145 ++p) {
2146 Op *op = p->second;
2147 ceph_assert(op->session);
2148 if (op->stamp < cutoff) {
2149 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2150 << " is laggy" << dendl;
2151 found = true;
2152 ++laggy_ops;
2153 }
2154 }
2155 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2156 p != s->linger_ops.end();
2157 ++p) {
2158 LingerOp *op = p->second;
2159 LingerOp::unique_lock wl(op->watch_lock);
2160 ceph_assert(op->session);
2161 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2162 << " (osd." << op->session->osd << ")" << dendl;
2163 found = true;
2164 if (op->is_watch && op->registered && !op->last_error)
2165 _send_linger_ping(op);
2166 }
2167 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2168 p != s->command_ops.end();
2169 ++p) {
2170 CommandOp *op = p->second;
2171 ceph_assert(op->session);
2172 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2173 << " (osd." << op->session->osd << ")" << dendl;
2174 found = true;
2175 }
2176 if (found)
2177 toping.insert(s);
2178 }
2179 if (num_homeless_ops || !toping.empty()) {
2180 _maybe_request_map();
2181 }
2182
2183 logger->set(l_osdc_op_laggy, laggy_ops);
2184 logger->set(l_osdc_osd_laggy, toping.size());
2185
2186 if (!toping.empty()) {
2187 // send a ping to these osds, to ensure we detect any session resets
2188 // (osd reply message policy is lossy)
2189 for (set<OSDSession*>::const_iterator i = toping.begin();
2190 i != toping.end();
2191 ++i) {
2192 (*i)->con->send_message(new MPing);
2193 }
2194 }
2195
2196 // Make sure we don't reschedule if we wake up after shutdown
2197 if (initialized) {
2198 tick_event = timer.reschedule_me(ceph::make_timespan(
2199 cct->_conf->objecter_tick_interval));
2200 }
2201 }
2202
2203 void Objecter::resend_mon_ops()
2204 {
2205 unique_lock wl(rwlock);
2206
2207 ldout(cct, 10) << "resend_mon_ops" << dendl;
2208
2209 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2210 p != poolstat_ops.end();
2211 ++p) {
2212 _poolstat_submit(p->second);
2213 logger->inc(l_osdc_poolstat_resend);
2214 }
2215
2216 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2217 p != statfs_ops.end();
2218 ++p) {
2219 _fs_stats_submit(p->second);
2220 logger->inc(l_osdc_statfs_resend);
2221 }
2222
2223 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2224 p != pool_ops.end();
2225 ++p) {
2226 _pool_op_submit(p->second);
2227 logger->inc(l_osdc_poolop_resend);
2228 }
2229
2230 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2231 p != check_latest_map_ops.end();
2232 ++p) {
2233 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2234 monc->get_version("osdmap", &c->latest, NULL, c);
2235 }
2236
2237 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2238 p != check_latest_map_lingers.end();
2239 ++p) {
2240 C_Linger_Map_Latest *c
2241 = new C_Linger_Map_Latest(this, p->second->linger_id);
2242 monc->get_version("osdmap", &c->latest, NULL, c);
2243 }
2244
2245 for (map<uint64_t, CommandOp*>::iterator p
2246 = check_latest_map_commands.begin();
2247 p != check_latest_map_commands.end();
2248 ++p) {
2249 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2250 monc->get_version("osdmap", &c->latest, NULL, c);
2251 }
2252 }
2253
2254 // read | write ---------------------------
2255
2256 void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2257 {
2258 shunique_lock rl(rwlock, ceph::acquire_shared);
2259 ceph_tid_t tid = 0;
2260 if (!ptid)
2261 ptid = &tid;
2262 op->trace.event("op submit");
2263 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2264 }
2265
2266 void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2267 ceph_tid_t *ptid,
2268 int *ctx_budget)
2269 {
2270 ceph_assert(initialized);
2271
2272 ceph_assert(op->ops.size() == op->out_bl.size());
2273 ceph_assert(op->ops.size() == op->out_rval.size());
2274 ceph_assert(op->ops.size() == op->out_handler.size());
2275
2276 // throttle. before we look at any state, because
2277 // _take_op_budget() may drop our lock while it blocks.
2278 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2279 int op_budget = _take_op_budget(op, sul);
2280 // take and pass out the budget for the first OP
2281 // in the context session
2282 if (ctx_budget && (*ctx_budget == -1)) {
2283 *ctx_budget = op_budget;
2284 }
2285 }
2286
2287 if (osd_timeout > timespan(0)) {
2288 if (op->tid == 0)
2289 op->tid = ++last_tid;
2290 auto tid = op->tid;
2291 op->ontimeout = timer.add_event(osd_timeout,
2292 [this, tid]() {
2293 op_cancel(tid, -ETIMEDOUT); });
2294 }
2295
2296 _op_submit(op, sul, ptid);
2297 }
2298
2299 void Objecter::_send_op_account(Op *op)
2300 {
2301 inflight_ops++;
2302
2303 // add to gather set(s)
2304 if (op->onfinish) {
2305 num_in_flight++;
2306 } else {
2307 ldout(cct, 20) << " note: not requesting reply" << dendl;
2308 }
2309
2310 logger->inc(l_osdc_op_active);
2311 logger->inc(l_osdc_op);
2312
2313 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2314 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2315 logger->inc(l_osdc_op_rmw);
2316 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2317 logger->inc(l_osdc_op_w);
2318 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2319 logger->inc(l_osdc_op_r);
2320
2321 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2322 logger->inc(l_osdc_op_pg);
2323
2324 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2325 int code = l_osdc_osdop_other;
2326 switch (p->op.op) {
2327 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2328 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2329 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2330 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2331 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2332 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2333 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2334 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2335 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2336 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2337 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2338 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2339 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2340 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2341 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2342 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2343 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2344
2345 // OMAP read operations
2346 case CEPH_OSD_OP_OMAPGETVALS:
2347 case CEPH_OSD_OP_OMAPGETKEYS:
2348 case CEPH_OSD_OP_OMAPGETHEADER:
2349 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2350 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2351
2352 // OMAP write operations
2353 case CEPH_OSD_OP_OMAPSETVALS:
2354 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2355
2356 // OMAP del operations
2357 case CEPH_OSD_OP_OMAPCLEAR:
2358 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2359
2360 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2361 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2362 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2363 }
2364 if (code)
2365 logger->inc(code);
2366 }
2367 }
2368
2369 void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2370 {
2371 // rwlock is locked
2372
2373 ldout(cct, 10) << __func__ << " op " << op << dendl;
2374
2375 // pick target
2376 ceph_assert(op->session == NULL);
2377 OSDSession *s = NULL;
2378
2379 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2380 == RECALC_OP_TARGET_POOL_DNE;
2381
2382 // Try to get a session, including a retry if we need to take write lock
2383 int r = _get_session(op->target.osd, &s, sul);
2384 if (r == -EAGAIN ||
2385 (check_for_latest_map && sul.owns_lock_shared()) ||
2386 cct->_conf->objecter_debug_inject_relock_delay) {
2387 epoch_t orig_epoch = osdmap->get_epoch();
2388 sul.unlock();
2389 if (cct->_conf->objecter_debug_inject_relock_delay) {
2390 sleep(1);
2391 }
2392 sul.lock();
2393 if (orig_epoch != osdmap->get_epoch()) {
2394 // map changed; recalculate mapping
2395 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2396 << dendl;
2397 check_for_latest_map = _calc_target(&op->target, nullptr)
2398 == RECALC_OP_TARGET_POOL_DNE;
2399 if (s) {
2400 put_session(s);
2401 s = NULL;
2402 r = -EAGAIN;
2403 }
2404 }
2405 }
2406 if (r == -EAGAIN) {
2407 ceph_assert(s == NULL);
2408 r = _get_session(op->target.osd, &s, sul);
2409 }
2410 ceph_assert(r == 0);
2411 ceph_assert(s); // may be homeless
2412
2413 _send_op_account(op);
2414
2415 // send?
2416
2417 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2418
2419 if (pool_full_try) {
2420 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2421 }
2422
2423 bool need_send = false;
2424
2425 if (osdmap->get_epoch() < epoch_barrier) {
2426 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2427 << dendl;
2428 op->target.paused = true;
2429 _maybe_request_map();
2430 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2431 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2432 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2433 << dendl;
2434 op->target.paused = true;
2435 _maybe_request_map();
2436 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2437 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2438 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2439 << dendl;
2440 op->target.paused = true;
2441 _maybe_request_map();
2442 } else if (op->respects_full() &&
2443 (_osdmap_full_flag() ||
2444 _osdmap_pool_full(op->target.base_oloc.pool))) {
2445 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2446 << op->tid << dendl;
2447 op->target.paused = true;
2448 _maybe_request_map();
2449 } else if (!s->is_homeless()) {
2450 need_send = true;
2451 } else {
2452 _maybe_request_map();
2453 }
2454
2455 OSDSession::unique_lock sl(s->lock);
2456 if (op->tid == 0)
2457 op->tid = ++last_tid;
2458
2459 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2460 << " '" << op->target.base_oloc << "' '"
2461 << op->target.target_oloc << "' " << op->ops << " tid "
2462 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2463 << dendl;
2464
2465 _session_op_assign(s, op);
2466
2467 if (need_send) {
2468 _send_op(op);
2469 }
2470
2471 // Last chance to touch Op here, after giving up session lock it can
2472 // be freed at any time by response handler.
2473 ceph_tid_t tid = op->tid;
2474 if (check_for_latest_map) {
2475 _send_op_map_check(op);
2476 }
2477 if (ptid)
2478 *ptid = tid;
2479 op = NULL;
2480
2481 sl.unlock();
2482 put_session(s);
2483
2484 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
2485 }
2486
2487 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2488 {
2489 ceph_assert(initialized);
2490
2491 OSDSession::unique_lock sl(s->lock);
2492
2493 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2494 if (p == s->ops.end()) {
2495 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2496 << s->osd << dendl;
2497 return -ENOENT;
2498 }
2499
2500 #if 0
2501 if (s->con) {
2502 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
2503 << " on " << s->con << dendl;
2504 s->con->revoke_rx_buffer(tid);
2505 }
2506 #endif
2507
2508 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2509 << dendl;
2510 Op *op = p->second;
2511 if (op->onfinish) {
2512 num_in_flight--;
2513 op->onfinish->complete(r);
2514 op->onfinish = NULL;
2515 }
2516 _op_cancel_map_check(op);
2517 _finish_op(op, r);
2518 sl.unlock();
2519
2520 return 0;
2521 }
2522
2523 int Objecter::op_cancel(ceph_tid_t tid, int r)
2524 {
2525 int ret = 0;
2526
2527 unique_lock wl(rwlock);
2528 ret = _op_cancel(tid, r);
2529
2530 return ret;
2531 }
2532
2533 int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2534 {
2535 unique_lock wl(rwlock);
2536 ldout(cct,10) << __func__ << " " << tids << dendl;
2537 for (auto tid : tids) {
2538 _op_cancel(tid, r);
2539 }
2540 return 0;
2541 }
2542
2543 int Objecter::_op_cancel(ceph_tid_t tid, int r)
2544 {
2545 int ret = 0;
2546
2547 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2548 << dendl;
2549
2550 start:
2551
2552 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2553 siter != osd_sessions.end(); ++siter) {
2554 OSDSession *s = siter->second;
2555 OSDSession::shared_lock sl(s->lock);
2556 if (s->ops.find(tid) != s->ops.end()) {
2557 sl.unlock();
2558 ret = op_cancel(s, tid, r);
2559 if (ret == -ENOENT) {
2560 /* oh no! raced, maybe tid moved to another session, restarting */
2561 goto start;
2562 }
2563 return ret;
2564 }
2565 }
2566
2567 ldout(cct, 5) << __func__ << ": tid " << tid
2568 << " not found in live sessions" << dendl;
2569
2570 // Handle case where the op is in homeless session
2571 OSDSession::shared_lock sl(homeless_session->lock);
2572 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2573 sl.unlock();
2574 ret = op_cancel(homeless_session, tid, r);
2575 if (ret == -ENOENT) {
2576 /* oh no! raced, maybe tid moved to another session, restarting */
2577 goto start;
2578 } else {
2579 return ret;
2580 }
2581 } else {
2582 sl.unlock();
2583 }
2584
2585 ldout(cct, 5) << __func__ << ": tid " << tid
2586 << " not found in homeless session" << dendl;
2587
2588 return ret;
2589 }
2590
2591
2592 epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2593 {
2594 unique_lock wl(rwlock);
2595
2596 std::vector<ceph_tid_t> to_cancel;
2597 bool found = false;
2598
2599 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2600 siter != osd_sessions.end(); ++siter) {
2601 OSDSession *s = siter->second;
2602 OSDSession::shared_lock sl(s->lock);
2603 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2604 op_i != s->ops.end(); ++op_i) {
2605 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2606 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2607 to_cancel.push_back(op_i->first);
2608 }
2609 }
2610 sl.unlock();
2611
2612 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2613 titer != to_cancel.end();
2614 ++titer) {
2615 int cancel_result = op_cancel(s, *titer, r);
2616 // We hold rwlock across search and cancellation, so cancels
2617 // should always succeed
2618 ceph_assert(cancel_result == 0);
2619 }
2620 if (!found && to_cancel.size())
2621 found = true;
2622 to_cancel.clear();
2623 }
2624
2625 const epoch_t epoch = osdmap->get_epoch();
2626
2627 wl.unlock();
2628
2629 if (found) {
2630 return epoch;
2631 } else {
2632 return -1;
2633 }
2634 }
2635
2636 bool Objecter::is_pg_changed(
2637 int oldprimary,
2638 const vector<int>& oldacting,
2639 int newprimary,
2640 const vector<int>& newacting,
2641 bool any_change)
2642 {
2643 if (OSDMap::primary_changed(
2644 oldprimary,
2645 oldacting,
2646 newprimary,
2647 newacting))
2648 return true;
2649 if (any_change && oldacting != newacting)
2650 return true;
2651 return false; // same primary (tho replicas may have changed)
2652 }
2653
2654 bool Objecter::target_should_be_paused(op_target_t *t)
2655 {
2656 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2657 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2658 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2659 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2660
2661 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2662 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2663 (osdmap->get_epoch() < epoch_barrier);
2664 }
2665
2666 /**
2667 * Locking public accessor for _osdmap_full_flag
2668 */
2669 bool Objecter::osdmap_full_flag() const
2670 {
2671 shared_lock rl(rwlock);
2672
2673 return _osdmap_full_flag();
2674 }
2675
2676 bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2677 {
2678 shared_lock rl(rwlock);
2679
2680 if (_osdmap_full_flag()) {
2681 return true;
2682 }
2683
2684 return _osdmap_pool_full(pool_id);
2685 }
2686
2687 bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2688 {
2689 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2690 if (pool == NULL) {
2691 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2692 return false;
2693 }
2694
2695 return _osdmap_pool_full(*pool);
2696 }
2697
2698 bool Objecter::_osdmap_has_pool_full() const
2699 {
2700 for (map<int64_t, pg_pool_t>::const_iterator it
2701 = osdmap->get_pools().begin();
2702 it != osdmap->get_pools().end(); ++it) {
2703 if (_osdmap_pool_full(it->second))
2704 return true;
2705 }
2706 return false;
2707 }
2708
2709 bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2710 {
2711 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_pool_full;
2712 }
2713
2714 /**
2715 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2716 */
2717 bool Objecter::_osdmap_full_flag() const
2718 {
2719 // Ignore the FULL flag if the caller does not have honor_osdmap_full
2720 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
2721 }
2722
2723 void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2724 {
2725 for (map<int64_t, pg_pool_t>::const_iterator it
2726 = osdmap->get_pools().begin();
2727 it != osdmap->get_pools().end(); ++it) {
2728 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2729 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2730 } else {
2731 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2732 pool_full_map[it->first];
2733 }
2734 }
2735 }
2736
2737 int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2738 const string& ns)
2739 {
2740 shared_lock rl(rwlock);
2741 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2742 if (!p)
2743 return -ENOENT;
2744 return p->hash_key(key, ns);
2745 }
2746
2747 int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2748 const string& ns)
2749 {
2750 shared_lock rl(rwlock);
2751 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2752 if (!p)
2753 return -ENOENT;
2754 return p->raw_hash_to_pg(p->hash_key(key, ns));
2755 }
2756
2757 void Objecter::_prune_snapc(
2758 const mempool::osdmap::map<int64_t,
2759 snap_interval_set_t>& new_removed_snaps,
2760 Op *op)
2761 {
2762 bool match = false;
2763 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2764 if (i != new_removed_snaps.end()) {
2765 for (auto s : op->snapc.snaps) {
2766 if (i->second.contains(s)) {
2767 match = true;
2768 break;
2769 }
2770 }
2771 if (match) {
2772 vector<snapid_t> new_snaps;
2773 for (auto s : op->snapc.snaps) {
2774 if (!i->second.contains(s)) {
2775 new_snaps.push_back(s);
2776 }
2777 }
2778 op->snapc.snaps.swap(new_snaps);
2779 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2780 << " (was " << new_snaps << ")" << dendl;
2781 }
2782 }
2783 }
2784
2785 int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2786 {
2787 // rwlock is locked
2788 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2789 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2790 t->epoch = osdmap->get_epoch();
2791 ldout(cct,20) << __func__ << " epoch " << t->epoch
2792 << " base " << t->base_oid << " " << t->base_oloc
2793 << " precalc_pgid " << (int)t->precalc_pgid
2794 << " pgid " << t->base_pgid
2795 << (is_read ? " is_read" : "")
2796 << (is_write ? " is_write" : "")
2797 << dendl;
2798
2799 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2800 if (!pi) {
2801 t->osd = -1;
2802 return RECALC_OP_TARGET_POOL_DNE;
2803 }
2804 ldout(cct,30) << __func__ << " base pi " << pi
2805 << " pg_num " << pi->get_pg_num() << dendl;
2806
2807 bool force_resend = false;
2808 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2809 if (t->last_force_resend < pi->last_force_op_resend) {
2810 t->last_force_resend = pi->last_force_op_resend;
2811 force_resend = true;
2812 } else if (t->last_force_resend == 0) {
2813 force_resend = true;
2814 }
2815 }
2816
2817 // apply tiering
2818 t->target_oid = t->base_oid;
2819 t->target_oloc = t->base_oloc;
2820 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2821 if (is_read && pi->has_read_tier())
2822 t->target_oloc.pool = pi->read_tier;
2823 if (is_write && pi->has_write_tier())
2824 t->target_oloc.pool = pi->write_tier;
2825 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2826 if (!pi) {
2827 t->osd = -1;
2828 return RECALC_OP_TARGET_POOL_DNE;
2829 }
2830 }
2831
2832 pg_t pgid;
2833 if (t->precalc_pgid) {
2834 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2835 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2836 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2837 pgid = t->base_pgid;
2838 } else {
2839 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2840 pgid);
2841 if (ret == -ENOENT) {
2842 t->osd = -1;
2843 return RECALC_OP_TARGET_POOL_DNE;
2844 }
2845 }
2846 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2847 << t->target_oloc << " -> pgid " << pgid << dendl;
2848 ldout(cct,30) << __func__ << " target pi " << pi
2849 << " pg_num " << pi->get_pg_num() << dendl;
2850 t->pool_ever_existed = true;
2851
2852 int size = pi->size;
2853 int min_size = pi->min_size;
2854 unsigned pg_num = pi->get_pg_num();
2855 unsigned pg_num_mask = pi->get_pg_num_mask();
2856 unsigned pg_num_pending = pi->get_pg_num_pending();
2857 int up_primary, acting_primary;
2858 vector<int> up, acting;
2859 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2860 pg_t actual_pgid(actual_ps, pgid.pool());
2861 pg_mapping_t pg_mapping;
2862 pg_mapping.epoch = osdmap->get_epoch();
2863 if (lookup_pg_mapping(actual_pgid, &pg_mapping)) {
2864 up = pg_mapping.up;
2865 up_primary = pg_mapping.up_primary;
2866 acting = pg_mapping.acting;
2867 acting_primary = pg_mapping.acting_primary;
2868 } else {
2869 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2870 &acting, &acting_primary);
2871 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2872 up, up_primary, acting, acting_primary);
2873 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2874 }
2875 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
2876 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
2877 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2878 pg_t prev_pgid(prev_seed, pgid.pool());
2879 if (any_change && PastIntervals::is_new_interval(
2880 t->acting_primary,
2881 acting_primary,
2882 t->acting,
2883 acting,
2884 t->up_primary,
2885 up_primary,
2886 t->up,
2887 up,
2888 t->size,
2889 size,
2890 t->min_size,
2891 min_size,
2892 t->pg_num,
2893 pg_num,
2894 t->pg_num_pending,
2895 pg_num_pending,
2896 t->sort_bitwise,
2897 sort_bitwise,
2898 t->recovery_deletes,
2899 recovery_deletes,
2900 prev_pgid)) {
2901 force_resend = true;
2902 }
2903
2904 bool unpaused = false;
2905 bool should_be_paused = target_should_be_paused(t);
2906 if (t->paused && !should_be_paused) {
2907 unpaused = true;
2908 }
2909 t->paused = should_be_paused;
2910
2911 bool legacy_change =
2912 t->pgid != pgid ||
2913 is_pg_changed(
2914 t->acting_primary, t->acting, acting_primary, acting,
2915 t->used_replica || any_change);
2916 bool split_or_merge = false;
2917 if (t->pg_num) {
2918 split_or_merge =
2919 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2920 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2921 prev_pgid.is_merge_target(t->pg_num, pg_num);
2922 }
2923
2924 if (legacy_change || split_or_merge || force_resend) {
2925 t->pgid = pgid;
2926 t->acting = acting;
2927 t->acting_primary = acting_primary;
2928 t->up_primary = up_primary;
2929 t->up = up;
2930 t->size = size;
2931 t->min_size = min_size;
2932 t->pg_num = pg_num;
2933 t->pg_num_mask = pg_num_mask;
2934 t->pg_num_pending = pg_num_pending;
2935 spg_t spgid(actual_pgid);
2936 if (pi->is_erasure()) {
2937 for (uint8_t i = 0; i < acting.size(); ++i) {
2938 if (acting[i] == acting_primary) {
2939 spgid.reset_shard(shard_id_t(i));
2940 break;
2941 }
2942 }
2943 }
2944 t->actual_pgid = spgid;
2945 t->sort_bitwise = sort_bitwise;
2946 t->recovery_deletes = recovery_deletes;
2947 ldout(cct, 10) << __func__ << " "
2948 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2949 << " acting " << acting
2950 << " primary " << acting_primary << dendl;
2951 t->used_replica = false;
2952 if (acting_primary == -1) {
2953 t->osd = -1;
2954 } else {
2955 int osd;
2956 bool read = is_read && !is_write;
2957 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2958 int p = rand() % acting.size();
2959 if (p)
2960 t->used_replica = true;
2961 osd = acting[p];
2962 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2963 << dendl;
2964 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2965 acting.size() > 1) {
2966 // look for a local replica. prefer the primary if the
2967 // distance is the same.
2968 int best = -1;
2969 int best_locality = 0;
2970 for (unsigned i = 0; i < acting.size(); ++i) {
2971 int locality = osdmap->crush->get_common_ancestor_distance(
2972 cct, acting[i], crush_location);
2973 ldout(cct, 20) << __func__ << " localize: rank " << i
2974 << " osd." << acting[i]
2975 << " locality " << locality << dendl;
2976 if (i == 0 ||
2977 (locality >= 0 && best_locality >= 0 &&
2978 locality < best_locality) ||
2979 (best_locality < 0 && locality >= 0)) {
2980 best = i;
2981 best_locality = locality;
2982 if (i)
2983 t->used_replica = true;
2984 }
2985 }
2986 ceph_assert(best >= 0);
2987 osd = acting[best];
2988 } else {
2989 osd = acting_primary;
2990 }
2991 t->osd = osd;
2992 }
2993 }
2994 if (legacy_change || unpaused || force_resend) {
2995 return RECALC_OP_TARGET_NEED_RESEND;
2996 }
2997 if (split_or_merge &&
2998 (osdmap->require_osd_release >= ceph_release_t::luminous ||
2999 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
3000 RESEND_ON_SPLIT))) {
3001 return RECALC_OP_TARGET_NEED_RESEND;
3002 }
3003 return RECALC_OP_TARGET_NO_ACTION;
3004 }
3005
3006 int Objecter::_map_session(op_target_t *target, OSDSession **s,
3007 shunique_lock& sul)
3008 {
3009 _calc_target(target, nullptr);
3010 return _get_session(target->osd, s, sul);
3011 }
3012
3013 void Objecter::_session_op_assign(OSDSession *to, Op *op)
3014 {
3015 // to->lock is locked
3016 ceph_assert(op->session == NULL);
3017 ceph_assert(op->tid);
3018
3019 get_session(to);
3020 op->session = to;
3021 to->ops[op->tid] = op;
3022
3023 if (to->is_homeless()) {
3024 num_homeless_ops++;
3025 }
3026
3027 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3028 }
3029
3030 void Objecter::_session_op_remove(OSDSession *from, Op *op)
3031 {
3032 ceph_assert(op->session == from);
3033 // from->lock is locked
3034
3035 if (from->is_homeless()) {
3036 num_homeless_ops--;
3037 }
3038
3039 from->ops.erase(op->tid);
3040 put_session(from);
3041 op->session = NULL;
3042
3043 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3044 }
3045
3046 void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3047 {
3048 // to lock is locked unique
3049 ceph_assert(op->session == NULL);
3050
3051 if (to->is_homeless()) {
3052 num_homeless_ops++;
3053 }
3054
3055 get_session(to);
3056 op->session = to;
3057 to->linger_ops[op->linger_id] = op;
3058
3059 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3060 << dendl;
3061 }
3062
3063 void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3064 {
3065 ceph_assert(from == op->session);
3066 // from->lock is locked unique
3067
3068 if (from->is_homeless()) {
3069 num_homeless_ops--;
3070 }
3071
3072 from->linger_ops.erase(op->linger_id);
3073 put_session(from);
3074 op->session = NULL;
3075
3076 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3077 << dendl;
3078 }
3079
3080 void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3081 {
3082 ceph_assert(from == op->session);
3083 // from->lock is locked
3084
3085 if (from->is_homeless()) {
3086 num_homeless_ops--;
3087 }
3088
3089 from->command_ops.erase(op->tid);
3090 put_session(from);
3091 op->session = NULL;
3092
3093 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3094 }
3095
3096 void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3097 {
3098 // to->lock is locked
3099 ceph_assert(op->session == NULL);
3100 ceph_assert(op->tid);
3101
3102 if (to->is_homeless()) {
3103 num_homeless_ops++;
3104 }
3105
3106 get_session(to);
3107 op->session = to;
3108 to->command_ops[op->tid] = op;
3109
3110 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3111 }
3112
3113 int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3114 shunique_lock& sul)
3115 {
3116 // rwlock is locked unique
3117
3118 int r = _calc_target(&linger_op->target, nullptr, true);
3119 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3120 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3121 << " pgid " << linger_op->target.pgid
3122 << " acting " << linger_op->target.acting << dendl;
3123
3124 OSDSession *s = NULL;
3125 r = _get_session(linger_op->target.osd, &s, sul);
3126 ceph_assert(r == 0);
3127
3128 if (linger_op->session != s) {
3129 // NB locking two sessions (s and linger_op->session) at the
3130 // same time here is only safe because we are the only one that
3131 // takes two, and we are holding rwlock for write. Disable
3132 // lockdep because it doesn't know that.
3133 OSDSession::unique_lock sl(s->lock);
3134 _session_linger_op_remove(linger_op->session, linger_op);
3135 _session_linger_op_assign(s, linger_op);
3136 }
3137
3138 put_session(s);
3139 return RECALC_OP_TARGET_NEED_RESEND;
3140 }
3141 return r;
3142 }
3143
3144 void Objecter::_cancel_linger_op(Op *op)
3145 {
3146 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3147
3148 ceph_assert(!op->should_resend);
3149 if (op->onfinish) {
3150 delete op->onfinish;
3151 num_in_flight--;
3152 }
3153
3154 _finish_op(op, 0);
3155 }
3156
3157 void Objecter::_finish_op(Op *op, int r)
3158 {
3159 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
3160
3161 // op->session->lock is locked unique or op->session is null
3162
3163 if (!op->ctx_budgeted && op->budget >= 0) {
3164 put_op_budget_bytes(op->budget);
3165 op->budget = -1;
3166 }
3167
3168 if (op->ontimeout && r != -ETIMEDOUT)
3169 timer.cancel_event(op->ontimeout);
3170
3171 if (op->session) {
3172 _session_op_remove(op->session, op);
3173 }
3174
3175 logger->dec(l_osdc_op_active);
3176
3177 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3178
3179 inflight_ops--;
3180
3181 op->put();
3182 }
3183
3184 MOSDOp *Objecter::_prepare_osd_op(Op *op)
3185 {
3186 // rwlock is locked
3187
3188 int flags = op->target.flags;
3189 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3190
3191 // Nothing checks this any longer, but needed for compatibility with
3192 // pre-luminous osds
3193 flags |= CEPH_OSD_FLAG_ONDISK;
3194
3195 if (!honor_pool_full)
3196 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3197
3198 op->target.paused = false;
3199 op->stamp = ceph::coarse_mono_clock::now();
3200
3201 hobject_t hobj = op->target.get_hobj();
3202 MOSDOp *m = new MOSDOp(client_inc, op->tid,
3203 hobj, op->target.actual_pgid,
3204 osdmap->get_epoch(),
3205 flags, op->features);
3206
3207 m->set_snapid(op->snapid);
3208 m->set_snap_seq(op->snapc.seq);
3209 m->set_snaps(op->snapc.snaps);
3210
3211 m->ops = op->ops;
3212 m->set_mtime(op->mtime);
3213 m->set_retry_attempt(op->attempts++);
3214
3215 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3216 op->trace.init("op", &trace_endpoint);
3217 }
3218
3219 if (op->priority)
3220 m->set_priority(op->priority);
3221 else
3222 m->set_priority(cct->_conf->osd_client_op_priority);
3223
3224 if (op->reqid != osd_reqid_t()) {
3225 m->set_reqid(op->reqid);
3226 }
3227
3228 logger->inc(l_osdc_op_send);
3229 ssize_t sum = 0;
3230 for (unsigned i = 0; i < m->ops.size(); i++) {
3231 sum += m->ops[i].indata.length();
3232 }
3233 logger->inc(l_osdc_op_send_bytes, sum);
3234
3235 return m;
3236 }
3237
3238 void Objecter::_send_op(Op *op)
3239 {
3240 // rwlock is locked
3241 // op->session->lock is locked
3242
3243 // backoff?
3244 auto p = op->session->backoffs.find(op->target.actual_pgid);
3245 if (p != op->session->backoffs.end()) {
3246 hobject_t hoid = op->target.get_hobj();
3247 auto q = p->second.lower_bound(hoid);
3248 if (q != p->second.begin()) {
3249 --q;
3250 if (hoid >= q->second.end) {
3251 ++q;
3252 }
3253 }
3254 if (q != p->second.end()) {
3255 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3256 << "," << q->second.end << ")" << dendl;
3257 int r = cmp(hoid, q->second.begin);
3258 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3259 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3260 << " id " << q->second.id << " on " << hoid
3261 << ", queuing " << op << " tid " << op->tid << dendl;
3262 return;
3263 }
3264 }
3265 }
3266
3267 ceph_assert(op->tid > 0);
3268 MOSDOp *m = _prepare_osd_op(op);
3269
3270 if (op->target.actual_pgid != m->get_spg()) {
3271 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3272 << m->get_spg() << " to " << op->target.actual_pgid
3273 << ", updating and reencoding" << dendl;
3274 m->set_spg(op->target.actual_pgid);
3275 m->clear_payload(); // reencode
3276 }
3277
3278 ldout(cct, 15) << "_send_op " << op->tid << " to "
3279 << op->target.actual_pgid << " on osd." << op->session->osd
3280 << dendl;
3281
3282 ConnectionRef con = op->session->con;
3283 ceph_assert(con);
3284
3285 #if 0
3286 // preallocated rx ceph::buffer?
3287 if (op->con) {
3288 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
3289 << op->con << dendl;
3290 op->con->revoke_rx_buffer(op->tid);
3291 }
3292 if (op->outbl &&
3293 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3294 op->outbl->length()) {
3295 op->outbl->invalidate_crc(); // messenger writes through c_str()
3296 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
3297 << dendl;
3298 op->con = con;
3299 op->con->post_rx_buffer(op->tid, *op->outbl);
3300 }
3301 #endif
3302
3303 op->incarnation = op->session->incarnation;
3304
3305 if (op->trace.valid()) {
3306 m->trace.init("op msg", nullptr, &op->trace);
3307 }
3308 op->session->con->send_message(m);
3309 }
3310
3311 int Objecter::calc_op_budget(const vector<OSDOp>& ops)
3312 {
3313 int op_budget = 0;
3314 for (vector<OSDOp>::const_iterator i = ops.begin();
3315 i != ops.end();
3316 ++i) {
3317 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3318 op_budget += i->indata.length();
3319 } else if (ceph_osd_op_mode_read(i->op.op)) {
3320 if (ceph_osd_op_uses_extent(i->op.op)) {
3321 if ((int64_t)i->op.extent.length > 0)
3322 op_budget += (int64_t)i->op.extent.length;
3323 } else if (ceph_osd_op_type_attr(i->op.op)) {
3324 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3325 }
3326 }
3327 }
3328 return op_budget;
3329 }
3330
3331 void Objecter::_throttle_op(Op *op,
3332 shunique_lock& sul,
3333 int op_budget)
3334 {
3335 ceph_assert(sul && sul.mutex() == &rwlock);
3336 bool locked_for_write = sul.owns_lock();
3337
3338 if (!op_budget)
3339 op_budget = calc_op_budget(op->ops);
3340 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3341 sul.unlock();
3342 op_throttle_bytes.get(op_budget);
3343 if (locked_for_write)
3344 sul.lock();
3345 else
3346 sul.lock_shared();
3347 }
3348 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3349 sul.unlock();
3350 op_throttle_ops.get(1);
3351 if (locked_for_write)
3352 sul.lock();
3353 else
3354 sul.lock_shared();
3355 }
3356 }
3357
3358 int Objecter::take_linger_budget(LingerOp *info)
3359 {
3360 return 1;
3361 }
3362
3363 /* This function DOES put the passed message before returning */
3364 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3365 {
3366 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3367
3368 // get pio
3369 ceph_tid_t tid = m->get_tid();
3370
3371 shunique_lock sul(rwlock, ceph::acquire_shared);
3372 if (!initialized) {
3373 m->put();
3374 return;
3375 }
3376
3377 ConnectionRef con = m->get_connection();
3378 auto priv = con->get_priv();
3379 auto s = static_cast<OSDSession*>(priv.get());
3380 if (!s || s->con != con) {
3381 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3382 m->put();
3383 return;
3384 }
3385
3386 OSDSession::unique_lock sl(s->lock);
3387
3388 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3389 if (iter == s->ops.end()) {
3390 ldout(cct, 7) << "handle_osd_op_reply " << tid
3391 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3392 " onnvram" : " ack"))
3393 << " ... stray" << dendl;
3394 sl.unlock();
3395 m->put();
3396 return;
3397 }
3398
3399 ldout(cct, 7) << "handle_osd_op_reply " << tid
3400 << (m->is_ondisk() ? " ondisk" :
3401 (m->is_onnvram() ? " onnvram" : " ack"))
3402 << " uv " << m->get_user_version()
3403 << " in " << m->get_pg()
3404 << " attempt " << m->get_retry_attempt()
3405 << dendl;
3406 Op *op = iter->second;
3407 op->trace.event("osd op reply");
3408
3409 if (retry_writes_after_first_reply && op->attempts == 1 &&
3410 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3411 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3412 if (op->onfinish) {
3413 num_in_flight--;
3414 }
3415 _session_op_remove(s, op);
3416 sl.unlock();
3417
3418 _op_submit(op, sul, NULL);
3419 m->put();
3420 return;
3421 }
3422
3423 if (m->get_retry_attempt() >= 0) {
3424 if (m->get_retry_attempt() != (op->attempts - 1)) {
3425 ldout(cct, 7) << " ignoring reply from attempt "
3426 << m->get_retry_attempt()
3427 << " from " << m->get_source_inst()
3428 << "; last attempt " << (op->attempts - 1) << " sent to "
3429 << op->session->con->get_peer_addr() << dendl;
3430 m->put();
3431 sl.unlock();
3432 return;
3433 }
3434 } else {
3435 // we don't know the request attempt because the server is old, so
3436 // just accept this one. we may do ACK callbacks we shouldn't
3437 // have, but that is better than doing callbacks out of order.
3438 }
3439
3440 Context *onfinish = 0;
3441
3442 int rc = m->get_result();
3443
3444 if (m->is_redirect_reply()) {
3445 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3446 if (op->onfinish)
3447 num_in_flight--;
3448 _session_op_remove(s, op);
3449 sl.unlock();
3450
3451 // FIXME: two redirects could race and reorder
3452
3453 op->tid = 0;
3454 m->get_redirect().combine_with_locator(op->target.target_oloc,
3455 op->target.target_oid.name);
3456 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3457 CEPH_OSD_FLAG_IGNORE_CACHE |
3458 CEPH_OSD_FLAG_IGNORE_OVERLAY);
3459 _op_submit(op, sul, NULL);
3460 m->put();
3461 return;
3462 }
3463
3464 if (rc == -EAGAIN) {
3465 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
3466 if (op->onfinish)
3467 num_in_flight--;
3468 _session_op_remove(s, op);
3469 sl.unlock();
3470
3471 op->tid = 0;
3472 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3473 CEPH_OSD_FLAG_LOCALIZE_READS);
3474 op->target.pgid = pg_t();
3475 _op_submit(op, sul, NULL);
3476 m->put();
3477 return;
3478 }
3479
3480 sul.unlock();
3481
3482 if (op->objver)
3483 *op->objver = m->get_user_version();
3484 if (op->reply_epoch)
3485 *op->reply_epoch = m->get_map_epoch();
3486 if (op->data_offset)
3487 *op->data_offset = m->get_header().data_off;
3488
3489 // got data?
3490 if (op->outbl) {
3491 #if 0
3492 if (op->con)
3493 op->con->revoke_rx_buffer(op->tid);
3494 #endif
3495 auto& bl = m->get_data();
3496 if (op->outbl->length() == bl.length() &&
3497 bl.get_num_buffers() <= 1) {
3498 // this is here to keep previous users to *relied* on getting data
3499 // read into existing buffers happy. Notably,
3500 // libradosstriper::RadosStriperImpl::aio_read().
3501 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
3502 << " into existing ceph::buffer of length " << op->outbl->length()
3503 << dendl;
3504 ceph::buffer::list t;
3505 t.claim(*op->outbl);
3506 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
3507 bl.copy(0, bl.length(), t.c_str());
3508 op->outbl->substr_of(t, 0, bl.length());
3509 } else {
3510 m->claim_data(*op->outbl);
3511 }
3512 op->outbl = 0;
3513 }
3514
3515 // per-op result demuxing
3516 vector<OSDOp> out_ops;
3517 m->claim_ops(out_ops);
3518
3519 if (out_ops.size() != op->ops.size())
3520 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3521 << " != request ops " << op->ops
3522 << " from " << m->get_source_inst() << dendl;
3523
3524 vector<ceph::buffer::list*>::iterator pb = op->out_bl.begin();
3525 vector<int*>::iterator pr = op->out_rval.begin();
3526 vector<Context*>::iterator ph = op->out_handler.begin();
3527 ceph_assert(op->out_bl.size() == op->out_rval.size());
3528 ceph_assert(op->out_bl.size() == op->out_handler.size());
3529 vector<OSDOp>::iterator p = out_ops.begin();
3530 for (unsigned i = 0;
3531 p != out_ops.end() && pb != op->out_bl.end();
3532 ++i, ++p, ++pb, ++pr, ++ph) {
3533 ldout(cct, 10) << " op " << i << " rval " << p->rval
3534 << " len " << p->outdata.length() << dendl;
3535 if (*pb)
3536 **pb = p->outdata;
3537 // set rval before running handlers so that handlers
3538 // can change it if e.g. decoding fails
3539 if (*pr)
3540 **pr = ceph_to_hostos_errno(p->rval);
3541 if (*ph) {
3542 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
3543 (*ph)->complete(ceph_to_hostos_errno(p->rval));
3544 *ph = NULL;
3545 }
3546 }
3547
3548 // NOTE: we assume that since we only request ONDISK ever we will
3549 // only ever get back one (type of) ack ever.
3550
3551 if (op->onfinish) {
3552 num_in_flight--;
3553 onfinish = op->onfinish;
3554 op->onfinish = NULL;
3555 }
3556 logger->inc(l_osdc_op_reply);
3557
3558 /* get it before we call _finish_op() */
3559 auto completion_lock = s->get_lock(op->target.base_oid);
3560
3561 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3562 _finish_op(op, 0);
3563
3564 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
3565
3566 // serialize completions
3567 if (completion_lock.mutex()) {
3568 completion_lock.lock();
3569 }
3570 sl.unlock();
3571
3572 // do callbacks
3573 if (onfinish) {
3574 onfinish->complete(rc);
3575 }
3576 if (completion_lock.mutex()) {
3577 completion_lock.unlock();
3578 }
3579
3580 m->put();
3581 }
3582
3583 void Objecter::handle_osd_backoff(MOSDBackoff *m)
3584 {
3585 ldout(cct, 10) << __func__ << " " << *m << dendl;
3586 shunique_lock sul(rwlock, ceph::acquire_shared);
3587 if (!initialized) {
3588 m->put();
3589 return;
3590 }
3591
3592 ConnectionRef con = m->get_connection();
3593 auto priv = con->get_priv();
3594 auto s = static_cast<OSDSession*>(priv.get());
3595 if (!s || s->con != con) {
3596 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3597 m->put();
3598 return;
3599 }
3600
3601 get_session(s);
3602
3603 OSDSession::unique_lock sl(s->lock);
3604
3605 switch (m->op) {
3606 case CEPH_OSD_BACKOFF_OP_BLOCK:
3607 {
3608 // register
3609 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3610 s->backoffs_by_id.insert(make_pair(m->id, &b));
3611 b.pgid = m->pgid;
3612 b.id = m->id;
3613 b.begin = m->begin;
3614 b.end = m->end;
3615
3616 // ack with original backoff's epoch so that the osd can discard this if
3617 // there was a pg split.
3618 Message *r = new MOSDBackoff(m->pgid,
3619 m->map_epoch,
3620 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3621 m->id, m->begin, m->end);
3622 // this priority must match the MOSDOps from _prepare_osd_op
3623 r->set_priority(cct->_conf->osd_client_op_priority);
3624 con->send_message(r);
3625 }
3626 break;
3627
3628 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3629 {
3630 auto p = s->backoffs_by_id.find(m->id);
3631 if (p != s->backoffs_by_id.end()) {
3632 OSDBackoff *b = p->second;
3633 if (b->begin != m->begin &&
3634 b->end != m->end) {
3635 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3636 << " unblock on ["
3637 << m->begin << "," << m->end << ") but backoff is ["
3638 << b->begin << "," << b->end << ")" << dendl;
3639 // hrmpf, unblock it anyway.
3640 }
3641 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3642 << " id " << b->id
3643 << " [" << b->begin << "," << b->end
3644 << ")" << dendl;
3645 auto spgp = s->backoffs.find(b->pgid);
3646 ceph_assert(spgp != s->backoffs.end());
3647 spgp->second.erase(b->begin);
3648 if (spgp->second.empty()) {
3649 s->backoffs.erase(spgp);
3650 }
3651 s->backoffs_by_id.erase(p);
3652
3653 // check for any ops to resend
3654 for (auto& q : s->ops) {
3655 if (q.second->target.actual_pgid == m->pgid) {
3656 int r = q.second->target.contained_by(m->begin, m->end);
3657 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3658 << q.second->target.get_hobj() << dendl;
3659 if (r) {
3660 _send_op(q.second);
3661 }
3662 }
3663 }
3664 } else {
3665 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3666 << " unblock on ["
3667 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3668 }
3669 }
3670 break;
3671
3672 default:
3673 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3674 }
3675
3676 sul.unlock();
3677 sl.unlock();
3678
3679 m->put();
3680 put_session(s);
3681 }
3682
3683 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3684 uint32_t pos)
3685 {
3686 shared_lock rl(rwlock);
3687 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3688 pos, list_context->pool_id, string());
3689 ldout(cct, 10) << __func__ << " " << list_context
3690 << " pos " << pos << " -> " << list_context->pos << dendl;
3691 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3692 list_context->current_pg = actual.ps();
3693 list_context->at_end_of_pool = false;
3694 return pos;
3695 }
3696
3697 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3698 const hobject_t& cursor)
3699 {
3700 shared_lock rl(rwlock);
3701 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3702 list_context->pos = cursor;
3703 list_context->at_end_of_pool = false;
3704 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3705 list_context->current_pg = actual.ps();
3706 list_context->sort_bitwise = true;
3707 return list_context->current_pg;
3708 }
3709
3710 void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3711 hobject_t *cursor)
3712 {
3713 shared_lock rl(rwlock);
3714 if (list_context->list.empty()) {
3715 *cursor = list_context->pos;
3716 } else {
3717 const librados::ListObjectImpl& entry = list_context->list.front();
3718 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3719 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3720 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3721 }
3722 }
3723
3724 void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3725 {
3726 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3727 << " pool_snap_seq " << list_context->pool_snap_seq
3728 << " max_entries " << list_context->max_entries
3729 << " list_context " << list_context
3730 << " onfinish " << onfinish
3731 << " current_pg " << list_context->current_pg
3732 << " pos " << list_context->pos << dendl;
3733
3734 shared_lock rl(rwlock);
3735 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3736 if (!pool) { // pool is gone
3737 rl.unlock();
3738 put_nlist_context_budget(list_context);
3739 onfinish->complete(-ENOENT);
3740 return;
3741 }
3742 int pg_num = pool->get_pg_num();
3743 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3744
3745 if (list_context->pos.is_min()) {
3746 list_context->starting_pg_num = 0;
3747 list_context->sort_bitwise = sort_bitwise;
3748 list_context->starting_pg_num = pg_num;
3749 }
3750 if (list_context->sort_bitwise != sort_bitwise) {
3751 list_context->pos = hobject_t(
3752 object_t(), string(), CEPH_NOSNAP,
3753 list_context->current_pg, list_context->pool_id, string());
3754 list_context->sort_bitwise = sort_bitwise;
3755 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3756 << list_context->pos << dendl;
3757 }
3758 if (list_context->starting_pg_num != pg_num) {
3759 if (!sort_bitwise) {
3760 // start reading from the beginning; the pgs have changed
3761 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3762 list_context->pos = collection_list_handle_t();
3763 }
3764 list_context->starting_pg_num = pg_num;
3765 }
3766
3767 if (list_context->pos.is_max()) {
3768 ldout(cct, 20) << __func__ << " end of pool, list "
3769 << list_context->list << dendl;
3770 if (list_context->list.empty()) {
3771 list_context->at_end_of_pool = true;
3772 }
3773 // release the listing context's budget once all
3774 // OPs (in the session) are finished
3775 put_nlist_context_budget(list_context);
3776 onfinish->complete(0);
3777 return;
3778 }
3779
3780 ObjectOperation op;
3781 op.pg_nls(list_context->max_entries, list_context->filter,
3782 list_context->pos, osdmap->get_epoch());
3783 list_context->bl.clear();
3784 C_NList *onack = new C_NList(list_context, onfinish, this);
3785 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3786
3787 // note current_pg in case we don't have (or lose) SORTBITWISE
3788 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3789 rl.unlock();
3790
3791 pg_read(list_context->current_pg, oloc, op,
3792 &list_context->bl, 0, onack, &onack->epoch,
3793 &list_context->ctx_budget);
3794 }
3795
3796 void Objecter::_nlist_reply(NListContext *list_context, int r,
3797 Context *final_finish, epoch_t reply_epoch)
3798 {
3799 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3800
3801 auto iter = list_context->bl.cbegin();
3802 pg_nls_response_t response;
3803 decode(response, iter);
3804 if (!iter.end()) {
3805 // we do this as legacy.
3806 ceph::buffer::list legacy_extra_info;
3807 decode(legacy_extra_info, iter);
3808 }
3809
3810 // if the osd returns 1 (newer code), or handle MAX, it means we
3811 // hit the end of the pg.
3812 if ((response.handle.is_max() || r == 1) &&
3813 !list_context->sort_bitwise) {
3814 // legacy OSD and !sortbitwise, figure out the next PG on our own
3815 ++list_context->current_pg;
3816 if (list_context->current_pg == list_context->starting_pg_num) {
3817 // end of pool
3818 list_context->pos = hobject_t::get_max();
3819 } else {
3820 // next pg
3821 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3822 list_context->current_pg,
3823 list_context->pool_id, string());
3824 }
3825 } else {
3826 list_context->pos = response.handle;
3827 }
3828
3829 int response_size = response.entries.size();
3830 ldout(cct, 20) << " response.entries.size " << response_size
3831 << ", response.entries " << response.entries
3832 << ", handle " << response.handle
3833 << ", tentative new pos " << list_context->pos << dendl;
3834 if (response_size) {
3835 list_context->list.splice(list_context->list.end(), response.entries);
3836 }
3837
3838 if (list_context->list.size() >= list_context->max_entries) {
3839 ldout(cct, 20) << " hit max, returning results so far, "
3840 << list_context->list << dendl;
3841 // release the listing context's budget once all
3842 // OPs (in the session) are finished
3843 put_nlist_context_budget(list_context);
3844 final_finish->complete(0);
3845 return;
3846 }
3847
3848 // continue!
3849 list_nobjects(list_context, final_finish);
3850 }
3851
3852 void Objecter::put_nlist_context_budget(NListContext *list_context)
3853 {
3854 if (list_context->ctx_budget >= 0) {
3855 ldout(cct, 10) << " release listing context's budget " <<
3856 list_context->ctx_budget << dendl;
3857 put_op_budget_bytes(list_context->ctx_budget);
3858 list_context->ctx_budget = -1;
3859 }
3860 }
3861
3862 // snapshots
3863
3864 int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3865 Context *onfinish)
3866 {
3867 unique_lock wl(rwlock);
3868 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3869 << snap_name << dendl;
3870
3871 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3872 if (!p)
3873 return -EINVAL;
3874 if (p->snap_exists(snap_name.c_str()))
3875 return -EEXIST;
3876
3877 PoolOp *op = new PoolOp;
3878 if (!op)
3879 return -ENOMEM;
3880 op->tid = ++last_tid;
3881 op->pool = pool;
3882 op->name = snap_name;
3883 op->onfinish = onfinish;
3884 op->pool_op = POOL_OP_CREATE_SNAP;
3885 pool_ops[op->tid] = op;
3886
3887 pool_op_submit(op);
3888
3889 return 0;
3890 }
3891
3892 struct C_SelfmanagedSnap : public Context {
3893 ceph::buffer::list bl;
3894 snapid_t *psnapid;
3895 Context *fin;
3896 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3897 void finish(int r) override {
3898 if (r == 0) {
3899 try {
3900 auto p = bl.cbegin();
3901 decode(*psnapid, p);
3902 } catch (ceph::buffer::error&) {
3903 r = -EIO;
3904 }
3905 }
3906 fin->complete(r);
3907 }
3908 };
3909
3910 int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3911 Context *onfinish)
3912 {
3913 unique_lock wl(rwlock);
3914 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3915 PoolOp *op = new PoolOp;
3916 if (!op) return -ENOMEM;
3917 op->tid = ++last_tid;
3918 op->pool = pool;
3919 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3920 op->onfinish = fin;
3921 op->blp = &fin->bl;
3922 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3923 pool_ops[op->tid] = op;
3924
3925 pool_op_submit(op);
3926 return 0;
3927 }
3928
3929 int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3930 Context *onfinish)
3931 {
3932 unique_lock wl(rwlock);
3933 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3934 << snap_name << dendl;
3935
3936 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3937 if (!p)
3938 return -EINVAL;
3939 if (!p->snap_exists(snap_name.c_str()))
3940 return -ENOENT;
3941
3942 PoolOp *op = new PoolOp;
3943 if (!op)
3944 return -ENOMEM;
3945 op->tid = ++last_tid;
3946 op->pool = pool;
3947 op->name = snap_name;
3948 op->onfinish = onfinish;
3949 op->pool_op = POOL_OP_DELETE_SNAP;
3950 pool_ops[op->tid] = op;
3951
3952 pool_op_submit(op);
3953
3954 return 0;
3955 }
3956
3957 int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3958 Context *onfinish)
3959 {
3960 unique_lock wl(rwlock);
3961 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3962 << snap << dendl;
3963 PoolOp *op = new PoolOp;
3964 if (!op) return -ENOMEM;
3965 op->tid = ++last_tid;
3966 op->pool = pool;
3967 op->onfinish = onfinish;
3968 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3969 op->snapid = snap;
3970 pool_ops[op->tid] = op;
3971
3972 pool_op_submit(op);
3973
3974 return 0;
3975 }
3976
3977 int Objecter::create_pool(string& name, Context *onfinish,
3978 int crush_rule)
3979 {
3980 unique_lock wl(rwlock);
3981 ldout(cct, 10) << "create_pool name=" << name << dendl;
3982
3983 if (osdmap->lookup_pg_pool_name(name) >= 0)
3984 return -EEXIST;
3985
3986 PoolOp *op = new PoolOp;
3987 if (!op)
3988 return -ENOMEM;
3989 op->tid = ++last_tid;
3990 op->pool = 0;
3991 op->name = name;
3992 op->onfinish = onfinish;
3993 op->pool_op = POOL_OP_CREATE;
3994 pool_ops[op->tid] = op;
3995 op->crush_rule = crush_rule;
3996
3997 pool_op_submit(op);
3998
3999 return 0;
4000 }
4001
4002 int Objecter::delete_pool(int64_t pool, Context *onfinish)
4003 {
4004 unique_lock wl(rwlock);
4005 ldout(cct, 10) << "delete_pool " << pool << dendl;
4006
4007 if (!osdmap->have_pg_pool(pool))
4008 return -ENOENT;
4009
4010 _do_delete_pool(pool, onfinish);
4011 return 0;
4012 }
4013
4014 int Objecter::delete_pool(const string &pool_name, Context *onfinish)
4015 {
4016 unique_lock wl(rwlock);
4017 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4018
4019 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4020 if (pool < 0)
4021 return pool;
4022
4023 _do_delete_pool(pool, onfinish);
4024 return 0;
4025 }
4026
4027 void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
4028 {
4029 PoolOp *op = new PoolOp;
4030 op->tid = ++last_tid;
4031 op->pool = pool;
4032 op->name = "delete";
4033 op->onfinish = onfinish;
4034 op->pool_op = POOL_OP_DELETE;
4035 pool_ops[op->tid] = op;
4036 pool_op_submit(op);
4037 }
4038
4039 void Objecter::pool_op_submit(PoolOp *op)
4040 {
4041 // rwlock is locked
4042 if (mon_timeout > timespan(0)) {
4043 op->ontimeout = timer.add_event(mon_timeout,
4044 [this, op]() {
4045 pool_op_cancel(op->tid, -ETIMEDOUT); });
4046 }
4047 _pool_op_submit(op);
4048 }
4049
4050 void Objecter::_pool_op_submit(PoolOp *op)
4051 {
4052 // rwlock is locked unique
4053
4054 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4055 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4056 op->name, op->pool_op,
4057 last_seen_osdmap_version);
4058 if (op->snapid) m->snapid = op->snapid;
4059 if (op->crush_rule) m->crush_rule = op->crush_rule;
4060 monc->send_mon_message(m);
4061 op->last_submit = ceph::coarse_mono_clock::now();
4062
4063 logger->inc(l_osdc_poolop_send);
4064 }
4065
4066 /**
4067 * Handle a reply to a PoolOp message. Check that we sent the message
4068 * and give the caller responsibility for the returned ceph::buffer::list.
4069 * Then either call the finisher or stash the PoolOp, depending on if we
4070 * have a new enough map.
4071 * Lastly, clean up the message and PoolOp.
4072 */
4073 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4074 {
4075 FUNCTRACE(cct);
4076 shunique_lock sul(rwlock, acquire_shared);
4077 if (!initialized) {
4078 sul.unlock();
4079 m->put();
4080 return;
4081 }
4082
4083 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4084 ceph_tid_t tid = m->get_tid();
4085 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4086 if (iter != pool_ops.end()) {
4087 PoolOp *op = iter->second;
4088 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4089 << ceph_pool_op_name(op->pool_op) << dendl;
4090 if (op->blp)
4091 op->blp->claim(m->response_data);
4092 if (m->version > last_seen_osdmap_version)
4093 last_seen_osdmap_version = m->version;
4094 if (osdmap->get_epoch() < m->epoch) {
4095 sul.unlock();
4096 sul.lock();
4097 // recheck op existence since we have let go of rwlock
4098 // (for promotion) above.
4099 iter = pool_ops.find(tid);
4100 if (iter == pool_ops.end())
4101 goto done; // op is gone.
4102 if (osdmap->get_epoch() < m->epoch) {
4103 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4104 << " before calling back" << dendl;
4105 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4106 } else {
4107 // map epoch changed, probably because a MOSDMap message
4108 // sneaked in. Do caller-specified callback now or else
4109 // we lose it forever.
4110 ceph_assert(op->onfinish);
4111 op->onfinish->complete(m->replyCode);
4112 }
4113 } else {
4114 ceph_assert(op->onfinish);
4115 op->onfinish->complete(m->replyCode);
4116 }
4117 op->onfinish = NULL;
4118 if (!sul.owns_lock()) {
4119 sul.unlock();
4120 sul.lock();
4121 }
4122 iter = pool_ops.find(tid);
4123 if (iter != pool_ops.end()) {
4124 _finish_pool_op(op, 0);
4125 }
4126 } else {
4127 ldout(cct, 10) << "unknown request " << tid << dendl;
4128 }
4129
4130 done:
4131 // Not strictly necessary, since we'll release it on return.
4132 sul.unlock();
4133
4134 ldout(cct, 10) << "done" << dendl;
4135 m->put();
4136 }
4137
4138 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4139 {
4140 ceph_assert(initialized);
4141
4142 unique_lock wl(rwlock);
4143
4144 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4145 if (it == pool_ops.end()) {
4146 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4147 return -ENOENT;
4148 }
4149
4150 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4151
4152 PoolOp *op = it->second;
4153 if (op->onfinish)
4154 op->onfinish->complete(r);
4155
4156 _finish_pool_op(op, r);
4157 return 0;
4158 }
4159
4160 void Objecter::_finish_pool_op(PoolOp *op, int r)
4161 {
4162 // rwlock is locked unique
4163 pool_ops.erase(op->tid);
4164 logger->set(l_osdc_poolop_active, pool_ops.size());
4165
4166 if (op->ontimeout && r != -ETIMEDOUT) {
4167 timer.cancel_event(op->ontimeout);
4168 }
4169
4170 delete op;
4171 }
4172
4173 // pool stats
4174
4175 void Objecter::get_pool_stats(list<string>& pools,
4176 map<string,pool_stat_t> *result,
4177 bool *per_pool,
4178 Context *onfinish)
4179 {
4180 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4181
4182 PoolStatOp *op = new PoolStatOp;
4183 op->tid = ++last_tid;
4184 op->pools = pools;
4185 op->pool_stats = result;
4186 op->per_pool = per_pool;
4187 op->onfinish = onfinish;
4188 if (mon_timeout > timespan(0)) {
4189 op->ontimeout = timer.add_event(mon_timeout,
4190 [this, op]() {
4191 pool_stat_op_cancel(op->tid,
4192 -ETIMEDOUT); });
4193 } else {
4194 op->ontimeout = 0;
4195 }
4196
4197 unique_lock wl(rwlock);
4198
4199 poolstat_ops[op->tid] = op;
4200
4201 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4202
4203 _poolstat_submit(op);
4204 }
4205
4206 void Objecter::_poolstat_submit(PoolStatOp *op)
4207 {
4208 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4209 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4210 op->pools,
4211 last_seen_pgmap_version));
4212 op->last_submit = ceph::coarse_mono_clock::now();
4213
4214 logger->inc(l_osdc_poolstat_send);
4215 }
4216
4217 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4218 {
4219 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4220 ceph_tid_t tid = m->get_tid();
4221
4222 unique_lock wl(rwlock);
4223 if (!initialized) {
4224 m->put();
4225 return;
4226 }
4227
4228 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4229 if (iter != poolstat_ops.end()) {
4230 PoolStatOp *op = poolstat_ops[tid];
4231 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4232 *op->pool_stats = m->pool_stats;
4233 *op->per_pool = m->per_pool;
4234 if (m->version > last_seen_pgmap_version) {
4235 last_seen_pgmap_version = m->version;
4236 }
4237 op->onfinish->complete(0);
4238 _finish_pool_stat_op(op, 0);
4239 } else {
4240 ldout(cct, 10) << "unknown request " << tid << dendl;
4241 }
4242 ldout(cct, 10) << "done" << dendl;
4243 m->put();
4244 }
4245
4246 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4247 {
4248 ceph_assert(initialized);
4249
4250 unique_lock wl(rwlock);
4251
4252 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4253 if (it == poolstat_ops.end()) {
4254 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4255 return -ENOENT;
4256 }
4257
4258 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4259
4260 PoolStatOp *op = it->second;
4261 if (op->onfinish)
4262 op->onfinish->complete(r);
4263 _finish_pool_stat_op(op, r);
4264 return 0;
4265 }
4266
4267 void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4268 {
4269 // rwlock is locked unique
4270
4271 poolstat_ops.erase(op->tid);
4272 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4273
4274 if (op->ontimeout && r != -ETIMEDOUT)
4275 timer.cancel_event(op->ontimeout);
4276
4277 delete op;
4278 }
4279
4280 void Objecter::get_fs_stats(ceph_statfs& result,
4281 boost::optional<int64_t> data_pool,
4282 Context *onfinish)
4283 {
4284 ldout(cct, 10) << "get_fs_stats" << dendl;
4285 unique_lock l(rwlock);
4286
4287 StatfsOp *op = new StatfsOp;
4288 op->tid = ++last_tid;
4289 op->stats = &result;
4290 op->data_pool = data_pool;
4291 op->onfinish = onfinish;
4292 if (mon_timeout > timespan(0)) {
4293 op->ontimeout = timer.add_event(mon_timeout,
4294 [this, op]() {
4295 statfs_op_cancel(op->tid,
4296 -ETIMEDOUT); });
4297 } else {
4298 op->ontimeout = 0;
4299 }
4300 statfs_ops[op->tid] = op;
4301
4302 logger->set(l_osdc_statfs_active, statfs_ops.size());
4303
4304 _fs_stats_submit(op);
4305 }
4306
4307 void Objecter::_fs_stats_submit(StatfsOp *op)
4308 {
4309 // rwlock is locked unique
4310
4311 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4312 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
4313 op->data_pool,
4314 last_seen_pgmap_version));
4315 op->last_submit = ceph::coarse_mono_clock::now();
4316
4317 logger->inc(l_osdc_statfs_send);
4318 }
4319
4320 void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4321 {
4322 unique_lock wl(rwlock);
4323 if (!initialized) {
4324 m->put();
4325 return;
4326 }
4327
4328 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4329 ceph_tid_t tid = m->get_tid();
4330
4331 if (statfs_ops.count(tid)) {
4332 StatfsOp *op = statfs_ops[tid];
4333 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4334 *(op->stats) = m->h.st;
4335 if (m->h.version > last_seen_pgmap_version)
4336 last_seen_pgmap_version = m->h.version;
4337 op->onfinish->complete(0);
4338 _finish_statfs_op(op, 0);
4339 } else {
4340 ldout(cct, 10) << "unknown request " << tid << dendl;
4341 }
4342 m->put();
4343 ldout(cct, 10) << "done" << dendl;
4344 }
4345
4346 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4347 {
4348 ceph_assert(initialized);
4349
4350 unique_lock wl(rwlock);
4351
4352 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4353 if (it == statfs_ops.end()) {
4354 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4355 return -ENOENT;
4356 }
4357
4358 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4359
4360 StatfsOp *op = it->second;
4361 if (op->onfinish)
4362 op->onfinish->complete(r);
4363 _finish_statfs_op(op, r);
4364 return 0;
4365 }
4366
4367 void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4368 {
4369 // rwlock is locked unique
4370
4371 statfs_ops.erase(op->tid);
4372 logger->set(l_osdc_statfs_active, statfs_ops.size());
4373
4374 if (op->ontimeout && r != -ETIMEDOUT)
4375 timer.cancel_event(op->ontimeout);
4376
4377 delete op;
4378 }
4379
4380 // scatter/gather
4381
4382 void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4383 vector<ceph::buffer::list>& resultbl,
4384 ceph::buffer::list *bl, Context *onfinish)
4385 {
4386 // all done
4387 ldout(cct, 15) << "_sg_read_finish" << dendl;
4388
4389 if (extents.size() > 1) {
4390 Striper::StripedReadResult r;
4391 vector<ceph::buffer::list>::iterator bit = resultbl.begin();
4392 for (vector<ObjectExtent>::iterator eit = extents.begin();
4393 eit != extents.end();
4394 ++eit, ++bit) {
4395 r.add_partial_result(cct, *bit, eit->buffer_extents);
4396 }
4397 bl->clear();
4398 r.assemble_result(cct, *bl, false);
4399 } else {
4400 ldout(cct, 15) << " only one frag" << dendl;
4401 bl->claim(resultbl[0]);
4402 }
4403
4404 // done
4405 uint64_t bytes_read = bl->length();
4406 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4407
4408 if (onfinish) {
4409 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4410 }
4411 }
4412
4413
4414 void Objecter::ms_handle_connect(Connection *con)
4415 {
4416 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
4417 if (!initialized)
4418 return;
4419
4420 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4421 resend_mon_ops();
4422 }
4423
4424 bool Objecter::ms_handle_reset(Connection *con)
4425 {
4426 if (!initialized)
4427 return false;
4428 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4429 unique_lock wl(rwlock);
4430
4431 auto priv = con->get_priv();
4432 auto session = static_cast<OSDSession*>(priv.get());
4433 if (session) {
4434 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4435 << " osd." << session->osd << dendl;
4436 // the session maybe had been closed if new osdmap just handled
4437 // says the osd down
4438 if (!(initialized && osdmap->is_up(session->osd))) {
4439 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
4440 wl.unlock();
4441 return false;
4442 }
4443 map<uint64_t, LingerOp *> lresend;
4444 OSDSession::unique_lock sl(session->lock);
4445 _reopen_session(session);
4446 _kick_requests(session, lresend);
4447 sl.unlock();
4448 _linger_ops_resend(lresend, wl);
4449 wl.unlock();
4450 maybe_request_map();
4451 }
4452 return true;
4453 }
4454 return false;
4455 }
4456
4457 void Objecter::ms_handle_remote_reset(Connection *con)
4458 {
4459 /*
4460 * treat these the same.
4461 */
4462 ms_handle_reset(con);
4463 }
4464
4465 bool Objecter::ms_handle_refused(Connection *con)
4466 {
4467 // just log for now
4468 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4469 int osd = osdmap->identify_osd(con->get_peer_addr());
4470 if (osd >= 0) {
4471 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4472 }
4473 }
4474 return false;
4475 }
4476
4477 void Objecter::op_target_t::dump(Formatter *f) const
4478 {
4479 f->dump_stream("pg") << pgid;
4480 f->dump_int("osd", osd);
4481 f->dump_stream("object_id") << base_oid;
4482 f->dump_stream("object_locator") << base_oloc;
4483 f->dump_stream("target_object_id") << target_oid;
4484 f->dump_stream("target_object_locator") << target_oloc;
4485 f->dump_int("paused", (int)paused);
4486 f->dump_int("used_replica", (int)used_replica);
4487 f->dump_int("precalc_pgid", (int)precalc_pgid);
4488 }
4489
4490 void Objecter::_dump_active(OSDSession *s)
4491 {
4492 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4493 p != s->ops.end();
4494 ++p) {
4495 Op *op = p->second;
4496 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4497 << "\tosd." << (op->session ? op->session->osd : -1)
4498 << "\t" << op->target.base_oid
4499 << "\t" << op->ops << dendl;
4500 }
4501 }
4502
4503 void Objecter::_dump_active()
4504 {
4505 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
4506 << dendl;
4507 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4508 siter != osd_sessions.end(); ++siter) {
4509 OSDSession *s = siter->second;
4510 OSDSession::shared_lock sl(s->lock);
4511 _dump_active(s);
4512 sl.unlock();
4513 }
4514 _dump_active(homeless_session);
4515 }
4516
4517 void Objecter::dump_active()
4518 {
4519 shared_lock rl(rwlock);
4520 _dump_active();
4521 rl.unlock();
4522 }
4523
4524 void Objecter::dump_requests(Formatter *fmt)
4525 {
4526 // Read-lock on Objecter held here
4527 fmt->open_object_section("requests");
4528 dump_ops(fmt);
4529 dump_linger_ops(fmt);
4530 dump_pool_ops(fmt);
4531 dump_pool_stat_ops(fmt);
4532 dump_statfs_ops(fmt);
4533 dump_command_ops(fmt);
4534 fmt->close_section(); // requests object
4535 }
4536
4537 void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4538 {
4539 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4540 p != s->ops.end();
4541 ++p) {
4542 Op *op = p->second;
4543 auto age = std::chrono::duration<double>(coarse_mono_clock::now() - op->stamp);
4544 fmt->open_object_section("op");
4545 fmt->dump_unsigned("tid", op->tid);
4546 op->target.dump(fmt);
4547 fmt->dump_stream("last_sent") << op->stamp;
4548 fmt->dump_float("age", age.count());
4549 fmt->dump_int("attempts", op->attempts);
4550 fmt->dump_stream("snapid") << op->snapid;
4551 fmt->dump_stream("snap_context") << op->snapc;
4552 fmt->dump_stream("mtime") << op->mtime;
4553
4554 fmt->open_array_section("osd_ops");
4555 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4556 it != op->ops.end();
4557 ++it) {
4558 fmt->dump_stream("osd_op") << *it;
4559 }
4560 fmt->close_section(); // osd_ops array
4561
4562 fmt->close_section(); // op object
4563 }
4564 }
4565
4566 void Objecter::dump_ops(Formatter *fmt)
4567 {
4568 // Read-lock on Objecter held
4569 fmt->open_array_section("ops");
4570 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4571 siter != osd_sessions.end(); ++siter) {
4572 OSDSession *s = siter->second;
4573 OSDSession::shared_lock sl(s->lock);
4574 _dump_ops(s, fmt);
4575 sl.unlock();
4576 }
4577 _dump_ops(homeless_session, fmt);
4578 fmt->close_section(); // ops array
4579 }
4580
4581 void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4582 {
4583 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4584 p != s->linger_ops.end();
4585 ++p) {
4586 LingerOp *op = p->second;
4587 fmt->open_object_section("linger_op");
4588 fmt->dump_unsigned("linger_id", op->linger_id);
4589 op->target.dump(fmt);
4590 fmt->dump_stream("snapid") << op->snap;
4591 fmt->dump_stream("registered") << op->registered;
4592 fmt->close_section(); // linger_op object
4593 }
4594 }
4595
4596 void Objecter::dump_linger_ops(Formatter *fmt)
4597 {
4598 // We have a read-lock on the objecter
4599 fmt->open_array_section("linger_ops");
4600 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4601 siter != osd_sessions.end(); ++siter) {
4602 OSDSession *s = siter->second;
4603 OSDSession::shared_lock sl(s->lock);
4604 _dump_linger_ops(s, fmt);
4605 sl.unlock();
4606 }
4607 _dump_linger_ops(homeless_session, fmt);
4608 fmt->close_section(); // linger_ops array
4609 }
4610
4611 void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4612 {
4613 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4614 p != s->command_ops.end();
4615 ++p) {
4616 CommandOp *op = p->second;
4617 fmt->open_object_section("command_op");
4618 fmt->dump_unsigned("command_id", op->tid);
4619 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4620 fmt->open_array_section("command");
4621 for (vector<string>::const_iterator q = op->cmd.begin();
4622 q != op->cmd.end(); ++q)
4623 fmt->dump_string("word", *q);
4624 fmt->close_section();
4625 if (op->target_osd >= 0)
4626 fmt->dump_int("target_osd", op->target_osd);
4627 else
4628 fmt->dump_stream("target_pg") << op->target_pg;
4629 fmt->close_section(); // command_op object
4630 }
4631 }
4632
4633 void Objecter::dump_command_ops(Formatter *fmt)
4634 {
4635 // We have a read-lock on the Objecter here
4636 fmt->open_array_section("command_ops");
4637 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4638 siter != osd_sessions.end(); ++siter) {
4639 OSDSession *s = siter->second;
4640 OSDSession::shared_lock sl(s->lock);
4641 _dump_command_ops(s, fmt);
4642 sl.unlock();
4643 }
4644 _dump_command_ops(homeless_session, fmt);
4645 fmt->close_section(); // command_ops array
4646 }
4647
4648 void Objecter::dump_pool_ops(Formatter *fmt) const
4649 {
4650 fmt->open_array_section("pool_ops");
4651 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4652 p != pool_ops.end();
4653 ++p) {
4654 PoolOp *op = p->second;
4655 fmt->open_object_section("pool_op");
4656 fmt->dump_unsigned("tid", op->tid);
4657 fmt->dump_int("pool", op->pool);
4658 fmt->dump_string("name", op->name);
4659 fmt->dump_int("operation_type", op->pool_op);
4660 fmt->dump_unsigned("crush_rule", op->crush_rule);
4661 fmt->dump_stream("snapid") << op->snapid;
4662 fmt->dump_stream("last_sent") << op->last_submit;
4663 fmt->close_section(); // pool_op object
4664 }
4665 fmt->close_section(); // pool_ops array
4666 }
4667
4668 void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4669 {
4670 fmt->open_array_section("pool_stat_ops");
4671 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4672 p != poolstat_ops.end();
4673 ++p) {
4674 PoolStatOp *op = p->second;
4675 fmt->open_object_section("pool_stat_op");
4676 fmt->dump_unsigned("tid", op->tid);
4677 fmt->dump_stream("last_sent") << op->last_submit;
4678
4679 fmt->open_array_section("pools");
4680 for (list<string>::const_iterator it = op->pools.begin();
4681 it != op->pools.end();
4682 ++it) {
4683 fmt->dump_string("pool", *it);
4684 }
4685 fmt->close_section(); // pools array
4686
4687 fmt->close_section(); // pool_stat_op object
4688 }
4689 fmt->close_section(); // pool_stat_ops array
4690 }
4691
4692 void Objecter::dump_statfs_ops(Formatter *fmt) const
4693 {
4694 fmt->open_array_section("statfs_ops");
4695 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4696 p != statfs_ops.end();
4697 ++p) {
4698 StatfsOp *op = p->second;
4699 fmt->open_object_section("statfs_op");
4700 fmt->dump_unsigned("tid", op->tid);
4701 fmt->dump_stream("last_sent") << op->last_submit;
4702 fmt->close_section(); // statfs_op object
4703 }
4704 fmt->close_section(); // statfs_ops array
4705 }
4706
4707 Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4708 m_objecter(objecter)
4709 {
4710 }
4711
4712 int Objecter::RequestStateHook::call(std::string_view command,
4713 const cmdmap_t& cmdmap,
4714 Formatter *f,
4715 std::ostream& ss,
4716 ceph::buffer::list& out)
4717 {
4718 shared_lock rl(m_objecter->rwlock);
4719 m_objecter->dump_requests(f);
4720 return 0;
4721 }
4722
4723 void Objecter::blacklist_self(bool set)
4724 {
4725 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4726
4727 vector<string> cmd;
4728 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4729 if (set)
4730 cmd.push_back("\"blacklistop\":\"add\",");
4731 else
4732 cmd.push_back("\"blacklistop\":\"rm\",");
4733 stringstream ss;
4734 // this is somewhat imprecise in that we are blacklisting our first addr only
4735 ss << messenger->get_myaddrs().front().get_legacy_str();
4736 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4737
4738 MMonCommand *m = new MMonCommand(monc->get_fsid());
4739 m->cmd = cmd;
4740
4741 monc->send_mon_message(m);
4742 }
4743
4744 // commands
4745
4746 void Objecter::handle_command_reply(MCommandReply *m)
4747 {
4748 unique_lock wl(rwlock);
4749 if (!initialized) {
4750 m->put();
4751 return;
4752 }
4753
4754 ConnectionRef con = m->get_connection();
4755 auto priv = con->get_priv();
4756 auto s = static_cast<OSDSession*>(priv.get());
4757 if (!s || s->con != con) {
4758 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4759 m->put();
4760 return;
4761 }
4762
4763 OSDSession::shared_lock sl(s->lock);
4764 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4765 if (p == s->command_ops.end()) {
4766 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4767 << " not found" << dendl;
4768 m->put();
4769 sl.unlock();
4770 return;
4771 }
4772
4773 CommandOp *c = p->second;
4774 if (!c->session ||
4775 m->get_connection() != c->session->con) {
4776 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4777 << " got reply from wrong connection "
4778 << m->get_connection() << " " << m->get_source_inst()
4779 << dendl;
4780 m->put();
4781 sl.unlock();
4782 return;
4783 }
4784 if (m->r == -EAGAIN) {
4785 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4786 << " got EAGAIN, requesting map and resending" << dendl;
4787 // NOTE: This might resend twice... once now, and once again when
4788 // we get an updated osdmap and the PG is found to have moved.
4789 _maybe_request_map();
4790 _send_command(c);
4791 m->put();
4792 sl.unlock();
4793 return;
4794 }
4795
4796 if (c->poutbl) {
4797 c->poutbl->claim(m->get_data());
4798 }
4799
4800 sl.unlock();
4801
4802 OSDSession::unique_lock sul(s->lock);
4803 _finish_command(c, m->r, m->rs);
4804 sul.unlock();
4805
4806 m->put();
4807 }
4808
4809 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4810 {
4811 shunique_lock sul(rwlock, ceph::acquire_unique);
4812
4813 ceph_tid_t tid = ++last_tid;
4814 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4815 c->tid = tid;
4816
4817 {
4818 OSDSession::unique_lock hs_wl(homeless_session->lock);
4819 _session_command_op_assign(homeless_session, c);
4820 }
4821
4822 _calc_command_target(c, sul);
4823 _assign_command_session(c, sul);
4824 if (osd_timeout > timespan(0)) {
4825 c->ontimeout = timer.add_event(osd_timeout,
4826 [this, c, tid]() {
4827 command_op_cancel(c->session, tid,
4828 -ETIMEDOUT); });
4829 }
4830
4831 if (!c->session->is_homeless()) {
4832 _send_command(c);
4833 } else {
4834 _maybe_request_map();
4835 }
4836 if (c->map_check_error)
4837 _send_command_map_check(c);
4838 *ptid = tid;
4839
4840 logger->inc(l_osdc_command_active);
4841 }
4842
4843 int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4844 {
4845 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
4846
4847 c->map_check_error = 0;
4848
4849 // ignore overlays, just like we do with pg ops
4850 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4851
4852 if (c->target_osd >= 0) {
4853 if (!osdmap->exists(c->target_osd)) {
4854 c->map_check_error = -ENOENT;
4855 c->map_check_error_str = "osd dne";
4856 c->target.osd = -1;
4857 return RECALC_OP_TARGET_OSD_DNE;
4858 }
4859 if (osdmap->is_down(c->target_osd)) {
4860 c->map_check_error = -ENXIO;
4861 c->map_check_error_str = "osd down";
4862 c->target.osd = -1;
4863 return RECALC_OP_TARGET_OSD_DOWN;
4864 }
4865 c->target.osd = c->target_osd;
4866 } else {
4867 int ret = _calc_target(&(c->target), nullptr, true);
4868 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4869 c->map_check_error = -ENOENT;
4870 c->map_check_error_str = "pool dne";
4871 c->target.osd = -1;
4872 return ret;
4873 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4874 c->map_check_error = -ENXIO;
4875 c->map_check_error_str = "osd down";
4876 c->target.osd = -1;
4877 return ret;
4878 }
4879 }
4880
4881 OSDSession *s;
4882 int r = _get_session(c->target.osd, &s, sul);
4883 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4884
4885 if (c->session != s) {
4886 put_session(s);
4887 return RECALC_OP_TARGET_NEED_RESEND;
4888 }
4889
4890 put_session(s);
4891
4892 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4893 << c->session << dendl;
4894
4895 return RECALC_OP_TARGET_NO_ACTION;
4896 }
4897
4898 void Objecter::_assign_command_session(CommandOp *c,
4899 shunique_lock& sul)
4900 {
4901 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
4902
4903 OSDSession *s;
4904 int r = _get_session(c->target.osd, &s, sul);
4905 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4906
4907 if (c->session != s) {
4908 if (c->session) {
4909 OSDSession *cs = c->session;
4910 OSDSession::unique_lock csl(cs->lock);
4911 _session_command_op_remove(c->session, c);
4912 csl.unlock();
4913 }
4914 OSDSession::unique_lock sl(s->lock);
4915 _session_command_op_assign(s, c);
4916 }
4917
4918 put_session(s);
4919 }
4920
4921 void Objecter::_send_command(CommandOp *c)
4922 {
4923 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4924 ceph_assert(c->session);
4925 ceph_assert(c->session->con);
4926 MCommand *m = new MCommand(monc->monmap.fsid);
4927 m->cmd = c->cmd;
4928 m->set_data(c->inbl);
4929 m->set_tid(c->tid);
4930 c->session->con->send_message(m);
4931 logger->inc(l_osdc_command_send);
4932 }
4933
4934 int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4935 {
4936 ceph_assert(initialized);
4937
4938 unique_lock wl(rwlock);
4939
4940 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4941 if (it == s->command_ops.end()) {
4942 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4943 return -ENOENT;
4944 }
4945
4946 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4947
4948 CommandOp *op = it->second;
4949 _command_cancel_map_check(op);
4950 OSDSession::unique_lock sl(op->session->lock);
4951 _finish_command(op, r, "");
4952 sl.unlock();
4953 return 0;
4954 }
4955
4956 void Objecter::_finish_command(CommandOp *c, int r, string rs)
4957 {
4958 // rwlock is locked unique
4959 // session lock is locked
4960
4961 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4962 << rs << dendl;
4963 if (c->prs)
4964 *c->prs = rs;
4965 if (c->onfinish)
4966 c->onfinish->complete(r);
4967
4968 if (c->ontimeout && r != -ETIMEDOUT)
4969 timer.cancel_event(c->ontimeout);
4970
4971 _session_command_op_remove(c->session, c);
4972
4973 c->put();
4974
4975 logger->dec(l_osdc_command_active);
4976 }
4977
4978 Objecter::OSDSession::~OSDSession()
4979 {
4980 // Caller is responsible for re-assigning or
4981 // destroying any ops that were assigned to us
4982 ceph_assert(ops.empty());
4983 ceph_assert(linger_ops.empty());
4984 ceph_assert(command_ops.empty());
4985 }
4986
4987 Objecter::Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
4988 Finisher *fin,
4989 double mon_timeout,
4990 double osd_timeout) :
4991 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
4992 trace_endpoint("0.0.0.0", 0, "Objecter"),
4993 osdmap{std::make_unique<OSDMap>()},
4994 homeless_session(new OSDSession(cct, -1)),
4995 mon_timeout(ceph::make_timespan(mon_timeout)),
4996 osd_timeout(ceph::make_timespan(osd_timeout)),
4997 op_throttle_bytes(cct, "objecter_bytes",
4998 cct->_conf->objecter_inflight_op_bytes),
4999 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
5000 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
5001 {}
5002
(1) Event exn_spec_violation: |
An exception of type "std::length_error" is thrown but the throw list "throw()" doesn't allow it to be thrown. This will cause a call to unexpected() which usually calls terminate(). |
Also see events: |
[fun_call_w_exception] |
5003 Objecter::~Objecter()
5004 {
5005 ceph_assert(homeless_session->get_nref() == 1);
5006 ceph_assert(num_homeless_ops == 0);
(2) Event fun_call_w_exception: |
Called function throws an exception of type "std::length_error". [details] |
Also see events: |
[exn_spec_violation] |
5007 homeless_session->put();
5008
5009 ceph_assert(osd_sessions.empty());
5010 ceph_assert(poolstat_ops.empty());
5011 ceph_assert(statfs_ops.empty());
5012 ceph_assert(pool_ops.empty());
5013 ceph_assert(waiting_for_map.empty());
5014 ceph_assert(linger_ops.empty());
5015 ceph_assert(check_latest_map_lingers.empty());
5016 ceph_assert(check_latest_map_ops.empty());
5017 ceph_assert(check_latest_map_commands.empty());
5018
5019 ceph_assert(!m_request_state_hook);
5020 ceph_assert(!logger);
5021 }
5022
5023 /**
5024 * Wait until this OSD map epoch is received before
5025 * sending any more operations to OSDs. Use this
5026 * when it is known that the client can't trust
5027 * anything from before this epoch (e.g. due to
5028 * client blacklist at this epoch).
5029 */
5030 void Objecter::set_epoch_barrier(epoch_t epoch)
5031 {
5032 unique_lock wl(rwlock);
5033
5034 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5035 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5036 << dendl;
5037 if (epoch > epoch_barrier) {
5038 epoch_barrier = epoch;
5039 _maybe_request_map();
5040 }
5041 }
5042
5043
5044
5045 hobject_t Objecter::enumerate_objects_begin()
5046 {
5047 return hobject_t();
5048 }
5049
5050 hobject_t Objecter::enumerate_objects_end()
5051 {
5052 return hobject_t::get_max();
5053 }
5054
5055 struct C_EnumerateReply : public Context {
5056 ceph::buffer::list bl;
5057
5058 Objecter *objecter;
5059 hobject_t *next;
5060 std::list<librados::ListObjectImpl> *result;
5061 const hobject_t end;
5062 const int64_t pool_id;
5063 Context *on_finish;
5064
5065 epoch_t epoch;
5066 int budget;
5067
5068 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5069 std::list<librados::ListObjectImpl> *result_,
5070 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5071 objecter(objecter_), next(next_), result(result_),
5072 end(end_), pool_id(pool_id_), on_finish(on_finish_),
5073 epoch(0), budget(-1)
5074 {}
5075
5076 void finish(int r) override {
5077 objecter->_enumerate_reply(
5078 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5079 }
5080 };
5081
5082 void Objecter::enumerate_objects(
5083 int64_t pool_id,
5084 const std::string &ns,
5085 const hobject_t &start,
5086 const hobject_t &end,
5087 const uint32_t max,
5088 const ceph::buffer::list &filter_bl,
5089 std::list<librados::ListObjectImpl> *result,
5090 hobject_t *next,
5091 Context *on_finish)
5092 {
5093 ceph_assert(result);
5094
5095 if (!end.is_max() && start > end) {
5096 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5097 on_finish->complete(-EINVAL);
5098 return;
5099 }
5100
5101 if (max < 1) {
5102 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5103 on_finish->complete(-EINVAL);
5104 return;
5105 }
5106
5107 if (start.is_max()) {
5108 on_finish->complete(0);
5109 return;
5110 }
5111
5112 shared_lock rl(rwlock);
5113 ceph_assert(osdmap->get_epoch());
5114 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5115 rl.unlock();
5116 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5117 on_finish->complete(-EOPNOTSUPP);
5118 return;
5119 }
5120 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5121 if (!p) {
5122 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5123 << osdmap->get_epoch() << dendl;
5124 rl.unlock();
5125 on_finish->complete(-ENOENT);
5126 return;
5127 } else {
5128 rl.unlock();
5129 }
5130
5131 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5132
5133 // Stash completion state
5134 C_EnumerateReply *on_ack = new C_EnumerateReply(
5135 this, next, result, end, pool_id, on_finish);
5136
5137 ObjectOperation op;
5138 op.pg_nls(max, filter_bl, start, 0);
5139
5140 // Issue. See you later in _enumerate_reply
5141 object_locator_t oloc(pool_id, ns);
5142 pg_read(start.get_hash(), oloc, op,
5143 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5144 }
5145
5146 void Objecter::_enumerate_reply(
5147 ceph::buffer::list &bl,
5148 int r,
5149 const hobject_t &end,
5150 const int64_t pool_id,
5151 int budget,
5152 epoch_t reply_epoch,
5153 std::list<librados::ListObjectImpl> *result,
5154 hobject_t *next,
5155 Context *on_finish)
5156 {
5157 if (budget >= 0) {
5158 put_op_budget_bytes(budget);
5159 }
5160
5161 if (r < 0) {
5162 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5163 on_finish->complete(r);
5164 return;
5165 }
5166
5167 ceph_assert(next != NULL);
5168
5169 // Decode the results
5170 auto iter = bl.cbegin();
5171 pg_nls_response_t response;
5172
5173 decode(response, iter);
5174 if (!iter.end()) {
5175 // extra_info isn't used anywhere. We do this solely to preserve
5176 // backward compatibility
5177 ceph::buffer::list legacy_extra_info;
5178 decode(legacy_extra_info, iter);
5179 }
5180
5181 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5182 << " handle " << response.handle
5183 << " reply_epoch " << reply_epoch << dendl;
5184 ldout(cct, 20) << __func__ << ": response.entries.size "
5185 << response.entries.size() << ", response.entries "
5186 << response.entries << dendl;
5187 if (response.handle <= end) {
5188 *next = response.handle;
5189 } else {
5190 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5191 << dendl;
5192 *next = end;
5193
5194 // drop anything after 'end'
5195 shared_lock rl(rwlock);
5196 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5197 if (!pool) {
5198 // pool is gone, drop any results which are now meaningless.
5199 rl.unlock();
5200 on_finish->complete(-ENOENT);
5201 return;
5202 }
5203 while (!response.entries.empty()) {
5204 uint32_t hash = response.entries.back().locator.empty() ?
5205 pool->hash_key(response.entries.back().oid,
5206 response.entries.back().nspace) :
5207 pool->hash_key(response.entries.back().locator,
5208 response.entries.back().nspace);
5209 hobject_t last(response.entries.back().oid,
5210 response.entries.back().locator,
5211 CEPH_NOSNAP,
5212 hash,
5213 pool_id,
5214 response.entries.back().nspace);
5215 if (last < end)
5216 break;
5217 ldout(cct, 20) << __func__ << " dropping item " << last
5218 << " >= end " << end << dendl;
5219 response.entries.pop_back();
5220 }
5221 rl.unlock();
5222 }
5223 if (!response.entries.empty()) {
5224 result->merge(response.entries);
5225 }
5226
5227 // release the listing context's budget once all
5228 // OPs (in the session) are finished
5229 #if 0
5230 put_nlist_context_budget(list_context);
5231 #endif
5232 on_finish->complete(r);
5233 return;
5234 }
5235
5236 namespace {
5237 using namespace librados;
5238
5239 template <typename T>
5240 void do_decode(std::vector<T>& items, std::vector<ceph::buffer::list>& bls)
5241 {
5242 for (auto bl : bls) {
5243 auto p = bl.cbegin();
5244 T t;
5245 decode(t, p);
5246 items.push_back(t);
5247 }
5248 }
5249
5250 struct C_ObjectOperation_scrub_ls : public Context {
5251 ceph::buffer::list bl;
5252 uint32_t *interval;
5253 std::vector<inconsistent_obj_t> *objects = nullptr;
5254 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5255 int *rval;
5256
5257 C_ObjectOperation_scrub_ls(uint32_t *interval,
5258 std::vector<inconsistent_obj_t> *objects,
5259 int *rval)
5260 : interval(interval), objects(objects), rval(rval) {}
5261 C_ObjectOperation_scrub_ls(uint32_t *interval,
5262 std::vector<inconsistent_snapset_t> *snapsets,
5263 int *rval)
5264 : interval(interval), snapsets(snapsets), rval(rval) {}
5265 void finish(int r) override {
5266 if (r < 0 && r != -EAGAIN) {
5267 if (rval)
5268 *rval = r;
5269 return;
5270 }
5271
5272 if (rval)
5273 *rval = 0;
5274
5275 try {
5276 decode();
5277 } catch (ceph::buffer::error&) {
5278 if (rval)
5279 *rval = -EIO;
5280 }
5281 }
5282 private:
5283 void decode() {
5284 scrub_ls_result_t result;
5285 auto p = bl.cbegin();
5286 result.decode(p);
5287 *interval = result.interval;
5288 if (objects) {
5289 do_decode(*objects, result.vals);
5290 } else {
5291 do_decode(*snapsets, result.vals);
5292 }
5293 }
5294 };
5295
5296 template <typename T>
5297 void do_scrub_ls(::ObjectOperation *op,
5298 const scrub_ls_arg_t& arg,
5299 std::vector<T> *items,
5300 uint32_t *interval,
5301 int *rval)
5302 {
5303 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5304 op->flags |= CEPH_OSD_FLAG_PGOP;
5305 ceph_assert(interval);
5306 arg.encode(osd_op.indata);
5307 unsigned p = op->ops.size() - 1;
5308 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5309 op->out_handler[p] = h;
5310 op->out_bl[p] = &h->bl;
5311 op->out_rval[p] = rval;
5312 }
5313 }
5314
5315 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5316 uint64_t max_to_get,
5317 std::vector<librados::inconsistent_obj_t> *objects,
5318 uint32_t *interval,
5319 int *rval)
5320 {
5321 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5322 do_scrub_ls(this, arg, objects, interval, rval);
5323 }
5324
5325 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5326 uint64_t max_to_get,
5327 std::vector<librados::inconsistent_snapset_t> *snapsets,
5328 uint32_t *interval,
5329 int *rval)
5330 {
5331 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5332 do_scrub_ls(this, arg, snapsets, interval, rval);
5333 }
5334