Fawkes API  Fawkes Development Version
robot_memory.cpp
1 /***************************************************************************
2  * robot_memory.cpp - Class for storing and querying information in the RobotMemory
3  *
4  * Created: Aug 23, 2016 1:34:32 PM 2016
5  * Copyright 2016 Frederik Zwilling
6  * 2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "robot_memory.h"
23 
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
33 #endif
34 #include <utils/time/tracker_macros.h>
35 
36 #include <bsoncxx/builder/basic/document.hpp>
37 #include <chrono>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
41 #include <string>
42 #include <thread>
43 
44 using namespace fawkes;
45 using namespace mongocxx;
46 using namespace bsoncxx;
47 
48 /** @class RobotMemory "robot_memory.h"
49  * Access to the robot memory based on mongodb.
50  * Using this class, you can query/insert/remove/update information in
51  * the robot memory. Furthermore, you can register trigger to get
52  * notified when something was changed in the robot memory matching
53  * your query and you can access computables, which are on demand
54  * computed information, by registering the computables and then
55  * querying as if the information would already be in the database.
56  * @author Frederik Zwilling
57  */
58 
59 /**
60  * Robot Memory Constructor with objects of the thread
61  * @param config Fawkes config
62  * @param logger Fawkes logger
63  * @param clock Fawkes clock
64  * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
65  * @param blackboard Fawkes blackboard
66  */
68  fawkes::Logger * logger,
69  fawkes::Clock * clock,
70  fawkes::MongoDBConnCreator *mongo_connection_manager,
71  fawkes::BlackBoard * blackboard)
72 {
73  config_ = config;
74  logger_ = logger;
75  clock_ = clock;
76  mongo_connection_manager_ = mongo_connection_manager;
77  blackboard_ = blackboard;
78  mongodb_client_local_ = nullptr;
79  mongodb_client_distributed_ = nullptr;
80  debug_ = false;
81 }
82 
83 RobotMemory::~RobotMemory()
84 {
85  mongo_connection_manager_->delete_client(mongodb_client_local_);
86  mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87  delete trigger_manager_;
88  blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
90  delete tt_;
91 #endif
92 }
93 
94 void
95 RobotMemory::init()
96 {
97  //load config values
98  log("Started RobotMemory");
99  default_collection_ = "robmem.test";
100  try {
101  default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
102  } catch (Exception &) {
103  }
104  try {
105  debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
106  } catch (Exception &) {
107  }
108  database_name_ = "robmem";
109  try {
110  database_name_ = config_->get_string("/plugins/robot-memory/database");
111  } catch (Exception &) {
112  }
113  distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
114 
115  cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
116  cfg_coord_mutex_collection_ =
117  config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
118 
119  using namespace std::chrono_literals;
120 
121  //initiate mongodb connections:
122  log("Connect to local mongod");
123  mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
124 
125  if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled")
126  && config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
127  distributed_ = true;
128  log("Connect to distributed mongod");
129  mongodb_client_distributed_ =
130  mongo_connection_manager_->create_client("robot-memory-distributed");
131  }
132 
133  //init blackboard interface
134  rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
135  config_->get_string("/plugins/robot-memory/interface-name").c_str());
136  rm_if_->set_error("");
137  rm_if_->set_result("");
138  rm_if_->write();
139 
140  //Setup event trigger and computables manager
141  trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
142  computables_manager_ = new ComputablesManager(config_, this);
143 
144  log_deb("Initialized RobotMemory");
145 
146 #ifdef USE_TIMETRACKER
147  tt_ = new TimeTracker();
148  tt_loopcount_ = 0;
149  ttc_events_ = tt_->add_class("RobotMemory Events");
150  ttc_cleanup_ = tt_->add_class("RobotMemory Cleanup");
151 #endif
152 }
153 
154 void
155 RobotMemory::loop()
156 {
157  TIMETRACK_START(ttc_events_);
158  trigger_manager_->check_events();
159  TIMETRACK_END(ttc_events_);
160  TIMETRACK_START(ttc_cleanup_);
161  computables_manager_->cleanup_computed_docs();
162  TIMETRACK_END(ttc_cleanup_);
163 #ifdef USE_TIMETRACKER
164  if (++tt_loopcount_ % 5 == 0) {
165  tt_->print_to_stdout();
166  }
167 #endif
168 }
169 
170 /**
171  * Query information from the robot memory.
172  * @param query The query returned documents have to match (essentially a BSONObj)
173  * @param collection_name The database and collection to query as string (e.g. robmem.worldmodel)
174  * @param query_options Optional options to use to query the database
175  * @return Cursor to get the documents from, NULL for invalid query
176  */
177 cursor
178 RobotMemory::query(document::view query,
179  const std::string & collection_name,
180  mongocxx::options::find query_options)
181 {
182  collection collection = get_collection(collection_name);
183  log_deb(std::string("Executing Query " + to_json(query) + " on collection " + collection_name));
184 
185  //check if computation on demand is necessary and execute Computables
186  computables_manager_->check_and_compute(query, collection_name);
187 
188  //lock (mongo_client not thread safe)
189  MutexLocker lock(mutex_);
190 
191  //actually execute query
192  try {
193  return collection.find(query, query_options);
194  } catch (mongocxx::operation_exception &e) {
195  std::string error =
196  std::string("Error for query ") + to_json(query) + "\n Exception: " + e.what();
197  log(error, "error");
198  throw;
199  }
200 }
201 
202 /**
203  * Aggregation call on the robot memory.
204  * @param pipeline Series of commands defining the aggregation
205  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
206  * @return Result object
207  */
208 bsoncxx::document::value
209 RobotMemory::aggregate(const std::vector<bsoncxx::document::view> &pipeline,
210  const std::string & collection)
211 {
212  /*
213  client *mongodb_client = get_mongodb_client(collection);
214  log_deb(std::string("Executing Aggregation on collection " + collection));
215 
216  //TODO: check if computation on demand is necessary and execute Computables
217  // that might be complicated because you need to build a query to check against from the fields mentioned in the different parts of the pipeline
218  // A possible solution might be forcing the user to define the $match oject seperately and using it as query to check computables
219 
220  //lock (mongo_client not thread safe)
221  MutexLocker lock(mutex_);
222 
223  //actually execute aggregation as command (in more modern mongo-cxx versions there should be an easier way with a proper aggregate function)
224  BSONObj res;
225  //get db and collection name
226  size_t point_pos = collection.find(".");
227  if (point_pos == std::string::npos) {
228  logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
229  return fromjson("{}");
230  }
231  std::string db = collection.substr(0, point_pos);
232  std::string col = collection.substr(point_pos + 1);
233  try {
234  mongodb_client->runCommand(db, BSON("aggregate" << col << "pipeline" << pipeline), res);
235  } catch (DBException &e) {
236  std::string error = std::string("Error for aggregation ") + "\n Exception: " + e.toString();
237  log(error, "error");
238  return fromjson("{}");
239  }
240  return res;
241  */
242  throw Exception("Not implemented");
243 }
244 
245 /**
246  * Inserts a document into the robot memory
247  * @param doc A view of the document to insert
248  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
249  * @return 1: Success 0: Error
250  */
251 int
252 RobotMemory::insert(bsoncxx::document::view doc, const std::string &collection_name)
253 {
254  collection collection = get_collection(collection_name);
255  log_deb(std::string("Inserting " + to_json(doc) + " into collection " + collection_name));
256  //lock (mongo_client not thread safe)
257  MutexLocker lock(mutex_);
258  //actually execute insert
259  try {
260  collection.insert_one(doc);
261  } catch (mongocxx::operation_exception &e) {
262  std::string error = "Error for insert " + to_json(doc) + "\n Exception: " + e.what();
263  log_deb(error, "error");
264  return 0;
265  }
266  //return success
267  return 1;
268 }
269 
270 /** Create an index on a collection.
271  * @param keys The keys document
272  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
273  * @param unique true to create unique index
274  * @return 1: Success 0: Error
275  */
276 int
277 RobotMemory::create_index(bsoncxx::document::view keys,
278  const std::string & collection_name,
279  bool unique)
280 {
281  collection collection = get_collection(collection_name);
282 
283  log_deb(std::string("Creating index " + to_json(keys) + " on collection " + collection_name));
284 
285  //lock (mongo_client not thread safe)
286  MutexLocker lock(mutex_);
287 
288  //actually execute insert
289  try {
290  using namespace bsoncxx::builder::basic;
291  collection.create_index(keys, make_document(kvp("unique", unique)));
292  } catch (operation_exception &e) {
293  std::string error = "Error when creating index " + to_json(keys) + "\n Exception: " + e.what();
294  log_deb(error, "error");
295  return 0;
296  }
297  //return success
298  return 1;
299 }
300 
301 /**
302  * Inserts all document of a vector into the robot memory
303  * @param docs The vector of BSON documents as views
304  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
305  * @return 1: Success 0: Error
306  */
307 int
308 RobotMemory::insert(std::vector<bsoncxx::document::view> docs, const std::string &collection_name)
309 {
310  collection collection = get_collection(collection_name);
311  std::string insert_string = "[";
312  for (auto &&doc : docs) {
313  insert_string += to_json(doc) + ",\n";
314  }
315  insert_string += "]";
316 
317  log_deb(std::string("Inserting vector of documents " + insert_string + " into collection "
318  + collection_name));
319 
320  //lock (mongo_client not thread safe)
321  MutexLocker lock(mutex_);
322 
323  //actually execute insert
324  try {
325  collection.insert_many(docs);
326  } catch (operation_exception &e) {
327  std::string error = "Error for insert " + insert_string + "\n Exception: " + e.what();
328  log_deb(error, "error");
329  return 0;
330  }
331  //return success
332  return 1;
333 }
334 
335 /**
336  * Inserts a document into the robot memory
337  * @param obj_str The document as json string
338  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
339  * @return 1: Success 0: Error
340  */
341 int
342 RobotMemory::insert(const std::string &obj_str, const std::string &collection)
343 {
344  return insert(from_json(obj_str), collection);
345 }
346 
347 /**
348  * Updates documents in the robot memory
349  * @param query The query defining which documents to update
350  * @param update What to change in these documents
351  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
352  * @param upsert Should the update document be inserted if the query returns no documents?
353  * @return 1: Success 0: Error
354  */
355 int
356 RobotMemory::update(const bsoncxx::document::view &query,
357  const bsoncxx::document::view &update,
358  const std::string & collection_name,
359  bool upsert)
360 {
361  collection collection = get_collection(collection_name);
362  log_deb(std::string("Executing Update " + to_json(update) + " for query " + to_json(query)
363  + " on collection " + collection_name));
364 
365  //lock (mongo_client not thread safe)
366  MutexLocker lock(mutex_);
367 
368  //actually execute update
369  try {
370  collection.update_many(query,
371  builder::basic::make_document(
372  builder::basic::kvp("$set", builder::concatenate(update))),
373  options::update().upsert(upsert));
374  } catch (operation_exception &e) {
375  log_deb(std::string("Error for update " + to_json(update) + " for query " + to_json(query)
376  + "\n Exception: " + e.what()),
377  "error");
378  return 0;
379  }
380  //return success
381  return 1;
382 }
383 
384 /**
385  * Updates documents in the robot memory
386  * @param query The query defining which documents to update
387  * @param update_str What to change in these documents as json string
388  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
389  * @param upsert Should the update document be inserted if the query returns no documents?
390  * @return 1: Success 0: Error
391  */
392 int
393 RobotMemory::update(const bsoncxx::document::view &query,
394  const std::string & update_str,
395  const std::string & collection,
396  bool upsert)
397 {
398  return update(query, from_json(update_str), collection, upsert);
399 }
400 
401 /** Atomically update and retrieve document.
402  * @param filter The filter defining the document to update.
403  * If multiple match takes the first one.
404  * @param update Update statement. May only contain update operators!
405  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
406  * @param upsert Should the update document be inserted if the query returns no documents?
407  * @param return_new return the document before (false) or after (true) the update has been applied?
408  * @return document, depending on @p return_new either before or after the udpate has been applied.
409  */
410 document::value
411 RobotMemory::find_one_and_update(const document::view &filter,
412  const document::view &update,
413  const std::string & collection_name,
414  bool upsert,
415  bool return_new)
416 {
417  collection collection = get_collection(collection_name);
418 
419  log_deb(std::string("Executing findOneAndUpdate " + to_json(update) + " for filter "
420  + to_json(filter) + " on collection " + collection_name));
421 
422  MutexLocker lock(mutex_);
423 
424  try {
425  auto res =
426  collection.find_one_and_update(filter,
427  update,
428  options::find_one_and_update().upsert(upsert).return_document(
429  return_new ? options::return_document::k_after
430  : options::return_document::k_before));
431  if (res) {
432  return *res;
433  } else {
434  std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
435  + "FindOneAndUpdate unexpectedly did not return a document";
436  log_deb(error, "warn");
437  return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
438  }
439  } catch (operation_exception &e) {
440  std::string error = "Error for update " + to_json(update) + " for query " + to_json(filter)
441  + "\n Exception: " + e.what();
442  log_deb(error, "error");
443  return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp("error", error));
444  }
445 }
446 
447 /**
448  * Remove documents from the robot memory
449  * @param query Which documents to remove
450  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
451  * @return 1: Success 0: Error
452  */
453 int
454 RobotMemory::remove(const bsoncxx::document::view &query, const std::string &collection_name)
455 {
456  //lock (mongo_client not thread safe)
457  MutexLocker lock(mutex_);
458  collection collection = get_collection(collection_name);
459  log_deb(std::string("Executing Remove " + to_json(query) + " on collection " + collection_name));
460  //actually execute remove
461  try {
462  collection.delete_many(query);
463  } catch (operation_exception &e) {
464  log_deb(std::string("Error for query " + to_json(query) + "\n Exception: " + e.what()),
465  "error");
466  return 0;
467  }
468  //return success
469  return 1;
470 }
471 
472 /**
473  * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
474  * @param query Which documents to use for the map step
475  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
476  * @param js_map_fun Map function in JavaScript as string
477  * @param js_reduce_fun Reduce function in JavaScript as string
478  * @return BSONObj containing the result
479  */
480 bsoncxx::document::value
481 RobotMemory::mapreduce(const bsoncxx::document::view &query,
482  const std::string & collection,
483  const std::string & js_map_fun,
484  const std::string & js_reduce_fun)
485 {
486  throw Exception("Not implemented");
487  /*
488  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
489  MutexLocker lock(mutex_);
490  log_deb(std::string("Executing MapReduce " + query.toString() + " on collection " + collection
491  + " map: " + js_map_fun + " reduce: " + js_reduce_fun));
492  return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
493  */
494 }
495 
496 /**
497  * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
498  * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
499  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
500  * @return Cursor to get the documents from, NULL for invalid pipeline
501  */
502 cursor
503 RobotMemory::aggregate(bsoncxx::document::view pipeline, const std::string &collection)
504 {
505  throw Exception("Not implemented");
506  /**
507  mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
508  MutexLocker lock(mutex_);
509  log_deb(std::string("Executing Aggregation pipeline: " + pipeline.toString() + " on collection "
510  + collection));
511 
512  QResCursor cursor;
513  try {
514  cursor = mongodb_client->aggregate(collection, pipeline);
515  } catch (DBException &e) {
516  std::string error =
517  std::string("Error for query ") + pipeline.toString() + "\n Exception: " + e.toString();
518  log(error, "error");
519  return NULL;
520  }
521  return cursor;
522  */
523 }
524 
525 /**
526  * Drop (= remove) a whole collection and all documents inside it
527  * @param collection_name The database and collection to use as string (e.g. robmem.worldmodel)
528  * @return 1: Success 0: Error
529  */
530 int
531 RobotMemory::drop_collection(const std::string &collection_name)
532 {
533  MutexLocker lock(mutex_);
534  collection collection = get_collection(collection_name);
535  log_deb("Dropping collection " + collection_name);
536  collection.drop();
537  return 1;
538 }
539 
540 /**
541  * Remove the whole database of the robot memory and all documents inside
542  * @return 1: Success 0: Error
543  */
544 int
546 {
547  //lock (mongo_client not thread safe)
548  MutexLocker lock(mutex_);
549 
550  log_deb("Clearing whole robot memory");
551  mongodb_client_local_->database(database_name_).drop();
552  return 1;
553 }
554 
555 /**
556  * Restore a previously dumped collection from a directory
557  * @param dbcollection The database and collection to use as string (e.g.
558  * robmem.worldmodel)
559  * @param directory Directory of the dump
560  * @param target_dbcollection Optional different database and collection where
561  * the dump is restored to. If not set, the dump will be restored in the
562  * previous place
563  * @return 1: Success 0: Error
564  */
565 int
566 RobotMemory::restore_collection(const std::string &dbcollection,
567  const std::string &directory,
568  std::string target_dbcollection)
569 {
570  if (target_dbcollection == "") {
571  target_dbcollection = dbcollection;
572  }
573 
574  drop_collection(target_dbcollection);
575 
576  //lock (mongo_client not thread safe)
577  MutexLocker lock(mutex_);
578 
579  auto [db, collection] = split_db_collection_string(dbcollection);
580  std::string path =
581  StringConversions::resolve_path(directory) + "/" + db + "/" + collection + ".bson";
582  log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
583 
584  auto [target_db, target_collection] = split_db_collection_string(target_dbcollection);
585 
586  //call mongorestore from folder with initial restores
587  std::string command = "/usr/bin/mongorestore --dir " + path + " -d " + target_db + " -c "
588  + target_collection + " --host=" + get_hostport(dbcollection);
589  log_deb(std::string("Restore command: " + command), "warn");
590  FILE *bash_output = popen(command.c_str(), "r");
591 
592  //check if output is ok
593  if (!bash_output) {
594  log(std::string("Unable to restore collection" + collection), "error");
595  return 0;
596  }
597  std::string output_string = "";
598  char buffer[100];
599  while (!feof(bash_output)) {
600  if (fgets(buffer, 100, bash_output) == NULL) {
601  break;
602  }
603  output_string += buffer;
604  }
605  pclose(bash_output);
606  if (output_string.find("Failed") != std::string::npos) {
607  log(std::string("Unable to restore collection" + collection), "error");
608  log_deb(output_string, "error");
609  return 0;
610  }
611  return 1;
612 }
613 
614 /**
615  * Dump (= save) a collection to the filesystem to restore it later
616  * @param dbcollection The database and collection to use as string (e.g. robmem.worldmodel)
617  * @param directory Directory to dump the collection to
618  * @return 1: Success 0: Error
619  */
620 int
621 RobotMemory::dump_collection(const std::string &dbcollection, const std::string &directory)
622 {
623  //lock (mongo_client not thread safe)
624  MutexLocker lock(mutex_);
625 
626  std::string path = StringConversions::resolve_path(directory);
627  log_deb(std::string("Dump collection " + dbcollection + " into " + path), "warn");
628 
629  auto [db, collection] = split_db_collection_string(dbcollection);
630 
631  std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + db
632  + " --collection=" + collection + " --forceTableScan"
633  + " --host=" + get_hostport(dbcollection);
634  log(std::string("Dump command: " + command), "info");
635  FILE *bash_output = popen(command.c_str(), "r");
636  //check if output is ok
637  if (!bash_output) {
638  log(std::string("Unable to dump collection" + collection), "error");
639  return 0;
640  }
641  std::string output_string = "";
642  char buffer[100];
643  while (!feof(bash_output)) {
644  if (fgets(buffer, 100, bash_output) == NULL) {
645  break;
646  }
647  output_string += buffer;
648  }
649  pclose(bash_output);
650  if (output_string.find("Failed") != std::string::npos) {
651  log(std::string("Unable to dump collection" + collection), "error");
652  log_deb(output_string, "error");
653  return 0;
654  }
655  return 1;
656 }
657 
658 void
659 RobotMemory::log(const std::string &what, const std::string &info)
660 {
661  if (!info.compare("error"))
662  logger_->log_error(name_, "%s", what.c_str());
663  else if (!info.compare("warn"))
664  logger_->log_warn(name_, "%s", what.c_str());
665  else if (!info.compare("debug"))
666  logger_->log_debug(name_, "%s", what.c_str());
667  else
668  logger_->log_info(name_, "%s", what.c_str());
669 }
670 
671 void
672 RobotMemory::log_deb(const std::string &what, const std::string &level)
673 {
674  if (debug_) {
675  log(what, level);
676  }
677 }
678 
679 void
680 RobotMemory::log_deb(const bsoncxx::document::view &query,
681  const std::string & what,
682  const std::string & level)
683 {
684  if (debug_) {
685  log(query, what, level);
686  }
687 }
688 
689 void
690 RobotMemory::log(const bsoncxx::document::view &query,
691  const std::string & what,
692  const std::string & level)
693 {
694  log(what + " " + to_json(query), level);
695 }
696 
697 /** Check if the given database is a distributed database
698  * @param dbcollection A database collection name pair of the form <dbname>.<collname>
699  * @return true iff the database is distributed database
700  */
701 bool
702 RobotMemory::is_distributed_database(const std::string &dbcollection)
703 {
704  return std::find(distributed_dbs_.begin(),
705  distributed_dbs_.end(),
706  split_db_collection_string(dbcollection).first)
707  != distributed_dbs_.end();
708 }
709 
710 std::string
711 RobotMemory::get_hostport(const std::string &dbcollection)
712 {
713  if (distributed_ && is_distributed_database(dbcollection)) {
714  return config_->get_string("/plugins/mongodb/clients/robot-memory-distributed-direct/hostport");
715  } else {
716  return config_->get_string("/plugins/mongodb/clients/robot-memory-local-direct/hostport");
717  }
718 }
719 
720 /**
721  * Get the mongodb client associated with the collection (eighter the local or distributed one)
722  * @param collection The collection name in the form "<dbname>.<collname>"
723  * @return A pointer to the client for the database with name <dbname>
724  */
725 client *
726 RobotMemory::get_mongodb_client(const std::string &collection)
727 {
728  if (!distributed_) {
729  return mongodb_client_local_;
730  }
731  if (is_distributed_database(collection)) {
732  return mongodb_client_distributed_;
733  } else {
734  return mongodb_client_local_;
735  }
736 }
737 
738 /**
739  * Get the collection object referred to by the given string.
740  * @param dbcollection The name of the collection in the form <dbname>.<collname>
741  * @return The respective collection object
742  */
743 
744 collection
745 RobotMemory::get_collection(const std::string &dbcollection)
746 {
747  auto db_coll_pair = split_db_collection_string(dbcollection);
748  client *client;
749  if (is_distributed_database(dbcollection)) {
750  client = mongodb_client_distributed_;
751  } else {
752  client = mongodb_client_local_;
753  }
754  return client->database(db_coll_pair.first)[db_coll_pair.second];
755 }
756 
757 /**
758  * Remove a previously registered trigger
759  * @param trigger Pointer to the trigger to remove
760  */
761 void
763 {
764  trigger_manager_->remove_trigger(trigger);
765 }
766 
767 /**
768  * Remove previously registered computable
769  * @param computable The computable to remove
770  */
771 void
773 {
774  computables_manager_->remove_computable(computable);
775 }
776 
777 /** Explicitly create a mutex.
778  * This is an optional step, a mutex is also created automatically when trying
779  * to acquire the lock for the first time. Adding it explicitly may increase
780  * visibility, e.g., in the database. Use it for mutexes which are locked
781  * only very infrequently.
782  * @param name mutex name
783  * @return true if operation was successful, false on failure
784  */
785 bool
786 RobotMemory::mutex_create(const std::string &name)
787 {
788  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
789  using namespace bsoncxx::builder;
790  basic::document insert_doc{};
791  insert_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
792  subdoc.append(basic::kvp("lock-time", true));
793  }));
794  insert_doc.append(basic::kvp("_id", name));
795  insert_doc.append(basic::kvp("locked", false));
796  try {
797  MutexLocker lock(mutex_);
798  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
799  auto write_concern = mongocxx::write_concern();
800  write_concern.majority(std::chrono::milliseconds(0));
801  collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
802  return true;
803  } catch (operation_exception &e) {
804  logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
805  return false;
806  }
807 }
808 
809 /** Destroy a mutex.
810  * The mutex is erased from the database. This is done disregarding it's current
811  * lock state.
812  * @param name mutex name
813  * @return true if operation was successful, false on failure
814  */
815 bool
816 RobotMemory::mutex_destroy(const std::string &name)
817 {
818  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
819  using namespace bsoncxx::builder;
820  basic::document destroy_doc;
821  destroy_doc.append(basic::kvp("_id", name));
822  try {
823  MutexLocker lock(mutex_);
824  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
825  auto write_concern = mongocxx::write_concern();
826  write_concern.majority(std::chrono::milliseconds(0));
827  collection.delete_one(destroy_doc.view(),
828  options::delete_options().write_concern(write_concern));
829  return true;
830  } catch (operation_exception &e) {
831  logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
832  return false;
833  }
834 }
835 
836 /** Try to acquire a lock for a mutex.
837  * This will access the database and atomically find and update (or
838  * insert) a mutex lock. If the mutex has not been created it is added
839  * automatically. If the lock cannot be acquired the function also
840  * returns immediately. There is no blocked waiting for the lock.
841  * @param name mutex name
842  * @param identity string to set as lock-holder
843  * @param force true to force acquisition of the lock, i.e., even if
844  * the lock has already been acquired take ownership (steal the lock).
845  * @return true if operation was successful, false on failure
846  */
847 bool
848 RobotMemory::mutex_try_lock(const std::string &name, const std::string &identity, bool force)
849 {
850  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
851 
852  std::string locked_by{identity};
853  if (identity.empty()) {
854  HostInfo host_info;
855  locked_by = host_info.name();
856  }
857 
858  // here we can add an $or to implement lock timeouts
859  using namespace bsoncxx::builder;
860  basic::document filter_doc;
861  filter_doc.append(basic::kvp("_id", name));
862  if (!force) {
863  filter_doc.append(basic::kvp("locked", false));
864  }
865 
866  basic::document update_doc;
867  update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
868  subdoc.append(basic::kvp("lock-time", true));
869  }));
870  update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
871  subdoc.append(basic::kvp("locked", true));
872  subdoc.append(basic::kvp("locked-by", locked_by));
873  }));
874  try {
875  MutexLocker lock(mutex_);
876  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
877  auto write_concern = mongocxx::write_concern();
878  write_concern.majority(std::chrono::milliseconds(0));
879  auto new_doc =
880  collection.find_one_and_update(filter_doc.view(),
881  update_doc.view(),
882  options::find_one_and_update()
883  .upsert(true)
884  .return_document(options::return_document::k_after)
885  .write_concern(write_concern));
886 
887  if (!new_doc) {
888  return false;
889  }
890  auto new_view = new_doc->view();
891  return (new_view["locked-by"].get_utf8().value.to_string() == locked_by
892  && new_view["locked"].get_bool());
893 
894  } catch (operation_exception &e) {
895  logger_->log_error(name_, "Mongo OperationException: %s", e.what());
896  try {
897  // TODO is this extrac check still needed?
898  basic::document check_doc;
899  check_doc.append(basic::kvp("_id", name));
900  check_doc.append(basic::kvp("locked", true));
901  check_doc.append(basic::kvp("locked-by", locked_by));
902  MutexLocker lock(mutex_);
903  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
904  auto res = collection.find_one(check_doc.view());
905  logger_->log_info(name_, "Checking whether mutex was acquired succeeded");
906  if (res) {
907  logger_->log_warn(name_,
908  "Exception during try-lock for %s, "
909  "but mutex was still acquired",
910  name.c_str());
911  } else {
912  logger_->log_info(name_,
913  "Exception during try-lock for %s, "
914  "and mutex was not acquired",
915  name.c_str());
916  }
917  return static_cast<bool>(res);
918  } catch (operation_exception &e) {
919  logger_->log_error(name_,
920  "Mongo OperationException while handling "
921  "the first exception: %s",
922  e.what());
923  return false;
924  }
925  }
926 }
927 
928 /** Try to acquire a lock for a mutex.
929  * This will access the database and atomically find and update (or
930  * insert) a mutex lock. If the mutex has not been created it is added
931  * automatically. If the lock cannot be acquired the function also
932  * returns immediately. There is no blocked waiting for the lock.
933  * @param name mutex name
934  * @param force true to force acquisition of the lock, i.e., even if
935  * the lock has already been acquired take ownership (steal the lock).
936  * @return true if operation was successful, false on failure
937  */
938 bool
939 RobotMemory::mutex_try_lock(const std::string &name, bool force)
940 {
941  return mutex_try_lock(name, "", force);
942 }
943 
944 /** Release lock on mutex.
945  * @param name mutex name
946  * @param identity string to set as lock-holder
947  * @return true if operation was successful, false on failure
948  */
949 bool
950 RobotMemory::mutex_unlock(const std::string &name, const std::string &identity)
951 {
952  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
953 
954  std::string locked_by{identity};
955  if (identity.empty()) {
956  HostInfo host_info;
957  locked_by = host_info.name();
958  }
959 
960  using namespace bsoncxx::builder;
961  // here we can add an $or to implement lock timeouts
962  basic::document filter_doc;
963  filter_doc.append(basic::kvp("_id", name));
964  filter_doc.append(basic::kvp("locked-by", locked_by));
965 
966  basic::document update_doc;
967  update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
968  subdoc.append(basic::kvp("locked", false));
969  }));
970  update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
971  subdoc.append(basic::kvp("locked-by", true));
972  subdoc.append(basic::kvp("lock-time", true));
973  }));
974 
975  try {
976  MutexLocker lock(mutex_);
977  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
978  auto write_concern = mongocxx::write_concern();
979  write_concern.majority(std::chrono::milliseconds(0));
980  auto new_doc =
981  collection.find_one_and_update(filter_doc.view(),
982  update_doc.view(),
983  options::find_one_and_update()
984  .upsert(true)
985  .return_document(options::return_document::k_after)
986  .write_concern(write_concern));
987  if (!new_doc) {
988  return false;
989  }
990  return new_doc->view()["locked"].get_bool();
991  } catch (operation_exception &e) {
992  return false;
993  }
994 }
995 
996 /** Renew a mutex.
997  * Renewing means updating the lock timestamp to the current time to
998  * avoid expiration. Note that the lock must currently be held by
999  * the given identity.
1000  * @param name mutex name
1001  * @param identity string to set as lock-holder (defaults to hostname
1002  * if empty)
1003  * @return true if operation was successful, false on failure
1004  */
1005 bool
1006 RobotMemory::mutex_renew_lock(const std::string &name, const std::string &identity)
1007 {
1008  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1009 
1010  std::string locked_by{identity};
1011  if (identity.empty()) {
1012  HostInfo host_info;
1013  locked_by = host_info.name();
1014  }
1015 
1016  using namespace bsoncxx::builder;
1017  // here we can add an $or to implement lock timeouts
1018  basic::document filter_doc;
1019  filter_doc.append(basic::kvp("_id", name));
1020  filter_doc.append(basic::kvp("locked", true));
1021  filter_doc.append(basic::kvp("locked-by", locked_by));
1022 
1023  // we set all data, even the data which is not actually modified, to
1024  // make it easier to process the update in triggers.
1025  basic::document update_doc;
1026  update_doc.append(basic::kvp("$currentDate", [](basic::sub_document subdoc) {
1027  subdoc.append(basic::kvp("lock-time", true));
1028  }));
1029  update_doc.append(basic::kvp("$set", [locked_by](basic::sub_document subdoc) {
1030  subdoc.append(basic::kvp("locked", true));
1031  subdoc.append(basic::kvp("locked-by", locked_by));
1032  }));
1033 
1034  try {
1035  MutexLocker lock(mutex_);
1036  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1037  auto write_concern = mongocxx::write_concern();
1038  write_concern.majority(std::chrono::milliseconds(0));
1039  auto new_doc =
1040  collection.find_one_and_update(filter_doc.view(),
1041  update_doc.view(),
1042  options::find_one_and_update()
1043  .upsert(false)
1044  .return_document(options::return_document::k_after)
1045  .write_concern(write_concern));
1046  return static_cast<bool>(new_doc);
1047  } catch (operation_exception &e) {
1048  logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1049  return false;
1050  }
1051 }
1052 
1053 /** Setup time-to-live index for mutexes.
1054  * Setting up a time-to-live index for mutexes enables automatic
1055  * expiration through the database. Note, however, that the documents
1056  * are expired only every 60 seconds. This has two consequences:
1057  * - max_age_sec lower than 60 seconds cannot be achieved
1058  * - locks may be held for up to just below 60 seconds longer than
1059  * configured, i.e., if the mutex had not yet expired when the
1060  * background tasks runs.
1061  * @param max_age_sec maximum age of locks in seconds
1062  * @return true if operation was successful, false on failure
1063  */
1064 bool
1066 {
1067  MutexLocker lock(mutex_);
1068 
1069  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1070 
1071  auto keys = builder::basic::make_document(builder::basic::kvp("lock-time", true));
1072 
1073  try {
1074  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1075  collection.create_index(keys.view(),
1076  builder::basic::make_document(
1077  builder::basic::kvp("expireAfterSeconds", max_age_sec)));
1078  } catch (operation_exception &e) {
1079  logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1080  return false;
1081  }
1082  return true;
1083 }
1084 
1085 /** Expire old locks on mutexes.
1086  * This will update the database and set all mutexes to unlocked for
1087  * which the lock-time is older than the given maximum age.
1088  * @param max_age_sec maximum age of locks in seconds
1089  * @return true if operation was successful, false on failure
1090  */
1091 bool
1093 {
1094  client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1095 
1096  using std::chrono::high_resolution_clock;
1097  using std::chrono::milliseconds;
1098  using std::chrono::time_point;
1099  using std::chrono::time_point_cast;
1100 
1101  auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1102  time_point<high_resolution_clock, milliseconds> expire_before =
1103  time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1104  types::b_date expire_before_mdb(expire_before);
1105 
1106  // here we can add an $or to implement lock timeouts
1107  using namespace bsoncxx::builder;
1108  basic::document filter_doc;
1109  filter_doc.append(basic::kvp("locked", true));
1110  filter_doc.append(basic::kvp("lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1111  subdoc.append(basic::kvp("$lt", expire_before_mdb));
1112  }));
1113 
1114  basic::document update_doc;
1115  update_doc.append(basic::kvp("$set", [](basic::sub_document subdoc) {
1116  subdoc.append(basic::kvp("locked", false));
1117  }));
1118  update_doc.append(basic::kvp("$unset", [](basic::sub_document subdoc) {
1119  subdoc.append(basic::kvp("locked-by", true));
1120  subdoc.append(basic::kvp("lock-time", true));
1121  }));
1122 
1123  try {
1124  MutexLocker lock(mutex_);
1125  collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1126  auto write_concern = mongocxx::write_concern();
1127  write_concern.majority(std::chrono::milliseconds(0));
1128  collection.update_many(filter_doc.view(),
1129  update_doc.view(),
1130  options::update().write_concern(write_concern));
1131  return true;
1132  } catch (operation_exception &e) {
1133  log(std::string("Failed to expire locks: " + std::string(e.what())), "error");
1134  return false;
1135  }
1136 }
RobotMemory::create_index
int create_index(bsoncxx::document::view keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
Definition: robot_memory.cpp:277
RobotMemory::mutex_expire_locks
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
Definition: robot_memory.cpp:1092
RobotMemory::remove_trigger
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
Definition: robot_memory.cpp:762
RobotMemory::insert
int insert(bsoncxx::document::view, const std::string &collection="")
Inserts a document into the robot memory.
Definition: robot_memory.cpp:252
ComputablesManager
This class manages registering computables and can check if any computables are invoced by a query.
Definition: computables_manager.h:49
fawkes::MongoDBConnCreator
Interface for a MongoDB connection creator.
Definition: mongodb_conncreator.h:38
fawkes::MutexLocker
Mutex locking helper.
Definition: mutex_locker.h:34
fawkes::BlackBoard
The BlackBoard abstract class.
Definition: blackboard.h:46
EventTrigger
Class holding all information about an EventTrigger.
Definition: event_trigger.h:32
RobotMemory::mutex_renew_lock
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
Definition: robot_memory.cpp:1006
RobotMemory::aggregate
bsoncxx::document::value aggregate(const std::vector< bsoncxx::document::view > &pipeline, const std::string &collection="")
Aggregation call on the robot memory.
Definition: robot_memory.cpp:209
fawkes::Configuration
Interface for configuration handling.
Definition: config.h:65
RobotMemory::restore_collection
int restore_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory", std::string target_dbcollection="")
Restore a previously dumped collection from a directory.
Definition: robot_memory.cpp:566
RobotMemory::RobotMemory
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
Definition: robot_memory.cpp:67
fawkes::StringConversions::resolve_path
static std::string resolve_path(std::string s)
Resolves path-string with @...@ tags.
Definition: string_conversions.cpp:265
RobotMemory::remove_computable
void remove_computable(Computable *computable)
Remove previously registered computable.
Definition: robot_memory.cpp:772
fawkes::HostInfo::name
const char * name()
Get full hostname.
Definition: hostinfo.cpp:100
RobotMemory::mapreduce
bsoncxx::document::value mapreduce(const bsoncxx::document::view &query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
Definition: robot_memory.cpp:481
RobotMemory::mutex_create
bool mutex_create(const std::string &name)
Explicitly create a mutex.
Definition: robot_memory.cpp:786
fawkes::Logger
Interface for logging.
Definition: logger.h:42
fawkes
Fawkes library namespace.
RobotMemory::mutex_try_lock
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
Definition: robot_memory.cpp:939
EventTriggerManager
Manager to realize triggers on events in the robot memory.
Definition: event_trigger_manager.h:48
RobotMemory::remove
int remove(const bsoncxx::document::view &query, const std::string &collection="")
Remove documents from the robot memory.
Definition: robot_memory.cpp:454
RobotMemory::drop_collection
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
Definition: robot_memory.cpp:531
RobotMemory::clear_memory
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
Definition: robot_memory.cpp:545
RobotMemory::mutex_setup_ttl
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
Definition: robot_memory.cpp:1065
RobotMemory::mutex_destroy
bool mutex_destroy(const std::string &name)
Destroy a mutex.
Definition: robot_memory.cpp:816
RobotMemory::find_one_and_update
bsoncxx::document::value find_one_and_update(const bsoncxx::document::view &filter, const bsoncxx::document::view &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
Definition: robot_memory.cpp:411
RobotMemory::dump_collection
int dump_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
Definition: robot_memory.cpp:621
fawkes::TimeTracker
Time tracking utility.
Definition: tracker.h:37
fawkes::HostInfo
Host information.
Definition: hostinfo.h:32
RobotMemory::query
mongocxx::cursor query(bsoncxx::document::view query, const std::string &collection_name="", mongocxx::options::find query_options=mongocxx::options::find())
Query information from the robot memory.
Definition: robot_memory.cpp:178
Computable
Class holding information for a single computable this class also enhances computed documents by addi...
Definition: computable.h:32
RobotMemory::update
int update(const bsoncxx::document::view &query, const bsoncxx::document::view &update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
Definition: robot_memory.cpp:356
RobotMemory::mutex_unlock
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
Definition: robot_memory.cpp:950
fawkes::Clock
This is supposed to be the central clock in Fawkes.
Definition: clock.h:35
fawkes::Exception
Base class for exceptions in Fawkes.
Definition: exception.h:36