24 #include <core/exceptions/system.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/thread.h>
28 #include <core/threading/wait_condition.h>
29 #include <netcomm/fawkes/client.h>
30 #include <netcomm/fawkes/client_handler.h>
31 #include <netcomm/fawkes/message_queue.h>
32 #include <netcomm/fawkes/transceiver.h>
33 #include <netcomm/socket/stream.h>
34 #include <netcomm/utils/exceptions.h>
53 :
Exception(
"A handler for this component has already been registered")
75 outbound_mutex_ =
new Mutex();
79 outbound_msgq_ = outbound_msgqs_[0];
80 outbound_havemore_ =
false;
86 for (
unsigned int i = 0; i < 2; ++i) {
87 while (!outbound_msgqs_[i]->empty()) {
90 outbound_msgqs_[i]->pop();
93 delete outbound_msgqs_[0];
94 delete outbound_msgqs_[1];
95 delete outbound_mutex_;
101 parent_->set_send_slave_alive();
110 while (outbound_havemore_) {
111 outbound_mutex_->
lock();
112 outbound_havemore_ =
false;
114 outbound_active_ = 1 - outbound_active_;
115 outbound_msgq_ = outbound_msgqs_[outbound_active_];
116 outbound_mutex_->
unlock();
122 parent_->connection_died();
155 outbound_mutex_->
lock();
156 outbound_msgq_->push(message);
157 outbound_havemore_ =
true;
158 outbound_mutex_->
unlock();
173 Mutex * outbound_mutex_;
174 unsigned int outbound_active_;
175 bool outbound_havemore_;
195 :
Thread(
"FawkesNetworkClientRecvThread")
200 recv_mutex_ = recv_mutex;
206 while (!inbound_msgq_->empty()) {
209 inbound_msgq_->pop();
211 delete inbound_msgq_;
218 std::list<unsigned int> wakeup_list;
225 inbound_msgq_->
lock();
226 while (!inbound_msgq_->empty()) {
228 wakeup_list.push_back(m->
cid());
229 parent_->dispatch_message(m);
231 inbound_msgq_->pop();
238 wakeup_list.unique();
239 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
240 parent_->wake_handlers(*i);
250 parent_->set_recv_slave_alive();
268 parent_->connection_died();
275 parent_->connection_died();
310 host_ = strdup(host);
319 connection_died_recently =
false;
320 send_slave_alive_ =
false;
321 recv_slave_alive_ =
false;
323 slave_status_mutex =
new Mutex();
328 recv_mutex_ =
new Mutex();
330 connest_mutex_ =
new Mutex();
333 connest_interrupted_ =
false;
351 connection_died_recently =
false;
352 send_slave_alive_ =
false;
353 recv_slave_alive_ =
false;
355 slave_status_mutex =
new Mutex();
360 recv_mutex_ =
new Mutex();
362 connest_mutex_ =
new Mutex();
365 connest_interrupted_ =
false;
375 host_ = strdup(host);
384 connection_died_recently =
false;
385 send_slave_alive_ =
false;
386 recv_slave_alive_ =
false;
388 slave_status_mutex =
new Mutex();
393 recv_mutex_ =
new Mutex();
395 connest_mutex_ =
new Mutex();
398 connest_interrupted_ =
false;
411 delete slave_status_mutex;
413 delete connest_waitcond_;
414 delete connest_mutex_;
415 delete recv_waitcond_;
426 if (host_ == NULL && addr_ == NULL) {
434 connection_died_recently =
false;
446 send_slave_->
start();
448 recv_slave_->
start();
450 connection_died_recently =
true;
463 send_slave_alive_ =
false;
464 recv_slave_alive_ =
false;
470 connest_mutex_->
lock();
471 while (!connest_ && !connest_interrupted_) {
472 connest_waitcond_->
wait();
474 bool interrupted = connest_interrupted_;
475 connest_interrupted_ =
false;
481 notify_of_connection_established();
495 host_ = strdup(host);
512 addr_ = (
struct sockaddr *)malloc(addr_len);
513 addr_len_ = addr_len;
514 memcpy(addr_, addr, addr_len);
515 host_ = strdup(hostname);
530 addr_ = (
struct sockaddr *)malloc(
sizeof(sockaddr_storage));
531 addr_len_ =
sizeof(sockaddr_storage);
532 memcpy(addr_, &addr, addr_len_);
533 host_ = strdup(hostname);
544 if (send_slave_alive_) {
545 if (!connection_died_recently) {
555 if (recv_slave_alive_) {
561 send_slave_alive_ =
false;
562 recv_slave_alive_ =
false;
566 if (!connection_died_recently) {
578 connest_mutex_->
lock();
579 connest_interrupted_ =
true;
616 if (send_slave_ && recv_slave_) {
618 if (recv_received_.find(message->
cid()) != recv_received_.end()) {
620 unsigned int cid = message->
cid();
621 throw Exception(
"There is already a thread waiting for messages of "
626 unsigned int cid = message->
cid();
627 recv_received_[cid] =
false;
628 while (!recv_received_[cid] && !connection_died_recently) {
630 recv_received_.erase(cid);
632 throw TimeoutException(
"Timeout reached while waiting for incoming message "
633 "(outgoing was %u:%u)",
638 recv_received_.erase(cid);
641 unsigned int cid = message->
cid();
642 unsigned int msgid = message->
msgid();
643 throw Exception(
"Cannot enqueue given message %u:%u, sender or "
659 unsigned int component_id)
662 if (handlers.find(component_id) != handlers.end()) {
666 handlers[component_id] = handler;
679 if (handlers.find(component_id) != handlers.end()) {
680 handlers[component_id]->deregistered(_id);
681 handlers.erase(component_id);
685 if (recv_received_.find(component_id) != recv_received_.end()) {
686 recv_received_[component_id] =
true;
695 unsigned int cid = m->
cid();
697 if (handlers.find(cid) != handlers.end()) {
698 handlers[cid]->inbound_received(m, _id);
704 FawkesNetworkClient::wake_handlers(
unsigned int cid)
707 if (recv_received_.find(cid) != recv_received_.end()) {
708 recv_received_[cid] =
true;
715 FawkesNetworkClient::notify_of_connection_dead()
717 connest_mutex_->
lock();
722 for (HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i) {
723 i->second->connection_died(_id);
733 FawkesNetworkClient::notify_of_connection_established()
736 for (HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i) {
737 i->second->connection_established(_id);
743 FawkesNetworkClient::connection_died()
745 connection_died_recently =
true;
746 notify_of_connection_dead();
750 FawkesNetworkClient::set_send_slave_alive()
752 slave_status_mutex->
lock();
753 send_slave_alive_ =
true;
754 if (send_slave_alive_ && recv_slave_alive_) {
755 connest_mutex_->
lock();
760 slave_status_mutex->
unlock();
764 FawkesNetworkClient::set_recv_slave_alive()
766 slave_status_mutex->
lock();
767 recv_slave_alive_ =
true;
768 if (send_slave_alive_ && recv_slave_alive_) {
769 connest_mutex_->
lock();
774 slave_status_mutex->
unlock();
788 if (recv_received_.find(component_id) != recv_received_.end()) {
790 throw Exception(
"There is already a thread waiting for messages of "
794 recv_received_[component_id] =
false;
795 while (!recv_received_[component_id] && !connection_died_recently) {
797 recv_received_.erase(component_id);
799 throw TimeoutException(
"Timeout reached while waiting for incoming message "
804 recv_received_.erase(component_id);
817 if (recv_received_.find(component_id) != recv_received_.end()) {
818 recv_received_[component_id] =
true;
830 return (!connection_died_recently && (s != NULL));
849 throw Exception(
"Trying to get the ID of a client that has no ID");