36 #include <protobuf_comm/crypto.h>
37 #include <protobuf_comm/peer.h>
39 #include <boost/lexical_cast.hpp>
42 using namespace boost::asio;
43 using namespace boost::system;
44 using namespace boost::placeholders;
46 namespace protobuf_comm {
59 ProtobufBroadcastPeer::ProtobufBroadcastPeer(
const std::string address,
unsigned short port)
61 resolver_(io_service_),
62 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
63 resolve_retry_timer_(io_service_)
66 own_message_register_ =
true;
79 unsigned short send_to_port,
80 unsigned short recv_on_port)
82 resolver_(io_service_),
83 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
84 resolve_retry_timer_(io_service_)
87 own_message_register_ =
true;
88 ctor(address, send_to_port);
98 std::vector<std::string> &proto_path)
100 resolver_(io_service_),
101 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
102 resolve_retry_timer_(io_service_)
105 own_message_register_ =
true;
119 unsigned short send_to_port,
120 unsigned short recv_on_port,
121 std::vector<std::string> &proto_path)
123 resolver_(io_service_),
124 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
125 resolve_retry_timer_(io_service_)
128 own_message_register_ =
true;
129 ctor(address, send_to_port);
141 resolver_(io_service_),
142 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
143 resolve_retry_timer_(io_service_),
144 message_register_(mr),
145 own_message_register_(false)
158 unsigned short send_to_port,
159 unsigned short recv_on_port,
160 const std::string crypto_key,
161 const std::string cipher)
163 resolver_(io_service_),
164 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
165 resolve_retry_timer_(io_service_)
167 ctor(address, send_to_port, crypto_key, cipher);
169 own_message_register_ =
true;
181 unsigned short send_to_port,
182 unsigned short recv_on_port,
184 const std::string crypto_key,
185 const std::string cipher)
187 resolver_(io_service_),
188 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
189 resolve_retry_timer_(io_service_),
190 message_register_(mr),
191 own_message_register_(false)
193 ctor(address, send_to_port, crypto_key, cipher);
204 const std::string crypto_key,
205 const std::string cipher)
207 resolver_(io_service_),
208 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
209 resolve_retry_timer_(io_service_)
211 ctor(address, port, crypto_key, cipher);
213 own_message_register_ =
true;
226 const std::string crypto_key,
227 const std::string cipher)
229 resolver_(io_service_),
230 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), port)),
231 resolve_retry_timer_(io_service_),
232 message_register_(mr),
233 own_message_register_(false)
235 ctor(address, port, crypto_key, cipher);
249 unsigned short send_to_port,
250 unsigned short recv_on_port,
252 frame_header_version_t header_version)
254 resolver_(io_service_),
255 socket_(io_service_, ip::udp::endpoint(ip::udp::v4(), recv_on_port)),
256 resolve_retry_timer_(io_service_),
257 message_register_(mr),
258 own_message_register_(false)
260 ctor(address, send_to_port,
"",
"", header_version);
271 ProtobufBroadcastPeer::ctor(
const std::string & address,
272 unsigned int send_to_port,
273 const std::string crypto_key,
274 const std::string cipher,
275 frame_header_version_t header_version)
281 frame_header_version_ = header_version;
283 send_to_address_ = address;
284 send_to_port_ = send_to_port;
287 in_data_ = malloc(in_data_size_);
290 socket_.set_option(socket_base::broadcast(
true));
291 socket_.set_option(socket_base::reuse_address(
true));
292 determine_local_endpoints();
294 outbound_ready_ = outbound_active_ =
false;
297 if (!crypto_key.empty())
301 asio_thread_ = std::thread(&ProtobufBroadcastPeer::run_asio,
this);
307 resolve_retry_timer_.cancel();
308 if (asio_thread_.joinable()) {
315 if (own_message_register_) {
316 delete message_register_;
335 if (frame_header_version_ == PB_FRAME_V1) {
336 throw std::runtime_error(
"Crypto support only available with V2+ frame header");
346 if (key !=
"" && cipher !=
"") {
351 enc_in_data_size_ = 2 * in_data_size_;
352 enc_in_data_ = malloc(enc_in_data_size_);
362 ProtobufBroadcastPeer::determine_local_endpoints()
364 struct ifaddrs *ifap;
365 if (getifaddrs(&ifap) == 0) {
366 for (
struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next) {
367 if (iter->ifa_addr == NULL)
369 if (iter->ifa_addr->sa_family == AF_INET) {
370 boost::asio::ip::address_v4 addr(
371 ntohl(
reinterpret_cast<sockaddr_in *
>(iter->ifa_addr)->sin_addr.s_addr));
373 local_endpoints_.push_back(
374 boost::asio::ip::udp::endpoint(addr, socket_.local_endpoint().port()));
379 local_endpoints_.sort();
388 filter_self_ = filter;
393 ProtobufBroadcastPeer::run_asio()
395 #if BOOST_ASIO_VERSION > 100409
396 while (!io_service_.stopped()) {
401 #if BOOST_ASIO_VERSION > 100409
407 ProtobufBroadcastPeer::handle_resolve(
const boost::system::error_code &err,
408 ip::udp::resolver::iterator endpoint_iterator)
411 std::lock_guard<std::mutex> lock(outbound_mutex_);
412 outbound_ready_ =
true;
413 outbound_endpoint_ = endpoint_iterator->endpoint();
415 sig_send_error_(
"Resolving endpoint failed, retrying");
416 resolve_retry_timer_.expires_from_now(boost::posix_time::seconds(2));
417 resolve_retry_timer_.async_wait(boost::bind(&ProtobufBroadcastPeer::retry_resolve,
this, _1));
423 ProtobufBroadcastPeer::retry_resolve(
const boost::system::error_code &ec)
430 ProtobufBroadcastPeer::start_resolve()
432 ip::udp::resolver::query query(send_to_address_, boost::lexical_cast<std::string>(send_to_port_));
433 resolver_.async_resolve(query,
434 boost::bind(&ProtobufBroadcastPeer::handle_resolve,
436 boost::asio::placeholders::error,
437 boost::asio::placeholders::iterator));
441 ProtobufBroadcastPeer::handle_recv(
const boost::system::error_code &error,
size_t bytes_rcvd)
443 const size_t expected_min_size = (frame_header_version_ == PB_FRAME_V1)
444 ?
sizeof(frame_header_v1_t)
445 : (
sizeof(frame_header_t) +
sizeof(message_header_t));
447 if (!error && bytes_rcvd >= expected_min_size) {
448 frame_header_t frame_header;
450 if (frame_header_version_ == PB_FRAME_V1) {
451 frame_header_v1_t *frame_header_v1 =
static_cast<frame_header_v1_t *
>(in_data_);
452 frame_header.header_version = PB_FRAME_V1;
453 frame_header.cipher = PB_ENCRYPTION_NONE;
454 frame_header.payload_size = frame_header_v1->payload_size;
455 header_size =
sizeof(frame_header_v1_t);
457 memcpy(&frame_header, crypto_buf_ ? enc_in_data_ : in_data_,
sizeof(frame_header_t));
458 header_size =
sizeof(frame_header_t);
461 sig_rcvd_raw_(in_endpoint_,
463 (
unsigned char *)enc_in_data_ +
sizeof(frame_header_t),
464 bytes_rcvd -
sizeof(frame_header_t));
466 sig_rcvd_raw_(in_endpoint_,
468 (
unsigned char *)in_data_ +
sizeof(frame_header_t),
469 bytes_rcvd -
sizeof(frame_header_t));
472 if (sig_rcvd_.num_slots() > 0) {
473 if (!crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
474 sig_recv_error_(in_endpoint_,
"Received encrypted message but encryption is disabled");
475 }
else if (crypto_buf_ && (frame_header.cipher == PB_ENCRYPTION_NONE)) {
476 sig_recv_error_(in_endpoint_,
"Received plain text message but encryption is enabled");
478 if (crypto_buf_ && (frame_header.cipher != PB_ENCRYPTION_NONE)) {
481 memcpy(in_data_, enc_in_data_,
sizeof(frame_header_t));
482 size_t to_decrypt = bytes_rcvd -
sizeof(frame_header_t);
484 crypto_dec_->
decrypt(frame_header.cipher,
485 (
unsigned char *)enc_in_data_ +
sizeof(frame_header_t),
487 (
unsigned char *)in_data_ +
sizeof(frame_header_t),
489 frame_header.payload_size = htonl(bytes_rcvd);
490 bytes_rcvd +=
sizeof(frame_header_t);
491 }
catch (std::runtime_error &e) {
492 sig_recv_error_(in_endpoint_, std::string(
"Decryption fail: ") + e.what());
500 size_t payload_size = ntohl(frame_header.payload_size);
502 if (sig_rcvd_.num_slots() > 0) {
503 if (bytes_rcvd == (header_size + payload_size)) {
505 || !std::binary_search(local_endpoints_.begin(),
506 local_endpoints_.end(),
509 message_header_t message_header;
511 if (frame_header_version_ == PB_FRAME_V1) {
512 frame_header_v1_t *frame_header_v1 =
static_cast<frame_header_v1_t *
>(in_data_);
513 message_header.component_id = frame_header_v1->component_id;
514 message_header.msg_type = frame_header_v1->msg_type;
515 data = (
char *)in_data_ +
sizeof(frame_header_v1_t);
517 frame_header.payload_size =
518 htonl(ntohl(frame_header.payload_size) +
sizeof(message_header_t));
520 message_header_t *msg_header =
521 static_cast<message_header_t *
>((
void *)((
char *)in_data_ +
sizeof(frame_header_t)));
522 message_header.component_id = msg_header->component_id;
523 message_header.msg_type = msg_header->msg_type;
524 data = (
char *)in_data_ +
sizeof(frame_header_t) +
sizeof(message_header_t);
527 uint16_t comp_id = ntohs(message_header.component_id);
528 uint16_t msg_type = ntohs(message_header.msg_type);
531 std::shared_ptr<google::protobuf::Message> m =
532 message_register_->
deserialize(frame_header, message_header, data);
534 sig_rcvd_(in_endpoint_, comp_id, msg_type, m);
535 }
catch (std::runtime_error &e) {
536 sig_recv_error_(in_endpoint_, std::string(
"Deserialization fail: ") + e.what());
540 sig_recv_error_(in_endpoint_,
"Invalid number of bytes received");
545 sig_recv_error_(in_endpoint_,
"General receiving error or truncated message");
552 ProtobufBroadcastPeer::handle_sent(
const boost::system::error_code &error,
553 size_t bytes_transferred,
559 std::lock_guard<std::mutex> lock(outbound_mutex_);
560 outbound_active_ =
false;
564 sig_send_error_(
"Sending message failed");
579 message_register_->
serialize(component_id,
587 throw std::runtime_error(
"Serialized message too big");
590 if (frame_header_version_ == PB_FRAME_V1) {
596 entry->
buffers[1] = boost::asio::const_buffer();
604 std::lock_guard<std::mutex> lock(outbound_mutex_);
605 outbound_queue_.push(entry);
624 entry->
serialized_message = std::string(
reinterpret_cast<const char *
>(data), data_size);
627 entry->
buffers[1] = boost::asio::const_buffer();
631 std::lock_guard<std::mutex> lock(outbound_mutex_);
632 outbound_queue_.push(entry);
645 std::shared_ptr<google::protobuf::Message> m)
647 send(component_id, msg_type, *m);
667 const google::protobuf::Descriptor * desc = m.GetDescriptor();
668 const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName(
"CompType");
670 throw std::logic_error(
"Message does not have CompType enum");
672 const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName(
"COMP_ID");
673 const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName(
"MSG_TYPE");
674 if (!compdesc || !msgtdesc) {
675 throw std::logic_error(
"Message CompType enum hs no COMP_ID or MSG_TYPE value");
677 int comp_id = compdesc->number();
678 int msg_type = msgtdesc->number();
679 if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
680 throw std::logic_error(
"Message has invalid COMP_ID");
682 if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
683 throw std::logic_error(
"Message has invalid MSG_TYPE");
686 send(comp_id, msg_type, m);
690 ProtobufBroadcastPeer::start_recv()
692 crypto_buf_ = crypto_;
693 socket_.async_receive_from(boost::asio::buffer(crypto_ ? enc_in_data_ : in_data_, in_data_size_),
695 boost::bind(&ProtobufBroadcastPeer::handle_recv,
697 boost::asio::placeholders::error,
698 boost::asio::placeholders::bytes_transferred));
702 ProtobufBroadcastPeer::start_send()
704 std::lock_guard<std::mutex> lock(outbound_mutex_);
705 if (outbound_queue_.empty() || outbound_active_ || !outbound_ready_)
708 outbound_active_ =
true;
710 QueueEntry *entry = outbound_queue_.front();
711 outbound_queue_.pop();
715 boost::asio::buffer_size(entry->buffers[1]) + boost::asio::buffer_size(entry->buffers[2]);
718 std::string plain_buf = std::string(plain_size,
'\0');
721 boost::asio::buffer_size(entry->buffers[1]),
722 boost::asio::buffer_cast<const char *>(entry->buffers[1]),
723 boost::asio::buffer_size(entry->buffers[1]));
725 plain_buf.replace(boost::asio::buffer_size(entry->buffers[1]),
726 boost::asio::buffer_size(entry->buffers[2]),
727 boost::asio::buffer_cast<const char *>(entry->buffers[2]),
728 boost::asio::buffer_size(entry->buffers[2]));
730 entry->encrypted_message.resize(enc_size);
731 crypto_enc_->
encrypt(plain_buf, entry->encrypted_message);
733 entry->frame_header.payload_size = htonl(entry->encrypted_message.size());
734 entry->frame_header.cipher = crypto_enc_->
cipher_id();
735 entry->buffers[1] = boost::asio::buffer(entry->encrypted_message);
736 entry->buffers[2] = boost::asio::const_buffer();
739 socket_.async_send_to(entry->buffers,
741 boost::bind(&ProtobufBroadcastPeer::handle_sent,
743 boost::asio::placeholders::error,
744 boost::asio::placeholders::bytes_transferred,