Fawkes API  Fawkes Development Version
sync_thread.cpp
1 
2 /***************************************************************************
3  * sync_thread.cpp - Fawkes BlackBoard Synchronization Thread
4  *
5  * Created: Thu Jun 04 18:13:06 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "sync_thread.h"
24 
25 #include <blackboard/remote.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
28 
29 #include <cstring>
30 
31 using namespace std;
32 using namespace fawkes;
33 
34 /** @class BlackBoardSynchronizationThread "sync_thread.h"
35  * Thread to synchronize two BlackBoards.
36  * @author Tim Niemueller
37  */
38 
39 /** Constructor.
40  * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin
41  * @param peer_cfg_prefix The configuration prefix for the peer this sync thread
42  * has been created for.
43  * @param peer name of the peer configuration for this thread
44  */
46  std::string &peer_cfg_prefix,
47  std::string &peer)
48 : Thread("", Thread::OPMODE_CONTINUOUS)
49 {
50  set_name("BBSyncThread[%s]", peer.c_str());
52 
53  bbsync_cfg_prefix_ = bbsync_cfg_prefix;
54  peer_cfg_prefix_ = peer_cfg_prefix;
55  peer_ = peer;
56 
57  remote_bb_ = NULL;
58 }
59 
60 /** Destructor. */
62 {
63 }
64 
65 void
67 {
68  logger->log_debug(name(), "Initializing");
69  unsigned int check_interval = 0;
70  try {
71  host_ = config->get_string((peer_cfg_prefix_ + "host").c_str());
72  port_ = config->get_uint((peer_cfg_prefix_ + "port").c_str());
73 
74  check_interval = config->get_uint((bbsync_cfg_prefix_ + "check_interval").c_str());
75  } catch (Exception &e) {
76  e.append("Host or port not specified for peer");
77  throw;
78  }
79 
80  try {
81  check_interval = config->get_uint((peer_cfg_prefix_ + "check_interval").c_str());
82  logger->log_debug(name(), "Peer check interval set, overriding default.");
83  } catch (Exception &e) {
84  logger->log_debug(name(), "No per-peer check interval set, using default");
85  }
86 
87  read_config_combos(peer_cfg_prefix_ + "reading/", /* writing */ false);
88  read_config_combos(peer_cfg_prefix_ + "writing/", /* writing */ true);
89 
90  for (ComboMap::iterator i = combos_.begin(); i != combos_.end(); ++i) {
92  "Combo: %s, %s (%s, R) -> %s (%s, W)",
93  i->second.type.c_str(),
94  i->second.reader_id.c_str(),
95  i->second.remote_writer ? "local" : "remote",
96  i->second.writer_id.c_str(),
97  i->second.remote_writer ? "remote" : "local");
98  }
99 
100  wsl_local_ = new SyncWriterInterfaceListener(this, logger, (peer_ + "/local").c_str());
101  wsl_remote_ = new SyncWriterInterfaceListener(this, logger, (peer_ + "/remote").c_str());
102 
103  if (!check_connection()) {
104  logger->log_warn(name(), "Remote peer not reachable, will keep trying");
105  }
106 
107  logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
108  timewait_ = new TimeWait(clock, check_interval * 1000);
109 }
110 
111 void
113 {
114  delete timewait_;
115 
116  close_interfaces();
117 
118  delete wsl_local_;
119  delete wsl_remote_;
120  delete remote_bb_;
121  remote_bb_ = NULL;
122 }
123 
124 void
126 {
127  timewait_->mark_start();
128  check_connection();
129  timewait_->wait_systime();
130 }
131 
132 bool
133 BlackBoardSynchronizationThread::check_connection()
134 {
135  if (!remote_bb_ || !remote_bb_->is_alive()) {
136  if (remote_bb_) {
137  logger->log_warn(name(),
138  "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
139  peer_.c_str(),
140  host_.c_str(),
141  port_);
142  blackboard->unregister_listener(wsl_local_);
143  remote_bb_->unregister_listener(wsl_remote_);
144  close_interfaces();
145  delete remote_bb_;
146  remote_bb_ = NULL;
147  }
148 
149  try {
150  remote_bb_ = new RemoteBlackBoard(host_.c_str(), port_);
151  logger->log_info(name(),
152  "Successfully connected via remote BB to %s (%s:%u)",
153  peer_.c_str(),
154  host_.c_str(),
155  port_);
156 
157  open_interfaces();
158  blackboard->register_listener(wsl_local_, BlackBoard::BBIL_FLAG_WRITER);
159  remote_bb_->register_listener(wsl_remote_, BlackBoard::BBIL_FLAG_WRITER);
160  } catch (Exception &e) {
161  e.print_trace();
162  return false;
163  }
164  }
165  return true;
166 }
167 
168 void
169 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
170 {
171  Configuration::ValueIterator *i = config->search(prefix.c_str());
172  while (i->next()) {
173  if (strcmp(i->type(), "string") != 0) {
174  TypeMismatchException e("Only values of type string may occur in %s, "
175  "but found value of type %s",
176  prefix.c_str(),
177  i->type());
178  delete i;
179  throw e;
180  }
181 
182  std::string varname = std::string(i->path()).substr(prefix.length());
183  std::string uid = i->get_string();
184  size_t sf;
185 
186  if ((sf = uid.find("::")) == std::string::npos) {
187  delete i;
188  throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
189  uid.c_str(),
190  i->path());
191  }
192 
193  std::string type = uid.substr(0, sf);
194  std::string id = uid.substr(sf + 2);
195  combo_t combo = {type, id, id, writing};
196 
197  if ((sf = id.find("=")) != std::string::npos) {
198  // we got a mapping
199  combo.reader_id = id.substr(0, sf);
200  combo.writer_id = id.substr(sf + 1);
201  }
202 
203  combos_[varname] = combo;
204  }
205  delete i;
206 }
207 
208 void
209 BlackBoardSynchronizationThread::open_interfaces()
210 {
211  logger->log_debug(name(), "Opening interfaces");
212  MutexLocker lock(interfaces_.mutex());
213 
214  ComboMap::iterator i;
215  for (i = combos_.begin(); i != combos_.end(); ++i) {
216  Interface *iface_reader = NULL, *iface_writer = NULL;
217 
218  BlackBoard *writer_bb = i->second.remote_writer ? remote_bb_ : blackboard;
219  BlackBoard *reader_bb = i->second.remote_writer ? blackboard : remote_bb_;
220 
221  try {
222  logger->log_debug(name(),
223  "Opening reading %s (%s:%s)",
224  i->second.remote_writer ? "locally" : "remotely",
225  i->second.type.c_str(),
226  i->second.reader_id.c_str());
227  iface_reader =
228  reader_bb->open_for_reading(i->second.type.c_str(), i->second.reader_id.c_str());
229 
230  if (iface_reader->has_writer()) {
231  logger->log_debug(name(),
232  "Opening writing on %s (%s:%s)",
233  i->second.remote_writer ? "remotely" : "locally",
234  i->second.type.c_str(),
235  i->second.writer_id.c_str());
236  iface_writer =
237  writer_bb->open_for_writing(i->second.type.c_str(), i->second.writer_id.c_str());
238  }
239 
240  InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
241  interfaces_[iface_reader] = ii;
242 
243  } catch (Exception &e) {
244  reader_bb->close(iface_reader);
245  writer_bb->close(iface_writer);
246  throw;
247  }
248 
249  SyncInterfaceListener *sync_listener = NULL;
250  if (iface_writer) {
251  logger->log_debug(name(), "Creating sync listener");
252  sync_listener =
253  new SyncInterfaceListener(logger, iface_reader, iface_writer, reader_bb, writer_bb);
254  }
255  sync_listeners_[iface_reader] = sync_listener;
256 
257  if (i->second.remote_writer) {
258  wsl_local_->add_interface(iface_reader);
259  } else {
260  wsl_remote_->add_interface(iface_reader);
261  }
262  }
263 }
264 
265 void
266 BlackBoardSynchronizationThread::close_interfaces()
267 {
268  SyncListenerMap::iterator s;
269  for (s = sync_listeners_.begin(); s != sync_listeners_.end(); ++s) {
270  if (s->second) {
271  logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
272  delete s->second;
273  }
274  }
275  MutexLocker lock(interfaces_.mutex());
276  InterfaceMap::iterator i;
277  for (i = interfaces_.begin(); i != interfaces_.end(); ++i) {
278  logger->log_debug(name(),
279  "Closing %s reading interface %s",
280  i->second.combo->remote_writer ? "local" : "remote",
281  i->first->uid());
282  if (i->second.combo->remote_writer) {
283  wsl_local_->remove_interface(i->first);
284  blackboard->close(i->first);
285  } else {
286  wsl_remote_->remove_interface(i->first);
287  remote_bb_->close(i->first);
288  }
289  if (i->second.writer) {
290  logger->log_debug(name(),
291  "Closing %s writing interface %s",
292  i->second.combo->remote_writer ? "remote" : "local",
293  i->second.writer->uid());
294  if (i->second.combo->remote_writer) {
295  remote_bb_->close(i->second.writer);
296  } else {
297  blackboard->close(i->second.writer);
298  }
299  }
300  }
301  interfaces_.clear();
302  sync_listeners_.clear();
303 }
304 
305 /** A writer has been added for an interface.
306  * To be called only by SyncWriterInterfaceListener.
307  * @param interface the interface a writer has been added for.
308  */
309 void
311 {
312  MutexLocker lock(interfaces_.mutex());
313 
314  if (interfaces_[interface].writer) {
315  // There exists a writer!?
316  logger->log_warn(name(),
317  "Writer added for %s, but relay exists already. Bug?",
318  interface->uid());
319  } else {
320  logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
321 
322  Interface * iface = NULL;
323  SyncInterfaceListener *sync_listener = NULL;
324  InterfaceInfo & ii = interfaces_[interface];
325  try {
326  iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(), ii.combo->writer_id.c_str());
327 
328  logger->log_debug(name(),
329  "Creating sync listener for %s:%s-%s",
330  ii.combo->type.c_str(),
331  ii.combo->reader_id.c_str(),
332  ii.combo->writer_id.c_str());
333 
334  sync_listener =
335  new SyncInterfaceListener(logger, interface, iface, ii.reader_bb, ii.writer_bb);
336 
337  sync_listeners_[interface] = sync_listener;
338  ii.writer = iface;
339 
340  } catch (Exception &e) {
341  delete sync_listener;
342  ii.writer_bb->close(iface);
343  logger->log_error(name(),
344  "Failed to open writer for %s:%s-%s, sync broken",
345  ii.combo->type.c_str(),
346  ii.combo->reader_id.c_str(),
347  ii.combo->writer_id.c_str());
348  logger->log_error(name(), e);
349  }
350  }
351 }
352 
353 /** A writer has been removed for an interface.
354  * To be called only by SyncWriterInterfaceListener.
355  * @param interface the interface a writer has been removed for.
356  */
357 void
359 {
360  MutexLocker lock(interfaces_.mutex());
361 
362  if (!interfaces_[interface].writer) {
363  // We do not have a writer!?
364  logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
365  } else {
366  logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
367 
368  InterfaceInfo &ii = interfaces_[interface];
369  try {
370  delete sync_listeners_[interface];
371  sync_listeners_[interface] = NULL;
372 
373  ii.writer_bb->close(ii.writer);
374  ii.writer = NULL;
375 
376  } catch (Exception &e) {
377  logger->log_error(name(),
378  "Failed to close writer for %s:%s-%s, sync broken",
379  ii.combo->type.c_str(),
380  ii.combo->reader_id.c_str(),
381  ii.combo->writer_id.c_str());
382  logger->log_error(name(), e);
383  }
384  }
385 }
BlackBoardSynchronizationThread::loop
virtual void loop()
Code to execute in the thread.
Definition: sync_thread.cpp:125
BlackBoardSynchronizationThread::~BlackBoardSynchronizationThread
virtual ~BlackBoardSynchronizationThread()
Destructor.
Definition: sync_thread.cpp:61
fawkes::MultiLogger::log_error
virtual void log_error(const char *component, const char *format,...)
Log error message.
Definition: multi.cpp:237
fawkes::Thread::set_prepfin_conc_loop
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
fawkes::LockMap::mutex
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:133
fawkes::BlackBoard::register_listener
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
BlackBoardSynchronizationThread::finalize
virtual void finalize()
Finalize the thread.
Definition: sync_thread.cpp:112
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
fawkes::MutexLocker
Mutex locking helper.
Definition: mutex_locker.h:34
fawkes::Configuration::ValueIterator::get_string
virtual std::string get_string() const =0
Get string value.
fawkes::BlackBoard::unregister_listener
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
fawkes::BlackBoard
The BlackBoard abstract class.
Definition: blackboard.h:46
BlackBoardSynchronizationThread::BlackBoardSynchronizationThread
BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, std::string &peer_cfg_prefix, std::string &peer)
Constructor.
Definition: sync_thread.cpp:45
fawkes::RemoteBlackBoard
Remote BlackBoard.
Definition: remote.h:49
fawkes::Thread::name
const char * name() const
Get name of thread.
Definition: thread.h:100
fawkes::ClockAspect::clock
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
fawkes::Configuration::ValueIterator
Iterator interface to iterate over config values.
Definition: config.h:72
fawkes::Exception::append
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:333
fawkes::Configuration::search
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
fawkes::MultiLogger::log_debug
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
fawkes::MultiLogger::log_warn
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
fawkes::TypeMismatchException
Type mismatch.
Definition: software.h:44
BlackBoardSynchronizationThread::writer_added
void writer_added(fawkes::Interface *interface)
A writer has been added for an interface.
Definition: sync_thread.cpp:310
fawkes::LoggingAspect::logger
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
Close interface.
fawkes
Fawkes library namespace.
BlackBoardSynchronizationThread::writer_removed
void writer_removed(fawkes::Interface *interface)
A writer has been removed for an interface.
Definition: sync_thread.cpp:358
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
fawkes::Interface
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
fawkes::Interface::has_writer
bool has_writer() const
Check if there is a writer for the interface.
Definition: interface.cpp:817
fawkes::Exception::print_trace
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:601
fawkes::TimeWait::mark_start
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
fawkes::ConfigurableAspect::config
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
SyncWriterInterfaceListener
Listener for writer events in bbsync plugin.
Definition: writer_listener.h:36
fawkes::BlackBoard::is_alive
virtual bool is_alive() const =0
Check if the BlackBoard is still alive.
SyncWriterInterfaceListener::remove_interface
void remove_interface(fawkes::Interface *interface)
Remove an interface to listen to.
Definition: writer_listener.cpp:68
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
fawkes::TimeWait
Time wait utility.
Definition: wait.h:33
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
fawkes::Configuration::ValueIterator::type
virtual const char * type() const =0
Type of value.
BlackBoardSynchronizationThread::init
virtual void init()
Initialize the thread.
Definition: sync_thread.cpp:66
fawkes::Configuration::get_uint
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
fawkes::TimeWait::wait_systime
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
fawkes::BlackBoard::open_for_reading
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
SyncInterfaceListener
Synchronize two interfaces.
Definition: sync_listener.h:34
SyncWriterInterfaceListener::add_interface
void add_interface(fawkes::Interface *interface)
Add an interface to listen to.
Definition: writer_listener.cpp:59
fawkes::Configuration::ValueIterator::path
virtual const char * path() const =0
Path of value.
fawkes::Logger::log_debug
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
fawkes::InterfaceInfo
Interface info.
Definition: interface_info.h:35
fawkes::BlackBoard::open_for_writing
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
fawkes::Configuration::ValueIterator::next
virtual bool next()=0
Check if there is another element and advance to this if possible.
fawkes::Thread::set_name
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
fawkes::Exception
Base class for exceptions in Fawkes.
Definition: exception.h:36