1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	
4    	#include "amqp_mock.h"
5    	#include <amqp.h>
6    	#include <amqp_tcp_socket.h>
7    	#include <string>
8    	#include <stdarg.h>
9    	#include <mutex>
10   	#include <boost/lockfree/queue.hpp>
11   	
12   	namespace amqp_mock {
13   	
14   	std::mutex set_valid_lock;
15   	int VALID_PORT(5672);
16   	std::string VALID_HOST("localhost");
17   	std::string VALID_VHOST("/");
18   	std::string VALID_USER("guest");
19   	std::string VALID_PASSWORD("guest");
20   	
21   	void set_valid_port(int port) {
22   	  std::lock_guard<std::mutex> lock(set_valid_lock);
23   	  VALID_PORT = port;
24   	}
25   	
26   	void set_valid_host(const std::string& host) {
27   	  std::lock_guard<std::mutex> lock(set_valid_lock);
28   	  VALID_HOST = host;
29   	}
30   	
31   	void set_valid_vhost(const std::string& vhost) {
32   	  std::lock_guard<std::mutex> lock(set_valid_lock);
33   	  VALID_VHOST = vhost;
34   	}
35   	
36   	void set_valid_user(const std::string& user, const std::string& password) {
37   	  std::lock_guard<std::mutex> lock(set_valid_lock);
38   	  VALID_USER = user;
39   	  VALID_PASSWORD = password;
40   	}
41   	
42   	std::atomic<unsigned> g_tag_skip = 0;
43   	std::atomic<int> g_multiple = 0;
44   	
45   	void set_multiple(unsigned tag_skip) {
46   	    g_multiple = 1;
47   	    g_tag_skip = tag_skip;
48   	}
49   	
50   	void reset_multiple() {
51   	    g_multiple = 0;
52   	    g_tag_skip = 0;
53   	}
54   	
55   	bool FAIL_NEXT_WRITE(false);
56   	bool FAIL_NEXT_READ(false);
57   	bool REPLY_ACK(true);
58   	}
59   	
60   	using namespace amqp_mock;
61   	
62   	struct amqp_connection_state_t_ {
63   	  amqp_socket_t* socket;
64   	  amqp_channel_open_ok_t* channel1;
65   	  amqp_channel_open_ok_t* channel2;
66   	  amqp_exchange_declare_ok_t* exchange;
67   	  amqp_queue_declare_ok_t* queue;
68   	  amqp_confirm_select_ok_t* confirm;
69   	  amqp_basic_consume_ok_t* consume;
70   	  bool login_called;
71   	  boost::lockfree::queue<amqp_basic_ack_t> ack_list;
72   	  boost::lockfree::queue<amqp_basic_nack_t> nack_list;
73   	  std::atomic<uint64_t> delivery_tag;
74   	  amqp_rpc_reply_t reply;
75   	  amqp_basic_ack_t ack;
76   	  amqp_basic_nack_t nack;
77   	  // ctor
78   	  amqp_connection_state_t_() : 
79   	    socket(nullptr), 
80   	    channel1(nullptr),
81   	    channel2(nullptr),
82   	    exchange(nullptr),
83   	    queue(nullptr),
84   	    confirm(nullptr),
85   	    consume(nullptr),
86   	    login_called(false),
87   	    ack_list(1024),
88   	    nack_list(1024),
89   	    delivery_tag(1) {
90   	      reply.reply_type = AMQP_RESPONSE_NONE;
(2) Event uninit_member: Non-static class member field "reply.reply" is not initialized in this constructor nor in any functions that it calls.
(4) Event uninit_member: Non-static class member field "reply.library_error" is not initialized in this constructor nor in any functions that it calls.
(6) Event uninit_member: Non-static class member field "ack.delivery_tag" is not initialized in this constructor nor in any functions that it calls.
(8) Event uninit_member: Non-static class member field "ack.multiple" is not initialized in this constructor nor in any functions that it calls.
(10) Event uninit_member: Non-static class member field "nack.delivery_tag" is not initialized in this constructor nor in any functions that it calls.
(12) Event uninit_member: Non-static class member field "nack.multiple" is not initialized in this constructor nor in any functions that it calls.
(14) Event uninit_member: Non-static class member field "nack.requeue" is not initialized in this constructor nor in any functions that it calls.
Also see events: [member_decl][member_decl][member_decl][member_decl][member_decl][member_decl][member_decl]
91   	    }
92   	};
93   	
94   	struct amqp_socket_t_ {
95   	  bool open_called;
96   	  // ctor
97   	  amqp_socket_t_() : open_called(false) {
98   	  }
99   	};
100  	
101  	amqp_connection_state_t AMQP_CALL amqp_new_connection(void) {
102  	  auto s = new amqp_connection_state_t_;
103  	  return s;
104  	}
105  	
106  	int amqp_destroy_connection(amqp_connection_state_t state) {
107  	  delete state->socket;
108  	  delete state->channel1;
109  	  delete state->channel2;
110  	  delete state->exchange;
111  	  delete state->queue;
112  	  delete state->confirm;
113  	  delete state->consume;
114  	  delete state;
115  	  return 0;
116  	}
117  	
118  	amqp_socket_t* amqp_tcp_socket_new(amqp_connection_state_t state) {
119  	  state->socket = new amqp_socket_t;
120  	  return state->socket;
121  	}
122  	
123  	int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
124  	  if (!self) {
125  	    return -1;
126  	  }
127  	  {
128  	    std::lock_guard<std::mutex> lock(set_valid_lock);
129  	    if (std::string(host) != VALID_HOST) {
130  	      return -2;
131  	    } 
132  	    if (port != VALID_PORT) {
133  	      return -3;
134  	    }
135  	  }
136  	  self->open_called = true;
137  	  return 0;
138  	}
139  	
140  	amqp_rpc_reply_t amqp_login(
141  	    amqp_connection_state_t state, 
142  	    char const *vhost, 
143  	    int channel_max,
144  	    int frame_max, 
145  	    int heartbeat, 
146  	    amqp_sasl_method_enum sasl_method, ...) {
147  	  state->reply.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
148  	  state->reply.library_error = 0;
149  	  state->reply.reply.decoded = nullptr;
150  	  state->reply.reply.id = 0;
151  	  if (std::string(vhost) != VALID_VHOST) {
152  	    return state->reply;
153  	  }
154  	  if (sasl_method != AMQP_SASL_METHOD_PLAIN) {
155  	      return state->reply;
156  	  }
157  	  va_list args;
158  	  va_start(args, sasl_method);
159  	  char* user = va_arg(args, char*);
160  	  char* password = va_arg(args, char*);
161  	  va_end(args);
162  	  if (std::string(user) != VALID_USER) {
163  	    return state->reply;
164  	  }
165  	  if (std::string(password) != VALID_PASSWORD) {
166  	    return state->reply;
167  	  }
168  	  state->reply.reply_type = AMQP_RESPONSE_NORMAL;
169  	  state->login_called = true;
170  	  return state->reply;
171  	}
172  	
173  	amqp_channel_open_ok_t* amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel) {
174  	  state->reply.reply_type = AMQP_RESPONSE_NORMAL;
175  	  if (state->channel1 == nullptr) {
176  	    state->channel1 = new amqp_channel_open_ok_t;
177  	    return state->channel1;
178  	  }
179  	
180  	  state->channel2 = new amqp_channel_open_ok_t;
181  	  return state->channel2;
182  	}
183  	
184  	amqp_exchange_declare_ok_t* amqp_exchange_declare(
185  	    amqp_connection_state_t state, 
186  	    amqp_channel_t channel,
187  	    amqp_bytes_t exchange, 
188  	    amqp_bytes_t type, 
189  	    amqp_boolean_t passive,
190  	    amqp_boolean_t durable, 
191  	    amqp_boolean_t auto_delete, 
192  	    amqp_boolean_t internal,
193  	    amqp_table_t arguments) {
194  	  state->exchange = new amqp_exchange_declare_ok_t;
195  	  state->reply.reply_type = AMQP_RESPONSE_NORMAL;
196  	  return state->exchange;
197  	}
198  	
199  	amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) {
200  	  return state->reply;
201  	}
202  	
203  	int amqp_basic_publish(
204  	    amqp_connection_state_t state, 
205  	    amqp_channel_t channel,
206  	    amqp_bytes_t exchange, 
207  	    amqp_bytes_t routing_key, 
208  	    amqp_boolean_t mandatory,
209  	    amqp_boolean_t immediate, 
210  	    struct amqp_basic_properties_t_ const *properties,
211  	    amqp_bytes_t body) {
212  	  // make sure that all calls happened before publish
213  	  if (state->socket && state->socket->open_called &&
214  	      state->login_called && state->channel1 && state->channel2 && state->exchange &&
215  	      !FAIL_NEXT_WRITE) {
216  	    state->reply.reply_type = AMQP_RESPONSE_NORMAL;
217  	    if (properties) {
218  	      if (REPLY_ACK) {
219  	        state->ack_list.push(amqp_basic_ack_t{state->delivery_tag++, 0});
220  	      } else {
221  	        state->nack_list.push(amqp_basic_nack_t{state->delivery_tag++, 0});
222  	      }
223  	    }
224  	    return AMQP_STATUS_OK;
225  	  }
226  	  return AMQP_STATUS_CONNECTION_CLOSED;
227  	}
228  	
229  	const amqp_table_t amqp_empty_table = {0, NULL};
230  	const amqp_bytes_t amqp_empty_bytes = {0, NULL};
231  	
232  	const char* amqp_error_string2(int code) {
233  	  static const char* str = "mock error";
234  	  return str;
235  	}
236  	
237  	char const* amqp_method_name(amqp_method_number_t methodNumber) {
238  	  static const char* str = "mock method";
239  	  return str;
240  	}
241  	
242  	amqp_queue_declare_ok_t* amqp_queue_declare(
243  	    amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue,
244  	    amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive,
245  	    amqp_boolean_t auto_delete, amqp_table_t arguments) {
246  	  state->queue = new amqp_queue_declare_ok_t;
247  	  static const char* str = "tmp-queue";
248  	  state->queue->queue = amqp_cstring_bytes(str);
249  	  state->reply.reply_type = AMQP_RESPONSE_NORMAL;
250  	  return state->queue;
251  	}
252  	
253  	amqp_confirm_select_ok_t* amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel) {
254  	  state->confirm = new amqp_confirm_select_ok_t;
255  	  state->reply.reply_type = AMQP_RESPONSE_NORMAL;
256  	  return state->confirm;
257  	}
258  	
259  	int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval* tv) {
260  	  if (state->socket && state->socket->open_called &&
261  	      state->login_called && state->channel1 && state->channel2 && state->exchange &&
262  	      state->queue && state->consume && state->confirm && !FAIL_NEXT_READ) {
263  	    // "wait" for queue
264  	    usleep(tv->tv_sec*1000000+tv->tv_usec);
265  	    // read from queue
266  	    if (g_multiple) {
267  	      // pop multiples and reply once at the end
268  	      for (auto i = 0U; i < g_tag_skip; ++i) {
269  	        if (REPLY_ACK && !state->ack_list.pop(state->ack)) {
270  	          // queue is empty
271  	          return AMQP_STATUS_TIMEOUT;
272  	        } else if (!REPLY_ACK && !state->nack_list.pop(state->nack)) {
273  	          // queue is empty
274  	          return AMQP_STATUS_TIMEOUT;
275  	        }
276  	      }
277  	      if (REPLY_ACK) {
278  	        state->ack.multiple = g_multiple;
279  	        decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
280  	        decoded_frame->payload.method.decoded = &state->ack;
281  	      } else {
282  	        state->nack.multiple = g_multiple;
283  	        decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
284  	        decoded_frame->payload.method.decoded = &state->nack;
285  	      }
286  	      decoded_frame->frame_type = AMQP_FRAME_METHOD;
287  	      state->reply.reply_type = AMQP_RESPONSE_NORMAL;
288  	      reset_multiple();
289  	      return AMQP_STATUS_OK;
290  	    }
291  	    // pop replies one by one
292  	    if (REPLY_ACK && state->ack_list.pop(state->ack)) {
293  	      state->ack.multiple = g_multiple;
294  	      decoded_frame->frame_type = AMQP_FRAME_METHOD;
295  	      decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
296  	      decoded_frame->payload.method.decoded = &state->ack;
297  	      state->reply.reply_type = AMQP_RESPONSE_NORMAL;
298  	      return AMQP_STATUS_OK;
299  	    } else if (!REPLY_ACK && state->nack_list.pop(state->nack)) {
300  	      state->nack.multiple = g_multiple;
301  	      decoded_frame->frame_type = AMQP_FRAME_METHOD;
302  	      decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
303  	      decoded_frame->payload.method.decoded = &state->nack;
304  	      state->reply.reply_type = AMQP_RESPONSE_NORMAL;
305  	      return AMQP_STATUS_OK;
306  	    } else {
307  	      // queue is empty
308  	      return AMQP_STATUS_TIMEOUT;
309  	    }
310  	  }
311  	  return AMQP_STATUS_CONNECTION_CLOSED;
312  	}
313  	
314  	amqp_basic_consume_ok_t* amqp_basic_consume(
315  	    amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue,
316  	    amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack,
317  	    amqp_boolean_t exclusive, amqp_table_t arguments) {
318  	  state->consume = new amqp_basic_consume_ok_t;
319  	  state->reply.reply_type = AMQP_RESPONSE_NORMAL;
320  	  return state->consume;
321  	}
322  	
323  	// amqp_parse_url() is linked via the actual rabbitmq-c library code. see: amqp_url.c
324  	
325  	// following functions are the actual implementation copied from rabbitmq-c library
326  	
327  	#include <string.h>
328  	
329  	amqp_bytes_t amqp_cstring_bytes(const char* cstr) {
330  	  amqp_bytes_t result;
331  	  result.len = strlen(cstr);
332  	  result.bytes = (void *)cstr;
333  	  return result;
334  	}
335  	
336  	void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); }
337  	
338  	amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
339  	  amqp_bytes_t result;
340  	  result.len = src.len;
341  	  result.bytes = malloc(src.len);
342  	  if (result.bytes != NULL) {
343  	    memcpy(result.bytes, src.bytes, src.len);
344  	  }
345  	  return result;
346  	}
347  	
348