00001 #ifndef _broker_Queue_h
00002 #define _broker_Queue_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "OwnershipToken.h"
00025 #include "Consumer.h"
00026 #include "Message.h"
00027 #include "PersistableQueue.h"
00028 #include "QueuePolicy.h"
00029 #include "QueueBindings.h"
00030 #include "RateTracker.h"
00031
00032 #include "qpid/framing/FieldTable.h"
00033 #include "qpid/sys/Monitor.h"
00034 #include "qpid/management/Manageable.h"
00035 #include "qmf/org/apache/qpid/broker/Queue.h"
00036 #include "qpid/framing/amqp_types.h"
00037
00038 #include <boost/shared_ptr.hpp>
00039 #include <boost/enable_shared_from_this.hpp>
00040 #include <boost/intrusive_ptr.hpp>
00041
00042 #include <list>
00043 #include <vector>
00044 #include <memory>
00045 #include <deque>
00046 #include <algorithm>
00047
00048 namespace qpid {
00049 namespace broker {
00050 class Broker;
00051 class MessageStore;
00052 class QueueRegistry;
00053 class TransactionContext;
00054 class Exchange;
00055
00056 using std::string;
00057
00064 class Queue : public boost::enable_shared_from_this<Queue>,
00065 public PersistableQueue, public management::Manageable {
00066
00067 typedef std::list<Consumer::shared_ptr> Listeners;
00068 typedef std::deque<QueuedMessage> Messages;
00069 typedef std::map<string,QueuedMessage*> LVQ;
00070
00071 const string name;
00072 const bool autodelete;
00073 MessageStore* store;
00074 const OwnershipToken* owner;
00075 uint32_t consumerCount;
00076 OwnershipToken* exclusive;
00077 bool noLocal;
00078 bool lastValueQueue;
00079 bool optimisticConsume;
00080 bool persistLastNode;
00081 bool inLastNodeFailure;
00082 std::string traceId;
00083 std::vector<std::string> traceExclude;
00084 Listeners listeners;
00085 Messages messages;
00086 LVQ lvq;
00087 mutable qpid::sys::Mutex consumerLock;
00088 mutable qpid::sys::Mutex messageLock;
00089 mutable qpid::sys::Mutex ownershipLock;
00090 mutable uint64_t persistenceId;
00091 framing::FieldTable settings;
00092 std::auto_ptr<QueuePolicy> policy;
00093 bool policyExceeded;
00094 QueueBindings bindings;
00095 boost::shared_ptr<Exchange> alternateExchange;
00096 framing::SequenceNumber sequence;
00097 qmf::org::apache::qpid::broker::Queue* mgmtObject;
00098 RateTracker dequeueTracker;
00099
00100 void push(boost::intrusive_ptr<Message>& msg);
00101 void setPolicy(std::auto_ptr<QueuePolicy> policy);
00102 bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
00103 bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
00104 bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
00105 bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
00106
00107 void removeListener(Consumer::shared_ptr);
00108 void addListener(Consumer::shared_ptr);
00109
00110 bool isExcluded(boost::intrusive_ptr<Message>& msg);
00111
00112 void dequeued(const QueuedMessage& msg);
00113 void popAndDequeue();
00114
00115 inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
00116 {
00117 if (mgmtObject != 0) {
00118 mgmtObject->inc_msgTotalEnqueues ();
00119 mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
00120 if (msg->isPersistent ()) {
00121 mgmtObject->inc_msgPersistEnqueues ();
00122 mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
00123 }
00124 }
00125 }
00126 inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
00127 {
00128 if (mgmtObject != 0){
00129 mgmtObject->inc_msgTotalDequeues ();
00130 mgmtObject->inc_byteTotalDequeues (msg->contentSize());
00131 if (msg->isPersistent ()){
00132 mgmtObject->inc_msgPersistDequeues ();
00133 mgmtObject->inc_bytePersistDequeues (msg->contentSize());
00134 }
00135 }
00136 }
00137
00138 public:
00139
00140 virtual void notifyDurableIOComplete();
00141 typedef boost::shared_ptr<Queue> shared_ptr;
00142
00143 typedef std::vector<shared_ptr> vector;
00144
00145 Queue(const string& name, bool autodelete = false,
00146 MessageStore* const store = 0,
00147 const OwnershipToken* const owner = 0,
00148 management::Manageable* parent = 0);
00149 ~Queue();
00150
00151 bool dispatch(Consumer::shared_ptr);
00158 bool checkForMessages(Consumer::shared_ptr);
00159
00160 void create(const qpid::framing::FieldTable& settings);
00161 void configure(const qpid::framing::FieldTable& settings);
00162 void destroy();
00163 void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
00164 void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
00165
00166 bool acquire(const QueuedMessage& msg);
00167
00172 void deliver(boost::intrusive_ptr<Message>& msg);
00177 void process(boost::intrusive_ptr<Message>& msg);
00184 void requeue(const QueuedMessage& msg);
00188 void recover(boost::intrusive_ptr<Message>& msg);
00189
00190 void consume(Consumer::shared_ptr c, bool exclusive = false);
00191 void cancel(Consumer::shared_ptr c);
00192
00193 uint32_t purge(const uint32_t purge_request = 0);
00194 void purgeExpired();
00195
00196
00197 uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
00198
00199 uint32_t getMessageCount() const;
00200 uint32_t getConsumerCount() const;
00201 inline const string& getName() const { return name; }
00202 bool isExclusiveOwner(const OwnershipToken* const o) const;
00203 void releaseExclusiveOwnership();
00204 bool setExclusiveOwner(const OwnershipToken* const o);
00205 bool hasExclusiveConsumer() const;
00206 bool hasExclusiveOwner() const;
00207 inline bool isDurable() const { return store != 0; }
00208 inline const framing::FieldTable& getSettings() const { return settings; }
00209 inline bool isAutoDelete() const { return autodelete; }
00210 bool canAutoDelete() const;
00211 const QueueBindings& getBindings() const { return bindings; }
00212
00216 void setLastNodeFailure();
00217 void clearLastNodeFailure();
00218
00219 bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
00223 bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
00224
00228 QueuedMessage get();
00229
00230 const QueuePolicy* getPolicy();
00231
00232 void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
00233 boost::shared_ptr<Exchange> getAlternateExchange();
00234 bool isLocal(boost::intrusive_ptr<Message>& msg);
00235
00236
00237 uint64_t getPersistenceId() const;
00238 void setPersistenceId(uint64_t persistenceId) const;
00239 void encode(framing::Buffer& buffer) const;
00240 uint32_t encodedSize() const;
00241
00242 static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
00243 static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
00244
00245 virtual void setExternalQueueStore(ExternalQueueStore* inst);
00246
00247
00248 management::ManagementObject* GetManagementObject (void) const;
00249 management::Manageable::status_t
00250 ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
00251
00253 template <class F> void eachMessage(F f) const {
00254 sys::Mutex::ScopedLock l(messageLock);
00255 std::for_each(messages.begin(), messages.end(), f);
00256 }
00257
00259 template <class F> void eachBinding(F f) {
00260 bindings.eachBinding(f);
00261 }
00262
00263 bool releaseMessageContent(const QueuedMessage&);
00264
00265 void popMsg(QueuedMessage& qmsg);
00266
00267 };
00268 }
00269 }
00270
00271
00272 #endif