23 #include "protobuf_thread.h"
25 #include "blackboard_manager.h"
27 #include <core/threading/mutex_locker.h>
28 #include <google/protobuf/descriptor.h>
29 #include <protobuf_comm/client.h>
30 #include <protobuf_comm/peer.h>
32 using namespace google::protobuf;
33 using namespace protobuf_comm;
34 using namespace boost::placeholders;
36 namespace protoboard {
38 ProtobufThead::ProtobufThead()
39 :
Thread(
"ProtoboardMessageHandler",
Thread::OPMODE_CONTINUOUS),
40 message_register_(nullptr),
48 delete message_register_;
58 "BUG: %s's reference to blackboard manager thread hasn't been initialized",
name());
65 return !pb_queue_.empty();
89 const std::string &crypto_key,
90 const std::string &cipher)
92 if (recv_on_port <= 0)
93 recv_on_port = send_to_port;
95 if (send_to_port > 0) {
97 address, send_to_port, recv_on_port, message_register_, crypto_key, cipher);
102 peer_id = ++next_client_id_;
103 peers_[peer_id] = peer;
107 boost::bind(&ProtobufThead::handle_peer_msg,
this, peer_id, _1, _2, _3, _4));
109 boost::bind(&ProtobufThead::handle_peer_recv_error,
this, peer_id, _1, _2));
111 boost::bind(&ProtobufThead::handle_peer_send_error,
this, peer_id, _1));
129 const std::string &crypto_key,
130 const std::string &cipher)
164 if (peers_.find(peer_id) != peers_.end()) {
165 delete peers_[peer_id];
166 peers_.erase(peer_id);
176 ProtobufThead::peer_setup_crypto(
long int peer_id,
177 const std::string &crypto_key,
178 const std::string &cipher)
180 if (peers_.find(peer_id) != peers_.end()) {
181 peers_[peer_id]->setup_crypto(crypto_key, cipher);
196 if (peers_.find(peer_id) == peers_.end())
201 peers_[peer_id]->send(m);
202 }
catch (google::protobuf::FatalException &e) {
205 "Failed to broadcast message of type %s: %s",
206 m->GetTypeName().c_str(),
212 "Failed to broadcast message of type %s: %s",
213 m->GetTypeName().c_str(),
216 }
catch (std::runtime_error &e) {
219 "Failed to broadcast message of type %s: %s",
220 m->GetTypeName().c_str(),
233 ProtobufThead::handle_peer_msg(
long int peer_id,
234 boost::asio::ip::udp::endpoint &endpoint,
235 uint16_t component_id,
237 std::shared_ptr<Message> msg)
240 pb_queue_.push({peer_id, endpoint, component_id, msg_type, std::move(msg)});
249 ProtobufThead::handle_peer_recv_error(
long int peer_id,
250 boost::asio::ip::udp::endpoint &endpoint,
255 "Failed to receive peer message from %s:%u: %s",
256 endpoint.address().to_string().c_str(),
266 ProtobufThead::handle_peer_send_error(
long int peer_id, std::string msg)