1    	// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2    	// vim: ts=8 sw=2 smarttab
3    	/*
4    	 * Ceph - scalable distributed file system
5    	 *
6    	 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7    	 *
8    	 * This is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License version 2.1, as published by the Free Software
11   	 * Foundation.  See file COPYING.
12   	 *
13   	 */
14   	
15   	#ifndef PRIORITY_QUEUE_H
16   	#define PRIORITY_QUEUE_H
17   	
18   	#include "include/ceph_assert.h"
19   	
20   	#include "common/Formatter.h"
21   	#include "common/OpQueue.h"
22   	
23   	/**
24   	 * Manages queue for normal and strict priority items
25   	 *
26   	 * On dequeue, the queue will select the lowest priority queue
27   	 * such that the q has bucket > cost of front queue item.
28   	 *
29   	 * If there is no such queue, we choose the next queue item for
30   	 * the highest priority queue.
31   	 *
32   	 * Before returning a dequeued item, we place into each bucket
33   	 * cost * (priority/total_priority) tokens.
34   	 *
35   	 * enqueue_strict and enqueue_strict_front queue items into queues
36   	 * which are serviced in strict priority order before items queued
37   	 * with enqueue and enqueue_front
38   	 *
39   	 * Within a priority class, we schedule round robin based on the class
40   	 * of type K used to enqueue items.  e.g. you could use entity_inst_t
41   	 * to provide fairness for different clients.
42   	 */
43   	template <typename T, typename K>
44   	class PrioritizedQueue : public OpQueue <T, K> {
45   	  int64_t total_priority;
46   	  int64_t max_tokens_per_subqueue;
47   	  int64_t min_cost;
48   	
49   	  typedef std::list<std::pair<unsigned, T> > ListPairs;
50   	
51   	  struct SubQueue {
52   	  private:
53   	    typedef std::map<K, ListPairs> Classes;
54   	    Classes q;
55   	    unsigned tokens, max_tokens;
56   	    int64_t size;
57   	    typename Classes::iterator cur;
58   	  public:
59   	    SubQueue(const SubQueue &other)
60   	      : q(other.q),
61   		tokens(other.tokens),
62   		max_tokens(other.max_tokens),
63   		size(other.size),
64   		cur(q.begin()) {}
65   	    SubQueue()
66   	      : tokens(0),
67   		max_tokens(0),
68   		size(0), cur(q.begin()) {}
69   	    void set_max_tokens(unsigned mt) {
70   	      max_tokens = mt;
71   	    }
72   	    unsigned get_max_tokens() const {
73   	      return max_tokens;
74   	    }
75   	    unsigned num_tokens() const {
76   	      return tokens;
77   	    }
78   	    void put_tokens(unsigned t) {
79   	      tokens += t;
80   	      if (tokens > max_tokens) {
81   		tokens = max_tokens;
82   	      }
83   	    }
84   	    void take_tokens(unsigned t) {
85   	      if (tokens > t) {
86   		tokens -= t;
87   	      } else {
88   		tokens = 0;
89   	      }
90   	    }
91   	    void enqueue(K cl, unsigned cost, T &&item) {
92   	      q[cl].push_back(std::make_pair(cost, std::move(item)));
93   	      if (cur == q.end())
94   		cur = q.begin();
95   	      size++;
96   	    }
97   	    void enqueue_front(K cl, unsigned cost, T &&item) {
98   	      q[cl].push_front(std::make_pair(cost, std::move(item)));
99   	      if (cur == q.end())
100  		cur = q.begin();
101  	      size++;
102  	    }
103  	    std::pair<unsigned, T> &front() const {
104  	      ceph_assert(!(q.empty()));
105  	      ceph_assert(cur != q.end());
106  	      return cur->second.front();
107  	    }
108  	    T pop_front() {
109  	      ceph_assert(!(q.empty()));
110  	      ceph_assert(cur != q.end());
111  	      T ret = std::move(cur->second.front().second);
112  	      cur->second.pop_front();
113  	      if (cur->second.empty()) {
114  		q.erase(cur++);
115  	      } else {
116  		++cur;
117  	      }
118  	      if (cur == q.end()) {
119  		cur = q.begin();
120  	      }
121  	      size--;
122  	      return ret;
123  	    }
124  	    unsigned length() const {
125  	      ceph_assert(size >= 0);
126  	      return (unsigned)size;
127  	    }
128  	    bool empty() const {
129  	      return q.empty();
130  	    }
131  	    void remove_by_class(K k, std::list<T> *out) {
132  	      typename Classes::iterator i = q.find(k);
133  	      if (i == q.end()) {
134  		return;
135  	      }
136  	      size -= i->second.size();
137  	      if (i == cur) {
138  		++cur;
139  	      }
140  	      if (out) {
141  		for (typename ListPairs::reverse_iterator j =
142  		       i->second.rbegin();
143  		     j != i->second.rend();
144  		     ++j) {
145  		  out->push_front(std::move(j->second));
146  		}
147  	      }
148  	      q.erase(i);
149  	      if (cur == q.end()) {
150  		cur = q.begin();
151  	      }
152  	    }
153  	
154  	    void dump(ceph::Formatter *f) const {
155  	      f->dump_int("tokens", tokens);
156  	      f->dump_int("max_tokens", max_tokens);
157  	      f->dump_int("size", size);
158  	      f->dump_int("num_keys", q.size());
159  	      if (!empty()) {
160  		f->dump_int("first_item_cost", front().first);
161  	      }
162  	    }
163  	  };
164  	
165  	  typedef std::map<unsigned, SubQueue> SubQueues;
166  	  SubQueues high_queue;
167  	  SubQueues queue;
168  	
169  	  SubQueue *create_queue(unsigned priority) {
170  	    typename SubQueues::iterator p = queue.find(priority);
171  	    if (p != queue.end()) {
172  	      return &p->second;
173  	    }
174  	    total_priority += priority;
175  	    SubQueue *sq = &queue[priority];
176  	    sq->set_max_tokens(max_tokens_per_subqueue);
177  	    return sq;
178  	  }
179  	
180  	  void remove_queue(unsigned priority) {
181  	    ceph_assert(queue.count(priority));
182  	    queue.erase(priority);
183  	    total_priority -= priority;
184  	    ceph_assert(total_priority >= 0);
185  	  }
186  	
187  	  void distribute_tokens(unsigned cost) {
188  	    if (total_priority == 0) {
189  	      return;
190  	    }
191  	    for (typename SubQueues::iterator i = queue.begin();
192  		 i != queue.end();
193  		 ++i) {
194  	      i->second.put_tokens(((i->first * cost) / total_priority) + 1);
195  	    }
196  	  }
197  	
198  	public:
199  	  PrioritizedQueue(unsigned max_per, unsigned min_c)
200  	    : total_priority(0),
201  	      max_tokens_per_subqueue(max_per),
202  	      min_cost(min_c)
203  	  {}
204  	
205  	  unsigned length() const {
206  	    unsigned total = 0;
207  	    for (typename SubQueues::const_iterator i = queue.begin();
208  		 i != queue.end();
209  		 ++i) {
210  	      ceph_assert(i->second.length());
211  	      total += i->second.length();
212  	    }
213  	    for (typename SubQueues::const_iterator i = high_queue.begin();
214  		 i != high_queue.end();
215  		 ++i) {
216  	      ceph_assert(i->second.length());
217  	      total += i->second.length();
218  	    }
219  	    return total;
220  	  }
221  	
222  	  void remove_by_class(K k, std::list<T> *out = 0) final {
223  	    for (typename SubQueues::iterator i = queue.begin();
224  		 i != queue.end();
225  		 ) {
226  	      i->second.remove_by_class(k, out);
227  	      if (i->second.empty()) {
228  		unsigned priority = i->first;
229  		++i;
230  		remove_queue(priority);
231  	      } else {
232  		++i;
233  	      }
234  	    }
235  	    for (typename SubQueues::iterator i = high_queue.begin();
236  		 i != high_queue.end();
237  		 ) {
238  	      i->second.remove_by_class(k, out);
239  	      if (i->second.empty()) {
240  		high_queue.erase(i++);
241  	      } else {
242  		++i;
243  	      }
244  	    }
245  	  }
246  	
247  	  void enqueue_strict(K cl, unsigned priority, T&& item) final {
248  	    high_queue[priority].enqueue(cl, 0, std::move(item));
249  	  }
250  	
251  	  void enqueue_strict_front(K cl, unsigned priority, T&& item) final {
252  	    high_queue[priority].enqueue_front(cl, 0, std::move(item));
253  	  }
254  	
255  	  void enqueue(K cl, unsigned priority, unsigned cost, T&& item) final {
256  	    if (cost < min_cost)
257  	      cost = min_cost;
258  	    if (cost > max_tokens_per_subqueue)
259  	      cost = max_tokens_per_subqueue;
260  	    create_queue(priority)->enqueue(cl, cost, std::move(item));
261  	  }
262  	
263  	  void enqueue_front(K cl, unsigned priority, unsigned cost, T&& item) final {
264  	    if (cost < min_cost)
265  	      cost = min_cost;
266  	    if (cost > max_tokens_per_subqueue)
267  	      cost = max_tokens_per_subqueue;
268  	    create_queue(priority)->enqueue_front(cl, cost, std::move(item));
269  	  }
270  	
271  	  bool empty() const final {
(1) Event fun_call_w_exception: Called function throws an exception of type "_ZN5boost16exception_detail10clone_implINS0_19error_info_injectorINSt8ios_base7failureB5cxx11EEEEE". [details]
272  	    ceph_assert(total_priority >= 0);
273  	    ceph_assert((total_priority == 0) || !(queue.empty()));
274  	    return queue.empty() && high_queue.empty();
275  	  }
276  	
277  	  T dequeue() final {
278  	    ceph_assert(!empty());
279  	
280  	    if (!(high_queue.empty())) {
281  	      T ret = std::move(high_queue.rbegin()->second.front().second);
282  	      high_queue.rbegin()->second.pop_front();
283  	      if (high_queue.rbegin()->second.empty()) {
284  		high_queue.erase(high_queue.rbegin()->first);
285  	      }
286  	      return ret;
287  	    }
288  	
289  	    // if there are multiple buckets/subqueues with sufficient tokens,
290  	    // we behave like a strict priority queue among all subqueues that
291  	    // are eligible to run.
292  	    for (typename SubQueues::iterator i = queue.begin();
293  		 i != queue.end();
294  		 ++i) {
295  	      ceph_assert(!(i->second.empty()));
296  	      if (i->second.front().first < i->second.num_tokens()) {
297  		unsigned cost = i->second.front().first;
298  		i->second.take_tokens(cost);
299  		T ret = std::move(i->second.front().second);
300  		i->second.pop_front();
301  		if (i->second.empty()) {
302  		  remove_queue(i->first);
303  		}
304  		distribute_tokens(cost);
305  		return ret;
306  	      }
307  	    }
308  	
309  	    // if no subqueues have sufficient tokens, we behave like a strict
310  	    // priority queue.
311  	    unsigned cost = queue.rbegin()->second.front().first;
312  	    T ret = std::move(queue.rbegin()->second.front().second);
313  	    queue.rbegin()->second.pop_front();
314  	    if (queue.rbegin()->second.empty()) {
315  	      remove_queue(queue.rbegin()->first);
316  	    }
317  	    distribute_tokens(cost);
318  	    return ret;
319  	  }
320  	
321  	  void dump(ceph::Formatter *f) const final {
322  	    f->dump_int("total_priority", total_priority);
323  	    f->dump_int("max_tokens_per_subqueue", max_tokens_per_subqueue);
324  	    f->dump_int("min_cost", min_cost);
325  	    f->open_array_section("high_queues");
326  	    for (typename SubQueues::const_iterator p = high_queue.begin();
327  		 p != high_queue.end();
328  		 ++p) {
329  	      f->open_object_section("subqueue");
330  	      f->dump_int("priority", p->first);
331  	      p->second.dump(f);
332  	      f->close_section();
333  	    }
334  	    f->close_section();
335  	    f->open_array_section("queues");
336  	    for (typename SubQueues::const_iterator p = queue.begin();
337  		 p != queue.end();
338  		 ++p) {
339  	      f->open_object_section("subqueue");
340  	      f->dump_int("priority", p->first);
341  	      p->second.dump(f);
342  	      f->close_section();
343  	    }
344  	    f->close_section();
345  	  }
346  	};
347  	
348  	#endif
349