Fawkes API  Fawkes Development Version
peer.cpp
1 /***************************************************************************
2  * peer.cpp - Protobuf stream protocol - broadcast peer
3  *
4  * Created: Mon Feb 04 17:19:17 2013
5  * Copyright 2013 Tim Niemueller [www.niemueller.de]
6  ****************************************************************************/
7 
8 /* Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * - Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  * - Redistributions in binary form must reproduce the above copyright
15  * notice, this list of conditions and the following disclaimer in
16  * the documentation and/or other materials provided with the
17  * distribution.
18  * - Neither the name of the authors nor the names of its contributors
19  * may be used to endorse or promote products derived from this
20  * software without specific prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
27  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
28  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
29  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
31  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
33  * OF THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <protobuf_comm/crypto.h>
37 #include <protobuf_comm/peer.h>
38 
39 #include <boost/lexical_cast.hpp>
40 #include <ifaddrs.h>
41 
42 using namespace boost::asio;
43 using namespace boost::system;
44 using namespace boost::placeholders;
45 
46 namespace protobuf_comm {
47 
48 /** @class ProtobufBroadcastPeer <protobuf_comm/peer.h>
49  * Communicate by broadcasting protobuf messages.
50  * This class allows to communicate via UDP by broadcasting messages to the
51  * network.
52  * @author Tim Niemueller
53  */
54 
55 /** Constructor.
56  * @param address IPv4 broadcast address to send to
57  * @param port IPv4 UDP port to listen on and to send to
58  */
59 ProtobufBroadcastPeer::ProtobufBroadcastPeer(const std::string address, unsigned short port)
60 : io_service_(),
61  resolver_(io_service_),
62  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
63  resolve_retry_timer_(io_service_)
64 {
65  message_register_ = new MessageRegister();
66  own_message_register_ = true;
67  ctor(address, port);
68 }
69 
70 /** Testing constructor.
71  * This constructor listens and sends to different ports. It can be used to
72  * send and receive on the same host or even from within the same process.
73  * It is most useful for communication tests.
74  * @param address IPv4 address to send to
75  * @param send_to_port IPv4 UDP port to send data to
76  * @param recv_on_port IPv4 UDP port to receive data on
77  */
79  unsigned short send_to_port,
80  unsigned short recv_on_port)
81 : io_service_(),
82  resolver_(io_service_),
83  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
84  resolve_retry_timer_(io_service_)
85 {
86  message_register_ = new MessageRegister();
87  own_message_register_ = true;
88  ctor(address, send_to_port);
89 }
90 
91 /** Constructor.
92  * @param address IPv4 broadcast address to send to
93  * @param port IPv4 UDP port to listen on and to send to
94  * @param proto_path list of file system paths where to look for proto files
95  */
97  unsigned short port,
98  std::vector<std::string> &proto_path)
99 : io_service_(),
100  resolver_(io_service_),
101  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
102  resolve_retry_timer_(io_service_)
103 {
104  message_register_ = new MessageRegister(proto_path);
105  own_message_register_ = true;
106  ctor(address, port);
107 }
108 
109 /** Testing constructor.
110  * This constructor listens and sends to different ports. It can be used to
111  * send and receive on the same host or even from within the same process.
112  * It is most useful for communication tests.
113  * @param address IPv4 address to send to
114  * @param send_to_port IPv4 UDP port to send data to
115  * @param recv_on_port IPv4 UDP port to receive data on
116  * @param proto_path list of file system paths where to look for proto files
117  */
119  unsigned short send_to_port,
120  unsigned short recv_on_port,
121  std::vector<std::string> &proto_path)
122 : io_service_(),
123  resolver_(io_service_),
124  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
125  resolve_retry_timer_(io_service_)
126 {
127  message_register_ = new MessageRegister(proto_path);
128  own_message_register_ = true;
129  ctor(address, send_to_port);
130 }
131 
132 /** Constructor.
133  * @param address IPv4 broadcast address to send to
134  * @param port IPv4 UDP port to listen on and to send to
135  * @param mr message register to query for message types
136  */
138  unsigned short port,
139  MessageRegister * mr)
140 : io_service_(),
141  resolver_(io_service_),
142  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
143  resolve_retry_timer_(io_service_),
144  message_register_(mr),
145  own_message_register_(false)
146 {
147  ctor(address, port);
148 }
149 
150 /** Constructor with encryption.
151  * @param address IPv4 broadcast address to send to
152  * @param send_to_port IPv4 UDP port to send data to
153  * @param recv_on_port IPv4 UDP port to receive data on
154  * @param crypto_key encryption key for messages
155  * @param cipher cipher to use for encryption
156  */
158  unsigned short send_to_port,
159  unsigned short recv_on_port,
160  const std::string crypto_key,
161  const std::string cipher)
162 : io_service_(),
163  resolver_(io_service_),
164  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
165  resolve_retry_timer_(io_service_)
166 {
167  ctor(address, send_to_port, crypto_key, cipher);
168  message_register_ = new MessageRegister();
169  own_message_register_ = true;
170 }
171 
172 /** Constructor with encryption.
173  * @param address IPv4 broadcast address to send to
174  * @param send_to_port IPv4 UDP port to send data to
175  * @param recv_on_port IPv4 UDP port to receive data on
176  * @param mr message register to query for message types
177  * @param crypto_key encryption key for messages
178  * @param cipher cipher to use for encryption
179  */
181  unsigned short send_to_port,
182  unsigned short recv_on_port,
183  MessageRegister * mr,
184  const std::string crypto_key,
185  const std::string cipher)
186 : io_service_(),
187  resolver_(io_service_),
188  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
189  resolve_retry_timer_(io_service_),
190  message_register_(mr),
191  own_message_register_(false)
192 {
193  ctor(address, send_to_port, crypto_key, cipher);
194 }
195 
196 /** Constructor with encryption.
197  * @param address IPv4 broadcast address to send to
198  * @param port IPv4 UDP port to listen on and to send to
199  * @param crypto_key encryption key for messages
200  * @param cipher cipher to use for encryption
201  */
203  unsigned short port,
204  const std::string crypto_key,
205  const std::string cipher)
206 : io_service_(),
207  resolver_(io_service_),
208  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
209  resolve_retry_timer_(io_service_)
210 {
211  ctor(address, port, crypto_key, cipher);
212  message_register_ = new MessageRegister();
213  own_message_register_ = true;
214 }
215 
216 /** Constructor with encryption.
217  * @param address IPv4 broadcast address to send to
218  * @param port IPv4 UDP port to listen on and to send to
219  * @param mr message register to query for message types
220  * @param crypto_key encryption key for messages
221  * @param cipher cipher to use for encryption
222  */
224  unsigned short port,
225  MessageRegister * mr,
226  const std::string crypto_key,
227  const std::string cipher)
228 : io_service_(),
229  resolver_(io_service_),
230  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
231  resolve_retry_timer_(io_service_),
232  message_register_(mr),
233  own_message_register_(false)
234 {
235  ctor(address, port, crypto_key, cipher);
236 }
237 
238 /** Testing constructor.
239  * This constructor listens and sends to different ports. It can be used to
240  * send and receive on the same host or even from within the same process.
241  * It is most useful for communication tests.
242  * @param address IPv4 address to send to
243  * @param send_to_port IPv4 UDP port to send data to
244  * @param recv_on_port IPv4 UDP port to receive data on
245  * @param mr message register to query for message types
246  * @param header_version which frame header version to send, use with caution
247  */
249  unsigned short send_to_port,
250  unsigned short recv_on_port,
251  MessageRegister * mr,
252  frame_header_version_t header_version)
253 : io_service_(),
254  resolver_(io_service_),
255  socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
256  resolve_retry_timer_(io_service_),
257  message_register_(mr),
258  own_message_register_(false)
259 {
260  ctor(address, send_to_port, "", "", header_version);
261 }
262 
263 /** Constructor helper.
264  * @param address hostname/address to send to
265  * @param send_to_port UDP port to send messages to
266  * @param crypto_key encryption key for messages
267  * @param cipher cipher to use for encryption
268  * @þaram header_version which frame header version to send, use with caution
269  */
270 void
271 ProtobufBroadcastPeer::ctor(const std::string & address,
272  unsigned int send_to_port,
273  const std::string crypto_key,
274  const std::string cipher,
275  frame_header_version_t header_version)
276 {
277  filter_self_ = true;
278  crypto_ = false;
279  crypto_enc_ = NULL;
280  crypto_dec_ = NULL;
281  frame_header_version_ = header_version;
282 
283  send_to_address_ = address;
284  send_to_port_ = send_to_port;
285 
286  in_data_size_ = max_packet_length;
287  in_data_ = malloc(in_data_size_);
288  enc_in_data_ = NULL;
289 
290  socket_.set_option(socket_base::broadcast(true));
291  socket_.set_option(socket_base::reuse_address(true));
292  determine_local_endpoints();
293 
294  outbound_ready_ = outbound_active_ = false;
295  start_resolve();
296 
297  if (!crypto_key.empty())
298  setup_crypto(crypto_key, cipher);
299 
300  start_recv();
301  asio_thread_ = std::thread(&ProtobufBroadcastPeer::run_asio, this);
302 }
303 
304 /** Destructor. */
306 {
307  resolve_retry_timer_.cancel();
308  if (asio_thread_.joinable()) {
309  io_service_.stop();
310  asio_thread_.join();
311  }
312  free(in_data_);
313  if (enc_in_data_)
314  free(enc_in_data_);
315  if (own_message_register_) {
316  delete message_register_;
317  }
318 
319  delete crypto_enc_;
320  delete crypto_dec_;
321 }
322 
323 /** Setup encryption.
324  * After this call communication will be encrypted. Note that the first
325  * received message might be considered invalid because we are still
326  * listening for plain text messages. To avoid this use the constructor
327  * which takes the encryption key as parameter.
328  * @param key encryption key
329  * @param cipher cipher to use for encryption
330  * @see BufferEncryptor for supported ciphers
331  */
332 void
333 ProtobufBroadcastPeer::setup_crypto(const std::string &key, const std::string &cipher)
334 {
335  if (frame_header_version_ == PB_FRAME_V1) {
336  throw std::runtime_error("Crypto support only available with V2+ frame header");
337  }
338 
339  delete crypto_enc_;
340  delete crypto_dec_;
341  crypto_enc_ = NULL;
342  crypto_dec_ = NULL;
343  crypto_ = false;
344  crypto_buf_ = false;
345 
346  if (key != "" && cipher != "") {
347  crypto_enc_ = new BufferEncryptor(key, cipher);
348 
349  if (!enc_in_data_) {
350  // this depends on the cipher, but nothing is two times the incoming buffer...
351  enc_in_data_size_ = 2 * in_data_size_;
352  enc_in_data_ = malloc(enc_in_data_size_);
353  }
354 
355  crypto_dec_ = new BufferDecryptor(key);
356  crypto_ = true;
357  crypto_buf_ = false;
358  }
359 }
360 
361 void
362 ProtobufBroadcastPeer::determine_local_endpoints()
363 {
364  struct ifaddrs *ifap;
365  if (getifaddrs(&ifap) == 0) {
366  for (struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next) {
367  if (iter->ifa_addr == NULL)
368  continue;
369  if (iter->ifa_addr->sa_family == AF_INET) {
370  boost::asio::ip::address_v4 addr(
371  ntohl(reinterpret_cast<sockaddr_in *>(iter->ifa_addr)->sin_addr.s_addr));
372 
373  local_endpoints_.push_back(
374  boost::asio::ip::udp::endpoint(addr, socket_.local_endpoint().port()));
375  }
376  }
377  freeifaddrs(ifap);
378  }
379  local_endpoints_.sort();
380 }
381 
382 /** Set if to filter out own messages.
383  * @param filter true to filter out own messages, false to receive them
384  */
385 void
387 {
388  filter_self_ = filter;
389 }
390 
391 /** ASIO thread runnable. */
392 void
393 ProtobufBroadcastPeer::run_asio()
394 {
395 #if BOOST_ASIO_VERSION > 100409
396  while (!io_service_.stopped()) {
397 #endif
398  usleep(0);
399  io_service_.reset();
400  io_service_.run();
401 #if BOOST_ASIO_VERSION > 100409
402  }
403 #endif
404 }
405 
406 void
407 ProtobufBroadcastPeer::handle_resolve(const boost::system::error_code &err,
408  ip::udp::resolver::iterator endpoint_iterator)
409 {
410  if (!err) {
411  std::lock_guard<std::mutex> lock(outbound_mutex_);
412  outbound_ready_ = true;
413  outbound_endpoint_ = endpoint_iterator->endpoint();
414  } else {
415  sig_send_error_("Resolving endpoint failed, retrying");
416  resolve_retry_timer_.expires_from_now(boost::posix_time::seconds(2));
417  resolve_retry_timer_.async_wait(boost::bind(&ProtobufBroadcastPeer::retry_resolve, this, _1));
418  }
419  start_send();
420 }
421 
422 void
423 ProtobufBroadcastPeer::retry_resolve(const boost::system::error_code &ec)
424 {
425  if (!ec)
426  start_resolve();
427 }
428 
429 void
430 ProtobufBroadcastPeer::start_resolve()
431 {
432  ip::udp::resolver::query query(send_to_address_, boost::lexical_cast<std::string>(send_to_port_));
433  resolver_.async_resolve(query,
434  boost::bind(&ProtobufBroadcastPeer::handle_resolve,
435  this,
436  boost::asio::placeholders::error,
437  boost::asio::placeholders::iterator));
438 }
439 
440 void
441 ProtobufBroadcastPeer::handle_recv(const boost::system::error_code &error, size_t bytes_rcvd)
442 {
443  const size_t expected_min_size = (frame_header_version_ == PB_FRAME_V1)
444  ? sizeof(frame_header_v1_t)
445  : (sizeof(frame_header_t) + sizeof(message_header_t));
446 
447  if (!error && bytes_rcvd >= expected_min_size) {
448  frame_header_t frame_header;
449  size_t header_size;
450  if (frame_header_version_ == PB_FRAME_V1) {
451  frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
452  frame_header.header_version = PB_FRAME_V1;
453  frame_header.cipher = PB_ENCRYPTION_NONE;
454  frame_header.payload_size = frame_header_v1->payload_size;
455  header_size = sizeof(frame_header_v1_t);
456  } else {
457  memcpy(&frame_header, crypto_buf_ ? enc_in_data_ : in_data_, sizeof(frame_header_t));
458  header_size = sizeof(frame_header_t);
459 
460  if (crypto_buf_) {
461  sig_rcvd_raw_(in_endpoint_,
462  frame_header,
463  (unsigned char *)enc_in_data_ + sizeof(frame_header_t),
464  bytes_rcvd - sizeof(frame_header_t));
465  } else {
466  sig_rcvd_raw_(in_endpoint_,
467  frame_header,
468  (unsigned char *)in_data_ + sizeof(frame_header_t),
469  bytes_rcvd - sizeof(frame_header_t));
470  }
471 
472  if (sig_rcvd_.num_slots() > 0) {
473  if (!crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
474  sig_recv_error_(in_endpoint_, "Received encrypted message but encryption is disabled");
475  } else if (crypto_buf_ && (frame_header.cipher == PB_ENCRYPTION_NONE)) {
476  sig_recv_error_(in_endpoint_, "Received plain text message but encryption is enabled");
477  } else {
478  if (crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
479  // we need to decrypt first
480  try {
481  memcpy(in_data_, enc_in_data_, sizeof(frame_header_t));
482  size_t to_decrypt = bytes_rcvd - sizeof(frame_header_t);
483  bytes_rcvd =
484  crypto_dec_->decrypt(frame_header.cipher,
485  (unsigned char *)enc_in_data_ + sizeof(frame_header_t),
486  to_decrypt,
487  (unsigned char *)in_data_ + sizeof(frame_header_t),
488  in_data_size_);
489  frame_header.payload_size = htonl(bytes_rcvd);
490  bytes_rcvd += sizeof(frame_header_t);
491  } catch (std::runtime_error &e) {
492  sig_recv_error_(in_endpoint_, std::string("Decryption fail: ") + e.what());
493  bytes_rcvd = 0;
494  }
495  }
496  }
497  } // else nobody cares about deserialized message
498  }
499 
500  size_t payload_size = ntohl(frame_header.payload_size);
501 
502  if (sig_rcvd_.num_slots() > 0) {
503  if (bytes_rcvd == (header_size + payload_size)) {
504  if (!filter_self_
505  || !std::binary_search(local_endpoints_.begin(),
506  local_endpoints_.end(),
507  in_endpoint_)) {
508  void * data;
509  message_header_t message_header;
510 
511  if (frame_header_version_ == PB_FRAME_V1) {
512  frame_header_v1_t *frame_header_v1 = static_cast<frame_header_v1_t *>(in_data_);
513  message_header.component_id = frame_header_v1->component_id;
514  message_header.msg_type = frame_header_v1->msg_type;
515  data = (char *)in_data_ + sizeof(frame_header_v1_t);
516  // message register expects payload size to include message header
517  frame_header.payload_size =
518  htonl(ntohl(frame_header.payload_size) + sizeof(message_header_t));
519  } else {
520  message_header_t *msg_header =
521  static_cast<message_header_t *>((void *)((char *)in_data_ + sizeof(frame_header_t)));
522  message_header.component_id = msg_header->component_id;
523  message_header.msg_type = msg_header->msg_type;
524  data = (char *)in_data_ + sizeof(frame_header_t) + sizeof(message_header_t);
525  }
526 
527  uint16_t comp_id = ntohs(message_header.component_id);
528  uint16_t msg_type = ntohs(message_header.msg_type);
529 
530  try {
531  std::shared_ptr<google::protobuf::Message> m =
532  message_register_->deserialize(frame_header, message_header, data);
533 
534  sig_rcvd_(in_endpoint_, comp_id, msg_type, m);
535  } catch (std::runtime_error &e) {
536  sig_recv_error_(in_endpoint_, std::string("Deserialization fail: ") + e.what());
537  }
538  }
539  } else {
540  sig_recv_error_(in_endpoint_, "Invalid number of bytes received");
541  }
542  } // else nobody cares (no one registered to signal)
543 
544  } else {
545  sig_recv_error_(in_endpoint_, "General receiving error or truncated message");
546  }
547 
548  start_recv();
549 }
550 
551 void
552 ProtobufBroadcastPeer::handle_sent(const boost::system::error_code &error,
553  size_t bytes_transferred,
554  QueueEntry * entry)
555 {
556  delete entry;
557 
558  {
559  std::lock_guard<std::mutex> lock(outbound_mutex_);
560  outbound_active_ = false;
561  }
562 
563  if (error) {
564  sig_send_error_("Sending message failed");
565  }
566 
567  start_send();
568 }
569 
570 /** Send a message to other peers.
571  * @param component_id ID of the component to address
572  * @param msg_type numeric message type
573  * @param m message to send
574  */
575 void
576 ProtobufBroadcastPeer::send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
577 {
578  QueueEntry *entry = new QueueEntry();
579  message_register_->serialize(component_id,
580  msg_type,
581  m,
582  entry->frame_header,
583  entry->message_header,
584  entry->serialized_message);
585 
586  if (entry->serialized_message.size() > max_packet_length) {
587  throw std::runtime_error("Serialized message too big");
588  }
589 
590  if (frame_header_version_ == PB_FRAME_V1) {
594 
595  entry->buffers[0] = boost::asio::buffer(&entry->frame_header_v1, sizeof(frame_header_v1_t));
596  entry->buffers[1] = boost::asio::const_buffer();
597  } else {
598  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
599  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
600  }
601  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
602 
603  {
604  std::lock_guard<std::mutex> lock(outbound_mutex_);
605  outbound_queue_.push(entry);
606  }
607  start_send();
608 }
609 
610 /** Send a raw message.
611  * The message is sent as-is (frame_header appended by message data) over the wire.
612  * @param frame_header frame header to prepend, must be completely and properly
613  * setup.
614  * @param data data buffer, maybe encrypted (if indicated in frame header)
615  * @param data_size size in bytes of @p data
616  */
617 void
619  const void * data,
620  size_t data_size)
621 {
622  QueueEntry *entry = new QueueEntry();
623  entry->frame_header = frame_header;
624  entry->serialized_message = std::string(reinterpret_cast<const char *>(data), data_size);
625 
626  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
627  entry->buffers[1] = boost::asio::const_buffer();
628  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
629 
630  {
631  std::lock_guard<std::mutex> lock(outbound_mutex_);
632  outbound_queue_.push(entry);
633  }
634  start_send();
635 }
636 
637 /** Send a message to other peers.
638  * @param component_id ID of the component to address
639  * @param msg_type numeric message type
640  * @param m message to send
641  */
642 void
643 ProtobufBroadcastPeer::send(uint16_t component_id,
644  uint16_t msg_type,
645  std::shared_ptr<google::protobuf::Message> m)
646 {
647  send(component_id, msg_type, *m);
648 }
649 
650 /** Send a message to other peers.
651  * @param m Message to send, the message must have an CompType enum type to
652  * specify component ID and message type.
653  */
654 void
655 ProtobufBroadcastPeer::send(std::shared_ptr<google::protobuf::Message> m)
656 {
657  send(*m);
658 }
659 
660 /** Send a message to other peers.
661  * @param m Message to send, the message must have an CompType enum type to
662  * specify component ID and message type.
663  */
664 void
665 ProtobufBroadcastPeer::send(google::protobuf::Message &m)
666 {
667  const google::protobuf::Descriptor * desc = m.GetDescriptor();
668  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
669  if (!enumdesc) {
670  throw std::logic_error("Message does not have CompType enum");
671  }
672  const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName("COMP_ID");
673  const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName("MSG_TYPE");
674  if (!compdesc || !msgtdesc) {
675  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
676  }
677  int comp_id = compdesc->number();
678  int msg_type = msgtdesc->number();
679  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
680  throw std::logic_error("Message has invalid COMP_ID");
681  }
682  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
683  throw std::logic_error("Message has invalid MSG_TYPE");
684  }
685 
686  send(comp_id, msg_type, m);
687 }
688 
689 void
690 ProtobufBroadcastPeer::start_recv()
691 {
692  crypto_buf_ = crypto_;
693  socket_.async_receive_from(boost::asio::buffer(crypto_ ? enc_in_data_ : in_data_, in_data_size_),
694  in_endpoint_,
695  boost::bind(&ProtobufBroadcastPeer::handle_recv,
696  this,
697  boost::asio::placeholders::error,
698  boost::asio::placeholders::bytes_transferred));
699 }
700 
701 void
702 ProtobufBroadcastPeer::start_send()
703 {
704  std::lock_guard<std::mutex> lock(outbound_mutex_);
705  if (outbound_queue_.empty() || outbound_active_ || !outbound_ready_)
706  return;
707 
708  outbound_active_ = true;
709 
710  QueueEntry *entry = outbound_queue_.front();
711  outbound_queue_.pop();
712 
713  if (crypto_) {
714  size_t plain_size =
715  boost::asio::buffer_size(entry->buffers[1]) + boost::asio::buffer_size(entry->buffers[2]);
716  size_t enc_size = crypto_enc_->encrypted_buffer_size(plain_size);
717 
718  std::string plain_buf = std::string(plain_size, '\0');
719 
720  plain_buf.replace(0,
721  boost::asio::buffer_size(entry->buffers[1]),
722  boost::asio::buffer_cast<const char *>(entry->buffers[1]),
723  boost::asio::buffer_size(entry->buffers[1]));
724 
725  plain_buf.replace(boost::asio::buffer_size(entry->buffers[1]),
726  boost::asio::buffer_size(entry->buffers[2]),
727  boost::asio::buffer_cast<const char *>(entry->buffers[2]),
728  boost::asio::buffer_size(entry->buffers[2]));
729 
730  entry->encrypted_message.resize(enc_size);
731  crypto_enc_->encrypt(plain_buf, entry->encrypted_message);
732 
733  entry->frame_header.payload_size = htonl(entry->encrypted_message.size());
734  entry->frame_header.cipher = crypto_enc_->cipher_id();
735  entry->buffers[1] = boost::asio::buffer(entry->encrypted_message);
736  entry->buffers[2] = boost::asio::const_buffer();
737  }
738 
739  socket_.async_send_to(entry->buffers,
740  outbound_endpoint_,
741  boost::bind(&ProtobufBroadcastPeer::handle_sent,
742  this,
743  boost::asio::placeholders::error,
744  boost::asio::placeholders::bytes_transferred,
745  entry));
746 }
747 
748 } // end namespace protobuf_comm
protobuf_comm::MessageRegister::serialize
void serialize(uint16_t component_id, uint16_t msg_type, const google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
Definition: message_register.cpp:271
protobuf_comm::QueueEntry::frame_header_v1
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:57
protobuf_comm::frame_header_v1_t::payload_size
uint32_t payload_size
payload size in bytes
Definition: frame_header.h:122
protobuf_comm::QueueEntry::message_header
message_header_t message_header
Frame header (network byte order)
Definition: queue_entry.h:58
protobuf_comm::ProtobufBroadcastPeer::~ProtobufBroadcastPeer
~ProtobufBroadcastPeer()
Destructor.
Definition: peer.cpp:305
protobuf_comm::MessageRegister
Register to map msg type numbers to Protobuf messages.
Definition: message_register.h:66
protobuf_comm::frame_header_v1_t::msg_type
uint16_t msg_type
message type
Definition: frame_header.h:120
protobuf_comm::BufferEncryptor::encrypted_buffer_size
size_t encrypted_buffer_size(size_t plain_length)
Get required size for an encrypted buffer of the given plain text length.
Definition: crypto.cpp:148
protobuf_comm::QueueEntry
Outgoing queue entry.
Definition: queue_entry.h:47
protobuf_comm::ProtobufBroadcastPeer::max_packet_length
@ max_packet_length
maximum packet length in bytes
Definition: peer.h:60
protobuf_comm::ProtobufBroadcastPeer::ProtobufBroadcastPeer
ProtobufBroadcastPeer(const std::string address, unsigned short port)
Constructor.
Definition: peer.cpp:59
protobuf_comm::QueueEntry::buffers
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
Definition: queue_entry.h:59
protobuf_comm::message_header_t
Network message header.
Definition: frame_header.h:98
protobuf_comm::ProtobufBroadcastPeer::send_raw
void send_raw(const frame_header_t &frame_header, const void *data, size_t data_size)
Send a raw message.
Definition: peer.cpp:618
protobuf_comm::frame_header_t
Network framing header.
Definition: frame_header.h:72
protobuf_comm::MessageRegister::deserialize
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
Definition: message_register.cpp:311
protobuf_comm::ProtobufBroadcastPeer::set_filter_self
void set_filter_self(bool filter)
Set if to filter out own messages.
Definition: peer.cpp:386
protobuf_comm::message_header_t::component_id
uint16_t component_id
component id
Definition: frame_header.h:100
protobuf_comm::ProtobufBroadcastPeer::setup_crypto
void setup_crypto(const std::string &key, const std::string &cipher)
Setup encryption.
Definition: peer.cpp:333
protobuf_comm::BufferDecryptor
Decrypt buffers encrypted with BufferEncryptor.
Definition: crypto.h:79
protobuf_comm::message_header_t::msg_type
uint16_t msg_type
message type
Definition: frame_header.h:102
protobuf_comm::frame_header_v1_t
Old network message framing header.
Definition: frame_header.h:116
protobuf_comm::QueueEntry::frame_header
frame_header_t frame_header
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:56
protobuf_comm::BufferDecryptor::decrypt
size_t decrypt(int cipher, const void *enc, size_t enc_size, void *plain, size_t plain_size)
Decrypt a buffer.
Definition: crypto.cpp:221
protobuf_comm::frame_header_t::payload_size
uint32_t payload_size
payload size in bytes includes message and header, not IV
Definition: frame_header.h:84
protobuf_comm::ProtobufBroadcastPeer::send
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to other peers.
Definition: peer.cpp:576
protobuf_comm::BufferEncryptor::encrypt
void encrypt(const std::string &plain, std::string &enc)
Encrypt a buffer.
Definition: crypto.cpp:97
protobuf_comm::QueueEntry::serialized_message
std::string serialized_message
serialized protobuf message
Definition: queue_entry.h:54
protobuf_comm::frame_header_v1_t::component_id
uint16_t component_id
component id
Definition: frame_header.h:118
protobuf_comm::BufferEncryptor::cipher_id
int cipher_id() const
Get cipher ID.
Definition: crypto.h:60
protobuf_comm::BufferEncryptor
Encrypt buffers using AES128 in ECB mode.
Definition: crypto.h:50