22 #include "robot_memory.h"
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
34 #include <utils/time/tracker_macros.h>
36 #include <bsoncxx/builder/basic/document.hpp>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
45 using namespace mongocxx;
46 using namespace bsoncxx;
76 mongo_connection_manager_ = mongo_connection_manager;
77 blackboard_ = blackboard;
78 mongodb_client_local_ =
nullptr;
79 mongodb_client_distributed_ =
nullptr;
83 RobotMemory::~RobotMemory()
85 mongo_connection_manager_->delete_client(mongodb_client_local_);
86 mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87 delete trigger_manager_;
88 blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
98 log(
"Started RobotMemory");
99 default_collection_ =
"robmem.test";
101 default_collection_ = config_->get_string(
"/plugins/robot-memory/default-collection");
105 debug_ = config_->get_bool(
"/plugins/robot-memory/more-debug-output");
108 database_name_ =
"robmem";
110 database_name_ = config_->get_string(
"/plugins/robot-memory/database");
113 distributed_dbs_ = config_->get_strings(
"/plugins/robot-memory/distributed-db-names");
115 cfg_coord_database_ = config_->get_string(
"/plugins/robot-memory/coordination/database");
116 cfg_coord_mutex_collection_ =
117 config_->get_string(
"/plugins/robot-memory/coordination/mutex-collection");
119 using namespace std::chrono_literals;
122 log(
"Connect to local mongod");
123 mongodb_client_local_ = mongo_connection_manager_->create_client(
"robot-memory-local");
125 if (config_->exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
126 && config_->get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
128 log(
"Connect to distributed mongod");
129 mongodb_client_distributed_ =
130 mongo_connection_manager_->create_client(
"robot-memory-distributed");
134 rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
135 config_->get_string(
"/plugins/robot-memory/interface-name").c_str());
136 rm_if_->set_error(
"");
137 rm_if_->set_result(
"");
144 log_deb(
"Initialized RobotMemory");
146 #ifdef USE_TIMETRACKER
149 ttc_events_ = tt_->add_class(
"RobotMemory Events");
150 ttc_cleanup_ = tt_->add_class(
"RobotMemory Cleanup");
157 TIMETRACK_START(ttc_events_);
158 trigger_manager_->check_events();
159 TIMETRACK_END(ttc_events_);
160 TIMETRACK_START(ttc_cleanup_);
161 computables_manager_->cleanup_computed_docs();
162 TIMETRACK_END(ttc_cleanup_);
163 #ifdef USE_TIMETRACKER
164 if (++tt_loopcount_ % 5 == 0) {
165 tt_->print_to_stdout();
179 const std::string & collection_name,
180 mongocxx::options::find query_options)
182 collection collection = get_collection(collection_name);
183 log_deb(std::string(
"Executing Query " + to_json(
query) +
" on collection " + collection_name));
186 computables_manager_->check_and_compute(
query, collection_name);
193 return collection.find(
query, query_options);
194 }
catch (mongocxx::operation_exception &e) {
196 std::string(
"Error for query ") + to_json(
query) +
"\n Exception: " + e.what();
208 bsoncxx::document::value
210 const std::string & collection)
254 collection collection = get_collection(collection_name);
255 log_deb(std::string(
"Inserting " + to_json(doc) +
" into collection " + collection_name));
260 collection.insert_one(doc);
261 }
catch (mongocxx::operation_exception &e) {
262 std::string error =
"Error for insert " + to_json(doc) +
"\n Exception: " + e.what();
263 log_deb(error,
"error");
278 const std::string & collection_name,
281 collection collection = get_collection(collection_name);
283 log_deb(std::string(
"Creating index " + to_json(keys) +
" on collection " + collection_name));
290 using namespace bsoncxx::builder::basic;
291 collection.create_index(keys, make_document(kvp(
"unique", unique)));
292 }
catch (operation_exception &e) {
293 std::string error =
"Error when creating index " + to_json(keys) +
"\n Exception: " + e.what();
294 log_deb(error,
"error");
310 collection collection = get_collection(collection_name);
311 std::string insert_string =
"[";
312 for (
auto &&doc : docs) {
313 insert_string += to_json(doc) +
",\n";
315 insert_string +=
"]";
317 log_deb(std::string(
"Inserting vector of documents " + insert_string +
" into collection "
325 collection.insert_many(docs);
326 }
catch (operation_exception &e) {
327 std::string error =
"Error for insert " + insert_string +
"\n Exception: " + e.what();
328 log_deb(error,
"error");
344 return insert(from_json(obj_str), collection);
357 const bsoncxx::document::view &
update,
358 const std::string & collection_name,
361 collection collection = get_collection(collection_name);
362 log_deb(std::string(
"Executing Update " + to_json(
update) +
" for query " + to_json(
query)
363 +
" on collection " + collection_name));
370 collection.update_many(
query,
371 builder::basic::make_document(
372 builder::basic::kvp(
"$set", builder::concatenate(
update))),
373 options::update().upsert(upsert));
374 }
catch (operation_exception &e) {
375 log_deb(std::string(
"Error for update " + to_json(
update) +
" for query " + to_json(
query)
376 +
"\n Exception: " + e.what()),
394 const std::string & update_str,
395 const std::string & collection,
398 return update(
query, from_json(update_str), collection, upsert);
412 const document::view &
update,
413 const std::string & collection_name,
417 collection collection = get_collection(collection_name);
419 log_deb(std::string(
"Executing findOneAndUpdate " + to_json(
update) +
" for filter "
420 + to_json(filter) +
" on collection " + collection_name));
426 collection.find_one_and_update(filter,
428 options::find_one_and_update().upsert(upsert).return_document(
429 return_new ? options::return_document::k_after
430 : options::return_document::k_before));
434 std::string error =
"Error for update " + to_json(
update) +
" for query " + to_json(filter)
435 +
"FindOneAndUpdate unexpectedly did not return a document";
436 log_deb(error,
"warn");
437 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
439 }
catch (operation_exception &e) {
440 std::string error =
"Error for update " + to_json(
update) +
" for query " + to_json(filter)
441 +
"\n Exception: " + e.what();
442 log_deb(error,
"error");
443 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
458 collection collection = get_collection(collection_name);
459 log_deb(std::string(
"Executing Remove " + to_json(
query) +
" on collection " + collection_name));
462 collection.delete_many(
query);
463 }
catch (operation_exception &e) {
464 log_deb(std::string(
"Error for query " + to_json(
query) +
"\n Exception: " + e.what()),
480 bsoncxx::document::value
482 const std::string & collection,
483 const std::string & js_map_fun,
484 const std::string & js_reduce_fun)
534 collection collection = get_collection(collection_name);
535 log_deb(
"Dropping collection " + collection_name);
550 log_deb(
"Clearing whole robot memory");
551 mongodb_client_local_->database(database_name_).drop();
567 const std::string &directory,
568 std::string target_dbcollection)
570 if (target_dbcollection ==
"") {
571 target_dbcollection = dbcollection;
579 auto [db, collection] = split_db_collection_string(dbcollection);
582 log_deb(std::string(
"Restore collection " + collection +
" from " + path),
"warn");
584 auto [target_db, target_collection] = split_db_collection_string(target_dbcollection);
587 std::string command =
"/usr/bin/mongorestore --dir " + path +
" -d " + target_db +
" -c "
588 + target_collection +
" --host=" + get_hostport(dbcollection);
589 log_deb(std::string(
"Restore command: " + command),
"warn");
590 FILE *bash_output = popen(command.c_str(),
"r");
594 log(std::string(
"Unable to restore collection" + collection),
"error");
597 std::string output_string =
"";
599 while (!feof(bash_output)) {
600 if (fgets(buffer, 100, bash_output) == NULL) {
603 output_string += buffer;
606 if (output_string.find(
"Failed") != std::string::npos) {
607 log(std::string(
"Unable to restore collection" + collection),
"error");
608 log_deb(output_string,
"error");
627 log_deb(std::string(
"Dump collection " + dbcollection +
" into " + path),
"warn");
629 auto [db, collection] = split_db_collection_string(dbcollection);
631 std::string command =
"/usr/bin/mongodump --out=" + path +
" --db=" + db
632 +
" --collection=" + collection +
" --forceTableScan"
633 +
" --host=" + get_hostport(dbcollection);
634 log(std::string(
"Dump command: " + command),
"info");
635 FILE *bash_output = popen(command.c_str(),
"r");
638 log(std::string(
"Unable to dump collection" + collection),
"error");
641 std::string output_string =
"";
643 while (!feof(bash_output)) {
644 if (fgets(buffer, 100, bash_output) == NULL) {
647 output_string += buffer;
650 if (output_string.find(
"Failed") != std::string::npos) {
651 log(std::string(
"Unable to dump collection" + collection),
"error");
652 log_deb(output_string,
"error");
659 RobotMemory::log(
const std::string &what,
const std::string &info)
661 if (!info.compare(
"error"))
662 logger_->log_error(name_,
"%s", what.c_str());
663 else if (!info.compare(
"warn"))
664 logger_->log_warn(name_,
"%s", what.c_str());
665 else if (!info.compare(
"debug"))
666 logger_->log_debug(name_,
"%s", what.c_str());
668 logger_->log_info(name_,
"%s", what.c_str());
672 RobotMemory::log_deb(
const std::string &what,
const std::string &level)
680 RobotMemory::log_deb(
const bsoncxx::document::view &
query,
681 const std::string & what,
682 const std::string & level)
685 log(
query, what, level);
690 RobotMemory::log(
const bsoncxx::document::view &
query,
691 const std::string & what,
692 const std::string & level)
694 log(what +
" " + to_json(
query), level);
702 RobotMemory::is_distributed_database(
const std::string &dbcollection)
704 return std::find(distributed_dbs_.begin(),
705 distributed_dbs_.end(),
706 split_db_collection_string(dbcollection).first)
707 != distributed_dbs_.end();
711 RobotMemory::get_hostport(
const std::string &dbcollection)
713 if (distributed_ && is_distributed_database(dbcollection)) {
714 return config_->get_string(
"/plugins/mongodb/clients/robot-memory-distributed-direct/hostport");
716 return config_->get_string(
"/plugins/mongodb/clients/robot-memory-local-direct/hostport");
726 RobotMemory::get_mongodb_client(
const std::string &collection)
729 return mongodb_client_local_;
731 if (is_distributed_database(collection)) {
732 return mongodb_client_distributed_;
734 return mongodb_client_local_;
745 RobotMemory::get_collection(
const std::string &dbcollection)
747 auto db_coll_pair = split_db_collection_string(dbcollection);
749 if (is_distributed_database(dbcollection)) {
750 client = mongodb_client_distributed_;
752 client = mongodb_client_local_;
754 return client->database(db_coll_pair.first)[db_coll_pair.second];
764 trigger_manager_->remove_trigger(trigger);
774 computables_manager_->remove_computable(computable);
788 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
789 using namespace bsoncxx::builder;
790 basic::document insert_doc{};
791 insert_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
792 subdoc.append(basic::kvp(
"lock-time",
true));
794 insert_doc.append(basic::kvp(
"_id", name));
795 insert_doc.append(basic::kvp(
"locked",
false));
798 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
799 auto write_concern = mongocxx::write_concern();
800 write_concern.majority(std::chrono::milliseconds(0));
801 collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
803 }
catch (operation_exception &e) {
804 logger_->log_info(name_,
"Failed to create mutex %s: %s", name.c_str(), e.what());
818 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
819 using namespace bsoncxx::builder;
820 basic::document destroy_doc;
821 destroy_doc.append(basic::kvp(
"_id", name));
824 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
825 auto write_concern = mongocxx::write_concern();
826 write_concern.majority(std::chrono::milliseconds(0));
827 collection.delete_one(destroy_doc.view(),
828 options::delete_options().write_concern(write_concern));
830 }
catch (operation_exception &e) {
831 logger_->log_info(name_,
"Failed to destroy mutex %s: %s", name.c_str(), e.what());
850 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
852 std::string locked_by{identity};
853 if (identity.empty()) {
855 locked_by = host_info.
name();
859 using namespace bsoncxx::builder;
860 basic::document filter_doc;
861 filter_doc.append(basic::kvp(
"_id", name));
863 filter_doc.append(basic::kvp(
"locked",
false));
866 basic::document update_doc;
867 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
868 subdoc.append(basic::kvp(
"lock-time",
true));
870 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
871 subdoc.append(basic::kvp(
"locked",
true));
872 subdoc.append(basic::kvp(
"locked-by", locked_by));
876 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
877 auto write_concern = mongocxx::write_concern();
878 write_concern.majority(std::chrono::milliseconds(0));
880 collection.find_one_and_update(filter_doc.view(),
882 options::find_one_and_update()
884 .return_document(options::return_document::k_after)
885 .write_concern(write_concern));
890 auto new_view = new_doc->view();
891 return (new_view[
"locked-by"].get_utf8().value.to_string() == locked_by
892 && new_view[
"locked"].get_bool());
894 }
catch (operation_exception &e) {
895 logger_->log_error(name_,
"Mongo OperationException: %s", e.what());
898 basic::document check_doc;
899 check_doc.append(basic::kvp(
"_id", name));
900 check_doc.append(basic::kvp(
"locked",
true));
901 check_doc.append(basic::kvp(
"locked-by", locked_by));
903 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
904 auto res = collection.find_one(check_doc.view());
905 logger_->log_info(name_,
"Checking whether mutex was acquired succeeded");
907 logger_->log_warn(name_,
908 "Exception during try-lock for %s, "
909 "but mutex was still acquired",
912 logger_->log_info(name_,
913 "Exception during try-lock for %s, "
914 "and mutex was not acquired",
917 return static_cast<bool>(res);
918 }
catch (operation_exception &e) {
919 logger_->log_error(name_,
920 "Mongo OperationException while handling "
921 "the first exception: %s",
952 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
954 std::string locked_by{identity};
955 if (identity.empty()) {
957 locked_by = host_info.
name();
960 using namespace bsoncxx::builder;
962 basic::document filter_doc;
963 filter_doc.append(basic::kvp(
"_id", name));
964 filter_doc.append(basic::kvp(
"locked-by", locked_by));
966 basic::document update_doc;
967 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
968 subdoc.append(basic::kvp(
"locked",
false));
970 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
971 subdoc.append(basic::kvp(
"locked-by",
true));
972 subdoc.append(basic::kvp(
"lock-time",
true));
977 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
978 auto write_concern = mongocxx::write_concern();
979 write_concern.majority(std::chrono::milliseconds(0));
981 collection.find_one_and_update(filter_doc.view(),
983 options::find_one_and_update()
985 .return_document(options::return_document::k_after)
986 .write_concern(write_concern));
990 return new_doc->view()[
"locked"].get_bool();
991 }
catch (operation_exception &e) {
1008 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1010 std::string locked_by{identity};
1011 if (identity.empty()) {
1013 locked_by = host_info.
name();
1016 using namespace bsoncxx::builder;
1018 basic::document filter_doc;
1019 filter_doc.append(basic::kvp(
"_id", name));
1020 filter_doc.append(basic::kvp(
"locked",
true));
1021 filter_doc.append(basic::kvp(
"locked-by", locked_by));
1025 basic::document update_doc;
1026 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
1027 subdoc.append(basic::kvp(
"lock-time",
true));
1029 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
1030 subdoc.append(basic::kvp(
"locked",
true));
1031 subdoc.append(basic::kvp(
"locked-by", locked_by));
1036 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1037 auto write_concern = mongocxx::write_concern();
1038 write_concern.majority(std::chrono::milliseconds(0));
1040 collection.find_one_and_update(filter_doc.view(),
1042 options::find_one_and_update()
1044 .return_document(options::return_document::k_after)
1045 .write_concern(write_concern));
1046 return static_cast<bool>(new_doc);
1047 }
catch (operation_exception &e) {
1048 logger_->log_warn(name_,
"Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1069 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1071 auto keys = builder::basic::make_document(builder::basic::kvp(
"lock-time",
true));
1074 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1075 collection.create_index(keys.view(),
1076 builder::basic::make_document(
1077 builder::basic::kvp(
"expireAfterSeconds", max_age_sec)));
1078 }
catch (operation_exception &e) {
1079 logger_->log_warn(name_,
"Creating TTL index failed: %s", e.what());
1094 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1096 using std::chrono::high_resolution_clock;
1097 using std::chrono::milliseconds;
1098 using std::chrono::time_point;
1099 using std::chrono::time_point_cast;
1101 auto max_age_ms = milliseconds(
static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1102 time_point<high_resolution_clock, milliseconds> expire_before =
1103 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1104 types::b_date expire_before_mdb(expire_before);
1107 using namespace bsoncxx::builder;
1108 basic::document filter_doc;
1109 filter_doc.append(basic::kvp(
"locked",
true));
1110 filter_doc.append(basic::kvp(
"lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1111 subdoc.append(basic::kvp(
"$lt", expire_before_mdb));
1114 basic::document update_doc;
1115 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
1116 subdoc.append(basic::kvp(
"locked",
false));
1118 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
1119 subdoc.append(basic::kvp(
"locked-by",
true));
1120 subdoc.append(basic::kvp(
"lock-time",
true));
1125 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1126 auto write_concern = mongocxx::write_concern();
1127 write_concern.majority(std::chrono::milliseconds(0));
1128 collection.update_many(filter_doc.view(),
1130 options::update().write_concern(write_concern));
1132 }
catch (operation_exception &e) {
1133 log(std::string(
"Failed to expire locks: " + std::string(e.what())),
"error");