37 #include <core/threading/mutex_locker.h>
38 #include <google/protobuf/descriptor.h>
39 #include <logging/logger.h>
40 #include <protobuf_clips/communicator.h>
41 #include <protobuf_comm/client.h>
42 #include <protobuf_comm/peer.h>
43 #include <protobuf_comm/server.h>
45 #include <boost/format.hpp>
47 using namespace google::protobuf;
48 using namespace protobuf_comm;
50 namespace protobuf_clips {
66 ClipsProtobufCommunicator::ClipsProtobufCommunicator(CLIPS::Environment *env,
69 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
83 std::vector<std::string> &proto_path,
85 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
97 for (
auto f : functions_) {
98 clips_->remove_function(f);
103 for (
auto c : clients_) {
108 delete message_register_;
112 #define ADD_FUNCTION(n, s) \
113 clips_->add_function(n, s); \
114 functions_.push_back(n);
118 ClipsProtobufCommunicator::setup_clips()
122 ADD_FUNCTION(
"pb-register-type",
123 (sigc::slot<CLIPS::Value, std::string>(
124 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_register_type))));
125 ADD_FUNCTION(
"pb-field-names",
126 (sigc::slot<CLIPS::Values, void *>(
127 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_names))));
128 ADD_FUNCTION(
"pb-field-type",
129 (sigc::slot<CLIPS::Value, void *, std::string>(
130 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_type))));
131 ADD_FUNCTION(
"pb-has-field",
132 (sigc::slot<CLIPS::Value, void *, std::string>(
133 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_has_field))));
134 ADD_FUNCTION(
"pb-field-label",
135 (sigc::slot<CLIPS::Value, void *, std::string>(
136 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_label))));
137 ADD_FUNCTION(
"pb-field-value",
138 (sigc::slot<CLIPS::Value, void *, std::string>(
139 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_value))));
140 ADD_FUNCTION(
"pb-field-list",
141 (sigc::slot<CLIPS::Values, void *, std::string>(
142 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_list))));
143 ADD_FUNCTION(
"pb-field-is-list",
144 (sigc::slot<CLIPS::Value, void *, std::string>(
145 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
146 ADD_FUNCTION(
"pb-create",
147 (sigc::slot<CLIPS::Value, std::string>(
148 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_create))));
149 ADD_FUNCTION(
"pb-destroy",
150 (sigc::slot<void, void *>(
151 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_destroy))));
152 ADD_FUNCTION(
"pb-ref",
153 (sigc::slot<CLIPS::Value, void *>(
154 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_ref))));
155 ADD_FUNCTION(
"pb-set-field",
156 (sigc::slot<void, void *, std::string, CLIPS::Value>(
157 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_set_field))));
158 ADD_FUNCTION(
"pb-add-list",
159 (sigc::slot<void, void *, std::string, CLIPS::Value>(
160 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_add_list))));
161 ADD_FUNCTION(
"pb-send",
162 (sigc::slot<void, long int, void *>(
163 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_send))));
164 ADD_FUNCTION(
"pb-tostring",
165 (sigc::slot<std::string, void *>(
166 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_tostring))));
167 ADD_FUNCTION(
"pb-server-enable",
168 (sigc::slot<void, int>(
170 ADD_FUNCTION(
"pb-server-disable",
173 ADD_FUNCTION(
"pb-peer-create",
174 (sigc::slot<long int, std::string, int>(
175 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
176 ADD_FUNCTION(
"pb-peer-create-local",
177 (sigc::slot<long int, std::string, int, int>(
178 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
179 ADD_FUNCTION(
"pb-peer-create-crypto",
180 (sigc::slot<long int, std::string, int, std::string, std::string>(
181 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
182 ADD_FUNCTION(
"pb-peer-create-local-crypto",
183 (sigc::slot<long int, std::string, int, int, std::string, std::string>(sigc::mem_fun(
184 *
this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
185 ADD_FUNCTION(
"pb-peer-destroy",
186 (sigc::slot<void, long int>(
187 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
188 ADD_FUNCTION(
"pb-peer-setup-crypto",
189 (sigc::slot<void, long int, std::string, std::string>(
190 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
191 ADD_FUNCTION(
"pb-broadcast",
192 (sigc::slot<void, long int, void *>(
193 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
194 ADD_FUNCTION(
"pb-connect",
195 (sigc::slot<long int, std::string, int>(
196 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
197 ADD_FUNCTION(
"pb-disconnect",
198 (sigc::slot<void, long int>(
199 sigc::mem_fun(*
this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
208 if ((port > 0) && !server_) {
212 boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected,
this, _1, _2));
214 boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected,
this, _1, _2));
216 boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg,
this, _1, _2, _3, _4));
218 boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail,
this, _1, _2, _3, _4));
239 ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address,
242 std::string crypto_key,
246 recv_port = send_port;
250 address, send_port, recv_port, message_register_, crypto_key, cipher);
255 peer_id = ++next_client_id_;
256 peers_[peer_id] = peer;
260 boost::bind(&ClipsProtobufCommunicator::handle_peer_msg,
this, peer_id, _1, _2, _3, _4));
262 boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error,
this, peer_id, _1, _2));
264 boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error,
this, peer_id, _1));
280 ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address,
282 std::string crypto_key,
285 return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
294 ClipsProtobufCommunicator::clips_pb_peer_create(std::string address,
int port)
296 return clips_pb_peer_create_local_crypto(address, port, port);
306 ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address,
310 return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
317 ClipsProtobufCommunicator::clips_pb_peer_destroy(
long int peer_id)
319 if (peers_.find(peer_id) != peers_.end()) {
320 delete peers_[peer_id];
321 peers_.erase(peer_id);
331 ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(
long int peer_id,
332 std::string crypto_key,
335 if (peers_.find(peer_id) != peers_.end()) {
336 peers_[peer_id]->setup_crypto(crypto_key, cipher);
345 ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
349 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
350 }
catch (std::runtime_error &e) {
353 "Registering type %s failed: %s",
357 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
362 ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
365 std::shared_ptr<google::protobuf::Message> m = message_register_->
new_message_for(full_name);
366 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>(m));
367 }
catch (std::runtime_error &e) {
370 "Cannot create message of type %s: %s",
374 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>());
379 ClipsProtobufCommunicator::clips_pb_ref(
void *msgptr)
381 std::shared_ptr<google::protobuf::Message> *m =
382 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
384 return new std::shared_ptr<google::protobuf::Message>();
386 return CLIPS::Value(
new std::shared_ptr<google::protobuf::Message>(*m));
390 ClipsProtobufCommunicator::clips_pb_destroy(
void *msgptr)
392 std::shared_ptr<google::protobuf::Message> *m =
393 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
401 ClipsProtobufCommunicator::clips_pb_field_names(
void *msgptr)
403 std::shared_ptr<google::protobuf::Message> *m =
404 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
406 return CLIPS::Values();
408 const Descriptor *desc = (*m)->GetDescriptor();
409 const int field_count = desc->field_count();
410 CLIPS::Values field_names(field_count);
411 for (
int i = 0; i < field_count; ++i) {
412 field_names[i].set(desc->field(i)->name(),
true);
418 ClipsProtobufCommunicator::clips_pb_field_type(
void *msgptr, std::string field_name)
420 std::shared_ptr<google::protobuf::Message> *m =
421 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
423 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
425 const Descriptor * desc = (*m)->GetDescriptor();
426 const FieldDescriptor *field = desc->FindFieldByName(field_name);
428 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
430 switch (field->type()) {
431 case FieldDescriptor::TYPE_DOUBLE:
return CLIPS::Value(
"DOUBLE", CLIPS::TYPE_SYMBOL);
432 case FieldDescriptor::TYPE_FLOAT:
return CLIPS::Value(
"FLOAT", CLIPS::TYPE_SYMBOL);
433 case FieldDescriptor::TYPE_INT64:
return CLIPS::Value(
"INT64", CLIPS::TYPE_SYMBOL);
434 case FieldDescriptor::TYPE_UINT64:
return CLIPS::Value(
"UINT64", CLIPS::TYPE_SYMBOL);
435 case FieldDescriptor::TYPE_INT32:
return CLIPS::Value(
"INT32", CLIPS::TYPE_SYMBOL);
436 case FieldDescriptor::TYPE_FIXED64:
return CLIPS::Value(
"FIXED64", CLIPS::TYPE_SYMBOL);
437 case FieldDescriptor::TYPE_FIXED32:
return CLIPS::Value(
"FIXED32", CLIPS::TYPE_SYMBOL);
438 case FieldDescriptor::TYPE_BOOL:
return CLIPS::Value(
"BOOL", CLIPS::TYPE_SYMBOL);
439 case FieldDescriptor::TYPE_STRING:
return CLIPS::Value(
"STRING", CLIPS::TYPE_SYMBOL);
440 case FieldDescriptor::TYPE_MESSAGE:
return CLIPS::Value(
"MESSAGE", CLIPS::TYPE_SYMBOL);
441 case FieldDescriptor::TYPE_BYTES:
return CLIPS::Value(
"BYTES", CLIPS::TYPE_SYMBOL);
442 case FieldDescriptor::TYPE_UINT32:
return CLIPS::Value(
"UINT32", CLIPS::TYPE_SYMBOL);
443 case FieldDescriptor::TYPE_ENUM:
return CLIPS::Value(
"ENUM", CLIPS::TYPE_SYMBOL);
444 case FieldDescriptor::TYPE_SFIXED32:
return CLIPS::Value(
"SFIXED32", CLIPS::TYPE_SYMBOL);
445 case FieldDescriptor::TYPE_SFIXED64:
return CLIPS::Value(
"SFIXED64", CLIPS::TYPE_SYMBOL);
446 case FieldDescriptor::TYPE_SINT32:
return CLIPS::Value(
"SINT32", CLIPS::TYPE_SYMBOL);
447 case FieldDescriptor::TYPE_SINT64:
return CLIPS::Value(
"SINT64", CLIPS::TYPE_SYMBOL);
448 default:
return CLIPS::Value(
"UNKNOWN", CLIPS::TYPE_SYMBOL);
453 ClipsProtobufCommunicator::clips_pb_has_field(
void *msgptr, std::string field_name)
455 std::shared_ptr<google::protobuf::Message> *m =
456 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
460 const Descriptor * desc = (*m)->GetDescriptor();
461 const FieldDescriptor *field = desc->FindFieldByName(field_name);
465 const Reflection *refl = (*m)->GetReflection();
467 if (field->is_repeated()) {
468 return CLIPS::Value((refl->FieldSize(**m, field) > 0) ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
469 }
else if (field->is_optional()) {
470 return CLIPS::Value(refl->HasField(**m, field) ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
472 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
477 ClipsProtobufCommunicator::clips_pb_field_label(
void *msgptr, std::string field_name)
479 std::shared_ptr<google::protobuf::Message> *m =
480 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
482 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
484 const Descriptor * desc = (*m)->GetDescriptor();
485 const FieldDescriptor *field = desc->FindFieldByName(field_name);
487 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
489 switch (field->label()) {
490 case FieldDescriptor::LABEL_OPTIONAL:
return CLIPS::Value(
"OPTIONAL", CLIPS::TYPE_SYMBOL);
491 case FieldDescriptor::LABEL_REQUIRED:
return CLIPS::Value(
"REQUIRED", CLIPS::TYPE_SYMBOL);
492 case FieldDescriptor::LABEL_REPEATED:
return CLIPS::Value(
"REPEATED", CLIPS::TYPE_SYMBOL);
493 default:
return CLIPS::Value(
"UNKNOWN", CLIPS::TYPE_SYMBOL);
498 ClipsProtobufCommunicator::clips_pb_field_value(
void *msgptr, std::string field_name)
500 std::shared_ptr<google::protobuf::Message> *m =
501 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
504 logger_->
log_warn(
"CLIPS-Protobuf",
"Invalid message when setting %s", field_name.c_str());
506 return CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
509 const Descriptor * desc = (*m)->GetDescriptor();
510 const FieldDescriptor *field = desc->FindFieldByName(field_name);
514 "Field %s of %s does not exist",
516 (*m)->GetTypeName().c_str());
518 return CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
520 const Reflection *refl = (*m)->GetReflection();
521 if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
524 "Field %s of %s not set",
526 (*m)->GetTypeName().c_str());
528 return CLIPS::Value(
"NOT-SET", CLIPS::TYPE_SYMBOL);
530 switch (field->type()) {
531 case FieldDescriptor::TYPE_DOUBLE:
return CLIPS::Value(refl->GetDouble(**m, field));
532 case FieldDescriptor::TYPE_FLOAT:
return CLIPS::Value(refl->GetFloat(**m, field));
533 case FieldDescriptor::TYPE_INT64:
return CLIPS::Value(refl->GetInt64(**m, field));
534 case FieldDescriptor::TYPE_UINT64:
return CLIPS::Value((
long int)refl->GetUInt64(**m, field));
535 case FieldDescriptor::TYPE_INT32:
return CLIPS::Value(refl->GetInt32(**m, field));
536 case FieldDescriptor::TYPE_FIXED64:
return CLIPS::Value((
long int)refl->GetUInt64(**m, field));
537 case FieldDescriptor::TYPE_FIXED32:
return CLIPS::Value(refl->GetUInt32(**m, field));
538 case FieldDescriptor::TYPE_BOOL:
540 if (refl->GetBool(**m, field)) {
541 return CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
543 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
545 case FieldDescriptor::TYPE_STRING:
return CLIPS::Value(refl->GetString(**m, field));
546 case FieldDescriptor::TYPE_MESSAGE: {
547 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
548 google::protobuf::Message * mcopy = mfield.New();
549 mcopy->CopyFrom(mfield);
550 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
551 return CLIPS::Value(ptr);
553 case FieldDescriptor::TYPE_BYTES:
return CLIPS::Value((
char *)
"bytes");
554 case FieldDescriptor::TYPE_UINT32:
return CLIPS::Value(refl->GetUInt32(**m, field));
555 case FieldDescriptor::TYPE_ENUM:
556 return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
557 case FieldDescriptor::TYPE_SFIXED32:
return CLIPS::Value(refl->GetInt32(**m, field));
558 case FieldDescriptor::TYPE_SFIXED64:
return CLIPS::Value(refl->GetInt64(**m, field));
559 case FieldDescriptor::TYPE_SINT32:
return CLIPS::Value(refl->GetInt32(**m, field));
560 case FieldDescriptor::TYPE_SINT64:
return CLIPS::Value(refl->GetInt64(**m, field));
561 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
566 ClipsProtobufCommunicator::clips_pb_set_field(
void * msgptr,
567 std::string field_name,
570 std::shared_ptr<google::protobuf::Message> *m =
571 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
575 const Descriptor * desc = (*m)->GetDescriptor();
576 const FieldDescriptor *field = desc->FindFieldByName(field_name);
579 logger_->
log_warn(
"CLIPS-Protobuf",
"Could not find field %s", field_name.c_str());
583 const Reflection *refl = (*m)->GetReflection();
586 switch (field->type()) {
587 case FieldDescriptor::TYPE_DOUBLE: refl->SetDouble(m->get(), field, value.as_float());
break;
588 case FieldDescriptor::TYPE_FLOAT: refl->SetFloat(m->get(), field, value.as_float());
break;
589 case FieldDescriptor::TYPE_SFIXED64:
590 case FieldDescriptor::TYPE_SINT64:
591 case FieldDescriptor::TYPE_INT64: refl->SetInt64(m->get(), field, value.as_integer());
break;
592 case FieldDescriptor::TYPE_FIXED64:
593 case FieldDescriptor::TYPE_UINT64: refl->SetUInt64(m->get(), field, value.as_integer());
break;
594 case FieldDescriptor::TYPE_SFIXED32:
595 case FieldDescriptor::TYPE_SINT32:
596 case FieldDescriptor::TYPE_INT32: refl->SetInt32(m->get(), field, value.as_integer());
break;
597 case FieldDescriptor::TYPE_BOOL: refl->SetBool(m->get(), field, (value ==
"TRUE"));
break;
598 case FieldDescriptor::TYPE_STRING: refl->SetString(m->get(), field, value.as_string());
break;
599 case FieldDescriptor::TYPE_MESSAGE: {
600 std::shared_ptr<google::protobuf::Message> *mfrom =
601 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value.as_address());
602 Message *mut_msg = refl->MutableMessage(m->get(), field);
603 mut_msg->CopyFrom(**mfrom);
606 case FieldDescriptor::TYPE_BYTES:
break;
607 case FieldDescriptor::TYPE_FIXED32:
608 case FieldDescriptor::TYPE_UINT32: refl->SetUInt32(m->get(), field, value.as_integer());
break;
609 case FieldDescriptor::TYPE_ENUM: {
610 const EnumDescriptor * enumdesc = field->enum_type();
611 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
613 refl->SetEnum(m->get(), field, enumval);
617 "%s: cannot set invalid "
618 "enum value '%s' on '%s'",
619 (*m)->GetTypeName().c_str(),
620 value.as_string().c_str(),
625 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
627 }
catch (std::logic_error &e) {
630 "Failed to set field %s of %s: %s "
631 "(type %d, as string %s)",
633 (*m)->GetTypeName().c_str(),
636 to_string(value).c_str());
642 ClipsProtobufCommunicator::clips_pb_add_list(
void * msgptr,
643 std::string field_name,
646 std::shared_ptr<google::protobuf::Message> *m =
647 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
651 const Descriptor * desc = (*m)->GetDescriptor();
652 const FieldDescriptor *field = desc->FindFieldByName(field_name);
655 logger_->
log_warn(
"CLIPS-Protobuf",
"Could not find field %s", field_name.c_str());
659 const Reflection *refl = (*m)->GetReflection();
662 switch (field->type()) {
663 case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value);
break;
664 case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value);
break;
665 case FieldDescriptor::TYPE_SFIXED64:
666 case FieldDescriptor::TYPE_SINT64:
667 case FieldDescriptor::TYPE_INT64: refl->AddInt64(m->get(), field, value);
break;
668 case FieldDescriptor::TYPE_FIXED64:
669 case FieldDescriptor::TYPE_UINT64: refl->AddUInt64(m->get(), field, (
long int)value);
break;
670 case FieldDescriptor::TYPE_SFIXED32:
671 case FieldDescriptor::TYPE_SINT32:
672 case FieldDescriptor::TYPE_INT32: refl->AddInt32(m->get(), field, value);
break;
673 case FieldDescriptor::TYPE_BOOL: refl->AddBool(m->get(), field, (value ==
"TRUE"));
break;
674 case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value);
break;
675 case FieldDescriptor::TYPE_MESSAGE: {
676 std::shared_ptr<google::protobuf::Message> *mfrom =
677 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value.as_address());
678 Message *new_msg = refl->AddMessage(m->get(), field);
679 new_msg->CopyFrom(**mfrom);
682 case FieldDescriptor::TYPE_BYTES:
break;
683 case FieldDescriptor::TYPE_FIXED32:
684 case FieldDescriptor::TYPE_UINT32: refl->AddUInt32(m->get(), field, value);
break;
685 case FieldDescriptor::TYPE_ENUM: {
686 const EnumDescriptor * enumdesc = field->enum_type();
687 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
689 refl->AddEnum(m->get(), field, enumval);
691 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
693 }
catch (std::logic_error &e) {
696 "Failed to add field %s of %s: %s",
698 (*m)->GetTypeName().c_str(),
705 ClipsProtobufCommunicator::clips_pb_client_connect(std::string host,
int port)
715 client_id = ++next_client_id_;
716 clients_[client_id] = client;
720 boost::bind(&ClipsProtobufCommunicator::handle_client_connected,
this, client_id));
722 boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
725 boost::asio::placeholders::error));
727 boost::bind(&ClipsProtobufCommunicator::handle_client_msg,
this, client_id, _1, _2, _3));
729 &ClipsProtobufCommunicator::handle_client_receive_fail,
this, client_id, _1, _2, _3));
732 return CLIPS::Value(client_id);
736 ClipsProtobufCommunicator::clips_pb_send(
long int client_id,
void *msgptr)
738 std::shared_ptr<google::protobuf::Message> *m =
739 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
742 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot send to %li: invalid message", client_id);
750 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
752 server_->
send(server_clients_[client_id], *m);
753 sig_server_sent_(server_clients_[client_id], *m);
754 }
else if (clients_.find(client_id) != clients_.end()) {
756 clients_[client_id]->send(*m);
757 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
758 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
759 }
else if (peers_.find(client_id) != peers_.end()) {
761 peers_[client_id]->send(*m);
762 sig_peer_sent_(client_id, *m);
767 }
catch (google::protobuf::FatalException &e) {
770 "Failed to send message of type %s: %s",
771 (*m)->GetTypeName().c_str(),
777 "Failed to send message of type %s: %s",
778 (*m)->GetTypeName().c_str(),
781 }
catch (std::runtime_error &e) {
784 "Failed to send message of type %s: %s",
785 (*m)->GetTypeName().c_str(),
792 ClipsProtobufCommunicator::clips_pb_tostring(
void *msgptr)
794 std::shared_ptr<google::protobuf::Message> *m =
795 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
798 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot convert message to string: invalid message");
803 return (*m)->DebugString();
807 ClipsProtobufCommunicator::clips_pb_broadcast(
long int peer_id,
void *msgptr)
809 std::shared_ptr<google::protobuf::Message> *m =
810 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
813 logger_->
log_warn(
"CLIPS-Protobuf",
"Cannot send broadcast: invalid message");
819 if (peers_.find(peer_id) == peers_.end())
824 peers_[peer_id]->send(*m);
825 }
catch (google::protobuf::FatalException &e) {
828 "Failed to broadcast message of type %s: %s",
829 (*m)->GetTypeName().c_str(),
835 "Failed to broadcast message of type %s: %s",
836 (*m)->GetTypeName().c_str(),
839 }
catch (std::runtime_error &e) {
842 "Failed to broadcast message of type %s: %s",
843 (*m)->GetTypeName().c_str(),
848 sig_peer_sent_(peer_id, *m);
852 ClipsProtobufCommunicator::clips_pb_disconnect(
long int client_id)
859 if (server_clients_.find(client_id) != server_clients_.end()) {
862 server_clients_.erase(client_id);
863 rev_server_clients_.erase(srv_client);
864 }
else if (clients_.find(client_id) != clients_.end()) {
865 delete clients_[client_id];
866 clients_.erase(client_id);
868 }
catch (std::runtime_error &e) {
871 "Failed to disconnect from client %li: %s",
879 ClipsProtobufCommunicator::clips_pb_field_list(
void *msgptr, std::string field_name)
881 std::shared_ptr<google::protobuf::Message> *m =
882 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
884 return CLIPS::Values(1, CLIPS::Value(
"INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
886 const Descriptor * desc = (*m)->GetDescriptor();
887 const FieldDescriptor *field = desc->FindFieldByName(field_name);
889 return CLIPS::Values(1, CLIPS::Value(
"DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
891 if (field->label() == FieldDescriptor::LABEL_REQUIRED
892 || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
893 CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
897 const Reflection *refl = (*m)->GetReflection();
898 int field_size = refl->FieldSize(**m, field);
899 CLIPS::Values rv(field_size);
900 for (
int i = 0; i < field_size; ++i) {
901 switch (field->type()) {
902 case FieldDescriptor::TYPE_DOUBLE:
903 rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
905 case FieldDescriptor::TYPE_FLOAT:
906 rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
909 case FieldDescriptor::TYPE_UINT64:
910 case FieldDescriptor::TYPE_FIXED64:
911 rv[i] = CLIPS::Value((
long int)refl->GetRepeatedUInt64(**m, field, i));
913 case FieldDescriptor::TYPE_UINT32:
914 case FieldDescriptor::TYPE_FIXED32:
915 rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
917 case FieldDescriptor::TYPE_BOOL:
919 if (refl->GetRepeatedBool(**m, field, i)) {
920 rv[i] = CLIPS::Value(
"TRUE", CLIPS::TYPE_SYMBOL);
922 rv[i] = CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
925 case FieldDescriptor::TYPE_STRING:
926 rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
928 case FieldDescriptor::TYPE_MESSAGE: {
929 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
930 google::protobuf::Message * mcopy = msg.New();
931 mcopy->CopyFrom(msg);
932 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
933 rv[i] = CLIPS::Value(ptr);
935 case FieldDescriptor::TYPE_BYTES:
936 rv[i] = CLIPS::Value((
char *)
"BYTES", CLIPS::TYPE_SYMBOL);
938 case FieldDescriptor::TYPE_ENUM:
939 rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
941 case FieldDescriptor::TYPE_SFIXED32:
942 case FieldDescriptor::TYPE_INT32:
943 case FieldDescriptor::TYPE_SINT32:
944 rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
946 case FieldDescriptor::TYPE_SFIXED64:
947 case FieldDescriptor::TYPE_SINT64:
948 case FieldDescriptor::TYPE_INT64:
949 rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
951 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
959 ClipsProtobufCommunicator::clips_pb_field_is_list(
void *msgptr, std::string field_name)
961 std::shared_ptr<google::protobuf::Message> *m =
962 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
964 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
966 const Descriptor * desc = (*m)->GetDescriptor();
967 const FieldDescriptor *field = desc->FindFieldByName(field_name);
969 return CLIPS::Value(
"FALSE", CLIPS::TYPE_SYMBOL);
970 return CLIPS::Value(field->is_repeated() ?
"TRUE" :
"FALSE", CLIPS::TYPE_SYMBOL);
974 ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
977 std::shared_ptr<google::protobuf::Message> &msg,
978 ClipsProtobufCommunicator::ClientType ct,
979 unsigned int client_id)
981 CLIPS::Template::pointer temp = clips_->get_template(
"protobuf-msg");
984 gettimeofday(&tv, 0);
985 void * ptr =
new std::shared_ptr<google::protobuf::Message>(msg);
986 CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
987 fact->set_slot(
"type", msg->GetTypeName());
988 fact->set_slot(
"comp-id", comp_id);
989 fact->set_slot(
"msg-type", msg_type);
990 fact->set_slot(
"rcvd-via",
991 CLIPS::Value((ct == CT_PEER) ?
"BROADCAST" :
"STREAM", CLIPS::TYPE_SYMBOL));
992 CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
993 rcvd_at[0] = tv.tv_sec;
994 rcvd_at[1] = tv.tv_usec;
995 fact->set_slot(
"rcvd-at", rcvd_at);
996 CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
997 host_port[0] = endpoint.first;
998 host_port[1] = CLIPS::Value(endpoint.second);
999 fact->set_slot(
"rcvd-from", host_port);
1000 fact->set_slot(
"client-type",
1001 CLIPS::Value(ct == CT_CLIENT ?
"CLIENT" : (ct == CT_SERVER ?
"SERVER" :
"PEER"),
1002 CLIPS::TYPE_SYMBOL));
1003 fact->set_slot(
"client-id", client_id);
1004 fact->set_slot(
"ptr", CLIPS::Value(ptr));
1005 CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
1009 logger_->
log_warn(
"CLIPS-Protobuf",
"Asserting protobuf-msg fact failed");
1011 delete static_cast<std::shared_ptr<google::protobuf::Message> *
>(ptr);
1015 logger_->
log_warn(
"CLIPS-Protobuf",
"Did not get template, did you load protobuf.clp?");
1022 boost::asio::ip::tcp::endpoint &endpoint)
1024 long int client_id = -1;
1027 client_id = ++next_client_id_;
1028 client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1029 server_clients_[client_id] = client;
1030 rev_server_clients_[client] = client_id;
1034 clips_->assert_fact_f(
"(protobuf-server-client-connected %li %s %u)",
1036 endpoint.address().to_string().c_str(),
1042 const boost::system::error_code &error)
1044 long int client_id = -1;
1047 RevServerClientMap::iterator c;
1048 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1049 client_id = c->second;
1050 rev_server_clients_.erase(c);
1051 server_clients_.erase(client_id);
1055 if (client_id >= 0) {
1057 clips_->assert_fact_f(
"(protobuf-server-client-disconnected %li)", client_id);
1069 uint16_t component_id,
1071 std::shared_ptr<google::protobuf::Message> msg)
1075 RevServerClientMap::iterator c;
1076 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1077 clips_assert_message(
1078 client_endpoints_[c->second], component_id, msg_type, msg, CT_SERVER, c->second);
1090 uint16_t component_id,
1095 RevServerClientMap::iterator c;
1096 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1098 clips_->assert_fact_f(
"(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1099 "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1100 "(rcvd-from (\"%s\" %u)))",
1105 client_endpoints_[c->second].first.c_str(),
1106 client_endpoints_[c->second].second);
1117 ClipsProtobufCommunicator::handle_peer_msg(
long int peer_id,
1118 boost::asio::ip::udp::endpoint & endpoint,
1119 uint16_t component_id,
1121 std::shared_ptr<google::protobuf::Message> msg)
1124 std::pair<std::string, unsigned short> endpp =
1125 std::make_pair(endpoint.address().to_string(), endpoint.port());
1126 clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1134 ClipsProtobufCommunicator::handle_peer_recv_error(
long int peer_id,
1135 boost::asio::ip::udp::endpoint &endpoint,
1139 logger_->
log_warn(
"CLIPS-Protobuf",
1140 "Failed to receive peer message from %s:%u: %s",
1141 endpoint.address().to_string().c_str(),
1151 ClipsProtobufCommunicator::handle_peer_send_error(
long int peer_id, std::string msg)
1154 logger_->
log_warn(
"CLIPS-Protobuf",
"Failed to send peer message: %s", msg.c_str());
1159 ClipsProtobufCommunicator::handle_client_connected(
long int client_id)
1162 clips_->assert_fact_f(
"(protobuf-client-connected %li)", client_id);
1166 ClipsProtobufCommunicator::handle_client_disconnected(
long int client_id,
1167 const boost::system::error_code &error)
1170 clips_->assert_fact_f(
"(protobuf-client-disconnected %li)", client_id);
1174 ClipsProtobufCommunicator::handle_client_msg(
long int client_id,
1177 std::shared_ptr<google::protobuf::Message> msg)
1180 std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1181 clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1185 ClipsProtobufCommunicator::handle_client_receive_fail(
long int client_id,
1191 clips_->assert_fact_f(
"(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1192 "(comp-id %u) (msg-type %u) (message \"%s\"))",
1200 ClipsProtobufCommunicator::to_string(
const CLIPS::Value &v)
1203 case CLIPS::TYPE_UNKNOWN:
return "Unknown Type";
1204 case CLIPS::TYPE_FLOAT:
return std::to_string(v.as_float());
1205 case CLIPS::TYPE_INTEGER:
return std::to_string(v.as_integer());
1206 case CLIPS::TYPE_SYMBOL:
1207 case CLIPS::TYPE_INSTANCE_NAME:
1208 case CLIPS::TYPE_STRING:
return v.as_string();
1209 case CLIPS::TYPE_INSTANCE_ADDRESS:
1210 case CLIPS::TYPE_EXTERNAL_ADDRESS:
return boost::str(boost::format(
"%p") % v.as_address());
1212 return "Implicit unknown type";