10 #ifndef __PION_PIONSCHEDULER_HEADER__
11 #define __PION_PIONSCHEDULER_HEADER__
14 #include <boost/asio.hpp>
15 #include <boost/bind.hpp>
16 #include <boost/function/function0.hpp>
17 #include <boost/cstdint.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/noncopyable.hpp>
20 #include <boost/thread/thread.hpp>
21 #include <boost/thread/mutex.hpp>
22 #include <boost/thread/xtime.hpp>
23 #include <boost/thread/condition.hpp>
24 #include <pion/PionConfig.hpp>
25 #include <pion/PionException.hpp>
26 #include <pion/PionLogger.hpp>
35 private boost::noncopyable
41 : m_logger(PION_GET_LOGGER(
"pion.PionScheduler")),
42 m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
52 virtual void shutdown(
void);
60 void addActiveUser(
void);
63 void removeActiveUser(
void);
66 inline bool isRunning(
void)
const {
return m_is_running; }
69 inline void setNumThreads(
const boost::uint32_t n) { m_num_threads = n; }
72 inline boost::uint32_t
getNumThreads(
void)
const {
return m_num_threads; }
81 virtual boost::asio::io_service& getIOService(
void) = 0;
88 virtual void post(boost::function0<void> work_func) {
89 getIOService().post(work_func);
98 void keepRunning(boost::asio::io_service& my_service,
99 boost::asio::deadline_timer& my_timer);
107 inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
108 boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec));
109 boost::thread::sleep(wakeup_time);
121 template <
typename ConditionType,
typename LockType>
122 inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
123 boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
125 boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec));
126 wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
131 void processServiceWork(boost::asio::io_service& service);
144 static boost::xtime getWakeupTime(boost::uint32_t sleep_sec,
145 boost::uint32_t sleep_nsec);
215 if (! m_thread_pool.empty()) {
216 PION_LOG_DEBUG(m_logger,
"Waiting for threads to shutdown");
219 boost::thread current_thread;
220 for (ThreadPool::iterator i = m_thread_pool.begin();
221 i != m_thread_pool.end(); ++i)
225 if (**i != current_thread) (*i)->join();
235 typedef std::vector<boost::shared_ptr<boost::thread> >
ThreadPool;
253 : m_service(), m_timer(m_service)
260 virtual boost::asio::io_service&
getIOService(
void) {
return m_service; }
263 virtual void startup(
void);
293 : m_service_pool(), m_next_service(0)
301 boost::mutex::scoped_lock scheduler_lock(m_mutex);
302 while (m_service_pool.size() < m_num_threads) {
303 boost::shared_ptr<ServicePair> service_ptr(
new ServicePair());
304 m_service_pool.push_back(service_ptr);
306 if (++m_next_service >= m_num_threads)
308 PION_ASSERT(m_next_service < m_num_threads);
309 return m_service_pool[m_next_service]->first;
319 PION_ASSERT(n < m_num_threads);
320 PION_ASSERT(n < m_service_pool.size());
321 return m_service_pool[n]->first;
325 virtual void startup(
void);
332 for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
344 boost::asio::io_service first;
345 boost::asio::deadline_timer second;
virtual void finishThreads(void)
finishes all threads used to perform work
static const boost::uint32_t NSEC_IN_SECOND
number of nanoseconds in one full second (10 ^ 9)
void setLogger(PionLogger log_ptr)
sets the logger to be used
boost::asio::io_service m_service
service used to manage async I/O events
PionScheduler(void)
constructs a new PionScheduler
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
boost::condition m_no_more_active_users
condition triggered when there are no more active users
std::vector< boost::shared_ptr< boost::thread > > ThreadPool
typedef for a pool of worker threads
PionOneToOneScheduler(void)
constructs a new PionOneToOneScheduler
PionLogger m_logger
primary logging interface used by this class
std::vector< boost::shared_ptr< ServicePair > > ServicePool
typedef for a pool of IO services
virtual ~PionScheduler()
virtual destructor
bool m_is_running
true if the thread scheduler is running
virtual ~PionMultiThreadScheduler()
virtual destructor
static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
boost::uint32_t m_next_service
the next service to use for scheduling work
PionLogger getLogger(void)
returns the logger currently in use
ServicePool m_service_pool
pool of IO services used to schedule work
virtual void post(boost::function0< void > work_func)
bool isRunning(void) const
returns true if the scheduler is running
virtual ~PionOneToOneScheduler()
virtual destructor
virtual void finishServices(void)
finishes all services used to schedule work
virtual void finishServices(void)
finishes all services used to schedule work
void setNumThreads(const boost::uint32_t n)
sets the number of threads to be used (these are shared by all servers)
virtual void stopThreads(void)
stops all threads used to perform work
virtual boost::asio::io_service & getIOService(void)
returns an async I/O service used to schedule work
static const boost::uint32_t MICROSEC_IN_SECOND
number of microseconds in one full second (10 ^ 6)
typedef for a pair object where first is an IO service and second is a deadline timer ...
virtual void stopServices(void)
stops all services used to schedule work
virtual ~PionSingleServiceScheduler()
virtual destructor
static const boost::uint32_t DEFAULT_NUM_THREADS
default number of worker threads in the thread pool
boost::condition m_scheduler_has_stopped
condition triggered when the scheduler has stopped
boost::uint32_t m_num_threads
total number of worker threads in the pool
boost::mutex m_mutex
mutex to make class thread-safe
virtual void stopServices(void)
stops all services used to schedule work
boost::asio::deadline_timer m_timer
timer used to periodically check for shutdown
the following enables use of the lock-free cache
virtual boost::asio::io_service & getIOService(boost::uint32_t n)
virtual void stopServices(void)
stops all services used to schedule work
static void sleep(ConditionType &wakeup_condition, LockType &wakeup_lock, boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
virtual void finishServices(void)
finishes all services used to schedule work
PionSingleServiceScheduler(void)
constructs a new PionSingleServiceScheduler
virtual void finishThreads(void)
finishes all threads used to perform work
PionMultiThreadScheduler(void)
constructs a new PionSingleServiceScheduler
boost::uint32_t getNumThreads(void) const
returns the number of threads currently in use
ThreadPool m_thread_pool
pool of threads used to perform work
boost::uint32_t m_active_users
the scheduler will not shutdown until there are no more active users
virtual boost::asio::io_service & getIOService(void)
returns an async I/O service used to schedule work
static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS
number of seconds a timer should wait for to keep the IO services running
virtual void stopThreads(void)
stops all threads used to perform work