25 #include "gazsim_comm_thread.h"
27 #include <aspect/blocked_timing.h>
28 #include <protobuf_comm/message_register.h>
29 #include <protobuf_comm/peer.h>
35 using namespace protobuf_comm;
36 using namespace boost::placeholders;
43 GazsimCommThread::GazsimCommThread()
44 :
Thread(
"GazsimCommThread",
Thread::OPMODE_WAITFORWAKEUP),
49 GazsimCommThread::~GazsimCommThread()
67 send_ports_crypto1_ =
config->
get_uints(
"/gazsim/comm/send-ports-crypto1");
68 recv_ports_crypto1_ =
config->
get_uints(
"/gazsim/comm/recv-ports-crypto1");
69 send_ports_crypto2_ =
config->
get_uints(
"/gazsim/comm/send-ports-crypto2");
70 recv_ports_crypto2_ =
config->
get_uints(
"/gazsim/comm/recv-ports-crypto2");
71 if (addresses_.size() != send_ports_.size() || addresses_.size() != recv_ports_.size()
72 || (use_crypto1_ && addresses_.size() != send_ports_crypto1_.size())
73 || (use_crypto1_ && addresses_.size() != recv_ports_crypto1_.size())
74 || (use_crypto2_ && addresses_.size() != send_ports_crypto2_.size())
75 || (use_crypto2_ && addresses_.size() != recv_ports_crypto2_.size())) {
82 for (
size_t i = 0; i < proto_dirs_.size(); ++i) {
83 std::string::size_type pos;
84 if ((pos = proto_dirs_[i].find(
"@BASEDIR@")) != std::string::npos) {
85 proto_dirs_[i].replace(pos, 9, BASEDIR);
87 if ((pos = proto_dirs_[i].find(
"@FAWKES_BASEDIR@")) != std::string::npos) {
88 proto_dirs_[i].replace(pos, 16, FAWKES_BASEDIR);
90 if ((pos = proto_dirs_[i].find(
"@RESDIR@")) != std::string::npos) {
91 proto_dirs_[i].replace(pos, 8, RESDIR);
93 if ((pos = proto_dirs_[i].find(
"@CONFDIR@")) != std::string::npos) {
94 proto_dirs_[i].replace(pos, 9, CONFDIR);
96 if (proto_dirs_[i][proto_dirs_.size() - 1] !=
'/') {
97 proto_dirs_[i] +=
"/";
101 logger->
log_warn(
name(),
"Failed to load proto paths from config, exception follows");
106 peers_.resize(addresses_.size());
107 peers_crypto1_.resize(addresses_.size());
108 peers_crypto2_.resize(addresses_.size());
109 for (
unsigned int i = 0; i < addresses_.size(); i++) {
112 peers_[i]->signal_received_raw().connect(
113 boost::bind(&GazsimCommThread::receive_raw_msg,
this, _1, _2, _3, _4));
114 peers_[i]->signal_send_error().connect(
115 boost::bind(&GazsimCommThread::peer_send_error,
this, addresses_[i], send_ports_[i], _1));
118 send_ports_crypto1_[i],
119 recv_ports_crypto1_[i],
121 peers_crypto1_[i]->signal_received_raw().connect(
122 boost::bind(&GazsimCommThread::receive_raw_msg,
this, _1, _2, _3, _4));
123 peers_crypto1_[i]->signal_send_error().connect(boost::bind(
124 &GazsimCommThread::peer_send_error,
this, addresses_[i], send_ports_crypto1_[i], _1));
128 send_ports_crypto2_[i],
129 recv_ports_crypto2_[i],
131 peers_crypto2_[i]->signal_received_raw().connect(
132 boost::bind(&GazsimCommThread::receive_raw_msg,
this, _1, _2, _3, _4));
133 peers_crypto2_[i]->signal_send_error().connect(boost::bind(
134 &GazsimCommThread::peer_send_error,
this, addresses_[i], send_ports_crypto2_[i], _1));
143 for (
unsigned int i = 0; i < peers_.size(); i++) {
161 GazsimCommThread::receive_raw_msg(boost::asio::ip::udp::endpoint &endpoint,
167 unsigned int incoming_peer_port = endpoint.port();
174 double rnd = ((double)rand()) / ((
double)RAND_MAX);
175 if (rnd < package_loss_) {
180 std::vector<protobuf_comm::ProtobufBroadcastPeer *> peers;
181 std::vector<unsigned int> send_ports;
182 if (std::find(send_ports_.begin(), send_ports_.end(), incoming_peer_port) != send_ports_.end()) {
184 send_ports = send_ports_;
185 }
else if (use_crypto1_
186 && std::find(send_ports_crypto1_.begin(),
187 send_ports_crypto1_.end(),
189 != send_ports_crypto1_.end()) {
190 peers = peers_crypto1_;
191 send_ports = send_ports_crypto1_;
192 }
else if (use_crypto2_
193 && std::find(send_ports_crypto2_.begin(),
194 send_ports_crypto2_.end(),
196 != send_ports_crypto2_.end()) {
197 peers = peers_crypto2_;
198 send_ports = send_ports_crypto2_;
202 for (
unsigned int i = 0; i < peers.size(); i++) {
203 if (send_ports[i] != incoming_peer_port) {
204 peers[i]->send_raw(header, data, length);
210 GazsimCommThread::peer_send_error(std::string address,
unsigned int port, std::string err)
212 logger->
log_warn(
name(),
"Peer send error for %s:%u: %s", address.c_str(), port, err.c_str());