Fawkes API  Fawkes Development Version
gazsim_comm_thread.cpp
1 /***************************************************************************
2  * gazsim_comm_plugin.cpp - Plugin simulates peer-to-peer communication over
3  * an network with configurable instability and manages
4  * the frowarding of messages to different ports on
5  * the same machine.
6  *
7  * Created: Thu Sep 12 11:09:48 2013
8  * Copyright 2013 Frederik Zwilling
9  *
10  ****************************************************************************/
11 
12 /* This program is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU Library General Public License for more details.
21  *
22  * Read the full text in the LICENSE.GPL file in the doc directory.
23  */
24 
25 #include "gazsim_comm_thread.h"
26 
27 #include <aspect/blocked_timing.h>
28 #include <protobuf_comm/message_register.h>
29 #include <protobuf_comm/peer.h>
30 
31 #include <algorithm>
32 #include <stdlib.h>
33 
34 using namespace fawkes;
35 using namespace protobuf_comm;
36 using namespace boost::placeholders;
37 
38 /** @class GazsimCommThread "clips_thread.h"
39  * Plugin simulates and manages communication for Simulation in Gazebo
40  * @author Frederik Zwilling
41  */
42 
43 GazsimCommThread::GazsimCommThread()
44 : Thread("GazsimCommThread", Thread::OPMODE_WAITFORWAKEUP),
45  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
46 {
47 }
48 
49 GazsimCommThread::~GazsimCommThread()
50 {
51 }
52 
53 void
55 {
56  //logger->log_info(name(), "GazsimComm initializing");
57  initialized_ = false;
58 
59  //read config values
60  proto_dirs_ = config->get_strings("/gazsim/proto-dirs");
61  package_loss_ = config->get_float("/gazsim/comm/package-loss");
62  addresses_ = config->get_strings("/gazsim/comm/addresses");
63  send_ports_ = config->get_uints("/gazsim/comm/send-ports");
64  recv_ports_ = config->get_uints("/gazsim/comm/recv-ports");
65  use_crypto1_ = config->get_bool("/gazsim/comm/use-crypto1");
66  use_crypto2_ = config->get_bool("/gazsim/comm/use-crypto1");
67  send_ports_crypto1_ = config->get_uints("/gazsim/comm/send-ports-crypto1");
68  recv_ports_crypto1_ = config->get_uints("/gazsim/comm/recv-ports-crypto1");
69  send_ports_crypto2_ = config->get_uints("/gazsim/comm/send-ports-crypto2");
70  recv_ports_crypto2_ = config->get_uints("/gazsim/comm/recv-ports-crypto2");
71  if (addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
72  || (use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
73  || (use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
74  || (use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
75  || (use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size())) {
76  logger->log_warn(name(), "/gazsim/comm/ has an invalid configuration!");
77  }
78 
79  //resolve proto paths
80  try {
81  proto_dirs_ = config->get_strings("/clips-protobuf/proto-dirs");
82  for (size_t i = 0; i < proto_dirs_.size(); ++i) {
83  std::string::size_type pos;
84  if ((pos = proto_dirs_[i].find("@BASEDIR@")) != std::string::npos) {
85  proto_dirs_[i].replace(pos, 9, BASEDIR);
86  }
87  if ((pos = proto_dirs_[i].find("@FAWKES_BASEDIR@")) != std::string::npos) {
88  proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
89  }
90  if ((pos = proto_dirs_[i].find("@RESDIR@")) != std::string::npos) {
91  proto_dirs_[i].replace(pos, 8, RESDIR);
92  }
93  if ((pos = proto_dirs_[i].find("@CONFDIR@")) != std::string::npos) {
94  proto_dirs_[i].replace(pos, 9, CONFDIR);
95  }
96  if (proto_dirs_[i][proto_dirs_.size() - 1] != '/') {
97  proto_dirs_[i] += "/";
98  }
99  }
100  } catch (Exception &e) {
101  logger->log_warn(name(), "Failed to load proto paths from config, exception follows");
102  logger->log_warn(name(), e);
103  }
104 
105  //create peer connections
106  peers_.resize(addresses_.size());
107  peers_crypto1_.resize(addresses_.size());
108  peers_crypto2_.resize(addresses_.size());
109  for (unsigned int i = 0; i < addresses_.size(); i++) {
110  peers_[i] =
111  new ProtobufBroadcastPeer(addresses_[i], send_ports_[i], recv_ports_[i], proto_dirs_);
112  peers_[i]->signal_received_raw().connect(
113  boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
114  peers_[i]->signal_send_error().connect(
115  boost::bind(&GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_[i], _1));
116  if (use_crypto1_) {
117  peers_crypto1_[i] = new ProtobufBroadcastPeer(addresses_[i],
118  send_ports_crypto1_[i],
119  recv_ports_crypto1_[i],
120  proto_dirs_);
121  peers_crypto1_[i]->signal_received_raw().connect(
122  boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
123  peers_crypto1_[i]->signal_send_error().connect(boost::bind(
124  &GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_crypto1_[i], _1));
125  }
126  if (use_crypto2_) {
127  peers_crypto2_[i] = new ProtobufBroadcastPeer(addresses_[i],
128  send_ports_crypto2_[i],
129  recv_ports_crypto2_[i],
130  proto_dirs_);
131  peers_crypto2_[i]->signal_received_raw().connect(
132  boost::bind(&GazsimCommThread::receive_raw_msg, this, _1, _2, _3, _4));
133  peers_crypto2_[i]->signal_send_error().connect(boost::bind(
134  &GazsimCommThread::peer_send_error, this, addresses_[i], send_ports_crypto2_[i], _1));
135  }
136  }
137  initialized_ = true;
138 }
139 
140 void
142 {
143  for (unsigned int i = 0; i < peers_.size(); i++) {
144  delete peers_[i];
145  }
146 }
147 
148 void
150 {
151 }
152 
153 /**
154  * Receive and forward raw msg
155  * @param endpoint port msg received from
156  * @param header header of the msg
157  * @param data data stream
158  * @param length length of the data stream
159  */
160 void
161 GazsimCommThread::receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint,
163  void * data,
164  size_t length)
165 {
166  //logger->log_info(name(), "Got raw Message from port %d", endpoint.port());
167  unsigned int incoming_peer_port = endpoint.port(); //this is suprisingly the send-port
168 
169  if (!initialized_) {
170  return;
171  }
172 
173  //simulate package loss
174  double rnd = ((double)rand()) / ((double)RAND_MAX); //0.0 <= rnd <= 1.0
175  if (rnd < package_loss_) {
176  return;
177  }
178 
179  //check which set of peers the message comes from
180  std::vector<protobuf_comm::ProtobufBroadcastPeer *> peers;
181  std::vector<unsigned int> send_ports;
182  if (std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end()) {
183  peers = peers_;
184  send_ports = send_ports_;
185  } else if (use_crypto1_
186  && std::find(send_ports_crypto1_.begin(),
187  send_ports_crypto1_.end(),
188  incoming_peer_port)
189  != send_ports_crypto1_.end()) {
190  peers = peers_crypto1_;
191  send_ports = send_ports_crypto1_;
192  } else if (use_crypto2_
193  && std::find(send_ports_crypto2_.begin(),
194  send_ports_crypto2_.end(),
195  incoming_peer_port)
196  != send_ports_crypto2_.end()) {
197  peers = peers_crypto2_;
198  send_ports = send_ports_crypto2_;
199  }
200 
201  //send message to all other peers
202  for (unsigned int i = 0; i < peers.size(); i++) {
203  if (send_ports[i] != incoming_peer_port) {
204  peers[i]->send_raw(header, data, length);
205  }
206  }
207 }
208 
209 void
210 GazsimCommThread::peer_send_error(std::string address, unsigned int port, std::string err)
211 {
212  logger->log_warn(name(), "Peer send error for %s:%u: %s", address.c_str(), port, err.c_str());
213 }
fawkes::Configuration::get_bool
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
fawkes::BlockedTimingAspect
Thread aspect to use blocked timing.
Definition: blocked_timing.h:51
fawkes::Thread::name
const char * name() const
Get name of thread.
Definition: thread.h:100
fawkes::Configuration::get_uints
virtual std::vector< unsigned int > get_uints(const char *path)=0
Get list of values from configuration which is of type unsigned int.
fawkes::LoggingAspect::logger
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
protobuf_comm::frame_header_t
Network framing header.
Definition: frame_header.h:72
protobuf_comm::ProtobufBroadcastPeer
Communicate by broadcasting protobuf messages.
Definition: peer.h:57
fawkes
Fawkes library namespace.
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
fawkes::Configuration::get_strings
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
fawkes::ConfigurableAspect::config
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
GazsimCommThread::loop
virtual void loop()
Code to execute in the thread.
Definition: gazsim_comm_thread.cpp:149
fawkes::Configuration::get_float
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
GazsimCommThread::finalize
virtual void finalize()
Finalize the thread.
Definition: gazsim_comm_thread.cpp:141
GazsimCommThread::init
virtual void init()
Initialize the thread.
Definition: gazsim_comm_thread.cpp:54
fawkes::Exception
Base class for exceptions in Fawkes.
Definition: exception.h:36