23 #include "mongodb_log_bb_thread.h"
25 #include <core/threading/mutex_locker.h>
26 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
30 #include <mongocxx/client.hpp>
31 #include <mongocxx/exception/operation_exception.hpp>
33 using namespace mongocxx;
66 std::vector<std::string> includes;
76 if (includes.empty()) {
77 includes.push_back(
"*");
80 std::vector<std::string>::iterator i;
81 std::vector<std::string>::iterator e;
82 for (i = includes.begin(); i != includes.end(); ++i) {
85 std::list<Interface *> current_interfaces =
88 std::list<Interface *>::iterator i;
89 for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
91 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
92 if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
105 listeners_[(*i)->uid()] =
new InterfaceListener(
118 std::map<std::string, InterfaceListener *>::iterator i;
119 for (i = listeners_.begin(); i != listeners_.end(); ++i) {
120 client *mc = i->second->mongodb_client();
138 std::vector<std::string>::iterator e;
139 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
140 if (fnmatch(e->c_str(),
id, 0) != FNM_NOMATCH) {
141 logger->
log_debug(name(),
"Ignoring excluded interface '%s::%s'", type,
id);
147 Interface *
interface = blackboard->open_for_reading(type, id);
148 if (listeners_.find(interface->uid()) == listeners_.end()) {
149 logger->
log_debug(name(),
"Opening new %s", interface->uid());
150 client * mc = mongodb_connmgr->create_client();
152 listeners_[interface->uid()] =
new InterfaceListener(
153 blackboard, interface, mc, database_, collections_, agent_name, logger, now_);
155 logger->
log_warn(name(),
"Interface %s already opened", interface->uid());
156 blackboard->
close(interface);
159 logger->
log_warn(name(),
"Failed to open interface %s::%s, exception follows", type,
id);
174 MongoLogBlackboardThread::InterfaceListener::InterfaceListener(
BlackBoard * blackboard,
177 std::string & database,
179 const std::string & agent_name,
185 agent_name_(agent_name)
188 interface_ = interface;
194 std::string
id = interface->
id();
196 while ((pos =
id.find_first_of(
" -", pos)) != std::string::npos) {
197 id.replace(pos, 1,
"_");
200 collection_ = std::string(interface->
type()) +
"." + id;
201 if (collections_.find(collection_) != collections_.end()) {
202 throw Exception(
"Collection named %s already used, cannot log %s",
207 bbil_add_data_interface(interface);
208 blackboard_->register_listener(
this, BlackBoard::BBIL_FLAG_DATA);
212 MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
214 blackboard_->unregister_listener(
this);
218 MongoLogBlackboardThread::InterfaceListener::bb_interface_data_changed(
Interface *interface)
throw()
225 using namespace bsoncxx::builder;
226 basic::document document;
227 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(now_->in_msec())));
231 bool is_array = (length > 1);
238 document.append(basic::kvp(key, [bools, length](basic::sub_array subarray) {
239 for (
size_t l = 0; l < length; ++l) {
240 subarray.append(bools[l]);
244 document.append(basic::kvp(key, i.
get_bool()));
251 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
252 for (
size_t l = 0; l < length; ++l) {
253 subarray.append(ints[l]);
257 document.append(basic::kvp(key, i.
get_int8()));
264 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
265 for (
size_t l = 0; l < length; ++l) {
266 subarray.append(ints[l]);
270 document.append(basic::kvp(key, i.
get_uint8()));
277 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
278 for (
size_t l = 0; l < length; ++l) {
279 subarray.append(ints[l]);
283 document.append(basic::kvp(key, i.
get_int16()));
290 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
291 for (
size_t l = 0; l < length; ++l) {
292 subarray.append(ints[l]);
296 document.append(basic::kvp(key, i.
get_uint16()));
303 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
304 for (
size_t l = 0; l < length; ++l) {
305 subarray.append(ints[l]);
309 document.append(basic::kvp(key, i.
get_int32()));
316 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
317 for (
size_t l = 0; l < length; ++l) {
318 subarray.append(
static_cast<int64_t
>(ints[l]));
322 document.append(basic::kvp(key,
static_cast<int64_t
>(i.
get_uint32())));
329 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
330 for (
size_t l = 0; l < length; ++l) {
331 subarray.append(ints[l]);
335 document.append(basic::kvp(key, i.
get_int64()));
342 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
343 for (
size_t l = 0; l < length; ++l) {
344 subarray.append(
static_cast<int64_t
>(ints[l]));
348 document.append(basic::kvp(key,
static_cast<int64_t
>(i.
get_uint64())));
355 document.append(basic::kvp(key, [floats, length](basic::sub_array subarray) {
356 for (
size_t l = 0; l < length; ++l) {
357 subarray.append(floats[l]);
361 document.append(basic::kvp(key, i.
get_float()));
368 document.append(basic::kvp(key, [doubles, length](basic::sub_array subarray) {
369 for (
size_t l = 0; l < length; ++l) {
370 subarray.append(doubles[l]);
374 document.append(basic::kvp(key, i.
get_double()));
383 document.append(basic::kvp(key, [bytes, length](basic::sub_array subarray) {
384 for (
size_t l = 0; l < length; ++l) {
385 subarray.append(bytes[l]);
389 document.append(basic::kvp(key, i.
get_byte()));
396 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
397 for (
size_t l = 0; l < length; ++l) {
398 subarray.append(ints[l]);
402 document.append(basic::kvp(key, i.
get_enum()));
408 document.append(basic::kvp(
"agent-name", agent_name_));
409 mongodb_->database(database_)[collection_].insert_one(document.view());
410 }
catch (operation_exception &e) {
412 bbil_name(),
"Failed to log to %s.%s: %s", database_.c_str(), collection_.c_str(), e.what());
413 }
catch (std::exception &e) {
414 logger_->log_warn(bbil_name(),
415 "Failed to log to %s.%s: %s (*)",