1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software 
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#ifndef CEPH_LZ4COMPRESSOR_H
16   	#define CEPH_LZ4COMPRESSOR_H
17   	
18   	#include <lz4.h>
19   	
20   	#include "compressor/Compressor.h"
21   	#include "include/buffer.h"
22   	#include "include/encoding.h"
23   	#include "common/config.h"
24   	#include "common/Tub.h"
25   	
26   	
27   	class LZ4Compressor : public Compressor {
28   	 public:
29   	  LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
30   	#ifdef HAVE_QATZIP
31   	    if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
32   	      qat_enabled = true;
33   	    else
34   	      qat_enabled = false;
35   	#endif
36   	  }
37   	
38   	  int compress(const bufferlist &src, bufferlist &dst) override {
39   	#ifdef HAVE_QATZIP
40   	    if (qat_enabled)
41   	      return qat_accel.compress(src, dst);
42   	#endif
43   	    bufferptr outptr = buffer::create_small_page_aligned(
44   	      LZ4_compressBound(src.length()));
(1) Event stack_use_local_overflow: Local variable "lz4_stream" uses 16416 bytes of stack space, which exceeds the maximum single use of 10000 bytes.
45   	    LZ4_stream_t lz4_stream;
46   	    LZ4_resetStream(&lz4_stream);
47   	
48   	    auto p = src.begin();
49   	    size_t left = src.length();
50   	    int pos = 0;
51   	    const char *data;
52   	    unsigned num = src.get_num_buffers();
53   	    encode((uint32_t)num, dst);
54   	    while (left) {
55   	      uint32_t origin_len = p.get_ptr_and_advance(left, &data);
56   	      int compressed_len = LZ4_compress_fast_continue(
57   	        &lz4_stream, data, outptr.c_str()+pos, origin_len,
58   	        outptr.length()-pos, 1);
59   	      if (compressed_len <= 0)
60   	        return -1;
61   	      pos += compressed_len;
62   	      left -= origin_len;
63   	      encode(origin_len, dst);
64   	      encode((uint32_t)compressed_len, dst);
65   	    }
66   	    ceph_assert(p.end());
67   	
68   	    dst.append(outptr, 0, pos);
69   	    return 0;
70   	  }
71   	
72   	  int decompress(const bufferlist &src, bufferlist &dst) override {
73   	#ifdef HAVE_QATZIP
74   	    if (qat_enabled)
75   	      return qat_accel.decompress(src, dst);
76   	#endif
77   	    auto i = std::cbegin(src);
78   	    return decompress(i, src.length(), dst);
79   	  }
80   	
81   	  int decompress(bufferlist::const_iterator &p,
82   			 size_t compressed_len,
83   			 bufferlist &dst) override {
84   	#ifdef HAVE_QATZIP
85   	    if (qat_enabled)
86   	      return qat_accel.decompress(p, compressed_len, dst);
87   	#endif
88   	    uint32_t count;
89   	    std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs;
90   	    decode(count, p);
91   	    compressed_pairs.resize(count);
92   	    uint32_t total_origin = 0;
93   	    for (unsigned i = 0; i < count; ++i) {
94   	      decode(compressed_pairs[i].first, p);
95   	      decode(compressed_pairs[i].second, p);
96   	      total_origin += compressed_pairs[i].first;
97   	    }
98   	    compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
99   	
100  	    bufferptr dstptr(total_origin);
101  	    LZ4_streamDecode_t lz4_stream_decode;
102  	    LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
103  	
104  	    bufferptr cur_ptr = p.get_current_ptr();
105  	    bufferptr *ptr = &cur_ptr;
106  	    Tub<bufferptr> data_holder;
107  	    if (compressed_len != cur_ptr.length()) {
108  	      data_holder.construct(compressed_len);
109  	      p.copy_deep(compressed_len, *data_holder);
110  	      ptr = data_holder.get();
111  	    }
112  	
113  	    char *c_in = ptr->c_str();
114  	    char *c_out = dstptr.c_str();
115  	    for (unsigned i = 0; i < count; ++i) {
116  	      int r = LZ4_decompress_safe_continue(
117  	          &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
118  	      if (r == (int)compressed_pairs[i].first) {
119  	        c_in += compressed_pairs[i].second;
120  	        c_out += compressed_pairs[i].first;
121  	      } else if (r < 0) {
122  	        return -1;
123  	      } else {
124  	        return -2;
125  	      }
126  	    }
127  	    dst.push_back(std::move(dstptr));
128  	    return 0;
129  	  }
130  	};
131  	
132  	#endif
133