00001 #ifndef _broker_PersistableMessage_h
00002 #define _broker_PersistableMessage_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <string>
00026 #include <list>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/weak_ptr.hpp>
00029 #include "Persistable.h"
00030 #include "qpid/framing/amqp_types.h"
00031 #include "qpid/sys/Monitor.h"
00032 #include "PersistableQueue.h"
00033
00034 namespace qpid {
00035 namespace broker {
00036
00037 class MessageStore;
00038
00042 class PersistableMessage : public Persistable
00043 {
00044 sys::Monitor asyncEnqueueLock;
00045 sys::Monitor asyncDequeueLock;
00046 sys::Mutex storeLock;
00047
00055 int asyncEnqueueCounter;
00056
00064 int asyncDequeueCounter;
00065 protected:
00066 typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
00067 syncList synclist;
00068 MessageStore* store;
00069 bool contentReleased;
00070
00071 inline void setContentReleased() {contentReleased = true; }
00072
00073 public:
00074 typedef boost::shared_ptr<PersistableMessage> shared_ptr;
00075
00079 virtual uint32_t encodedHeaderSize() const = 0;
00080
00081 virtual ~PersistableMessage() {};
00082
00083 PersistableMessage():
00084 asyncEnqueueCounter(0),
00085 asyncDequeueCounter(0),
00086 store(0),
00087 contentReleased(false)
00088 {}
00089
00090 void flush();
00091
00092 inline bool isContentReleased()const {return contentReleased; }
00093
00094 inline void waitForEnqueueComplete() {
00095 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00096 while (asyncEnqueueCounter > 0) {
00097 asyncEnqueueLock.wait();
00098 }
00099 }
00100
00101 inline bool isEnqueueComplete() {
00102 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00103 return asyncEnqueueCounter == 0;
00104 }
00105
00106 inline void enqueueComplete() {
00107 bool notify = false;
00108 {
00109 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00110 if (asyncEnqueueCounter > 0) {
00111 if (--asyncEnqueueCounter == 0) {
00112 asyncEnqueueLock.notify();
00113 notify = true;
00114 }
00115 }
00116 }
00117 if (notify) {
00118 sys::ScopedLock<sys::Mutex> l(storeLock);
00119 if (store) {
00120 for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
00121 PersistableQueue::shared_ptr q(i->lock());
00122 if (q) q->notifyDurableIOComplete();
00123 }
00124 }
00125 }
00126 }
00127
00128 inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
00129 if (_store){
00130 sys::ScopedLock<sys::Mutex> l(storeLock);
00131 store = _store;
00132 boost::weak_ptr<PersistableQueue> q(queue);
00133 synclist.push_back(q);
00134 }
00135 enqueueAsync();
00136 }
00137
00138 inline void enqueueAsync() {
00139 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00140 asyncEnqueueCounter++;
00141 }
00142
00143 inline bool isDequeueComplete() {
00144 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00145 return asyncDequeueCounter == 0;
00146 }
00147
00148 inline void dequeueComplete() {
00149
00150 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00151 if (asyncDequeueCounter > 0) {
00152 if (--asyncDequeueCounter == 0) {
00153 asyncDequeueLock.notify();
00154 }
00155 }
00156 }
00157
00158 inline void waitForDequeueComplete() {
00159 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00160 while (asyncDequeueCounter > 0) {
00161 asyncDequeueLock.wait();
00162 }
00163 }
00164
00165 inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
00166 if (_store){
00167 sys::ScopedLock<sys::Mutex> l(storeLock);
00168 store = _store;
00169 boost::weak_ptr<PersistableQueue> q(queue);
00170 synclist.push_back(q);
00171 }
00172 dequeueAsync();
00173 }
00174
00175 inline void dequeueAsync() {
00176 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00177 asyncDequeueCounter++;
00178 }
00179
00180
00181 };
00182
00183 }}
00184
00185
00186 #endif