1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 * 
13   	 */
14   	
15   	#include <atomic>
16   	#include <cstring>
17   	#include <errno.h>
18   	#include <limits.h>
19   	
20   	#include <sys/uio.h>
21   	
22   	#include "include/ceph_assert.h"
23   	#include "include/types.h"
24   	#include "include/buffer_raw.h"
25   	#include "include/compat.h"
26   	#include "include/mempool.h"
27   	#include "armor.h"
28   	#include "common/environment.h"
29   	#include "common/errno.h"
30   	#include "common/safe_io.h"
31   	#include "common/strtol.h"
32   	#include "common/likely.h"
33   	#include "common/valgrind.h"
34   	#include "common/deleter.h"
35   	#include "common/RWLock.h"
36   	#include "include/spinlock.h"
37   	#include "include/scope_guard.h"
38   	
39   	using namespace ceph;
40   	
41   	#define CEPH_BUFFER_ALLOC_UNIT  4096u
42   	#define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))
43   	
44   	#ifdef BUFFER_DEBUG
45   	static ceph::spinlock debug_lock;
46   	# define bdout { std::lock_guard<ceph::spinlock> lg(debug_lock); std::cout
47   	# define bendl std::endl; }
48   	#else
49   	# define bdout if (0) { std::cout
50   	# define bendl std::endl; }
51   	#endif
52   	
53   	  static std::atomic<unsigned> buffer_cached_crc { 0 };
54   	  static std::atomic<unsigned> buffer_cached_crc_adjusted { 0 };
55   	  static std::atomic<unsigned> buffer_missed_crc { 0 };
56   	
57   	  static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK");
58   	
59   	  void buffer::track_cached_crc(bool b) {
60   	    buffer_track_crc = b;
61   	  }
62   	  int buffer::get_cached_crc() {
63   	    return buffer_cached_crc;
64   	  }
65   	  int buffer::get_cached_crc_adjusted() {
66   	    return buffer_cached_crc_adjusted;
67   	  }
68   	
69   	  int buffer::get_missed_crc() {
70   	    return buffer_missed_crc;
71   	  }
72   	
73   	  const char * buffer::error::what() const throw () {
74   	    return "buffer::exception";
75   	  }
76   	  const char * buffer::bad_alloc::what() const throw () {
77   	    return "buffer::bad_alloc";
78   	  }
79   	  const char * buffer::end_of_buffer::what() const throw () {
80   	    return "buffer::end_of_buffer";
81   	  }
82   	  const char * buffer::malformed_input::what() const throw () {
83   	    return buf;
84   	  }
85   	  buffer::error_code::error_code(int error) :
86   	    buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {}
87   	
88   	  /*
89   	   * raw_combined is always placed within a single allocation along
90   	   * with the data buffer.  the data goes at the beginning, and
91   	   * raw_combined at the end.
92   	   */
93   	  class buffer::raw_combined : public buffer::raw {
94   	    size_t alignment;
95   	  public:
96   	    raw_combined(char *dataptr, unsigned l, unsigned align,
97   			 int mempool)
98   	      : raw(dataptr, l, mempool),
99   		alignment(align) {
100  	    }
101  	    raw* clone_empty() override {
102  	      return create(len, alignment);
103  	    }
104  	
105  	    static raw_combined *create(unsigned len,
106  					unsigned align,
107  					int mempool = mempool::mempool_buffer_anon) {
108  	      if (!align)
109  		align = sizeof(size_t);
110  	      size_t rawlen = round_up_to(sizeof(buffer::raw_combined),
111  					  alignof(buffer::raw_combined));
112  	      size_t datalen = round_up_to(len, alignof(buffer::raw_combined));
113  	
114  	#ifdef DARWIN
115  	      char *ptr = (char *) valloc(rawlen + datalen);
116  	#else
117  	      char *ptr = 0;
118  	      int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
119  	      if (r)
120  		throw bad_alloc();
121  	#endif /* DARWIN */
122  	      if (!ptr)
123  		throw bad_alloc();
124  	
125  	      // actual data first, since it has presumably larger alignment restriction
126  	      // then put the raw_combined at the end
127  	      return new (ptr + datalen) raw_combined(ptr, len, align, mempool);
128  	    }
129  	
130  	    static void operator delete(void *ptr) {
131  	      raw_combined *raw = (raw_combined *)ptr;
132  	      ::free((void *)raw->data);
133  	    }
134  	  };
135  	
136  	  class buffer::raw_malloc : public buffer::raw {
137  	  public:
138  	    MEMPOOL_CLASS_HELPERS();
139  	
140  	    explicit raw_malloc(unsigned l) : raw(l) {
141  	      if (len) {
142  		data = (char *)malloc(len);
143  	        if (!data)
144  	          throw bad_alloc();
145  	      } else {
146  		data = 0;
147  	      }
148  	      bdout << "raw_malloc " << this << " alloc " << (void *)data << " " << l << bendl;
149  	    }
150  	    raw_malloc(unsigned l, char *b) : raw(b, l) {
151  	      bdout << "raw_malloc " << this << " alloc " << (void *)data << " " << l << bendl;
152  	    }
153  	    ~raw_malloc() override {
154  	      free(data);
155  	      bdout << "raw_malloc " << this << " free " << (void *)data << " " << bendl;
156  	    }
157  	    raw* clone_empty() override {
158  	      return new raw_malloc(len);
159  	    }
160  	  };
161  	
162  	#ifndef __CYGWIN__
163  	  class buffer::raw_posix_aligned : public buffer::raw {
164  	    unsigned align;
165  	  public:
166  	    MEMPOOL_CLASS_HELPERS();
167  	
168  	    raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
169  	      align = _align;
170  	      ceph_assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
171  	#ifdef DARWIN
172  	      data = (char *) valloc(len);
173  	#else
174  	      int r = ::posix_memalign((void**)(void*)&data, align, len);
175  	      if (r)
176  		throw bad_alloc();
177  	#endif /* DARWIN */
178  	      if (!data)
179  		throw bad_alloc();
180  	      bdout << "raw_posix_aligned " << this << " alloc " << (void *)data
181  		    << " l=" << l << ", align=" << align << bendl;
182  	    }
183  	    ~raw_posix_aligned() override {
184  	      ::free(data);
185  	      bdout << "raw_posix_aligned " << this << " free " << (void *)data << bendl;
186  	    }
187  	    raw* clone_empty() override {
188  	      return new raw_posix_aligned(len, align);
189  	    }
190  	  };
191  	#endif
192  	
193  	#ifdef __CYGWIN__
194  	  class buffer::raw_hack_aligned : public buffer::raw {
195  	    unsigned align;
196  	    char *realdata;
197  	  public:
198  	    raw_hack_aligned(unsigned l, unsigned _align) : raw(l) {
199  	      align = _align;
200  	      realdata = new char[len+align-1];
201  	      unsigned off = ((unsigned)realdata) & (align-1);
202  	      if (off)
203  		data = realdata + align - off;
204  	      else
205  		data = realdata;
206  	      //cout << "hack aligned " << (unsigned)data
207  	      //<< " in raw " << (unsigned)realdata
208  	      //<< " off " << off << std::endl;
209  	      ceph_assert(((unsigned)data & (align-1)) == 0);
210  	    }
211  	    ~raw_hack_aligned() {
212  	      delete[] realdata;
213  	    }
214  	    raw* clone_empty() {
215  	      return new raw_hack_aligned(len, align);
216  	    }
217  	  };
218  	#endif
219  	
220  	  /*
221  	   * primitive buffer types
222  	   */
223  	  class buffer::raw_char : public buffer::raw {
224  	  public:
225  	    MEMPOOL_CLASS_HELPERS();
226  	
227  	    explicit raw_char(unsigned l) : raw(l) {
228  	      if (len)
229  		data = new char[len];
230  	      else
231  		data = 0;
232  	      bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << bendl;
233  	    }
234  	    raw_char(unsigned l, char *b) : raw(b, l) {
235  	      bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << bendl;
236  	    }
237  	    ~raw_char() override {
238  	      delete[] data;
239  	      bdout << "raw_char " << this << " free " << (void *)data << bendl;
240  	    }
241  	    raw* clone_empty() override {
242  	      return new raw_char(len);
243  	    }
244  	  };
245  	
246  	  class buffer::raw_claimed_char : public buffer::raw {
247  	  public:
248  	    MEMPOOL_CLASS_HELPERS();
249  	
250  	    explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
251  	      bdout << "raw_claimed_char " << this << " alloc " << (void *)data
252  		    << " " << l << bendl;
253  	    }
254  	    ~raw_claimed_char() override {
255  	      bdout << "raw_claimed_char " << this << " free " << (void *)data
256  		    << bendl;
257  	    }
258  	    raw* clone_empty() override {
259  	      return new raw_char(len);
260  	    }
261  	  };
262  	
263  	  class buffer::raw_unshareable : public buffer::raw {
264  	  public:
265  	    MEMPOOL_CLASS_HELPERS();
266  	
267  	    explicit raw_unshareable(unsigned l) : raw(l) {
268  	      if (len)
269  		data = new char[len];
270  	      else
271  		data = 0;
272  	    }
273  	    raw_unshareable(unsigned l, char *b) : raw(b, l) {
274  	    }
275  	    raw* clone_empty() override {
276  	      return new raw_char(len);
277  	    }
278  	    bool is_shareable() const override {
279  	      return false; // !shareable, will force make_shareable()
280  	    }
281  	    ~raw_unshareable() override {
282  	      delete[] data;
283  	    }
284  	  };
285  	
286  	  class buffer::raw_static : public buffer::raw {
287  	  public:
288  	    MEMPOOL_CLASS_HELPERS();
289  	
290  	    raw_static(const char *d, unsigned l) : raw((char*)d, l) { }
291  	    ~raw_static() override {}
292  	    raw* clone_empty() override {
293  	      return new buffer::raw_char(len);
294  	    }
295  	  };
296  	
297  	  class buffer::raw_claim_buffer : public buffer::raw {
298  	    deleter del;
299  	   public:
300  	    raw_claim_buffer(const char *b, unsigned l, deleter d)
301  	        : raw((char*)b, l), del(std::move(d)) { }
302  	    ~raw_claim_buffer() override {}
303  	    raw* clone_empty() override {
304  	      return new buffer::raw_char(len);
305  	    }
306  	  };
307  	
308  	  ceph::unique_leakable_ptr<buffer::raw> buffer::copy(const char *c, unsigned len) {
309  	    auto r = buffer::create_aligned(len, sizeof(size_t));
310  	    memcpy(r->data, c, len);
311  	    return r;
312  	  }
313  	
314  	  ceph::unique_leakable_ptr<buffer::raw> buffer::create(unsigned len) {
315  	    return buffer::create_aligned(len, sizeof(size_t));
316  	  }
317  	  ceph::unique_leakable_ptr<buffer::raw> buffer::create_in_mempool(unsigned len, int mempool) {
318  	    return buffer::create_aligned_in_mempool(len, sizeof(size_t), mempool);
319  	  }
320  	  buffer::raw* buffer::claim_char(unsigned len, char *buf) {
321  	    return new raw_claimed_char(len, buf);
322  	  }
323  	  buffer::raw* buffer::create_malloc(unsigned len) {
324  	    return new raw_malloc(len);
325  	  }
326  	  buffer::raw* buffer::claim_malloc(unsigned len, char *buf) {
327  	    return new raw_malloc(len, buf);
328  	  }
329  	  buffer::raw* buffer::create_static(unsigned len, char *buf) {
330  	    return new raw_static(buf, len);
331  	  }
332  	  buffer::raw* buffer::claim_buffer(unsigned len, char *buf, deleter del) {
333  	    return new raw_claim_buffer(buf, len, std::move(del));
334  	  }
335  	
336  	  ceph::unique_leakable_ptr<buffer::raw> buffer::create_aligned_in_mempool(
337  	    unsigned len, unsigned align, int mempool) {
338  	    // If alignment is a page multiple, use a separate buffer::raw to
339  	    // avoid fragmenting the heap.
340  	    //
341  	    // Somewhat unexpectedly, I see consistently better performance
342  	    // from raw_combined than from raw even when the allocation size is
343  	    // a page multiple (but alignment is not).
344  	    //
345  	    // I also see better performance from a separate buffer::raw once the
346  	    // size passes 8KB.
347  	    if ((align & ~CEPH_PAGE_MASK) == 0 ||
348  		len >= CEPH_PAGE_SIZE * 2) {
349  	#ifndef __CYGWIN__
350  	      return ceph::unique_leakable_ptr<buffer::raw>(new raw_posix_aligned(len, align));
351  	#else
352  	      return ceph::unique_leakable_ptr<buffer::raw>(new raw_hack_aligned(len, align));
353  	#endif
354  	    }
355  	    return ceph::unique_leakable_ptr<buffer::raw>(
356  	      raw_combined::create(len, align, mempool));
357  	  }
358  	  ceph::unique_leakable_ptr<buffer::raw> buffer::create_aligned(
359  	    unsigned len, unsigned align) {
360  	    return create_aligned_in_mempool(len, align,
361  					     mempool::mempool_buffer_anon);
362  	  }
363  	
364  	  ceph::unique_leakable_ptr<buffer::raw> buffer::create_page_aligned(unsigned len) {
365  	    return create_aligned(len, CEPH_PAGE_SIZE);
366  	  }
367  	  ceph::unique_leakable_ptr<buffer::raw> buffer::create_small_page_aligned(unsigned len) {
368  	    if (len < CEPH_PAGE_SIZE) {
369  	      return create_aligned(len, CEPH_BUFFER_ALLOC_UNIT);
370  	    } else
371  	      return create_aligned(len, CEPH_PAGE_SIZE);
372  	  }
373  	
374  	  buffer::raw* buffer::create_unshareable(unsigned len) {
375  	    return new raw_unshareable(len);
376  	  }
377  	
378  	  buffer::ptr::ptr(raw* r) : _raw(r), _off(0), _len(r->len)   // no lock needed; this is an unref raw.
379  	  {
380  	    r->nref++;
381  	    bdout << "ptr " << this << " get " << _raw << bendl;
382  	  }
383  	  buffer::ptr::ptr(ceph::unique_leakable_ptr<raw> r)
384  	    : _raw(r.release()),
385  	      _off(0),
386  	      _len(_raw->len)
387  	  {
388  	    _raw->nref.store(1, std::memory_order_release);
389  	    bdout << "ptr " << this << " get " << _raw << bendl;
390  	  }
391  	  buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
392  	  {
393  	    _raw = buffer::create(l).release();
394  	    _raw->nref.store(1, std::memory_order_release);
395  	    bdout << "ptr " << this << " get " << _raw << bendl;
396  	  }
397  	  buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l)    // ditto.
398  	  {
399  	    _raw = buffer::copy(d, l).release();
400  	    _raw->nref.store(1, std::memory_order_release);
401  	    bdout << "ptr " << this << " get " << _raw << bendl;
402  	  }
403  	  buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len)
404  	  {
405  	    if (_raw) {
406  	      _raw->nref++;
407  	      bdout << "ptr " << this << " get " << _raw << bendl;
408  	    }
409  	  }
410  	  buffer::ptr::ptr(ptr&& p) noexcept : _raw(p._raw), _off(p._off), _len(p._len)
411  	  {
412  	    p._raw = nullptr;
413  	    p._off = p._len = 0;
414  	  }
415  	  buffer::ptr::ptr(const ptr& p, unsigned o, unsigned l)
416  	    : _raw(p._raw), _off(p._off + o), _len(l)
417  	  {
418  	    ceph_assert(o+l <= p._len);
419  	    ceph_assert(_raw);
420  	    _raw->nref++;
421  	    bdout << "ptr " << this << " get " << _raw << bendl;
422  	  }
423  	  buffer::ptr::ptr(const ptr& p, ceph::unique_leakable_ptr<raw> r)
424  	    : _raw(r.release()),
425  	      _off(p._off),
426  	      _len(p._len)
427  	  {
428  	    _raw->nref.store(1, std::memory_order_release);
429  	    bdout << "ptr " << this << " get " << _raw << bendl;
430  	  }
431  	  buffer::ptr& buffer::ptr::operator= (const ptr& p)
432  	  {
433  	    if (p._raw) {
434  	      p._raw->nref++;
435  	      bdout << "ptr " << this << " get " << _raw << bendl;
436  	    }
437  	    buffer::raw *raw = p._raw; 
438  	    release();
439  	    if (raw) {
440  	      _raw = raw;
441  	      _off = p._off;
442  	      _len = p._len;
443  	    } else {
444  	      _off = _len = 0;
445  	    }
446  	    return *this;
447  	  }
448  	  buffer::ptr& buffer::ptr::operator= (ptr&& p) noexcept
449  	  {
450  	    release();
451  	    buffer::raw *raw = p._raw;
452  	    if (raw) {
453  	      _raw = raw;
454  	      _off = p._off;
455  	      _len = p._len;
456  	      p._raw = nullptr;
457  	      p._off = p._len = 0;
458  	    } else {
459  	      _off = _len = 0;
460  	    }
461  	    return *this;
462  	  }
463  	
464  	  ceph::unique_leakable_ptr<buffer::raw> buffer::ptr::clone()
465  	  {
466  	    return _raw->clone();
467  	  }
468  	
469  	  void buffer::ptr::swap(ptr& other) noexcept
470  	  {
471  	    raw *r = _raw;
472  	    unsigned o = _off;
473  	    unsigned l = _len;
474  	    _raw = other._raw;
475  	    _off = other._off;
476  	    _len = other._len;
477  	    other._raw = r;
478  	    other._off = o;
479  	    other._len = l;
480  	  }
481  	
482  	  void buffer::ptr::release()
483  	  {
484  	    if (_raw) {
485  	      bdout << "ptr " << this << " release " << _raw << bendl;
486  	      const bool last_one = (1 == _raw->nref.load(std::memory_order_acquire));
487  	      if (likely(last_one) || --_raw->nref == 0) {
488  	        // BE CAREFUL: this is called also for hypercombined ptr_node. After
489  	        // freeing underlying raw, `*this` can become inaccessible as well!
490  	        const auto* delete_raw = _raw;
491  	        _raw = nullptr;
492  		//cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
493  	        ANNOTATE_HAPPENS_AFTER(&_raw->nref);
494  	        ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
495  		delete delete_raw;  // dealloc old (if any)
496  	      } else {
497  	        ANNOTATE_HAPPENS_BEFORE(&_raw->nref);
498  	        _raw = nullptr;
499  	      }
500  	    }
501  	  }
502  	
503  	  int buffer::ptr::get_mempool() const {
504  	    if (_raw) {
505  	      return _raw->mempool;
506  	    }
507  	    return mempool::mempool_buffer_anon;
508  	  }
509  	
510  	  void buffer::ptr::reassign_to_mempool(int pool) {
511  	    if (_raw) {
512  	      _raw->reassign_to_mempool(pool);
513  	    }
514  	  }
515  	  void buffer::ptr::try_assign_to_mempool(int pool) {
516  	    if (_raw) {
517  	      _raw->try_assign_to_mempool(pool);
518  	    }
519  	  }
520  	
521  	  const char *buffer::ptr::c_str() const {
522  	    ceph_assert(_raw);
523  	    return _raw->get_data() + _off;
524  	  }
525  	  char *buffer::ptr::c_str() {
526  	    ceph_assert(_raw);
527  	    return _raw->get_data() + _off;
528  	  }
529  	  const char *buffer::ptr::end_c_str() const {
530  	    ceph_assert(_raw);
531  	    return _raw->get_data() + _off + _len;
532  	  }
533  	  char *buffer::ptr::end_c_str() {
534  	    ceph_assert(_raw);
535  	    return _raw->get_data() + _off + _len;
536  	  }
537  	
538  	  unsigned buffer::ptr::unused_tail_length() const
539  	  {
540  	    if (_raw)
541  	      return _raw->len - (_off+_len);
542  	    else
543  	      return 0;
544  	  }
545  	  const char& buffer::ptr::operator[](unsigned n) const
546  	  {
547  	    ceph_assert(_raw);
548  	    ceph_assert(n < _len);
549  	    return _raw->get_data()[_off + n];
550  	  }
551  	  char& buffer::ptr::operator[](unsigned n)
552  	  {
553  	    ceph_assert(_raw);
554  	    ceph_assert(n < _len);
555  	    return _raw->get_data()[_off + n];
556  	  }
557  	
558  	  const char *buffer::ptr::raw_c_str() const { ceph_assert(_raw); return _raw->data; }
559  	  unsigned buffer::ptr::raw_length() const { ceph_assert(_raw); return _raw->len; }
560  	  int buffer::ptr::raw_nref() const { ceph_assert(_raw); return _raw->nref; }
561  	
562  	  void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
563  	    ceph_assert(_raw);
564  	    if (o+l > _len)
565  	        throw end_of_buffer();
566  	    char* src =  _raw->data + _off + o;
567  	    maybe_inline_memcpy(dest, src, l, 8);
568  	  }
569  	
570  	  unsigned buffer::ptr::wasted() const
571  	  {
572  	    return _raw->len - _len;
573  	  }
574  	
575  	  int buffer::ptr::cmp(const ptr& o) const
576  	  {
577  	    int l = _len < o._len ? _len : o._len;
578  	    if (l) {
579  	      int r = memcmp(c_str(), o.c_str(), l);
580  	      if (r)
581  		return r;
582  	    }
583  	    if (_len < o._len)
584  	      return -1;
585  	    if (_len > o._len)
586  	      return 1;
587  	    return 0;
588  	  }
589  	
590  	  bool buffer::ptr::is_zero() const
591  	  {
592  	    return mem_is_zero(c_str(), _len);
593  	  }
594  	
595  	  unsigned buffer::ptr::append(char c)
596  	  {
597  	    ceph_assert(_raw);
598  	    ceph_assert(1 <= unused_tail_length());
599  	    char* ptr = _raw->data + _off + _len;
600  	    *ptr = c;
601  	    _len++;
602  	    return _len + _off;
603  	  }
604  	
605  	  unsigned buffer::ptr::append(const char *p, unsigned l)
606  	  {
| (1) Event cond_true: | Condition "this->_raw", taking true branch. | 
607  	    ceph_assert(_raw);
| (2) Event cond_true: | Condition "l <= this->unused_tail_length()", taking true branch. | 
608  	    ceph_assert(l <= unused_tail_length());
609  	    char* c = _raw->data + _off + _len;
| (3) Event access_dbuff_const: | Calling "maybe_inline_memcpy" indexes array "p" at byte position 7. [details] | 
610  	    maybe_inline_memcpy(c, p, l, 32);
611  	    _len += l;
612  	    return _len + _off;
613  	  }
614  	
615  	  unsigned buffer::ptr::append_zeros(unsigned l)
616  	  {
617  	    ceph_assert(_raw);
618  	    ceph_assert(l <= unused_tail_length());
619  	    char* c = _raw->data + _off + _len;
620  	    memset(c, 0, l);
621  	    _len += l;
622  	    return _len + _off;
623  	  }
624  	
625  	  void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src, bool crc_reset)
626  	  {
627  	    ceph_assert(_raw);
628  	    ceph_assert(o <= _len);
629  	    ceph_assert(o+l <= _len);
630  	    char* dest = _raw->data + _off + o;
631  	    if (crc_reset)
632  	        _raw->invalidate_crc();
633  	    maybe_inline_memcpy(dest, src, l, 64);
634  	  }
635  	
636  	  void buffer::ptr::zero(bool crc_reset)
637  	  {
638  	    if (crc_reset)
639  	        _raw->invalidate_crc();
640  	    memset(c_str(), 0, _len);
641  	  }
642  	
643  	  void buffer::ptr::zero(unsigned o, unsigned l, bool crc_reset)
644  	  {
645  	    ceph_assert(o+l <= _len);
646  	    if (crc_reset)
647  	        _raw->invalidate_crc();
648  	    memset(c_str()+o, 0, l);
649  	  }
650  	
651  	  // -- buffer::list::iterator --
652  	  /*
653  	  buffer::list::iterator operator=(const buffer::list::iterator& other)
654  	  {
655  	    if (this != &other) {
656  	      bl = other.bl;
657  	      ls = other.ls;
658  	      off = other.off;
659  	      p = other.p;
660  	      p_off = other.p_off;
661  	    }
662  	    return *this;
663  	    }*/
664  	
665  	  template<bool is_const>
666  	  buffer::list::iterator_impl<is_const>::iterator_impl(bl_t *l, unsigned o)
667  	    : bl(l), ls(&bl->_buffers), p(ls->begin()), off(0), p_off(0)
668  	  {
669  	    advance(o);
670  	  }
671  	
672  	  template<bool is_const>
673  	  buffer::list::iterator_impl<is_const>::iterator_impl(const buffer::list::iterator& i)
674  	    : iterator_impl<is_const>(i.bl, i.off, i.p, i.p_off) {}
675  	
676  	  template<bool is_const>
677  	  void buffer::list::iterator_impl<is_const>::advance(unsigned o)
678  	  {
679  	    //cout << this << " advance " << o << " from " << off
680  	    //     << " (p_off " << p_off << " in " << p->length() << ")"
681  	    //     << std::endl;
682  	
683  	    p_off +=o;
684  	    while (p != ls->end()) {
685  	      if (p_off >= p->length()) {
686  	        // skip this buffer
687  	        p_off -= p->length();
688  	        p++;
689  	      } else {
690  	        // somewhere in this buffer!
691  	        break;
692  	      }
693  	    }
694  	    if (p == ls->end() && p_off) {
695  	      throw end_of_buffer();
696  	    }
697  	    off += o;
698  	  }
699  	
700  	  template<bool is_const>
701  	  void buffer::list::iterator_impl<is_const>::seek(unsigned o)
702  	  {
703  	    p = ls->begin();
704  	    off = p_off = 0;
705  	    advance(o);
706  	  }
707  	
708  	  template<bool is_const>
709  	  char buffer::list::iterator_impl<is_const>::operator*() const
710  	  {
711  	    if (p == ls->end())
712  	      throw end_of_buffer();
713  	    return (*p)[p_off];
714  	  }
715  	
716  	  template<bool is_const>
717  	  buffer::list::iterator_impl<is_const>&
718  	  buffer::list::iterator_impl<is_const>::operator++()
719  	  {
720  	    if (p == ls->end())
721  	      throw end_of_buffer();
722  	    advance(1u);
723  	    return *this;
724  	  }
725  	
726  	  template<bool is_const>
727  	  buffer::ptr buffer::list::iterator_impl<is_const>::get_current_ptr() const
728  	  {
729  	    if (p == ls->end())
730  	      throw end_of_buffer();
731  	    return ptr(*p, p_off, p->length() - p_off);
732  	  }
733  	
734  	  template<bool is_const>
735  	  bool buffer::list::iterator_impl<is_const>::is_pointing_same_raw(
736  	    const ptr& other) const
737  	  {
738  	    if (p == ls->end())
739  	      throw end_of_buffer();
740  	    return p->get_raw() == other.get_raw();
741  	  }
742  	
743  	  // copy data out.
744  	  // note that these all _append_ to dest!
745  	  template<bool is_const>
746  	  void buffer::list::iterator_impl<is_const>::copy(unsigned len, char *dest)
747  	  {
748  	    if (p == ls->end()) seek(off);
749  	    while (len > 0) {
750  	      if (p == ls->end())
751  		throw end_of_buffer();
752  	
753  	      unsigned howmuch = p->length() - p_off;
754  	      if (len < howmuch) howmuch = len;
755  	      p->copy_out(p_off, howmuch, dest);
756  	      dest += howmuch;
757  	
758  	      len -= howmuch;
759  	      advance(howmuch);
760  	    }
761  	  }
762  	
763  	  template<bool is_const>
764  	  void buffer::list::iterator_impl<is_const>::copy(unsigned len, ptr &dest)
765  	  {
766  	    copy_deep(len, dest);
767  	  }
768  	
769  	  template<bool is_const>
770  	  void buffer::list::iterator_impl<is_const>::copy_deep(unsigned len, ptr &dest)
771  	  {
772  	    if (!len) {
773  	      return;
774  	    }
775  	    if (p == ls->end())
776  	      throw end_of_buffer();
777  	    dest = create(len);
778  	    copy(len, dest.c_str());
779  	  }
780  	  template<bool is_const>
781  	  void buffer::list::iterator_impl<is_const>::copy_shallow(unsigned len,
782  								   ptr &dest)
783  	  {
784  	    if (!len) {
785  	      return;
786  	    }
787  	    if (p == ls->end())
788  	      throw end_of_buffer();
789  	    unsigned howmuch = p->length() - p_off;
790  	    if (howmuch < len) {
791  	      dest = create(len);
792  	      copy(len, dest.c_str());
793  	    } else {
794  	      dest = ptr(*p, p_off, len);
795  	      advance(len);
796  	    }
797  	  }
798  	
799  	  template<bool is_const>
800  	  void buffer::list::iterator_impl<is_const>::copy(unsigned len, list &dest)
801  	  {
802  	    if (p == ls->end())
803  	      seek(off);
804  	    while (len > 0) {
805  	      if (p == ls->end())
806  		throw end_of_buffer();
807  	
808  	      unsigned howmuch = p->length() - p_off;
809  	      if (len < howmuch)
810  		howmuch = len;
811  	      dest.append(*p, p_off, howmuch);
812  	
813  	      len -= howmuch;
814  	      advance(howmuch);
815  	    }
816  	  }
817  	
818  	  template<bool is_const>
819  	  void buffer::list::iterator_impl<is_const>::copy(unsigned len, std::string &dest)
820  	  {
821  	    if (p == ls->end())
822  	      seek(off);
823  	    while (len > 0) {
824  	      if (p == ls->end())
825  		throw end_of_buffer();
826  	
827  	      unsigned howmuch = p->length() - p_off;
828  	      const char *c_str = p->c_str();
829  	      if (len < howmuch)
830  		howmuch = len;
831  	      dest.append(c_str + p_off, howmuch);
832  	
833  	      len -= howmuch;
834  	      advance(howmuch);
835  	    }
836  	  }
837  	
838  	  template<bool is_const>
839  	  void buffer::list::iterator_impl<is_const>::copy_all(list &dest)
840  	  {
841  	    if (p == ls->end())
842  	      seek(off);
843  	    while (1) {
844  	      if (p == ls->end())
845  		return;
846  	
847  	      unsigned howmuch = p->length() - p_off;
848  	      const char *c_str = p->c_str();
849  	      dest.append(c_str + p_off, howmuch);
850  	
851  	      advance(howmuch);
852  	    }
853  	  }
854  	
855  	  template<bool is_const>
856  	  size_t buffer::list::iterator_impl<is_const>::get_ptr_and_advance(
857  	    size_t want, const char **data)
858  	  {
859  	    if (p == ls->end()) {
860  	      seek(off);
861  	      if (p == ls->end()) {
862  		return 0;
863  	      }
864  	    }
865  	    *data = p->c_str() + p_off;
866  	    size_t l = std::min<size_t>(p->length() - p_off, want);
867  	    p_off += l;
868  	    if (p_off == p->length()) {
869  	      ++p;
870  	      p_off = 0;
871  	    }
872  	    off += l;
873  	    return l;
874  	  }
875  	
876  	  template<bool is_const>
877  	  uint32_t buffer::list::iterator_impl<is_const>::crc32c(
878  	    size_t length, uint32_t crc)
879  	  {
880  	    length = std::min<size_t>(length, get_remaining());
881  	    while (length > 0) {
882  	      const char *p;
883  	      size_t l = get_ptr_and_advance(length, &p);
884  	      crc = ceph_crc32c(crc, (unsigned char*)p, l);
885  	      length -= l;
886  	    }
887  	    return crc;
888  	  }
889  	
890  	  // explicitly instantiate only the iterator types we need, so we can hide the
891  	  // details in this compilation unit without introducing unnecessary link time
892  	  // dependencies.
893  	  template class buffer::list::iterator_impl<true>;
894  	  template class buffer::list::iterator_impl<false>;
895  	
896  	  buffer::list::iterator::iterator(bl_t *l, unsigned o)
897  	    : iterator_impl(l, o)
898  	  {}
899  	
900  	  buffer::list::iterator::iterator(bl_t *l, unsigned o, list_iter_t ip, unsigned po)
901  	    : iterator_impl(l, o, ip, po)
902  	  {}
903  	
904  	  // copy data in
905  	  void buffer::list::iterator::copy_in(unsigned len, const char *src, bool crc_reset)
906  	  {
907  	    // copy
908  	    if (p == ls->end())
909  	      seek(off);
910  	    while (len > 0) {
911  	      if (p == ls->end())
912  		throw end_of_buffer();
913  	      
914  	      unsigned howmuch = p->length() - p_off;
915  	      if (len < howmuch)
916  		howmuch = len;
917  	      p->copy_in(p_off, howmuch, src, crc_reset);
918  		
919  	      src += howmuch;
920  	      len -= howmuch;
921  	      advance(howmuch);
922  	    }
923  	  }
924  	  
925  	  void buffer::list::iterator::copy_in(unsigned len, const list& otherl)
926  	  {
927  	    if (p == ls->end())
928  	      seek(off);
929  	    unsigned left = len;
930  	    for (const auto& node : otherl._buffers) {
931  	      unsigned l = node.length();
932  	      if (left < l)
933  		l = left;
934  	      copy_in(l, node.c_str());
935  	      left -= l;
936  	      if (left == 0)
937  		break;
938  	    }
939  	  }
940  	
941  	  // -- buffer::list --
942  	
943  	  buffer::list::list(list&& other) noexcept
944  	    : _buffers(std::move(other._buffers)),
945  	      _carriage(&always_empty_bptr),
946  	      _len(other._len),
947  	      _memcopy_count(other._memcopy_count),
948  	      last_p(this) {
949  	    other.clear();
950  	  }
951  	
952  	  void buffer::list::swap(list& other) noexcept
953  	  {
954  	    std::swap(_len, other._len);
955  	    std::swap(_memcopy_count, other._memcopy_count);
956  	    std::swap(_carriage, other._carriage);
957  	    _buffers.swap(other._buffers);
958  	    //last_p.swap(other.last_p);
959  	    last_p = begin();
960  	    other.last_p = other.begin();
961  	  }
962  	
963  	  bool buffer::list::contents_equal(const ceph::buffer::list& other) const
964  	  {
965  	    if (length() != other.length())
966  	      return false;
967  	
968  	    // buffer-wise comparison
969  	    if (true) {
970  	      auto a = std::cbegin(_buffers);
971  	      auto b = std::cbegin(other._buffers);
972  	      unsigned aoff = 0, boff = 0;
973  	      while (a != std::cend(_buffers)) {
974  		unsigned len = a->length() - aoff;
975  		if (len > b->length() - boff)
976  		  len = b->length() - boff;
977  		if (memcmp(a->c_str() + aoff, b->c_str() + boff, len) != 0)
978  		  return false;
979  		aoff += len;
980  		if (aoff == a->length()) {
981  		  aoff = 0;
982  		  ++a;
983  		}
984  		boff += len;
985  		if (boff == b->length()) {
986  		  boff = 0;
987  		  ++b;
988  		}
989  	      }
990  	      return true;
991  	    }
992  	
993  	    // byte-wise comparison
994  	    if (false) {
995  	      bufferlist::const_iterator me = begin();
996  	      bufferlist::const_iterator him = other.begin();
997  	      while (!me.end()) {
998  		if (*me != *him)
999  		  return false;
1000 		++me;
1001 		++him;
1002 	      }
1003 	      return true;
1004 	    }
1005 	  }
1006 	
1007 	  bool buffer::list::contents_equal(const void* const other,
1008 	                                    size_t length) const
1009 	  {
1010 	    if (this->length() != length) {
1011 	      return false;
1012 	    }
1013 	
1014 	    const auto* other_buf = reinterpret_cast<const char*>(other);
1015 	    for (const auto& bp : buffers()) {
1016 	      const auto round_length = std::min<size_t>(length, bp.length());
1017 	      if (std::memcmp(bp.c_str(), other_buf, round_length) != 0) {
1018 	        return false;
1019 	      } else {
1020 	        length -= round_length;
1021 	        other_buf += round_length;
1022 	      }
1023 	    }
1024 	
1025 	    return true;
1026 	  }
1027 	
1028 	  bool buffer::list::is_provided_buffer(const char* const dst) const
1029 	  {
1030 	    if (_buffers.empty()) {
1031 	      return false;
1032 	    }
1033 	    return (is_contiguous() && (_buffers.front().c_str() == dst));
1034 	  }
1035 	
1036 	  bool buffer::list::is_aligned(const unsigned align) const
1037 	  {
1038 	    for (const auto& node : _buffers) {
1039 	      if (!node.is_aligned(align)) {
1040 		return false;
1041 	      }
1042 	    }
1043 	    return true;
1044 	  }
1045 	
1046 	  bool buffer::list::is_n_align_sized(const unsigned align) const
1047 	  {
1048 	    for (const auto& node : _buffers) {
1049 	      if (!node.is_n_align_sized(align)) {
1050 		return false;
1051 	      }
1052 	    }
1053 	    return true;
1054 	  }
1055 	
1056 	  bool buffer::list::is_aligned_size_and_memory(
1057 	    const unsigned align_size,
1058 	    const unsigned align_memory) const
1059 	  {
1060 	    for (const auto& node : _buffers) {
1061 	      if (!node.is_aligned(align_memory) || !node.is_n_align_sized(align_size)) {
1062 		return false;
1063 	      }
1064 	    }
1065 	    return true;
1066 	  }
1067 	
1068 	  bool buffer::list::is_zero() const {
1069 	    for (const auto& node : _buffers) {
1070 	      if (!node.is_zero()) {
1071 		return false;
1072 	      }
1073 	    }
1074 	    return true;
1075 	  }
1076 	
1077 	  void buffer::list::zero()
1078 	  {
1079 	    for (auto& node : _buffers) {
1080 	      node.zero();
1081 	    }
1082 	  }
1083 	
1084 	  void buffer::list::zero(const unsigned o, const unsigned l)
1085 	  {
1086 	    ceph_assert(o+l <= _len);
1087 	    unsigned p = 0;
1088 	    for (auto& node : _buffers) {
1089 	      if (p + node.length() > o) {
1090 	        if (p >= o && p+node.length() <= o+l) {
1091 	          // 'o'------------- l -----------|
1092 	          //      'p'-- node.length() --|
1093 		  node.zero();
1094 	        } else if (p >= o) {
1095 	          // 'o'------------- l -----------|
1096 	          //    'p'------- node.length() -------|
1097 		  node.zero(0, o+l-p);
1098 	        } else if (p + node.length() <= o+l) {
1099 	          //     'o'------------- l -----------|
1100 	          // 'p'------- node.length() -------|
1101 		  node.zero(o-p, node.length()-(o-p));
1102 	        } else {
1103 	          //       'o'----------- l -----------|
1104 	          // 'p'---------- node.length() ----------|
1105 	          node.zero(o-p, l);
1106 	        }
1107 	      }
1108 	      p += node.length();
1109 	      if (o+l <= p) {
1110 		break;  // done
1111 	      }
1112 	    }
1113 	  }
1114 	
1115 	  bool buffer::list::is_contiguous() const
1116 	  {
1117 	    return _buffers.size() <= 1;
1118 	  }
1119 	
1120 	  bool buffer::list::is_n_page_sized() const
1121 	  {
1122 	    return is_n_align_sized(CEPH_PAGE_SIZE);
1123 	  }
1124 	
1125 	  bool buffer::list::is_page_aligned() const
1126 	  {
1127 	    return is_aligned(CEPH_PAGE_SIZE);
1128 	  }
1129 	
1130 	  int buffer::list::get_mempool() const
1131 	  {
1132 	    if (_buffers.empty()) {
1133 	      return mempool::mempool_buffer_anon;
1134 	    }
1135 	    return _buffers.back().get_mempool();
1136 	  }
1137 	
1138 	  void buffer::list::reassign_to_mempool(int pool)
1139 	  {
1140 	    for (auto& p : _buffers) {
1141 	      p.get_raw()->reassign_to_mempool(pool);
1142 	    }
1143 	  }
1144 	
1145 	  void buffer::list::try_assign_to_mempool(int pool)
1146 	  {
1147 	    for (auto& p : _buffers) {
1148 	      p.get_raw()->try_assign_to_mempool(pool);
1149 	    }
1150 	  }
1151 	
1152 	  uint64_t buffer::list::get_wasted_space() const
1153 	  {
1154 	    if (_buffers.size() == 1)
1155 	      return _buffers.back().wasted();
1156 	
1157 	    std::vector<const raw*> raw_vec;
1158 	    raw_vec.reserve(_buffers.size());
1159 	    for (const auto& p : _buffers)
1160 	      raw_vec.push_back(p.get_raw());
1161 	    std::sort(raw_vec.begin(), raw_vec.end());
1162 	
1163 	    uint64_t total = 0;
1164 	    const raw *last = nullptr;
1165 	    for (const auto r : raw_vec) {
1166 	      if (r == last)
1167 		continue;
1168 	      last = r;
1169 	      total += r->len;
1170 	    }
1171 	    // If multiple buffers are sharing the same raw buffer and they overlap
1172 	    // with each other, the wasted space will be underestimated.
1173 	    if (total <= length())
1174 	      return 0;
1175 	    return total - length();
1176 	  }
1177 	
1178 	  void buffer::list::rebuild()
1179 	  {
1180 	    if (_len == 0) {
1181 	      _carriage = &always_empty_bptr;
1182 	      _buffers.clear_and_dispose();
1183 	      return;
1184 	    }
1185 	    if ((_len & ~CEPH_PAGE_MASK) == 0)
1186 	      rebuild(ptr_node::create(buffer::create_page_aligned(_len)));
1187 	    else
1188 	      rebuild(ptr_node::create(buffer::create(_len)));
1189 	  }
1190 	
1191 	  void buffer::list::rebuild(
1192 	    std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> nb)
1193 	  {
1194 	    unsigned pos = 0;
1195 	    for (auto& node : _buffers) {
1196 	      nb->copy_in(pos, node.length(), node.c_str(), false);
1197 	      pos += node.length();
1198 	    }
1199 	    _memcopy_count += pos;
1200 	    _carriage = &always_empty_bptr;
1201 	    _buffers.clear_and_dispose();
1202 	    if (likely(nb->length())) {
1203 	      _carriage = nb.get();
1204 	      _buffers.push_back(*nb.release());
1205 	    }
1206 	    invalidate_crc();
1207 	    last_p = begin();
1208 	  }
1209 	
1210 	  bool buffer::list::rebuild_aligned(unsigned align)
1211 	  {
1212 	    return rebuild_aligned_size_and_memory(align, align);
1213 	  }
1214 	  
1215 	  bool buffer::list::rebuild_aligned_size_and_memory(unsigned align_size,
1216 							    unsigned align_memory,
1217 							    unsigned max_buffers)
1218 	  {
1219 	    unsigned old_memcopy_count = _memcopy_count;
1220 	
1221 	    if (max_buffers && _buffers.size() > max_buffers
1222 		&& _len > (max_buffers * align_size)) {
1223 	      align_size = round_up_to(round_up_to(_len, max_buffers) / max_buffers, align_size);
1224 	    }
1225 	    auto p = std::begin(_buffers);
1226 	    auto p_prev = _buffers.before_begin();
1227 	    while (p != std::end(_buffers)) {
1228 	      // keep anything that's already align and sized aligned
1229 	      if (p->is_aligned(align_memory) && p->is_n_align_sized(align_size)) {
1230 	        /*cout << " segment " << (void*)p->c_str()
1231 	  	     << " offset " << ((unsigned long)p->c_str() & (align - 1))
1232 	  	     << " length " << p->length()
1233 	  	     << " " << (p->length() & (align - 1)) << " ok" << std::endl;
1234 	        */
1235 	        p_prev = p++;
1236 	        continue;
1237 	      }
1238 	      
1239 	      // consolidate unaligned items, until we get something that is sized+aligned
1240 	      list unaligned;
1241 	      unsigned offset = 0;
1242 	      do {
1243 	        /*cout << " segment " << (void*)p->c_str()
1244 	               << " offset " << ((unsigned long)p->c_str() & (align - 1))
1245 	               << " length " << p->length() << " " << (p->length() & (align - 1))
1246 	               << " overall offset " << offset << " " << (offset & (align - 1))
1247 	  	     << " not ok" << std::endl;
1248 	        */
1249 	        offset += p->length();
1250 	        // no need to reallocate, relinking is enough thankfully to bi::list.
1251 	        auto p_after = _buffers.erase_after(p_prev);
1252 	        unaligned._buffers.push_back(*p);
1253 	        unaligned._len += p->length();
1254 	        p = p_after;
1255 	      } while (p != std::end(_buffers) &&
1256 	  	     (!p->is_aligned(align_memory) ||
1257 	  	      !p->is_n_align_sized(align_size) ||
1258 	  	      (offset % align_size)));
1259 	      if (!(unaligned.is_contiguous() && unaligned._buffers.front().is_aligned(align_memory))) {
1260 	        unaligned.rebuild(
1261 	          ptr_node::create(
1262 	            buffer::create_aligned(unaligned._len, align_memory)));
1263 	        _memcopy_count += unaligned._len;
1264 	      }
1265 	      _buffers.insert_after(p_prev, *ptr_node::create(unaligned._buffers.front()).release());
1266 	      ++p_prev;
1267 	    }
1268 	    last_p = begin();
1269 	
1270 	    return  (old_memcopy_count != _memcopy_count);
1271 	  }
1272 	  
1273 	  bool buffer::list::rebuild_page_aligned()
1274 	  {
1275 	   return  rebuild_aligned(CEPH_PAGE_SIZE);
1276 	  }
1277 	
1278 	  void buffer::list::reserve(size_t prealloc)
1279 	  {
1280 	    if (get_append_buffer_unused_tail_length() < prealloc) {
1281 	      auto ptr = ptr_node::create(buffer::create_page_aligned(prealloc));
1282 	      ptr->set_length(0);   // unused, so far.
1283 	      _carriage = ptr.get();
1284 	      _buffers.push_back(*ptr.release());
1285 	    }
1286 	  }
1287 	
1288 	  // sort-of-like-assignment-op
1289 	  void buffer::list::claim(list& bl, unsigned int flags)
1290 	  {
1291 	    // free my buffers
1292 	    clear();
1293 	    claim_append(bl, flags);
1294 	  }
1295 	
1296 	  void buffer::list::claim_append(list& bl, unsigned int flags)
1297 	  {
1298 	    // steal the other guy's buffers
1299 	    _len += bl._len;
1300 	    if (!(flags & CLAIM_ALLOW_NONSHAREABLE)) {
1301 	      auto curbuf = bl._buffers.begin();
1302 	      auto curbuf_prev = bl._buffers.before_begin();
1303 	
1304 	      while (curbuf != bl._buffers.end()) {
1305 		const auto* const raw = curbuf->get_raw();
1306 		if (unlikely(raw && !raw->is_shareable())) {
1307 		  auto* clone = ptr_node::copy_hypercombined(*curbuf);
1308 		  curbuf = bl._buffers.erase_after_and_dispose(curbuf_prev);
1309 		  bl._buffers.insert_after(curbuf_prev++, *clone);
1310 		} else {
1311 		  curbuf_prev = curbuf++;
1312 		}
1313 	      }
1314 	    }
1315 	    _buffers.splice_back(bl._buffers);
1316 	    bl._carriage = &always_empty_bptr;
1317 	    bl._buffers.clear_and_dispose();
1318 	    bl._len = 0;
1319 	    bl.last_p = bl.begin();
1320 	  }
1321 	
1322 	  void buffer::list::claim_append_piecewise(list& bl)
1323 	  {
1324 	    // steal the other guy's buffers
1325 	    for (const auto& node : bl.buffers()) {
1326 	      append(node, 0, node.length());
1327 	    }
1328 	    bl.clear();
1329 	  }
1330 	
1331 	  void buffer::list::copy(unsigned off, unsigned len, char *dest) const
1332 	  {
1333 	    if (off + len > length())
1334 	      throw end_of_buffer();
1335 	    if (last_p.get_off() != off) 
1336 	      last_p.seek(off);
1337 	    last_p.copy(len, dest);
1338 	  }
1339 	
1340 	  void buffer::list::copy(unsigned off, unsigned len, list &dest) const
1341 	  {
1342 	    if (off + len > length())
1343 	      throw end_of_buffer();
1344 	    if (last_p.get_off() != off) 
1345 	      last_p.seek(off);
1346 	    last_p.copy(len, dest);
1347 	  }
1348 	
1349 	  void buffer::list::copy(unsigned off, unsigned len, std::string& dest) const
1350 	  {
1351 	    if (last_p.get_off() != off) 
1352 	      last_p.seek(off);
1353 	    return last_p.copy(len, dest);
1354 	  }
1355 	    
1356 	  void buffer::list::copy_in(unsigned off, unsigned len, const char *src, bool crc_reset)
1357 	  {
1358 	    if (off + len > length())
1359 	      throw end_of_buffer();
1360 	    
1361 	    if (last_p.get_off() != off) 
1362 	      last_p.seek(off);
1363 	    last_p.copy_in(len, src, crc_reset);
1364 	  }
1365 	
1366 	  void buffer::list::copy_in(unsigned off, unsigned len, const list& src)
1367 	  {
1368 	    if (last_p.get_off() != off) 
1369 	      last_p.seek(off);
1370 	    last_p.copy_in(len, src);
1371 	  }
1372 	
1373 	  void buffer::list::append(char c)
1374 	  {
1375 	    // put what we can into the existing append_buffer.
1376 	    unsigned gap = get_append_buffer_unused_tail_length();
1377 	    if (!gap) {
1378 	      // make a new buffer!
1379 	      auto buf = ptr_node::create(
1380 		raw_combined::create(CEPH_BUFFER_APPEND_SIZE, 0, get_mempool()));
1381 	      buf->set_length(0);   // unused, so far.
1382 	      _carriage = buf.get();
1383 	      _buffers.push_back(*buf.release());
1384 	    } else if (unlikely(_carriage != &_buffers.back())) {
1385 	      auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
1386 	      _carriage = bptr.get();
1387 	      _buffers.push_back(*bptr.release());
1388 	    }
1389 	    _carriage->append(c);
1390 	    _len++;
1391 	  }
1392 	
1393 	  buffer::ptr buffer::list::always_empty_bptr;
1394 	
1395 	  buffer::ptr_node& buffer::list::refill_append_space(const unsigned len)
1396 	  {
1397 	    // make a new buffer.  fill out a complete page, factoring in the
1398 	    // raw_combined overhead.
1399 	    size_t need = round_up_to(len, sizeof(size_t)) + sizeof(raw_combined);
1400 	    size_t alen = round_up_to(need, CEPH_BUFFER_ALLOC_UNIT) -
1401 	      sizeof(raw_combined);
1402 	    auto new_back = \
1403 	      ptr_node::create(raw_combined::create(alen, 0, get_mempool()));
1404 	    new_back->set_length(0);   // unused, so far.
1405 	    _carriage = new_back.get();
1406 	    _buffers.push_back(*new_back.release());
1407 	    return _buffers.back();
1408 	  }
1409 	
1410 	  void buffer::list::append(const char *data, unsigned len)
1411 	  {
1412 	    _len += len;
1413 	
1414 	    const unsigned free_in_last = get_append_buffer_unused_tail_length();
1415 	    const unsigned first_round = std::min(len, free_in_last);
| (1) Event cond_true: | Condition "first_round", taking true branch. | 
1416 	    if (first_round) {
1417 	      // _buffers and carriage can desynchronize when 1) a new ptr
1418 	      // we don't own has been added into the _buffers 2) _buffers
1419 	      // has been emptied as as a result of std::move or stolen by
1420 	      // claim_append.
| (2) Event cond_true: | Condition "this->_carriage != this->_buffers.back()", taking true branch. | 
| (3) Event cond_true: | Condition "this->_carriage != this->_buffers.back()", taking true branch. | 
1421 	      if (unlikely(_carriage != &_buffers.back())) {
1422 	        auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
1423 		_carriage = bptr.get();
1424 		_buffers.push_back(*bptr.release());
1425 	      }
| (4) Event access_dbuff_const: | Calling "append" indexes array "data" at byte position 7. [details] | 
1426 	      _carriage->append(data, first_round);
1427 	    }
1428 	
1429 	    const unsigned second_round = len - first_round;
1430 	    if (second_round) {
1431 	      auto& new_back = refill_append_space(second_round);
1432 	      new_back.append(data + first_round, second_round);
1433 	    }
1434 	  }
1435 	
1436 	  buffer::list::reserve_t buffer::list::obtain_contiguous_space(
1437 	    const unsigned len)
1438 	  {
1439 	    // note: if len < the normal append_buffer size it *might*
1440 	    // be better to allocate a normal-sized append_buffer and
1441 	    // use part of it.  however, that optimizes for the case of
1442 	    // old-style types including new-style types.  and in most
1443 	    // such cases, this won't be the very first thing encoded to
1444 	    // the list, so append_buffer will already be allocated.
1445 	    // OTOH if everything is new-style, we *should* allocate
1446 	    // only what we need and conserve memory.
1447 	    if (unlikely(get_append_buffer_unused_tail_length() < len)) {
1448 	      auto new_back = \
1449 		buffer::ptr_node::create(buffer::create(len)).release();
1450 	      new_back->set_length(0);   // unused, so far.
1451 	      _buffers.push_back(*new_back);
1452 	      _carriage = new_back;
1453 	      return { new_back->c_str(), &new_back->_len, &_len };
1454 	    } else {
1455 	      if (unlikely(_carriage != &_buffers.back())) {
1456 	        auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
1457 		_carriage = bptr.get();
1458 		_buffers.push_back(*bptr.release());
1459 	      }
1460 	      return { _carriage->end_c_str(), &_carriage->_len, &_len };
1461 	    }
1462 	  }
1463 	
1464 	  void buffer::list::append(const ptr& bp)
1465 	  {
1466 	      push_back(bp);
1467 	  }
1468 	
1469 	  void buffer::list::append(ptr&& bp)
1470 	  {
1471 	      push_back(std::move(bp));
1472 	  }
1473 	
1474 	  void buffer::list::append(const ptr& bp, unsigned off, unsigned len)
1475 	  {
1476 	    ceph_assert(len+off <= bp.length());
1477 	    if (!_buffers.empty()) {
1478 	      ptr &l = _buffers.back();
1479 	      if (l.get_raw() == bp.get_raw() &&
1480 		  l.end() == bp.start() + off) {
1481 		// yay contiguous with tail bp!
1482 		l.set_length(l.length()+len);
1483 		_len += len;
1484 		return;
1485 	      }
1486 	    }
1487 	    // add new item to list
1488 	    _buffers.push_back(*ptr_node::create(bp, off, len).release());
1489 	    _len += len;
1490 	  }
1491 	
1492 	  void buffer::list::append(const list& bl)
1493 	  {
1494 	    _len += bl._len;
1495 	    for (const auto& node : bl._buffers) {
1496 	      _buffers.push_back(*ptr_node::create(node).release());
1497 	    }
1498 	  }
1499 	
1500 	  void buffer::list::append(std::istream& in)
1501 	  {
1502 	    while (!in.eof()) {
1503 	      std::string s;
1504 	      getline(in, s);
1505 	      append(s.c_str(), s.length());
1506 	      if (s.length())
1507 		append("\n", 1);
1508 	    }
1509 	  }
1510 	
1511 	  buffer::list::contiguous_filler buffer::list::append_hole(const unsigned len)
1512 	  {
1513 	    _len += len;
1514 	
1515 	    if (unlikely(get_append_buffer_unused_tail_length() < len)) {
1516 	      // make a new append_buffer.  fill out a complete page, factoring in
1517 	      // the raw_combined overhead.
1518 	      auto& new_back = refill_append_space(len);
1519 	      new_back.set_length(len);
1520 	      return { new_back.c_str() };
1521 	    } else if (unlikely(_carriage != &_buffers.back())) {
1522 	      auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
1523 	      _carriage = bptr.get();
1524 	      _buffers.push_back(*bptr.release());
1525 	    }
1526 	    _carriage->set_length(_carriage->length() + len);
1527 	    return { _carriage->end_c_str() - len };
1528 	  }
1529 	
1530 	  void buffer::list::prepend_zero(unsigned len)
1531 	  {
1532 	    auto bp = ptr_node::create(len);
1533 	    bp->zero(false);
1534 	    _len += len;
1535 	    _buffers.push_front(*bp.release());
1536 	  }
1537 	  
1538 	  void buffer::list::append_zero(unsigned len)
1539 	  {
1540 	    _len += len;
1541 	
1542 	    const unsigned free_in_last = get_append_buffer_unused_tail_length();
1543 	    const unsigned first_round = std::min(len, free_in_last);
1544 	    if (first_round) {
1545 	      if (unlikely(_carriage != &_buffers.back())) {
1546 	        auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
1547 		_carriage = bptr.get();
1548 		_buffers.push_back(*bptr.release());
1549 	      }
1550 	      _carriage->append_zeros(first_round);
1551 	    }
1552 	
1553 	    const unsigned second_round = len - first_round;
1554 	    if (second_round) {
1555 	      auto& new_back = refill_append_space(second_round);
1556 	      new_back.set_length(second_round);
1557 	      new_back.zero(false);
1558 	    }
1559 	  }
1560 	
1561 	  
1562 	  /*
1563 	   * get a char
1564 	   */
1565 	  const char& buffer::list::operator[](unsigned n) const
1566 	  {
1567 	    if (n >= _len)
1568 	      throw end_of_buffer();
1569 	    
1570 	    for (const auto& node : _buffers) {
1571 	      if (n >= node.length()) {
1572 		n -= node.length();
1573 		continue;
1574 	      }
1575 	      return node[n];
1576 	    }
1577 	    ceph_abort();
1578 	  }
1579 	
1580 	  /*
1581 	   * return a contiguous ptr to whole bufferlist contents.
1582 	   */
1583 	  char *buffer::list::c_str()
1584 	  {
1585 	    if (_buffers.empty())
1586 	      return 0;                         // no buffers
1587 	
1588 	    auto iter = std::cbegin(_buffers);
1589 	    ++iter;
1590 	
1591 	    if (iter != std::cend(_buffers)) {
1592 	      rebuild();
1593 	    }
1594 	    return _buffers.front().c_str();  // good, we're already contiguous.
1595 	  }
1596 	
1597 	  string buffer::list::to_str() const {
1598 	    string s;
1599 	    s.reserve(length());
1600 	    for (const auto& node : _buffers) {
1601 	      if (node.length()) {
1602 		s.append(node.c_str(), node.length());
1603 	      }
1604 	    }
1605 	    return s;
1606 	  }
1607 	
1608 	  void buffer::list::substr_of(const list& other, unsigned off, unsigned len)
1609 	  {
1610 	    if (off + len > other.length())
1611 	      throw end_of_buffer();
1612 	
1613 	    clear();
1614 	
1615 	    // skip off
1616 	    auto curbuf = std::cbegin(other._buffers);
1617 	    while (off > 0 && off >= curbuf->length()) {
1618 	      // skip this buffer
1619 	      //cout << "skipping over " << *curbuf << std::endl;
1620 	      off -= (*curbuf).length();
1621 	      ++curbuf;
1622 	    }
1623 	    ceph_assert(len == 0 || curbuf != std::cend(other._buffers));
1624 	    
1625 	    while (len > 0) {
1626 	      // partial?
1627 	      if (off + len < curbuf->length()) {
1628 		//cout << "copying partial of " << *curbuf << std::endl;
1629 		_buffers.push_back(*ptr_node::create( *curbuf, off, len ).release());
1630 		_len += len;
1631 		break;
1632 	      }
1633 	      
1634 	      // through end
1635 	      //cout << "copying end (all?) of " << *curbuf << std::endl;
1636 	      unsigned howmuch = curbuf->length() - off;
1637 	      _buffers.push_back(*ptr_node::create( *curbuf, off, howmuch ).release());
1638 	      _len += howmuch;
1639 	      len -= howmuch;
1640 	      off = 0;
1641 	      ++curbuf;
1642 	    }
1643 	  }
1644 	
1645 	  // funky modifer
1646 	  void buffer::list::splice(unsigned off, unsigned len, list *claim_by /*, bufferlist& replace_with */)
1647 	  {    // fixme?
1648 	    if (len == 0)
1649 	      return;
1650 	
1651 	    if (off >= length())
1652 	      throw end_of_buffer();
1653 	
1654 	    ceph_assert(len > 0);
1655 	    //cout << "splice off " << off << " len " << len << " ... mylen = " << length() << std::endl;
1656 	      
1657 	    // skip off
1658 	    auto curbuf = std::begin(_buffers);
1659 	    auto curbuf_prev = _buffers.before_begin();
1660 	    while (off > 0) {
1661 	      ceph_assert(curbuf != std::end(_buffers));
1662 	      if (off >= (*curbuf).length()) {
1663 		// skip this buffer
1664 		//cout << "off = " << off << " skipping over " << *curbuf << std::endl;
1665 		off -= (*curbuf).length();
1666 		curbuf_prev = curbuf++;
1667 	      } else {
1668 		// somewhere in this buffer!
1669 		//cout << "off = " << off << " somewhere in " << *curbuf << std::endl;
1670 		break;
1671 	      }
1672 	    }
1673 	    
1674 	    if (off) {
1675 	      // add a reference to the front bit
1676 	      //  insert it before curbuf (which we'll hose)
1677 	      //cout << "keeping front " << off << " of " << *curbuf << std::endl;
1678 	      _buffers.insert_after(curbuf_prev,
1679 				    *ptr_node::create(*curbuf, 0, off).release());
1680 	      _len += off;
1681 	      ++curbuf_prev;
1682 	    }
1683 	    
1684 	    _carriage = &always_empty_bptr;
1685 	
1686 	    while (len > 0) {
1687 	      // partial?
1688 	      if (off + len < (*curbuf).length()) {
1689 		//cout << "keeping end of " << *curbuf << ", losing first " << off+len << std::endl;
1690 		if (claim_by) 
1691 		  claim_by->append( *curbuf, off, len );
1692 		(*curbuf).set_offset( off+len + (*curbuf).offset() );    // ignore beginning big
1693 		(*curbuf).set_length( (*curbuf).length() - (len+off) );
1694 		_len -= off+len;
1695 		//cout << " now " << *curbuf << std::endl;
1696 		break;
1697 	      }
1698 	      
1699 	      // hose though the end
1700 	      unsigned howmuch = (*curbuf).length() - off;
1701 	      //cout << "discarding " << howmuch << " of " << *curbuf << std::endl;
1702 	      if (claim_by) 
1703 		claim_by->append( *curbuf, off, howmuch );
1704 	      _len -= (*curbuf).length();
1705 	      curbuf = _buffers.erase_after_and_dispose(curbuf_prev);
1706 	      len -= howmuch;
1707 	      off = 0;
1708 	    }
1709 	      
1710 	    // splice in *replace (implement me later?)
1711 	    
1712 	    last_p = begin();  // just in case we were in the removed region.
1713 	  }
1714 	
1715 	  void buffer::list::write(int off, int len, std::ostream& out) const
1716 	  {
1717 	    list s;
1718 	    s.substr_of(*this, off, len);
1719 	    for (const auto& node : s._buffers) {
1720 	      if (node.length()) {
1721 		out.write(node.c_str(), node.length());
1722 	      }
1723 	    }
1724 	  }
1725 	  
1726 	void buffer::list::encode_base64(buffer::list& o)
1727 	{
1728 	  bufferptr bp(length() * 4 / 3 + 3);
1729 	  int l = ceph_armor(bp.c_str(), bp.c_str() + bp.length(), c_str(), c_str() + length());
1730 	  bp.set_length(l);
1731 	  o.push_back(std::move(bp));
1732 	}
1733 	
1734 	void buffer::list::decode_base64(buffer::list& e)
1735 	{
1736 	  bufferptr bp(4 + ((e.length() * 3) / 4));
1737 	  int l = ceph_unarmor(bp.c_str(), bp.c_str() + bp.length(), e.c_str(), e.c_str() + e.length());
1738 	  if (l < 0) {
1739 	    std::ostringstream oss;
1740 	    oss << "decode_base64: decoding failed:\n";
1741 	    hexdump(oss);
1742 	    throw buffer::malformed_input(oss.str().c_str());
1743 	  }
1744 	  ceph_assert(l <= (int)bp.length());
1745 	  bp.set_length(l);
1746 	  push_back(std::move(bp));
1747 	}
1748 	
1749 	ssize_t buffer::list::pread_file(const char *fn, uint64_t off, uint64_t len, std::string *error)
1750 	{
1751 	  int fd = TEMP_FAILURE_RETRY(::open(fn, O_RDONLY|O_CLOEXEC));
1752 	  if (fd < 0) {
1753 	    int err = errno;
1754 	    std::ostringstream oss;
1755 	    oss << "can't open " << fn << ": " << cpp_strerror(err);
1756 	    *error = oss.str();
1757 	    return -err;
1758 	  }
1759 	
1760 	  struct stat st;
1761 	  memset(&st, 0, sizeof(st));
1762 	  if (::fstat(fd, &st) < 0) {
1763 	    int err = errno;
1764 	    std::ostringstream oss;
1765 	    oss << "bufferlist::read_file(" << fn << "): stat error: "
1766 	        << cpp_strerror(err);
1767 	    *error = oss.str();
1768 	    VOID_TEMP_FAILURE_RETRY(::close(fd));
1769 	    return -err;
1770 	  }
1771 	
1772 	  if (off > (uint64_t)st.st_size) {
1773 	    std::ostringstream oss;
1774 	    oss << "bufferlist::read_file(" << fn << "): read error: size < offset";
1775 	    *error = oss.str();
1776 	    VOID_TEMP_FAILURE_RETRY(::close(fd));
1777 	    return 0;
1778 	  }
1779 	
1780 	  if (len > st.st_size - off) {
1781 	    len = st.st_size - off;
1782 	  }
1783 	  ssize_t ret = lseek64(fd, off, SEEK_SET);
1784 	  if (ret != (ssize_t)off) {
1785 	    return -errno;
1786 	  }
1787 	
1788 	  ret = read_fd(fd, len);
1789 	  if (ret < 0) {
1790 	    std::ostringstream oss;
1791 	    oss << "bufferlist::read_file(" << fn << "): read error:"
1792 		<< cpp_strerror(ret);
1793 	    *error = oss.str();
1794 	    VOID_TEMP_FAILURE_RETRY(::close(fd));
1795 	    return ret;
1796 	  } else if (ret != (ssize_t)len) {
1797 	    // Premature EOF.
1798 	    // Perhaps the file changed between stat() and read()?
1799 	    std::ostringstream oss;
1800 	    oss << "bufferlist::read_file(" << fn << "): warning: got premature EOF.";
1801 	    *error = oss.str();
1802 	    // not actually an error, but weird
1803 	  }
1804 	  VOID_TEMP_FAILURE_RETRY(::close(fd));
1805 	  return 0;
1806 	}
1807 	
1808 	int buffer::list::read_file(const char *fn, std::string *error)
1809 	{
1810 	  int fd = TEMP_FAILURE_RETRY(::open(fn, O_RDONLY|O_CLOEXEC));
1811 	  if (fd < 0) {
1812 	    int err = errno;
1813 	    std::ostringstream oss;
1814 	    oss << "can't open " << fn << ": " << cpp_strerror(err);
1815 	    *error = oss.str();
1816 	    return -err;
1817 	  }
1818 	
1819 	  struct stat st;
1820 	  memset(&st, 0, sizeof(st));
1821 	  if (::fstat(fd, &st) < 0) {
1822 	    int err = errno;
1823 	    std::ostringstream oss;
1824 	    oss << "bufferlist::read_file(" << fn << "): stat error: "
1825 	        << cpp_strerror(err);
1826 	    *error = oss.str();
1827 	    VOID_TEMP_FAILURE_RETRY(::close(fd));
1828 	    return -err;
1829 	  }
1830 	
1831 	  ssize_t ret = read_fd(fd, st.st_size);
1832 	  if (ret < 0) {
1833 	    std::ostringstream oss;
1834 	    oss << "bufferlist::read_file(" << fn << "): read error:"
1835 		<< cpp_strerror(ret);
1836 	    *error = oss.str();
1837 	    VOID_TEMP_FAILURE_RETRY(::close(fd));
1838 	    return ret;
1839 	  }
1840 	  else if (ret != st.st_size) {
1841 	    // Premature EOF.
1842 	    // Perhaps the file changed between stat() and read()?
1843 	    std::ostringstream oss;
1844 	    oss << "bufferlist::read_file(" << fn << "): warning: got premature EOF.";
1845 	    *error = oss.str();
1846 	    // not actually an error, but weird
1847 	  }
1848 	  VOID_TEMP_FAILURE_RETRY(::close(fd));
1849 	  return 0;
1850 	}
1851 	
1852 	ssize_t buffer::list::read_fd(int fd, size_t len)
1853 	{
1854 	  auto bp = ptr_node::create(buffer::create(len));
1855 	  ssize_t ret = safe_read(fd, (void*)bp->c_str(), len);
1856 	  if (ret >= 0) {
1857 	    bp->set_length(ret);
1858 	    push_back(std::move(bp));
1859 	  }
1860 	  return ret;
1861 	}
1862 	
1863 	int buffer::list::write_file(const char *fn, int mode)
1864 	{
1865 	  int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode));
1866 	  if (fd < 0) {
1867 	    int err = errno;
1868 	    cerr << "bufferlist::write_file(" << fn << "): failed to open file: "
1869 		 << cpp_strerror(err) << std::endl;
1870 	    return -err;
1871 	  }
1872 	  int ret = write_fd(fd);
1873 	  if (ret) {
1874 	    cerr << "bufferlist::write_fd(" << fn << "): write_fd error: "
1875 		 << cpp_strerror(ret) << std::endl;
1876 	    VOID_TEMP_FAILURE_RETRY(::close(fd));
1877 	    return ret;
1878 	  }
1879 	  if (TEMP_FAILURE_RETRY(::close(fd))) {
1880 	    int err = errno;
1881 	    cerr << "bufferlist::write_file(" << fn << "): close error: "
1882 		 << cpp_strerror(err) << std::endl;
1883 	    return -err;
1884 	  }
1885 	  return 0;
1886 	}
1887 	
1888 	static int do_writev(int fd, struct iovec *vec, uint64_t offset, unsigned veclen, unsigned bytes)
1889 	{
1890 	  while (bytes > 0) {
1891 	    ssize_t r = 0;
1892 	#ifdef HAVE_PWRITEV
1893 	    r = ::pwritev(fd, vec, veclen, offset);
1894 	#else
1895 	    r = ::lseek64(fd, offset, SEEK_SET);
1896 	    if (r != offset) {
1897 	      return -errno;
1898 	    }
1899 	    r = ::writev(fd, vec, veclen);
1900 	#endif
1901 	    if (r < 0) {
1902 	      if (errno == EINTR)
1903 	        continue;
1904 	      return -errno;
1905 	    }
1906 	
1907 	    bytes -= r;
1908 	    offset += r;
1909 	    if (bytes == 0) break;
1910 	
1911 	    while (r > 0) {
1912 	      if (vec[0].iov_len <= (size_t)r) {
1913 	        // drain this whole item
1914 	        r -= vec[0].iov_len;
1915 	        ++vec;
1916 	        --veclen;
1917 	      } else {
1918 	        vec[0].iov_base = (char *)vec[0].iov_base + r;
1919 	        vec[0].iov_len -= r;
1920 	        break;
1921 	      }
1922 	    }
1923 	  }
1924 	  return 0;
1925 	}
1926 	
1927 	int buffer::list::write_fd(int fd) const
1928 	{
1929 	  // use writev!
1930 	  iovec iov[IOV_MAX];
1931 	  int iovlen = 0;
1932 	  ssize_t bytes = 0;
1933 	
1934 	  auto p = std::cbegin(_buffers);
1935 	  while (p != std::cend(_buffers)) {
1936 	    if (p->length() > 0) {
1937 	      iov[iovlen].iov_base = (void *)p->c_str();
1938 	      iov[iovlen].iov_len = p->length();
1939 	      bytes += p->length();
1940 	      iovlen++;
1941 	    }
1942 	    ++p;
1943 	
1944 	    if (iovlen == IOV_MAX ||
1945 		p == _buffers.end()) {
1946 	      iovec *start = iov;
1947 	      int num = iovlen;
1948 	      ssize_t wrote;
1949 	    retry:
1950 	      wrote = ::writev(fd, start, num);
1951 	      if (wrote < 0) {
1952 		int err = errno;
1953 		if (err == EINTR)
1954 		  goto retry;
1955 		return -err;
1956 	      }
1957 	      if (wrote < bytes) {
1958 		// partial write, recover!
1959 		while ((size_t)wrote >= start[0].iov_len) {
1960 		  wrote -= start[0].iov_len;
1961 		  bytes -= start[0].iov_len;
1962 		  start++;
1963 		  num--;
1964 		}
1965 		if (wrote > 0) {
1966 		  start[0].iov_len -= wrote;
1967 		  start[0].iov_base = (char *)start[0].iov_base + wrote;
1968 		  bytes -= wrote;
1969 		}
1970 		goto retry;
1971 	      }
1972 	      iovlen = 0;
1973 	      bytes = 0;
1974 	    }
1975 	  }
1976 	  return 0;
1977 	}
1978 	
1979 	int buffer::list::write_fd(int fd, uint64_t offset) const
1980 	{
1981 	  iovec iov[IOV_MAX];
1982 	
1983 	  auto p = std::cbegin(_buffers);
1984 	  uint64_t left_pbrs = std::size(_buffers);
1985 	  while (left_pbrs) {
1986 	    ssize_t bytes = 0;
1987 	    unsigned iovlen = 0;
1988 	    uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
1989 	    left_pbrs -= size;
1990 	    while (size > 0) {
1991 	      iov[iovlen].iov_base = (void *)p->c_str();
1992 	      iov[iovlen].iov_len = p->length();
1993 	      iovlen++;
1994 	      bytes += p->length();
1995 	      ++p;
1996 	      size--;
1997 	    }
1998 	
1999 	    int r = do_writev(fd, iov, offset, iovlen, bytes);
2000 	    if (r < 0)
2001 	      return r;
2002 	    offset += bytes;
2003 	  }
2004 	  return 0;
2005 	}
2006 	
2007 	__u32 buffer::list::crc32c(__u32 crc) const
2008 	{
2009 	  int cache_misses = 0;
2010 	  int cache_hits = 0;
2011 	  int cache_adjusts = 0;
2012 	
2013 	  for (const auto& node : _buffers) {
2014 	    if (node.length()) {
2015 	      raw* const r = node.get_raw();
2016 	      pair<size_t, size_t> ofs(node.offset(), node.offset() + node.length());
2017 	      pair<uint32_t, uint32_t> ccrc;
2018 	      if (r->get_crc(ofs, &ccrc)) {
2019 		if (ccrc.first == crc) {
2020 		  // got it already
2021 		  crc = ccrc.second;
2022 		  cache_hits++;
2023 		} else {
2024 		  /* If we have cached crc32c(buf, v) for initial value v,
2025 		   * we can convert this to a different initial value v' by:
2026 		   * crc32c(buf, v') = crc32c(buf, v) ^ adjustment
2027 		   * where adjustment = crc32c(0*len(buf), v ^ v')
2028 		   *
2029 		   * http://crcutil.googlecode.com/files/crc-doc.1.0.pdf
2030 		   * note, u for our crc32c implementation is 0
2031 		   */
2032 		  crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, node.length());
2033 		  cache_adjusts++;
2034 		}
2035 	      } else {
2036 		cache_misses++;
2037 		uint32_t base = crc;
2038 		crc = ceph_crc32c(crc, (unsigned char*)node.c_str(), node.length());
2039 		r->set_crc(ofs, make_pair(base, crc));
2040 	      }
2041 	    }
2042 	  }
2043 	
2044 	  if (buffer_track_crc) {
2045 	    if (cache_adjusts)
2046 	      buffer_cached_crc_adjusted += cache_adjusts;
2047 	    if (cache_hits)
2048 	      buffer_cached_crc += cache_hits;
2049 	    if (cache_misses)
2050 	      buffer_missed_crc += cache_misses;
2051 	  }
2052 	
2053 	  return crc;
2054 	}
2055 	
2056 	void buffer::list::invalidate_crc()
2057 	{
2058 	  for (const auto& node : _buffers) {
2059 	    raw* const r = node.get_raw();
2060 	    if (r) {
2061 	      r->invalidate_crc();
2062 	    }
2063 	  }
2064 	}
2065 	
2066 	/**
2067 	 * Binary write all contents to a C++ stream
2068 	 */
2069 	void buffer::list::write_stream(std::ostream &out) const
2070 	{
2071 	  for (const auto& node : _buffers) {
2072 	    if (node.length() > 0) {
2073 	      out.write(node.c_str(), node.length());
2074 	    }
2075 	  }
2076 	}
2077 	
2078 	
2079 	void buffer::list::hexdump(std::ostream &out, bool trailing_newline) const
2080 	{
2081 	  if (!length())
2082 	    return;
2083 	
2084 	  std::ios_base::fmtflags original_flags = out.flags();
2085 	
2086 	  // do our best to match the output of hexdump -C, for better
2087 	  // diff'ing!
2088 	
2089 	  out.setf(std::ios::right);
2090 	  out.fill('0');
2091 	
2092 	  unsigned per = 16;
2093 	  char last_row_char = '\0';
2094 	  bool was_same = false, did_star = false;
2095 	  for (unsigned o=0; o<length(); o += per) {
2096 	    if (o == 0) {
2097 	      last_row_char = (*this)[o];
2098 	    }
2099 	
2100 	    if (o + per < length()) {
2101 	      bool row_is_same = true;
2102 	      for (unsigned i=0; i<per && o+i<length(); i++) {
2103 	        char current_char = (*this)[o+i];
2104 	        if (current_char != last_row_char) {
2105 	          if (i == 0) {
2106 	            last_row_char = current_char;
2107 	            was_same = false;
2108 	            did_star = false;
2109 	          } else {
2110 		    row_is_same = false;
2111 	          }
2112 		}
2113 	      }
2114 	      if (row_is_same) {
2115 		if (was_same) {
2116 		  if (!did_star) {
2117 		    out << "\n*";
2118 		    did_star = true;
2119 		  }
2120 		  continue;
2121 		}
2122 		was_same = true;
2123 	      } else {
2124 		was_same = false;
2125 		did_star = false;
2126 	      }
2127 	    }
2128 	    if (o)
2129 	      out << "\n";
2130 	    out << std::hex << std::setw(8) << o << " ";
2131 	
2132 	    unsigned i;
2133 	    for (i=0; i<per && o+i<length(); i++) {
2134 	      if (i == 8)
2135 		out << ' ';
2136 	      out << " " << std::setw(2) << ((unsigned)(*this)[o+i] & 0xff);
2137 	    }
2138 	    for (; i<per; i++) {
2139 	      if (i == 8)
2140 		out << ' ';
2141 	      out << "   ";
2142 	    }
2143 	    
2144 	    out << "  |";
2145 	    for (i=0; i<per && o+i<length(); i++) {
2146 	      char c = (*this)[o+i];
2147 	      if (isupper(c) || islower(c) || isdigit(c) || c == ' ' || ispunct(c))
2148 		out << c;
2149 	      else
2150 		out << '.';
2151 	    }
2152 	    out << '|' << std::dec;
2153 	  }
2154 	  if (trailing_newline) {
2155 	    out << "\n" << std::hex << std::setw(8) << length();
2156 	    out << "\n";
2157 	  }
2158 	
2159 	  out.flags(original_flags);
2160 	}
2161 	
2162 	
2163 	buffer::list buffer::list::static_from_mem(char* c, size_t l) {
2164 	  list bl;
2165 	  bl.push_back(ptr_node::create(create_static(l, c)));
2166 	  return bl;
2167 	}
2168 	
2169 	buffer::list buffer::list::static_from_cstring(char* c) {
2170 	  return static_from_mem(c, std::strlen(c));
2171 	}
2172 	
2173 	buffer::list buffer::list::static_from_string(string& s) {
2174 	  // C++14 just has string::data return a char* from a non-const
2175 	  // string.
2176 	  return static_from_mem(const_cast<char*>(s.data()), s.length());
2177 	  // But the way buffer::list mostly doesn't work in a sane way with
2178 	  // const makes me generally sad.
2179 	}
2180 	
2181 	bool buffer::ptr_node::dispose_if_hypercombined(
2182 	  buffer::ptr_node* const delete_this)
2183 	{
2184 	  const bool is_hypercombined = static_cast<void*>(delete_this) == \
2185 	    static_cast<void*>(&delete_this->get_raw()->bptr_storage);
2186 	  if (is_hypercombined) {
2187 	    ceph_assert_always("hypercombining is currently disabled" == nullptr);
2188 	    delete_this->~ptr_node();
2189 	  }
2190 	  return is_hypercombined;
2191 	}
2192 	
2193 	std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>
2194 	buffer::ptr_node::create_hypercombined(ceph::unique_leakable_ptr<buffer::raw> r)
2195 	{
2196 	  // FIXME: we don't currently hypercombine buffers due to crashes
2197 	  // observed in the rados suite. After fixing we'll use placement
2198 	  // new to create ptr_node on buffer::raw::bptr_storage.
2199 	  return std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>(
2200 	    new ptr_node(std::move(r)));
2201 	}
2202 	
2203 	std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>
2204 	buffer::ptr_node::create_hypercombined(buffer::raw* const r)
2205 	{
2206 	  if (likely(r->nref == 0)) {
2207 	    // FIXME: we don't currently hypercombine buffers due to crashes
2208 	    // observed in the rados suite. After fixing we'll use placement
2209 	    // new to create ptr_node on buffer::raw::bptr_storage.
2210 	    return std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>(
2211 	      new ptr_node(r));
2212 	  } else {
2213 	    return std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>(
2214 	      new ptr_node(r));
2215 	  }
2216 	}
2217 	
2218 	buffer::ptr_node* buffer::ptr_node::copy_hypercombined(
2219 	  const buffer::ptr_node& copy_this)
2220 	{
2221 	  // FIXME: we don't currently hypercombine buffers due to crashes
2222 	  // observed in the rados suite. After fixing we'll use placement
2223 	  // new to create ptr_node on buffer::raw::bptr_storage.
2224 	  auto raw_new = copy_this.get_raw()->clone();
2225 	  return new ptr_node(copy_this, std::move(raw_new));
2226 	}
2227 	
2228 	buffer::ptr_node* buffer::ptr_node::cloner::operator()(
2229 	  const buffer::ptr_node& clone_this)
2230 	{
2231 	  const raw* const raw_this = clone_this.get_raw();
2232 	  if (likely(!raw_this || raw_this->is_shareable())) {
2233 	    return new ptr_node(clone_this);
2234 	  } else {
2235 	    // clone non-shareable buffers (make shareable)
2236 	   return copy_hypercombined(clone_this);
2237 	  }
2238 	}
2239 	
2240 	std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
2241 	  return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
2242 	}
2243 	
2244 	std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
2245 	  if (bp.have_raw())
2246 	    out << "buffer::ptr(" << bp.offset() << "~" << bp.length()
2247 		<< " " << (void*)bp.c_str()
2248 		<< " in raw " << (void*)bp.raw_c_str()
2249 		<< " len " << bp.raw_length()
2250 		<< " nref " << bp.raw_nref() << ")";
2251 	  else
2252 	    out << "buffer:ptr(" << bp.offset() << "~" << bp.length() << " no raw)";
2253 	  return out;
2254 	}
2255 	
2256 	std::ostream& buffer::operator<<(std::ostream& out, const buffer::list& bl) {
2257 	  out << "buffer::list(len=" << bl.length() << "," << std::endl;
2258 	
2259 	  for (const auto& node : bl.buffers()) {
2260 	    out << "\t" << node;
2261 	    if (&node != &bl.buffers().back()) {
2262 	      out << "," << std::endl;
2263 	    }
2264 	  }
2265 	  out << std::endl << ")";
2266 	  return out;
2267 	}
2268 	
2269 	std::ostream& buffer::operator<<(std::ostream& out, const buffer::error& e)
2270 	{
2271 	  return out << e.what();
2272 	}
2273 	
2274 	MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_malloc, buffer_raw_malloc,
2275 				      buffer_meta);
2276 	MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned,
2277 				      buffer_raw_posix_aligned, buffer_meta);
2278 	MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
2279 	MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
2280 				      buffer_meta);
2281 	MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_unshareable, buffer_raw_unshareable,
2282 				      buffer_meta);
2283 	MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_static, buffer_raw_static,
2284 				      buffer_meta);
2285 	
2286