pion-net  4.0.9
PionScheduler.cpp
1 // -----------------------------------------------------------------------
2 // pion-common: a collection of common libraries used by the Pion Platform
3 // -----------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #include <boost/date_time/posix_time/posix_time_duration.hpp>
11 #include <pion/PionScheduler.hpp>
12 
13 namespace pion { // begin namespace pion
14 
15 
16 // static members of PionScheduler
17 
18 const boost::uint32_t PionScheduler::DEFAULT_NUM_THREADS = 8;
19 const boost::uint32_t PionScheduler::NSEC_IN_SECOND = 1000000000; // (10^9)
20 const boost::uint32_t PionScheduler::MICROSEC_IN_SECOND = 1000000; // (10^6)
21 const boost::uint32_t PionScheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
22 
23 
24 // PionScheduler member functions
25 
27 {
28  // lock mutex for thread safety
29  boost::mutex::scoped_lock scheduler_lock(m_mutex);
30 
31  if (m_is_running) {
32 
33  PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
34 
35  while (m_active_users > 0) {
36  // first, wait for any active users to exit
37  PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
38  m_no_more_active_users.wait(scheduler_lock);
39  }
40 
41  // shut everything down
42  m_is_running = false;
43  stopServices();
44  stopThreads();
46  finishThreads();
47 
48  PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
49 
50  // Make sure anyone waiting on shutdown gets notified
51  m_scheduler_has_stopped.notify_all();
52 
53  } else {
54 
55  // stop and finish everything to be certain that no events are pending
56  stopServices();
57  stopThreads();
59  finishThreads();
60 
61  // Make sure anyone waiting on shutdown gets notified
62  // even if the scheduler did not startup successfully
63  m_scheduler_has_stopped.notify_all();
64  }
65 }
66 
68 {
69  boost::mutex::scoped_lock scheduler_lock(m_mutex);
70  while (m_is_running) {
71  // sleep until scheduler_has_stopped condition is signaled
72  m_scheduler_has_stopped.wait(scheduler_lock);
73  }
74 }
75 
76 void PionScheduler::keepRunning(boost::asio::io_service& my_service,
77  boost::asio::deadline_timer& my_timer)
78 {
79  if (m_is_running) {
80  // schedule this again to make sure the service doesn't complete
81  my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
82  my_timer.async_wait(boost::bind(&PionScheduler::keepRunning, this,
83  boost::ref(my_service), boost::ref(my_timer)));
84  }
85 }
86 
88 {
89  if (!m_is_running) startup();
90  boost::mutex::scoped_lock scheduler_lock(m_mutex);
92 }
93 
95 {
96  boost::mutex::scoped_lock scheduler_lock(m_mutex);
97  if (--m_active_users == 0)
98  m_no_more_active_users.notify_all();
99 }
100 
101 boost::xtime PionScheduler::getWakeupTime(boost::uint32_t sleep_sec,
102  boost::uint32_t sleep_nsec)
103 {
104  boost::xtime wakeup_time;
105 #ifdef TIME_UTC
106  boost::xtime_get(&wakeup_time, TIME_UTC);
107 #else
108  boost::xtime_get(&wakeup_time, boost::TIME_UTC);
109 #endif
110  wakeup_time.sec += sleep_sec;
111  wakeup_time.nsec += sleep_nsec;
112  if (static_cast<boost::uint32_t>(wakeup_time.nsec) >= NSEC_IN_SECOND) {
113  wakeup_time.sec++;
114  wakeup_time.nsec -= NSEC_IN_SECOND;
115  }
116  return wakeup_time;
117 }
118 
119 void PionScheduler::processServiceWork(boost::asio::io_service& service) {
120  while (m_is_running) {
121  try {
122  service.run();
123  } catch (std::exception& e) {
124  PION_LOG_ERROR(m_logger, e.what());
125  } catch (...) {
126  PION_LOG_ERROR(m_logger, "caught unrecognized exception");
127  }
128  }
129 }
130 
131 
132 // PionSingleServiceScheduler member functions
133 
135 {
136  // lock mutex for thread safety
137  boost::mutex::scoped_lock scheduler_lock(m_mutex);
138 
139  if (! m_is_running) {
140  PION_LOG_INFO(m_logger, "Starting thread scheduler");
141  m_is_running = true;
142 
143  // schedule a work item to make sure that the service doesn't complete
144  m_service.reset();
146 
147  // start multiple threads to handle async tasks
148  for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
149  boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
150  this, boost::ref(m_service)) ));
151  m_thread_pool.push_back(new_thread);
152  }
153  }
154 }
155 
156 
157 // PionOneToOneScheduler member functions
158 
160 {
161  // lock mutex for thread safety
162  boost::mutex::scoped_lock scheduler_lock(m_mutex);
163 
164  if (! m_is_running) {
165  PION_LOG_INFO(m_logger, "Starting thread scheduler");
166  m_is_running = true;
167 
168  // make sure there are enough services initialized
169  while (m_service_pool.size() < m_num_threads) {
170  boost::shared_ptr<ServicePair> service_ptr(new ServicePair());
171  m_service_pool.push_back(service_ptr);
172  }
173 
174  // schedule a work item for each service to make sure that it doesn't complete
175  for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
176  keepRunning((*i)->first, (*i)->second);
177  }
178 
179  // start multiple threads to handle async tasks
180  for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
181  boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&PionScheduler::processServiceWork,
182  this, boost::ref(m_service_pool[n]->first)) ));
183  m_thread_pool.push_back(new_thread);
184  }
185  }
186 }
187 
188 
189 } // end namespace pion
void addActiveUser(void)
static const boost::uint32_t NSEC_IN_SECOND
number of nanoseconds in one full second (10 ^ 9)
boost::asio::io_service m_service
service used to manage async I/O events
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
void keepRunning(boost::asio::io_service &my_service, boost::asio::deadline_timer &my_timer)
boost::condition m_no_more_active_users
condition triggered when there are no more active users
static boost::xtime getWakeupTime(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
void join(void)
the calling thread will sleep until the scheduler has stopped
PionLogger m_logger
primary logging interface used by this class
bool m_is_running
true if the thread scheduler is running
ServicePool m_service_pool
pool of IO services used to schedule work
virtual void finishServices(void)
finishes all services used to schedule work
virtual void stopThreads(void)
stops all threads used to perform work
static const boost::uint32_t MICROSEC_IN_SECOND
number of microseconds in one full second (10 ^ 6)
typedef for a pair object where first is an IO service and second is a deadline timer ...
void processServiceWork(boost::asio::io_service &service)
processes work passed to the asio service & handles uncaught exceptions
static const boost::uint32_t DEFAULT_NUM_THREADS
default number of worker threads in the thread pool
void removeActiveUser(void)
unregisters an active user with the thread scheduler
boost::condition m_scheduler_has_stopped
condition triggered when the scheduler has stopped
boost::uint32_t m_num_threads
total number of worker threads in the pool
boost::mutex m_mutex
mutex to make class thread-safe
boost::asio::deadline_timer m_timer
timer used to periodically check for shutdown
virtual void shutdown(void)
Stops the thread scheduler (this is called automatically when the program exits)
virtual void stopServices(void)
stops all services used to schedule work
virtual void finishThreads(void)
finishes all threads used to perform work
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
ThreadPool m_thread_pool
pool of threads used to perform work
boost::uint32_t m_active_users
the scheduler will not shutdown until there are no more active users
static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS
number of seconds a timer should wait for to keep the IO services running