10 #ifndef __PION_PIONLOCKEDQUEUE_HEADER__
11 #define __PION_PIONLOCKEDQUEUE_HEADER__
14 #include <boost/cstdint.hpp>
15 #include <boost/noncopyable.hpp>
16 #include <boost/thread/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/detail/atomic_count.hpp>
20 #include <pion/PionConfig.hpp>
21 #include <pion/PionException.hpp>
22 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
23 #include <boost/lockfree/detail/freelist.hpp>
41 boost::uint32_t MaxSize = 250000,
42 boost::uint32_t SleepMilliSec = 10 >
44 private boost::noncopyable
52 boost::uint32_t version;
57 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
58 return new (m_free_list.allocate())
QueueNode();
66 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
67 node_ptr->~QueueNode();
68 m_free_list.deallocate(node_ptr);
79 m_head_ptr->next = NULL;
80 m_head_ptr->version = 0;
91 inline bool dequeue(T& t, boost::uint32_t& version) {
93 boost::mutex::scoped_lock head_lock(m_head_mutex);
94 QueueNode *new_head_ptr = m_head_ptr->next;
96 version = m_head_ptr->version;
101 version = new_head_ptr->version;
102 t = new_head_ptr->data;
106 m_head_ptr = new_head_ptr;
131 m_wakeup_time(
boost::posix_time::not_a_date_time) {}
139 template <
typename DurationType>
141 : m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d)
145 inline bool isRunning(
void)
const {
return m_is_running; }
148 inline void stop(
void) { m_is_running =
false; m_wakeup_event.notify_one(); }
151 inline void reset(
void) { m_is_running =
true; m_next_ptr = NULL; }
154 inline bool hasWakeupTimer(
void)
const {
return !m_wakeup_time.is_not_a_date_time(); }
158 return m_wakeup_time;
166 volatile bool m_is_running;
168 boost::condition m_wakeup_event;
169 boost::posix_time::time_duration m_wakeup_time;
175 : m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL),
176 m_next_version(1), m_size(0)
188 inline bool empty(
void)
const {
return (m_head_ptr->next == NULL); }
197 boost::mutex::scoped_lock tail_lock(m_tail_mutex);
198 boost::mutex::scoped_lock head_lock(m_head_mutex);
201 m_tail_ptr = m_head_ptr;
202 m_head_ptr = m_head_ptr->next;
218 boost::system_time wakeup_time;
219 while (
size() >= MaxSize) {
220 wakeup_time = boost::get_system_time()
221 + boost::posix_time::millisec(SleepMilliSec);
222 boost::thread::sleep(wakeup_time);
229 node_ptr->next = NULL;
230 node_ptr->version = 0;
233 boost::mutex::scoped_lock tail_lock(m_tail_mutex);
234 node_ptr->version = (m_next_version += 2);
235 m_tail_ptr->next = node_ptr;
238 m_tail_ptr = node_ptr;
246 m_idle_ptr = m_idle_ptr->m_next_ptr;
247 idle_ptr->m_wakeup_event.notify_one();
262 boost::uint32_t last_known_version;
265 if (
dequeue(t, last_known_version) )
269 boost::mutex::scoped_lock tail_lock(m_tail_mutex);
270 if (m_tail_ptr->version == last_known_version) {
272 thread_info.m_next_ptr = m_idle_ptr;
273 m_idle_ptr = & thread_info;
277 const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.
getWakeupTimer());
278 if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time))
282 thread_info.m_wakeup_event.wait(tail_lock);
296 inline bool pop(T& t) { boost::uint32_t version;
return dequeue(t, version); }
301 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
307 boost::mutex m_head_mutex;
310 boost::mutex m_tail_mutex;
313 QueueNode * m_head_ptr;
316 QueueNode * m_tail_ptr;
319 ConsumerThread * m_idle_ptr;
322 boost::uint32_t m_next_version;
325 boost::detail::atomic_count m_size;
std::size_t size(void) const
returns the number of items that are currently in the queue
void initialize(void)
initializes head and tail pointers for empty queue
ConsumerThread(const DurationType &d)
bool empty(void) const
returns true if the queue is empty; false if it is not
bool hasWakeupTimer(void) const
returns true if an inactivity wakeup timer is set for the thread
bool isRunning(void) const
returns true while the consumer thread is active/running
data structure used to manage idle consumer threads waiting for items
void reset(void)
stops the thread – if waiting on pop() will return immediately
data structure used to wrap each item in the queue
const boost::posix_time::time_duration & getWakeupTimer(void) const
returns absolute wakeup time based on current time
void stop(void)
stops the thread – if waiting on pop() will return immediately
void clear(void)
clears the list by removing all remaining items
bool dequeue(T &t, boost::uint32_t &version)
the following enables use of the lock-free cache
PionLockedQueue(void)
constructs a new PionLockedQueue
QueueNode * createNode(void)
returns a new queue node item for use in the queue
bool pop(T &t, ConsumerThread &thread_info)
void destroyNode(QueueNode *node_ptr)
frees memory for an existing queue node item
virtual ~PionLockedQueue()
virtual destructor