Fawkes API  Fawkes Development Version
pcl_db_roscomm_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_merge_roscomm_thread.cpp - ROS communication for pcl-db-merge
4  *
5  * Created: Thu Dec 06 13:54:45 2012
6  * Copyright 2012-2018 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_roscomm_thread.h"
23 
24 #include <blackboard/utils/on_update_waker.h>
25 #include <core/threading/wait_condition.h>
26 #include <interfaces/PclDatabaseMergeInterface.h>
27 #include <interfaces/PclDatabaseRetrieveInterface.h>
28 #include <interfaces/PclDatabaseStoreInterface.h>
29 #include <pcl/pcl_config.h>
30 #include <ros/ros.h>
31 #if PCL_VERSION_COMPARE(>=, 1, 7, 0)
32 # include <pcl/PCLPointCloud2.h>
33 # include <pcl_conversions/pcl_conversions.h>
34 #endif
35 
36 using namespace fawkes;
37 
38 #define CFG_PREFIX "/perception/pcl-db-merge/ros/"
39 
40 /** @class PointCloudDBROSCommThread "pcl_db_roscomm_thread.h"
41  * Thread to merge point clouds from database on request.
42  * @author Tim Niemueller
43  */
44 
45 /** Constructor. */
47 : Thread("PointCloudDBROSCommThread", Thread::OPMODE_WAITFORWAKEUP),
48  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_WORLDSTATE)
49 {
50 }
51 
52 /** Destructor. */
54 {
55 }
56 
57 void
59 {
60  cfg_store_pcl_id_ = config->get_string("/perception/pcl-db-roscomm/store-pcl-id");
61 
62  merge_if_ = blackboard->open_for_reading<PclDatabaseMergeInterface>("PCL Database Merge");
63 
64  merge_waitcond_ = new WaitCondition();
65  merge_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, merge_if_, this);
66 
67  retrieve_if_ =
68  blackboard->open_for_reading<PclDatabaseRetrieveInterface>("PCL Database Retrieve");
69 
70  retrieve_waitcond_ = new WaitCondition();
71  retrieve_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, retrieve_if_, this);
72 
73  store_if_ = blackboard->open_for_reading<PclDatabaseStoreInterface>("PCL Database Store");
74 
75  store_waitcond_ = new WaitCondition();
76  store_update_waker_ = new BlackBoardOnUpdateWaker(blackboard, store_if_, this);
77 
78  srv_merge_ = new ros::ServiceServer();
79  srv_retrieve_ = new ros::ServiceServer();
80  srv_record_ = new ros::ServiceServer();
81  srv_store_ = new ros::ServiceServer();
82 
83  *srv_merge_ =
84  rosnode->advertiseService("/pcl_db/merge", &PointCloudDBROSCommThread::merge_cb, this);
85 
86  *srv_retrieve_ =
87  rosnode->advertiseService("/pcl_db/retrieve", &PointCloudDBROSCommThread::retrieve_cb, this);
88 
89  *srv_store_ =
90  rosnode->advertiseService("/pcl_db/store", &PointCloudDBROSCommThread::store_cb, this);
91 }
92 
93 void
95 {
96  srv_merge_->shutdown();
97  srv_retrieve_->shutdown();
98  srv_store_->shutdown();
99  srv_record_->shutdown();
100  delete srv_merge_;
101  delete srv_retrieve_;
102  delete srv_store_;
103  delete srv_record_;
104 
105  delete merge_update_waker_;
106  delete retrieve_update_waker_;
107  delete store_update_waker_;
108  delete merge_waitcond_;
109  delete retrieve_waitcond_;
110  delete store_waitcond_;
111 
112  blackboard->close(merge_if_);
113  blackboard->close(retrieve_if_);
114  blackboard->close(store_if_);
115 }
116 
117 void
119 {
120  merge_if_->read();
121  if (merge_if_->changed()) {
122  logger->log_info(name(), "Merge interface has changed");
123 
124  logger->log_info(name(),
125  "%u vs. %u final: %s",
126  merge_if_->msgid(),
127  merge_msg_id_,
128  merge_if_->is_final() ? "yes" : "no");
129 
130  if ((merge_if_->msgid() == merge_msg_id_) && merge_if_->is_final()) {
131  logger->log_info(name(), "Merge final");
132  merge_waitcond_->wake_all();
133  }
134  }
135 
136  retrieve_if_->read();
137  if (retrieve_if_->changed()) {
138  logger->log_info(name(), "Retrieve interface has changed");
139 
140  logger->log_info(name(),
141  "%u vs. %u final: %s",
142  retrieve_if_->msgid(),
143  retrieve_msg_id_,
144  retrieve_if_->is_final() ? "yes" : "no");
145 
146  if ((retrieve_if_->msgid() == retrieve_msg_id_) && retrieve_if_->is_final()) {
147  logger->log_info(name(), "Retrieve final");
148  retrieve_waitcond_->wake_all();
149  }
150  }
151 
152  store_if_->read();
153  if (store_if_->changed()) {
154  logger->log_info(name(), "Store interface has changed");
155 
156  logger->log_info(name(),
157  "%u vs. %u final: %s",
158  store_if_->msgid(),
159  store_msg_id_,
160  store_if_->is_final() ? "yes" : "no");
161 
162  if ((store_if_->msgid() == store_msg_id_) && store_if_->is_final()) {
163  logger->log_info(name(), "Store final");
164  store_waitcond_->wake_all();
165  }
166  }
167 }
168 
169 bool
170 PointCloudDBROSCommThread::merge_cb(fawkes_msgs::MergePointClouds::Request & req,
171  fawkes_msgs::MergePointClouds::Response &resp)
172 {
173  PclDatabaseMergeInterface::MergeMessage *mm = new PclDatabaseMergeInterface::MergeMessage();
174 
175  if (req.timestamps.size() > mm->maxlenof_timestamps()) {
176  logger->log_warn(name(),
177  "Number of requested timestamps (%zu) "
178  "exceeds maximum number allowed (%zu)",
179  req.timestamps.size(),
180  mm->maxlenof_timestamps());
181  resp.ok = false;
182  resp.error = "Number of requested timestamps exceeds maximum number allowed";
183  delete mm;
184  return true;
185  }
186  if (req.timestamps.empty()) {
187  logger->log_warn(name(), "No times given in request");
188  resp.ok = false;
189  resp.error = "No times given in request";
190  delete mm;
191  return true;
192  }
193 
194  size_t num_timestamps = mm->maxlenof_timestamps();
195  std::vector<int64_t> timestamps(num_timestamps, 0);
196  for (size_t i = 0; i < req.timestamps.size(); ++i) {
197  timestamps[i] =
198  (int64_t)req.timestamps[i].data.sec * 1000L + (int64_t)req.timestamps[i].data.nsec / 1000000L;
199  }
200  sort(timestamps.begin(), timestamps.begin() + req.timestamps.size());
201  mm->set_timestamps(&timestamps[0]);
202  mm->set_database(req.database.c_str());
203  mm->set_collection(req.collection.c_str());
204 
205  mm->ref();
206  merge_if_->msgq_enqueue(mm);
207  merge_msg_id_ = mm->id();
208  mm->unref();
209 
210  // wait for result
211  merge_waitcond_->wait();
212 
213  // Check result
214  merge_if_->read();
215  if (merge_if_->is_final() && (std::string("") == merge_if_->error())) {
216  resp.ok = true;
217  } else {
218  resp.ok = false;
219  resp.error = merge_if_->error();
220  }
221  return true;
222 }
223 
224 bool
225 PointCloudDBROSCommThread::retrieve_cb(fawkes_msgs::RetrievePointCloud::Request & req,
226  fawkes_msgs::RetrievePointCloud::Response &resp)
227 {
228  PclDatabaseRetrieveInterface::RetrieveMessage *mm =
229  new PclDatabaseRetrieveInterface::RetrieveMessage();
230 
231  int64_t timestamp = (int64_t)req.timestamp.sec * 1000L + (int64_t)req.timestamp.nsec / 1000000L;
232 
233  logger->log_info(
234  name(), "Restoring %li from %s.%s", timestamp, req.database.c_str(), req.collection.c_str());
235 
236  mm->set_timestamp(timestamp);
237  mm->set_database(req.database.c_str());
238  mm->set_collection(req.collection.c_str());
239  mm->set_target_frame(req.target_frame.c_str());
240  mm->set_original_timestamp(req.original_timestamp);
241 
242  mm->ref();
243  retrieve_if_->msgq_enqueue(mm);
244  retrieve_msg_id_ = mm->id();
245  mm->unref();
246 
247  // wait for result
248  retrieve_waitcond_->wait();
249 
250  // Check result
251  retrieve_if_->read();
252  if (retrieve_if_->is_final() && (std::string("") == retrieve_if_->error())) {
253  resp.ok = true;
254  } else {
255  resp.ok = false;
256  resp.error = retrieve_if_->error();
257  }
258  return true;
259 }
260 
261 bool
262 PointCloudDBROSCommThread::store_cb(fawkes_msgs::StorePointCloud::Request & req,
263  fawkes_msgs::StorePointCloud::Response &resp)
264 {
265 #if PCL_VERSION_COMPARE(>=, 1, 7, 0)
266  PclDatabaseStoreInterface::StoreMessage *mm = new PclDatabaseStoreInterface::StoreMessage();
267 
268  logger->log_info(name(),
269  "Storing to %s.%s",
270  (req.database == "") ? "<default>" : req.database.c_str(),
271  (req.collection == "") ? "<default>" : req.collection.c_str());
272 
273  pcl::PCLPointCloud2 pcl_in;
274  pcl_conversions::moveToPCL(req.pointcloud, pcl_in);
275 
278 
279  std::string fields_in = pcl::getFieldsList(pcl_in);
280  std::string fields_xyz = pcl::getFieldsList(**pcl_xyz);
281  std::string fields_xyzrgb = pcl::getFieldsList(**pcl_xyzrgb);
282 
283  if (fields_in == fields_xyz) {
284  pcl::fromPCLPointCloud2(pcl_in, **pcl_xyz);
285  pcl_manager->add_pointcloud(cfg_store_pcl_id_.c_str(), pcl_xyz);
286 
287  } else if (fields_in == fields_xyzrgb) {
288  pcl::fromPCLPointCloud2(pcl_in, **pcl_xyzrgb);
289  pcl_manager->add_pointcloud(cfg_store_pcl_id_.c_str(), pcl_xyzrgb);
290 
291  } else {
292  resp.ok = false;
293  resp.error = "Unsupported point cloud type";
294  }
295 
296  mm->set_pcl_id(cfg_store_pcl_id_.c_str());
297  mm->set_database(req.database.c_str());
298  mm->set_collection(req.collection.c_str());
299 
300  mm->ref();
301  store_if_->msgq_enqueue(mm);
302  store_msg_id_ = mm->id();
303  mm->unref();
304 
305  // wait for result
306  store_waitcond_->wait();
307 
308  pcl_manager->remove_pointcloud(cfg_store_pcl_id_.c_str());
309 
310  // Check result
311  store_if_->read();
312  if (store_if_->is_final() && (std::string("") == store_if_->error())) {
313  resp.ok = true;
314  } else {
315  resp.ok = false;
316  resp.error = store_if_->error();
317  }
318  return true;
319 #else
320  logger->log_warn(name(), "Cannot store point clouds, PCL < 1.7.0");
321  resp.ok = false;
322  resp.error = "Storing only supported with PCL >= 1.7.0";
323  return true;
324 #endif
325 }
326 
327 bool
328 PointCloudDBROSCommThread::record_cb(fawkes_msgs::RecordData::Request & req,
329  fawkes_msgs::RecordData::Response &resp)
330 {
331  logger->log_info(name(), "Recording ordered for %f sec", req.range.toSec());
332  ros::Time begin = ros::Time::now();
333  ros::Time end = begin + req.range;
334  ros::Time::sleepUntil(end);
335  resp.begin = begin;
336  resp.end = end;
337  resp.ok = true;
338  logger->log_info(name(), "Recording done");
339  return true;
340 }
PointCloudDBROSCommThread::PointCloudDBROSCommThread
PointCloudDBROSCommThread()
Constructor.
Definition: pcl_db_roscomm_thread.cpp:46
PointCloudDBROSCommThread::init
virtual void init()
Initialize the thread.
Definition: pcl_db_roscomm_thread.cpp:58
PointCloudDBROSCommThread::loop
virtual void loop()
Code to execute in the thread.
Definition: pcl_db_roscomm_thread.cpp:118
fawkes::WaitCondition
Wait until a given condition holds.
Definition: wait_condition.h:37
fawkes::RefPtr
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:50
fawkes::PointCloudManager::remove_pointcloud
void remove_pointcloud(const char *id)
Remove the point cloud.
Definition: pointcloud_manager.cpp:66
fawkes::ROSAspect::rosnode
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
Definition: ros.h:47
fawkes::Logger::log_info
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
fawkes::BlockedTimingAspect
Thread aspect to use blocked timing.
Definition: blocked_timing.h:51
fawkes::Thread::name
const char * name() const
Get name of thread.
Definition: thread.h:100
fawkes::WaitCondition::wait
void wait()
Wait for the condition forever.
Definition: wait_condition.cpp:139
fawkes::BlackBoardOnUpdateWaker
Wake threads when a blackboard interface is updated.
Definition: on_update_waker.h:35
fawkes::LoggingAspect::logger
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
PointCloudDBROSCommThread::finalize
virtual void finalize()
Finalize the thread.
Definition: pcl_db_roscomm_thread.cpp:94
fawkes::BlackBoard::close
virtual void close(Interface *interface)=0
Close interface.
fawkes
Fawkes library namespace.
pcl::PointCloud< pcl::PointXYZ >
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
fawkes::ConfigurableAspect::config
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
fawkes::WaitCondition::wake_all
void wake_all()
Wake up all waiting threads.
Definition: wait_condition.cpp:287
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
fawkes::BlackBoardAspect::blackboard
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
fawkes::PointCloudManager::add_pointcloud
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT >> cloud)
Add point cloud.
Definition: pointcloud_manager.h:78
PointCloudDBROSCommThread::~PointCloudDBROSCommThread
virtual ~PointCloudDBROSCommThread()
Destructor.
Definition: pcl_db_roscomm_thread.cpp:53
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
fawkes::BlackBoard::open_for_reading
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
fawkes::PointCloudAspect::pcl_manager
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47