Fawkes API  Fawkes Development Version
protobuf_thread.cpp
1 
2 /***************************************************************************
3  * Protoboard plugin template
4  * - Implementation of the ProtoBuf thread: protobuf_comm to actually send
5  * messages.
6  *
7  * Copyright 2019 Victor MatarĂ©
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "protobuf_thread.h"
24 
25 #include "blackboard_manager.h"
26 
27 #include <core/threading/mutex_locker.h>
28 #include <google/protobuf/descriptor.h>
29 #include <protobuf_comm/client.h>
30 #include <protobuf_comm/peer.h>
31 
32 using namespace google::protobuf;
33 using namespace protobuf_comm;
34 using namespace boost::placeholders;
35 
36 namespace protoboard {
37 
38 ProtobufThead::ProtobufThead()
39 : Thread("ProtoboardMessageHandler", Thread::OPMODE_CONTINUOUS),
40  message_register_(nullptr),
41  next_client_id_(0),
42  bb_manager_(nullptr)
43 {
44 }
45 
47 {
48  delete message_register_;
49 }
50 
51 void
53 {
54  message_register_ = new MessageRegister(proto_dirs());
55 
56  if (!bb_manager_)
57  throw fawkes::Exception(
58  "BUG: %s's reference to blackboard manager thread hasn't been initialized", name());
59 }
60 
61 bool
63 {
64  fawkes::MutexLocker lock(&msgq_mutex_);
65  return !pb_queue_.empty();
66 }
67 
70 {
71  fawkes::MutexLocker lock(&msgq_mutex_);
72  incoming_message msg = pb_queue_.front();
73  pb_queue_.pop();
74  return msg;
75 }
76 
77 /** Enable protobuf peer.
78  * @param address IP address to send messages to
79  * @param send_to_port UDP port to send messages to
80  * @param recv_on_port UDP port to receive messages on, 0 to use the same as the @p send_port
81  * @param crypto_key encryption key
82  * @param cipher cipher suite, see BufferEncryptor for supported types
83  * @return peer identifier
84  */
85 long int
86 ProtobufThead::peer_create_local_crypto(const std::string &address,
87  int send_to_port,
88  int recv_on_port,
89  const std::string &crypto_key,
90  const std::string &cipher)
91 {
92  if (recv_on_port <= 0)
93  recv_on_port = send_to_port;
94 
95  if (send_to_port > 0) {
97  address, send_to_port, recv_on_port, message_register_, crypto_key, cipher);
98 
99  long int peer_id;
100  {
101  fawkes::MutexLocker lock(&map_mutex_);
102  peer_id = ++next_client_id_;
103  peers_[peer_id] = peer;
104  }
105 
106  peer->signal_received().connect(
107  boost::bind(&ProtobufThead::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
108  peer->signal_recv_error().connect(
109  boost::bind(&ProtobufThead::handle_peer_recv_error, this, peer_id, _1, _2));
110  peer->signal_send_error().connect(
111  boost::bind(&ProtobufThead::handle_peer_send_error, this, peer_id, _1));
112 
113  return peer_id;
114  } else {
115  return 0;
116  }
117 }
118 
119 /** Enable protobuf peer.
120  * @param address IP address to send messages to
121  * @param port UDP port to send and receive messages
122  * @param crypto_key encryption key
123  * @param cipher cipher suite, see BufferEncryptor for supported types
124  * @return peer identifier
125  */
126 long int
127 ProtobufThead::peer_create_crypto(const std::string &address,
128  int port,
129  const std::string &crypto_key,
130  const std::string &cipher)
131 {
132  return peer_create_local_crypto(address, port, port, crypto_key, cipher);
133 }
134 
135 /** Enable protobuf peer.
136  * @param address IP address to send messages to
137  * @param port UDP port to send and receive messages
138  * @return peer identifier
139  */
140 long int
141 ProtobufThead::peer_create(const std::string &address, int port)
142 {
143  return peer_create_local_crypto(address, port, port);
144 }
145 
146 /** Enable protobuf peer.
147  * @param address IP address to send messages to
148  * @param send_to_port UDP port to send messages to
149  * @param recv_on_port UDP port to receive messages on, 0 to use the same as the @p send_port
150  * @return peer identifier
151  */
152 long int
153 ProtobufThead::peer_create_local(const std::string &address, int send_to_port, int recv_on_port)
154 {
155  return peer_create_local_crypto(address, send_to_port, recv_on_port);
156 }
157 
158 /** Disable peer.
159  * @param peer_id ID of the peer to destroy
160  */
161 void
163 {
164  if (peers_.find(peer_id) != peers_.end()) {
165  delete peers_[peer_id];
166  peers_.erase(peer_id);
167  }
168 }
169 
170 /** Setup crypto for peer.
171  * @param peer_id ID of the peer to destroy
172  * @param crypto_key encryption key
173  * @param cipher cipher suite, see BufferEncryptor for supported types
174  */
175 void
176 ProtobufThead::peer_setup_crypto(long int peer_id,
177  const std::string &crypto_key,
178  const std::string &cipher)
179 {
180  if (peers_.find(peer_id) != peers_.end()) {
181  peers_[peer_id]->setup_crypto(crypto_key, cipher);
182  }
183 }
184 
185 void
186 ProtobufThead::send(long int peer_id, std::shared_ptr<google::protobuf::Message> m)
187 {
188  if (!m) {
189  if (logger) {
190  logger->log_warn(name(), "Cannot send broadcast: invalid message");
191  }
192  return;
193  }
194 
195  fawkes::MutexLocker lock(&map_mutex_);
196  if (peers_.find(peer_id) == peers_.end())
197  return;
198 
199  //logger->log_info(name(), "Broadcasting %s", (*m)->GetTypeName().c_str());
200  try {
201  peers_[peer_id]->send(m);
202  } catch (google::protobuf::FatalException &e) {
203  if (logger) {
204  logger->log_warn(name(),
205  "Failed to broadcast message of type %s: %s",
206  m->GetTypeName().c_str(),
207  e.what());
208  }
209  } catch (fawkes::Exception &e) {
210  if (logger) {
211  logger->log_warn(name(),
212  "Failed to broadcast message of type %s: %s",
213  m->GetTypeName().c_str(),
214  e.what_no_backtrace());
215  }
216  } catch (std::runtime_error &e) {
217  if (logger) {
218  logger->log_warn(name(),
219  "Failed to broadcast message of type %s: %s",
220  m->GetTypeName().c_str(),
221  e.what());
222  }
223  }
224 }
225 
226 /** Handle message that came from a peer/robot
227  * @param endpoint the endpoint from which the message was received
228  * @param component_id component the message was addressed to
229  * @param msg_type type of the message
230  * @param msg the message
231  */
232 void
233 ProtobufThead::handle_peer_msg(long int peer_id,
234  boost::asio::ip::udp::endpoint &endpoint,
235  uint16_t component_id,
236  uint16_t msg_type,
237  std::shared_ptr<Message> msg)
238 {
239  fawkes::MutexLocker lock(&msgq_mutex_);
240  pb_queue_.push({peer_id, endpoint, component_id, msg_type, std::move(msg)});
241  bb_manager_->wakeup();
242 }
243 
244 /** Handle error during peer message processing.
245  * @param endpoint endpoint of incoming message
246  * @param msg error message
247  */
248 void
249 ProtobufThead::handle_peer_recv_error(long int peer_id,
250  boost::asio::ip::udp::endpoint &endpoint,
251  std::string msg)
252 {
253  if (logger) {
254  logger->log_warn(name(),
255  "Failed to receive peer message from %s:%u: %s",
256  endpoint.address().to_string().c_str(),
257  endpoint.port(),
258  msg.c_str());
259  }
260 }
261 
262 /** Handle error during peer message processing.
263  * @param msg error message
264  */
265 void
266 ProtobufThead::handle_peer_send_error(long int peer_id, std::string msg)
267 {
268  if (logger) {
269  logger->log_warn(name(), "Failed to send peer message: %s", msg.c_str());
270  }
271 }
272 
273 } // namespace protoboard
protoboard::ProtobufThead::pb_queue_incoming
bool pb_queue_incoming()
Definition: protobuf_thread.cpp:62
protoboard::ProtobufThead::peer_create_local
long int peer_create_local(const std::string &host, int send_to_port, int recv_on_port)
Enable protobuf peer.
Definition: protobuf_thread.cpp:153
protobuf_comm::MessageRegister
Register to map msg type numbers to Protobuf messages.
Definition: message_register.h:66
protobuf_comm::ProtobufBroadcastPeer::signal_recv_error
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
Definition: peer.h:164
fawkes::Thread::wakeup
void wakeup()
Wake up thread.
Definition: thread.cpp:995
protoboard::ProtobufThead::~ProtobufThead
virtual ~ProtobufThead() override
Destructor.
Definition: protobuf_thread.cpp:46
protoboard::ProtobufThead::peer_create
long int peer_create(const std::string &host, int port)
Enable protobuf peer.
Definition: protobuf_thread.cpp:141
fawkes::MutexLocker
Mutex locking helper.
Definition: mutex_locker.h:34
protoboard::ProtobufThead::peer_create_local_crypto
long int peer_create_local_crypto(const std::string &host, int send_to_port, int recv_on_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Definition: protobuf_thread.cpp:86
fawkes::Thread::name
const char * name() const
Get name of thread.
Definition: thread.h:100
protobuf_comm::ProtobufBroadcastPeer::signal_received
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
Definition: peer.h:144
fawkes::LoggingAspect::logger
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
protobuf_comm::ProtobufBroadcastPeer
Communicate by broadcasting protobuf messages.
Definition: peer.h:57
protoboard::ProtobufThead::incoming_message
Wrapper for a ProtoBuf message and its metadata.
Definition: protobuf_thread.h:67
protoboard::ProtobufThead::peer_destroy
void peer_destroy(long int peer_id)
Disable peer.
Definition: protobuf_thread.cpp:162
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
protoboard::ProtobufThead::init
virtual void init() override
Initialize the thread.
Definition: protobuf_thread.cpp:52
fawkes::Exception::what_no_backtrace
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
protobuf_comm::ProtobufBroadcastPeer::signal_send_error
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
Definition: peer.h:173
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
protoboard::ProtobufThead::pb_queue_pop
incoming_message pb_queue_pop()
Definition: protobuf_thread.cpp:69
protoboard::ProtobufThead::peer_create_crypto
long int peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Definition: protobuf_thread.cpp:127
protoboard::ProtobufThead::send
void send(long int peer_id, std::shared_ptr< google::protobuf::Message > msg)
Send a ProtoBuf message to the given peer.
Definition: protobuf_thread.cpp:186
fawkes::Exception
Base class for exceptions in Fawkes.
Definition: exception.h:36