Fawkes API  Fawkes Development Version
pcl_db_store_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_store_thread.cpp - Store point clouds to MongoDB
4  *
5  * Created: Mon May 05 14:26:22 2014
6  * Copyright 2012-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_store_thread.h"
23 
24 #include <blackboard/utils/on_message_waker.h>
25 #include <interfaces/PclDatabaseStoreInterface.h>
26 #include <pcl_utils/pcl_adapter.h>
27 
28 // from MongoDB
29 #include <bsoncxx/builder/basic/document.hpp>
30 #include <mongocxx/client.hpp>
31 #include <mongocxx/exception/operation_exception.hpp>
32 #include <mongocxx/gridfs/bucket.hpp>
33 #include <mongocxx/gridfs/uploader.hpp>
34 
35 #define CFG_PREFIX "/perception/pcl-db/"
36 
37 using namespace fawkes;
38 using namespace mongocxx;
39 
40 /** @class PointCloudDBStoreThread "pcl_db_store_thread.h"
41  * Thread to store point clouds from database on request.
42  * @author Tim Niemueller
43  */
44 
45 /** Constructor. */
47 : Thread("PointCloudDBStoreThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
48 {
49 }
50 
51 /** Destructor. */
53 {
54 }
55 
56 void
58 {
59  store_if_ = NULL;
60  adapter_ = NULL;
61  msg_waker_ = NULL;
62 
63  cfg_database_ = config->get_string(CFG_PREFIX "database-name");
64 
65  adapter_ = new PointCloudAdapter(pcl_manager, logger);
66 
67  try {
68  store_if_ = blackboard->open_for_writing<PclDatabaseStoreInterface>("PCL Database Store");
69 
70  msg_waker_ = new BlackBoardOnMessageWaker(blackboard, store_if_, this);
71  } catch (Exception &e) {
72  finalize();
73  throw;
74  }
75 }
76 
77 void
79 {
80  delete msg_waker_;
81  blackboard->close(store_if_);
82  delete adapter_;
83 }
84 
85 void
87 {
88  if (store_if_->msgq_empty())
89  return;
90 
91  if (PclDatabaseStoreInterface::StoreMessage *msg = store_if_->msgq_first_safe(msg)) {
92  store_if_->set_final(false);
93  store_if_->set_msgid(msg->id());
94  store_if_->set_error("");
95  store_if_->write();
96 
97  std::string msg_database = msg->database();
98  std::string msg_collection = msg->collection();
99 
100  bool store = true;
101  std::string errmsg;
102  std::string database = (msg_database != "") ? msg_database : cfg_database_;
103  std::string collection = database + ".";
104  if (msg_collection == "") {
105  collection += "pcls";
106  } else if (msg_collection.compare(0, 3, "fs.") == 0) {
107  errmsg = "Passed in collection uses GridFS namespace";
108  store = false;
109  } else {
110  collection += msg->collection();
111  }
112 
113  if (store)
114  store_pointcloud(msg->pcl_id(), database, collection, errmsg);
115 
116  store_if_->set_error(errmsg.c_str());
117  store_if_->set_final(true);
118  store_if_->write();
119  } else {
120  logger->log_warn(name(), "Unhandled message received");
121  }
122  store_if_->msgq_pop();
123 }
124 
125 bool
126 PointCloudDBStoreThread::store_pointcloud(std::string pcl_id,
127  std::string database,
128  std::string collection,
129  std::string &errmsg)
130 {
131  if (!pcl_manager->exists_pointcloud(pcl_id.c_str())) {
132  errmsg = "PointCloud does not exist";
133  return false;
134  }
135 
136  std::string frame_id;
137  unsigned int width, height;
138  bool is_dense;
139  void * point_data;
140  size_t point_size, num_points;
141  fawkes::Time time;
143 
144  adapter_->get_data_and_info(pcl_id,
145  frame_id,
146  is_dense,
147  width,
148  height,
149  time,
150  fieldinfo,
151  &point_data,
152  point_size,
153  num_points);
154 
155  size_t data_size = point_size * num_points;
156 
157  auto gridfs = mongodb_client->database(database).gridfs_bucket();
158 
159  std::stringstream name;
160  name << "pcl_" << time.in_msec();
161  auto uploader = gridfs.open_upload_stream(name.str());
162  uploader.write(static_cast<uint8_t *>(point_data), data_size);
163  auto result = uploader.close();
164  using namespace bsoncxx::builder;
165  basic::document document;
166  document.append(basic::kvp("timestamp", static_cast<int64_t>(time.in_msec())));
167  document.append(basic::kvp("pointcloud", [&](basic::sub_document subdoc) {
168  subdoc.append(basic::kvp("frame_id", frame_id));
169  subdoc.append(basic::kvp("is_dense", is_dense));
170  subdoc.append(basic::kvp("width", static_cast<int64_t>(width)));
171  subdoc.append(basic::kvp("height", static_cast<int64_t>(height)));
172  subdoc.append(basic::kvp("point_size", static_cast<int64_t>(point_size)));
173  subdoc.append(basic::kvp("num_points", static_cast<int64_t>(num_points)));
174  subdoc.append(basic::kvp("data", [&](basic::sub_document datadoc) {
175  datadoc.append(basic::kvp("id", result.id()));
176  datadoc.append(basic::kvp("filename", name.str()));
177  }));
178  subdoc.append(basic::kvp("field_info", [fieldinfo](basic::sub_array fi_array) {
179  for (auto fi : fieldinfo) {
180  basic::document fi_doc;
181  fi_doc.append(basic::kvp("name", fi.name));
182  fi_doc.append(basic::kvp("offset", static_cast<int64_t>(fi.offset)));
183  fi_doc.append(basic::kvp("datatype", fi.datatype));
184  fi_doc.append(basic::kvp("count", static_cast<int64_t>(fi.count)));
185  }
186  }));
187  }));
188 
189  try {
190  mongodb_client->database(database)[collection].insert_one(document.view());
191  } catch (mongocxx::operation_exception &e) {
192  logger->log_warn(this->name(), "Failed to insert into %s: %s", collection.c_str(), e.what());
193  errmsg = e.what();
194  return false;
195  }
196 
197  return true;
198 }
fawkes::Time::in_msec
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:235
fawkes::MongoDBAspect
Definition: mongodb.h:43
fawkes::PointCloudManager::exists_pointcloud
bool exists_pointcloud(const char *id)
Check if point cloud exists.
Definition: pointcloud_manager.cpp:86
PointCloudDBStoreThread::~PointCloudDBStoreThread
virtual ~PointCloudDBStoreThread()
Destructor.
Definition: pcl_db_store_thread.cpp:52
PointCloudDBStoreThread::PointCloudDBStoreThread
PointCloudDBStoreThread()
Constructor.
Definition: pcl_db_store_thread.cpp:46
PointCloudDBStoreThread::init
virtual void init()
Initialize the thread.
Definition: pcl_db_store_thread.cpp:57
PointCloudAdapter::V_PointFieldInfo
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:66
fawkes::MongoDBAspect::mongodb_client
mongocxx::client * mongodb_client
Definition: mongodb.h:59
PointCloudDBStoreThread::loop
virtual void loop()
Code to execute in the thread.
Definition: pcl_db_store_thread.cpp:86
PointCloudDBStoreThread::finalize
virtual void finalize()
Finalize the thread.
Definition: pcl_db_store_thread.cpp:78
fawkes::Thread::name
const char * name() const
Definition: thread.h:100
fawkes::LoggingAspect::logger
Logger * logger
Definition: logging.h:53
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
fawkes
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
PointCloudAdapter::get_data_and_info
void get_data_and_info(const std::string &id, std::string &frame_id, bool &is_dense, unsigned int &width, unsigned int &height, fawkes::Time &time, V_PointFieldInfo &pfi, void **data_ptr, size_t &point_size, size_t &num_points)
Get data and info of point cloud.
Definition: pcl_adapter.cpp:219
fawkes::ConfigurableAspect::config
Configuration * config
Definition: configurable.h:53
fawkes::Time
Definition: time.h:98
PointCloudAdapter
Definition: pcl_adapter.h:38
fawkes::Thread
Definition: thread.h:45
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
Definition: blackboard.h:49
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
fawkes::BlackBoardOnMessageWaker
Definition: on_message_waker.h:41
fawkes::PointCloudAspect::pcl_manager
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
fawkes::BlackBoard::open_for_writing
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
fawkes::Exception
Definition: exception.h:41