Fawkes API  Fawkes Development Version
mongodb_instance_config.cpp
1 
2 /***************************************************************************
3  * mongodb_instance_config.cpp - MongoDB instance configuration
4  *
5  * Created: Wed Jul 12 14:33:02 2017
6  * Copyright 2006-2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_instance_config.h"
23 
24 #include "utils.h"
25 
26 #include <config/config.h>
27 #include <core/exceptions/system.h>
28 #include <utils/sub_process/proc.h>
29 #include <utils/time/wait.h>
30 
31 #include <boost/filesystem.hpp>
32 #include <bsoncxx/builder/basic/document.hpp>
33 #include <bsoncxx/json.hpp>
34 #include <chrono>
35 #include <mongocxx/client.hpp>
36 #include <mongocxx/exception/exception.hpp>
37 #include <numeric>
38 #include <sstream>
39 #include <wordexp.h>
40 
41 using namespace fawkes;
42 using namespace std::chrono_literals;
43 
44 /** @class MongoDBInstanceConfig "mongodb_client_config.h"
45  * MongoDB Instances Configuration.
46  * Configure a single mongod instance that can be run as a sub-process.
47  *
48  * @author Tim Niemueller
49  */
50 
51 /** Constructor.
52  * This will read the given configuration.
53  * @param config configuration to query
54  * @param cfgname configuration name
55  * @param prefix configuration path prefix
56  */
58  std::string cfgname,
59  std::string prefix)
60 : Thread("MongoDBInstance", Thread::OPMODE_CONTINUOUS)
61 {
62  set_name("MongoDBInstance|%s", cfgname.c_str());
63  config_name_ = cfgname;
64 
65  running_ = false;
66 
67  enabled_ = false;
68  try {
69  enabled_ = config->get_bool(prefix + "enabled");
70  } catch (Exception &e) {
71  }
72 
73  if (enabled_) {
74  startup_grace_period_ = 10;
75  try {
76  startup_grace_period_ = config->get_uint(prefix + "startup-grace-period");
77  } catch (Exception &e) {
78  } // ignored, use default
79  loop_interval_ = 5.0;
80  try {
81  loop_interval_ = config->get_float(prefix + "loop-interval");
82  } catch (Exception &e) {
83  } // ignored, use default
84  termination_grace_period_ = config->get_uint(prefix + "termination-grace-period");
85  clear_data_on_termination_ = config->get_bool(prefix + "clear-data-on-termination");
86  port_ = config->get_uint(prefix + "port");
87  bind_ip_ = config->get_string_or_default(std::string(prefix + "bind_ip").c_str(), "0.0.0.0");
88  data_path_ = config->get_string(prefix + "data-path");
89  log_path_ = config->get_string(prefix + "log/path");
90  log_append_ = config->get_bool(prefix + "log/append");
91  try {
92  replica_set_ = config->get_string(prefix + "replica-set");
93  ;
94  } catch (Exception &e) {
95  } // ignored, no replica set
96  if (!replica_set_.empty()) {
97  oplog_size_ = 0;
98  try {
99  oplog_size_ = config->get_uint(prefix + "oplog-size");
100  } catch (Exception &e) {
101  } // ignored, use default
102  }
103  }
104 
105  argv_ = {
106  "mongod", "--bind_ip", bind_ip_, "--port", std::to_string(port_), "--dbpath", data_path_};
107 
108  if (!log_path_.empty()) {
109  if (log_append_) {
110  argv_.push_back("--logappend");
111  }
112  argv_.push_back("--logpath");
113  argv_.push_back(log_path_);
114  }
115 
116  if (!replica_set_.empty()) {
117  argv_.push_back("--replSet");
118  argv_.push_back(replica_set_);
119  if (oplog_size_ > 0) {
120  argv_.push_back("--oplogSize");
121  argv_.push_back(std::to_string(oplog_size_));
122  }
123  }
124 
125  if (enabled_) {
126  std::string extra_args = config->get_string_or_default((prefix + "args").c_str(), "");
127  if (!extra_args.empty()) {
128  wordexp_t p;
129  int wrv = wordexp(extra_args.c_str(), &p, WRDE_NOCMD | WRDE_UNDEF);
130  switch (wrv) {
131  case 0: break; // all good
132  case WRDE_BADCHAR: throw Exception("%s: invalid character in args", name());
133  case WRDE_BADVAL: throw Exception("%s: undefined variable referenced in args", name());
134  case WRDE_CMDSUB:
135  throw Exception("%s: running sub-commands has been disabled for args", name());
136  case WRDE_NOSPACE: throw OutOfMemoryException("Cannot parse args");
137  case WRDE_SYNTAX: throw Exception("%s: shell syntax error in args", name());
138  default: throw Exception("Unexpected wordexp error %d when parsing args", wrv);
139  }
140 
141  // These arguments may not be passed, they are either configured through
142  // config values and could interfere, or they mess with our rs handling.
143  std::vector<std::string> invalid_args = {"--port",
144  "--dbpath",
145  "--fork",
146  "--logappend",
147  "--logpath",
148  "--replSet",
149  "--oplogSize",
150  "--master",
151  "--slave",
152  "--source",
153  "--only"};
154 
155  // pass and verify arguments to be added to command line
156  for (size_t i = 0; i < p.we_wordc; ++i) {
157  for (size_t j = 0; j < invalid_args.size(); ++j) {
158  if (invalid_args[j] == p.we_wordv[i]) {
159  wordfree(&p);
160  throw Exception("%s: %s may not be passed in args", name(), invalid_args[j].c_str());
161  }
162  }
163  argv_.push_back(p.we_wordv[i]);
164  }
165  wordfree(&p);
166  }
167  }
168 
169  command_line_ = std::accumulate(std::next(argv_.begin()),
170  argv_.end(),
171  argv_.front(),
172  [](std::string &s, const std::string &a) { return s + " " + a; });
173 }
174 
175 void
177 {
178  if (enabled_) {
179  logger->log_debug(name(), "enabled: true");
180  logger->log_debug(name(), "TCP port: %u", port_);
181  logger->log_debug(name(), "Termination grace period: %u", termination_grace_period_);
182  logger->log_debug(name(),
183  "clear data on termination: %s",
184  clear_data_on_termination_ ? "yes" : "no");
185  logger->log_debug(name(), "data path: %s", data_path_.c_str());
186  logger->log_debug(name(), "log path: %s", log_path_.c_str());
187  logger->log_debug(name(), "log append: %s", log_append_ ? "yes" : "no");
188  logger->log_debug(name(),
189  "replica set: %s",
190  replica_set_.empty() ? "DISABLED" : replica_set_.c_str());
191  if (!replica_set_.empty()) {
192  logger->log_debug(name(), "Op Log Size: %u MB", oplog_size_);
193  }
194 
195  start_mongod();
196  } else {
197  throw Exception("Instance '%s' cannot be started while disabled", name());
198  }
199 
200  timewait_ = new TimeWait(clock, (int)(loop_interval_ * 1000000.));
201 }
202 
203 void
205 {
206  timewait_->mark_start();
207  if (!running_ || !check_alive()) {
208  logger->log_error(name(), "MongoDB dead, restarting");
209  // on a crash, clean to make sure
210  try {
211  kill_mongod(true);
212  start_mongod();
213  } catch (Exception &e) {
214  logger->log_error(name(), "Failed to start MongoDB: %s", e.what_no_backtrace());
215  }
216  }
217  timewait_->wait_systime();
218 }
219 
220 void
222 {
223  kill_mongod(clear_data_on_termination_);
224  delete timewait_;
225 }
226 
227 /** Get command line used to execute program.
228  * @return command line to run mongod
229  */
230 std::string
232 {
233  return command_line_;
234 }
235 
236 /** Get termination grace period.
237  * @return termination grace period
238  */
239 unsigned int
241 {
242  return termination_grace_period_;
243 }
244 
245 bool
246 MongoDBInstanceConfig::check_alive()
247 {
248  try {
249  mongocxx::client client{mongocxx::uri("mongodb://localhost:" + std::to_string(port_))};
250 
251  using namespace bsoncxx::builder;
252  auto cmd{basic::document{}};
253  cmd.append(basic::kvp("isMaster", 1));
254 
255  auto reply = client["admin"].run_command(cmd.view());
256  bool ok = check_mongodb_ok(reply.view());
257  if (!ok) {
258  logger->log_warn(name(), "Failed to connect: %s", bsoncxx::to_json(reply.view()).c_str());
259  }
260  return ok;
261  } catch (mongocxx::exception &e) {
262  logger->log_warn(name(), "Fail: %s", e.what());
263  return false;
264  }
265 }
266 
267 /** Start mongod. */
268 void
270 {
271  if (running_)
272  return;
273 
274  if (check_alive()) {
275  logger->log_warn(name(), "MongoDB already running, not starting");
276  running_ = true;
277  return;
278  }
279 
280  try {
281  boost::filesystem::create_directories(data_path_);
282  } catch (boost::filesystem::filesystem_error &e) {
283  throw Exception("Failed to create data path '%s' for mongod(%s): %s",
284  data_path_.c_str(),
285  config_name_.c_str(),
286  e.what());
287  }
288 
289  if (!log_path_.empty()) {
290  boost::filesystem::path p(log_path_);
291  try {
292  boost::filesystem::create_directories(p.parent_path());
293  } catch (boost::filesystem::filesystem_error &e) {
294  throw Exception("Failed to create log path '%s' for mongod(%s): %s",
295  p.parent_path().string().c_str(),
296  config_name_.c_str(),
297  e.what());
298  }
299  }
300 
301  std::string progname = "mongod(" + config_name_ + ")";
302  proc_ =
303  std::make_shared<SubProcess>(progname, "mongod", argv_, std::vector<std::string>{}, logger);
304 
305  for (unsigned i = 0; i < startup_grace_period_ * 4; ++i) {
306  if (check_alive()) {
307  running_ = true;
308  return;
309  }
310  std::this_thread::sleep_for(250ms);
311  }
312  if (!running_) {
313  proc_.reset();
314  throw Exception("%s: instance did not start in time", name());
315  }
316 }
317 
318 /** Stop mongod.
319  * This send a SIGINT and then wait for the configured grace period
320  * before sending the TERM signal.
321  * @param clear_data true to clear data, false otherwise
322  */
323 void
325 {
326  if (proc_) {
327  proc_->kill(SIGINT);
328  for (unsigned i = 0; i < termination_grace_period_; ++i) {
329  if (!proc_->alive())
330  break;
331  std::this_thread::sleep_for(1s);
332  }
333  // This will send the term signal
334  proc_.reset();
335  running_ = false;
336  if (clear_data) {
337  try {
338  boost::filesystem::remove_all(data_path_);
339  } catch (boost::filesystem::filesystem_error &e) {
340  throw Exception("Failed to create data path '%s' for mongod(%s): %s",
341  data_path_.c_str(),
342  config_name_.c_str(),
343  e.what());
344  }
345  }
346  }
347 }
MongoDBInstanceConfig::kill_mongod
void kill_mongod(bool clear_data)
Stop mongod.
Definition: mongodb_instance_config.cpp:324
fawkes::Configuration::get_bool
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
fawkes::Thread::name
const char * name() const
Get name of thread.
Definition: thread.h:100
fawkes::ClockAspect::clock
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
MongoDBInstanceConfig::init
virtual void init()
Initialize the thread.
Definition: mongodb_instance_config.cpp:176
fawkes::Configuration
Interface for configuration handling.
Definition: config.h:65
fawkes::LoggingAspect::logger
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
fawkes::Logger::log_error
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
fawkes
Fawkes library namespace.
MongoDBInstanceConfig::start_mongod
void start_mongod()
Start mongod.
Definition: mongodb_instance_config.cpp:269
fawkes::Logger::log_warn
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
MongoDBInstanceConfig::loop
virtual void loop()
Code to execute in the thread.
Definition: mongodb_instance_config.cpp:204
MongoDBInstanceConfig::finalize
virtual void finalize()
Finalize the thread.
Definition: mongodb_instance_config.cpp:221
fawkes::TimeWait::mark_start
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
fawkes::Exception::what_no_backtrace
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
fawkes::Configuration::get_float
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
fawkes::TimeWait
Time wait utility.
Definition: wait.h:33
fawkes::Configuration::get_uint
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
fawkes::TimeWait::wait_systime
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
fawkes::Configuration::get_string
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
fawkes::Configuration::get_string_or_default
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
fawkes::OutOfMemoryException
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
fawkes::Logger::log_debug
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
MongoDBInstanceConfig::termination_grace_period
unsigned int termination_grace_period() const
Get termination grace period.
Definition: mongodb_instance_config.cpp:240
fawkes::Thread::set_name
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
MongoDBInstanceConfig::MongoDBInstanceConfig
MongoDBInstanceConfig(fawkes::Configuration *config, std::string cfgname, std::string prefix)
Constructor.
Definition: mongodb_instance_config.cpp:57
MongoDBInstanceConfig::command_line
std::string command_line() const
Get command line used to execute program.
Definition: mongodb_instance_config.cpp:231
fawkes::Exception
Base class for exceptions in Fawkes.
Definition: exception.h:36