00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _DeliveryRecord_
00022 #define _DeliveryRecord_
00023
00024 #include <algorithm>
00025 #include <list>
00026 #include <vector>
00027 #include <ostream>
00028 #include "qpid/framing/SequenceSet.h"
00029 #include "Queue.h"
00030 #include "Consumer.h"
00031 #include "DeliveryId.h"
00032 #include "DeliveryToken.h"
00033 #include "Message.h"
00034
00035 namespace qpid {
00036 namespace broker {
00037 class SemanticState;
00038
00042 class DeliveryRecord{
00043 QueuedMessage msg;
00044 mutable Queue::shared_ptr queue;
00045 const std::string tag;
00046 DeliveryToken::shared_ptr token;
00047 DeliveryId id;
00048 bool acquired;
00049 const bool pull;
00050 bool cancelled;
00051 const uint32_t credit;
00052 const uint64_t size;
00053
00054 bool completed;
00055 bool ended;
00056 const bool windowing;
00057
00058 public:
00059 DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
00060 const DeliveryId id, bool acquired, bool confirmed, bool windowing);
00061 bool matches(DeliveryId tag) const;
00062 bool matchOrAfter(DeliveryId tag) const;
00063 bool after(DeliveryId tag) const;
00064 bool coveredBy(const framing::SequenceSet* const range) const;
00065
00066 void dequeue(TransactionContext* ctxt = 0) const;
00067 void requeue() const;
00068 void release(bool setRedelivered);
00069 void reject();
00070 void cancel(const std::string& tag);
00071 void redeliver(SemanticState* const);
00072 void acquire(DeliveryIds& results);
00073 void complete();
00074 void accept(TransactionContext* ctxt);
00075 void setEnded();
00076
00077 bool isAcquired() const { return acquired; }
00078 bool isComplete() const { return completed; }
00079 bool isRedundant() const { return ended && (!windowing || completed); }
00080
00081 uint32_t getCredit() const;
00082 const std::string& getTag() const { return tag; }
00083 bool isPull() const { return pull; }
00084 friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
00085 friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
00086 };
00087
00088 typedef std::list<DeliveryRecord> DeliveryRecords;
00089 typedef std::list<DeliveryRecord>::iterator ack_iterator;
00090
00091 struct AckRange
00092 {
00093 ack_iterator start;
00094 ack_iterator end;
00095 AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
00096 };
00097
00098 struct AcquireFunctor
00099 {
00100 DeliveryIds& results;
00101
00102 AcquireFunctor(DeliveryIds& _results) : results(_results) {}
00103
00104 void operator()(DeliveryRecord& record)
00105 {
00106 record.acquire(results);
00107 }
00108 };
00109
00110 }
00111 }
00112
00113
00114 #endif