22 #include "event_trigger_manager.h"
24 #ifdef USE_TIMETRACKER
25 # include <utils/time/tracker.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/time/tracker_macros.h>
30 #include <boost/bind.hpp>
31 #include <bsoncxx/json.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
33 #include <mongocxx/exception/query_exception.hpp>
36 using namespace mongocxx;
56 mongo_connection_manager_ = mongo_connection_manager;
58 con_local_ = mongo_connection_manager_->
create_client(
"robot-memory-local");
59 if (config_->
exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
60 && config_->
get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
61 con_replica_ = mongo_connection_manager_->
create_client(
"robot-memory-distributed");
65 std::string local_db = config_->
get_string(
"/plugins/robot-memory/database");
66 dbnames_local_.push_back(local_db);
67 dbnames_distributed_ = config_->
get_strings(
"/plugins/robot-memory/distributed-db-names");
72 cfg_debug_ = config->
get_bool(
"/plugins/robot-memory/more-debug-output");
75 #ifdef USE_TIMETRACKER
77 ttc_trigger_loop_ = tt_->add_class(
"RM Trigger Trigger Loop");
78 ttc_callback_loop_ = tt_->add_class(
"RM Trigger Callback Loop");
79 ttc_callback_ = tt_->add_class(
"RM Trigger Single Callback");
80 ttc_reinit_ = tt_->add_class(
"RM Trigger Reinit");
84 EventTriggerManager::~EventTriggerManager()
92 #ifdef USE_TIMETRACKER
98 EventTriggerManager::check_events()
103 TIMETRACK_START(ttc_trigger_loop_);
107 auto next = trigger->change_stream.begin();
108 TIMETRACK_START(ttc_callback_loop_);
109 while (next != trigger->change_stream.end()) {
112 TIMETRACK_START(ttc_callback_);
113 trigger->callback(*next);
115 TIMETRACK_END(ttc_callback_);
117 TIMETRACK_END(ttc_callback_loop_);
118 }
catch (operation_exception &e) {
119 logger_->
log_error(name.c_str(),
"Error while reading the change stream");
125 TIMETRACK_START(ttc_reinit_);
127 logger_->
log_debug(name.c_str(),
"Tailable Cursor is dead, requerying");
130 if (std::find(dbnames_distributed_.begin(),
131 dbnames_distributed_.end(),
133 != dbnames_distributed_.end()) {
138 auto db_coll_pair = split_db_collection_string(trigger->ns);
139 auto collection = con->database(db_coll_pair.first)[db_coll_pair.second];
141 trigger->change_stream = create_change_stream(collection, trigger->filter_query.view());
142 }
catch (mongocxx::query_exception &e) {
144 "Failed to create change stream, broken trigger for collection %s: %s",
148 TIMETRACK_END(ttc_reinit_);
151 TIMETRACK_END(ttc_trigger_loop_);
152 #ifdef USE_TIMETRACKER
153 if (++tt_loopcount_ % 5 == 0) {
154 tt_->print_to_stdout();
166 triggers.remove(trigger);
171 EventTriggerManager::create_change_stream(mongocxx::collection &coll, bsoncxx::document::view query)
177 if (!query.empty()) {
180 mongocxx::options::change_stream opts;
181 opts.full_document(
"updateLookup");
182 opts.max_await_time(std::chrono::milliseconds(1));
183 auto res = coll.watch(opts);
185 auto it = res.begin();
186 while (std::next(it) != res.end()) {}
198 std::string::size_type dot_pos = ns.find(
".");
199 if (dot_pos == std::string::npos) {
202 return ns.substr(0, dot_pos);