Alexandria  2.16
Please provide a description of the project.
ThreadPool.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2020 Euclid Science Ground Segment
3  *
4  * This library is free software; you can redistribute it and/or modify it under
5  * the terms of the GNU Lesser General Public License as published by the Free
6  * Software Foundation; either version 3.0 of the License, or (at your option)
7  * any later version.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
27 
28 namespace Euclid {
29 
30 namespace {
31 
32 class Worker {
33 
34 public:
35 
36  Worker(std::mutex& queue_mutex, std::deque<ThreadPool::Task>& queue,
37  std::atomic<bool>& run_flag, std::atomic<bool>& sleeping_flag,
38  std::atomic<bool>& done_flag, unsigned int empty_queue_wait_time,
39  std::exception_ptr& exception_ptr)
40  : m_queue_mutex(queue_mutex), m_queue(queue), m_run_flag(run_flag),
41  m_sleeping_flag(sleeping_flag), m_done_flag(done_flag),
42  m_empty_queue_wait_time(empty_queue_wait_time),
43  m_exception_ptr(exception_ptr) {
44  }
45 
46  void operator()() {
47  while (m_run_flag.get() && m_exception_ptr == nullptr) {
48  // Check if there is anything it the queue to be done and get it
49  std::unique_ptr<ThreadPool::Task> task_ptr = nullptr;
51  if (!m_queue.get().empty()) {
52  task_ptr = make_unique<ThreadPool::Task>(m_queue.get().front());
53  m_queue.get().pop_front();
54  }
55  lock.unlock();
56 
57  // If we have some work to do, do it. Otherwise sleep for some time.
58  if (task_ptr) {
59  try {
60  (*task_ptr)();
61  } catch(...) {
63  }
64  } else {
65  m_sleeping_flag.get() = true;
67  m_sleeping_flag.get() = false;
68  }
69  }
70  // Indicate that the worker is done
71  m_sleeping_flag.get() = true;
72  m_done_flag.get() = true;
73  }
74 
75 private:
76 
84 
85 };
86 
87 } // end of anonymous namespace
88 
89 ThreadPool::ThreadPool(unsigned int thread_count, unsigned int empty_queue_wait_time)
90  : m_worker_run_flags(thread_count), m_worker_sleeping_flags(thread_count),
91  m_worker_done_flags(thread_count), m_empty_queue_wait_time(empty_queue_wait_time) {
92  for (unsigned int i = 0; i < thread_count; ++i) {
93  m_worker_run_flags.at(i) = true;
94  m_worker_sleeping_flags.at(i) = false;
95  m_worker_done_flags.at(i) = false;
98  }
99 }
100 
101 namespace {
102 
103 void waitWorkers(std::vector<std::atomic<bool>>& worker_flags, unsigned int wait_time) {
104  // Now wait until all the workers have finish any current tasks
105  for (auto& flag : worker_flags) {
106  while (!flag) {
108  }
109  }
110 }
111 
112 }
113 
114 bool ThreadPool::checkForException(bool rethrow) {
115  if (m_exception_ptr) {
116  if (rethrow) {
118  } else {
119  return true;
120  }
121  }
122  return false;
123 }
124 
126  // Wait for the queue to be empty
127  bool queue_is_empty = false;
128  while (!queue_is_empty && m_exception_ptr == nullptr) {
130  queue_is_empty = m_queue.empty();
131  lock.unlock();
132  if (!queue_is_empty) {
134  }
135  }
136  // Wait for the workers to finish the currently executing tasks
138  // Check if any worker finished with an exception
139  checkForException(true);
140 }
141 
142 
144  // Stop all the workers. They will stop right after they finish the task
145  // they already run.
146  for (auto& flag : m_worker_run_flags) {
147  flag = false;
148  }
149  // Now wait until all the workers have finish any current tasks
151 }
152 
155  m_queue.emplace_back(std::move(task));
156 }
157 
158 } // Euclid namespace
159 
160 
161 
std::exception_ptr
std::this_thread::sleep_for
T sleep_for(T... args)
std::lock
T lock(T... args)
std::move
T move(T... args)
m_done_flag
std::reference_wrapper< std::atomic< bool > > m_done_flag
Definition: ThreadPool.cpp:81
Euclid::ThreadPool::~ThreadPool
virtual ~ThreadPool()
Definition: ThreadPool.cpp:143
std::vector
STL class.
Euclid::ThreadPool::ThreadPool
ThreadPool(unsigned int thread_count=std::thread::hardware_concurrency(), unsigned int empty_queue_wait_time=50)
Constructs a new ThreadPool.
Definition: ThreadPool.cpp:89
std::chrono::milliseconds
Euclid::ThreadPool::m_worker_run_flags
std::vector< std::atomic< bool > > m_worker_run_flags
Definition: ThreadPool.h:102
std::lock_guard
STL class.
Euclid::ThreadPool::m_worker_done_flags
std::vector< std::atomic< bool > > m_worker_done_flags
Definition: ThreadPool.h:104
std::function
m_sleeping_flag
std::reference_wrapper< std::atomic< bool > > m_sleeping_flag
Definition: ThreadPool.cpp:80
std::current_exception
T current_exception(T... args)
m_queue
std::reference_wrapper< std::deque< ThreadPool::Task > > m_queue
Definition: ThreadPool.cpp:78
std::reference_wrapper
Euclid::ThreadPool::m_queue
std::deque< Task > m_queue
Definition: ThreadPool.h:105
std::thread
STL class.
std::vector::at
T at(T... args)
ThreadPool.h
std::unique_lock
STL class.
m_run_flag
std::reference_wrapper< std::atomic< bool > > m_run_flag
Definition: ThreadPool.cpp:79
Euclid::ThreadPool::checkForException
bool checkForException(bool rethrow=false)
Checks if any task has thrown an exception and optionally rethrows it.
Definition: ThreadPool.cpp:114
std::deque
STL class.
std::atomic< bool >
m_empty_queue_wait_time
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.cpp:82
Euclid::ThreadPool::m_empty_queue_wait_time
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.h:106
Euclid::ThreadPool::block
void block()
Definition: ThreadPool.cpp:125
std::mutex
STL class.
Euclid::ThreadPool::m_queue_mutex
std::mutex m_queue_mutex
Definition: ThreadPool.h:101
Euclid::ThreadPool::m_worker_sleeping_flags
std::vector< std::atomic< bool > > m_worker_sleeping_flags
Definition: ThreadPool.h:103
Euclid::ThreadPool::m_exception_ptr
std::exception_ptr m_exception_ptr
Definition: ThreadPool.h:107
Euclid::ThreadPool::submit
void submit(Task task)
Submit a task to be executed.
Definition: ThreadPool.cpp:153
std::rethrow_exception
T rethrow_exception(T... args)
m_exception_ptr
std::reference_wrapper< std::exception_ptr > m_exception_ptr
Definition: ThreadPool.cpp:83
memory_tools.h
std::unique_ptr
STL class.
Euclid
Definition: InstOrRefHolder.h:29
m_queue_mutex
std::reference_wrapper< std::mutex > m_queue_mutex
Definition: ThreadPool.cpp:77