45 #include <D4ParserSax2.h>
46 #include <XMLWriter.h>
47 #include <BaseTypeFactory.h>
48 #include <D4BaseTypeFactory.h>
50 #include "PicoSHA2/picosha2.h"
53 #include "TheBESKeys.h"
56 #include "BESContextManager.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
65 #include "GlobalMetadataStore.h"
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
71 #define AT_EXIT(x) atexit((x))
85 #undef SYMETRIC_ADD_RESPONSES
87 #define prolog std::string("GlobalMetadataStore::").append(__func__).append("() - ")
93 static const unsigned int default_cache_size = 20;
94 static const string default_cache_prefix =
"mds";
95 static const string default_cache_dir =
"";
96 static const string default_ledger_name =
"mds_ledger.txt";
98 static const string PATH_KEY =
"DAP.GlobalMetadataStore.path";
99 static const string PREFIX_KEY =
"DAP.GlobalMetadataStore.prefix";
100 static const string SIZE_KEY =
"DAP.GlobalMetadataStore.size";
101 static const string LEDGER_KEY =
"DAP.GlobalMetadataStore.ledger";
102 static const string LOCAL_TIME_KEY =
"BES.LogTimeLocal";
105 bool GlobalMetadataStore::d_enabled =
true;
121 void GlobalMetadataStore::transfer_bytes(
int fd, ostream &os)
123 static const int BUFFER_SIZE = 16*1024;
125 #if _POSIX_C_SOURCE >= 200112L
127 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
129 ERROR_LOG(prolog <<
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
132 char buf[BUFFER_SIZE + 1];
134 while(
int bytes_read = read(fd, buf, BUFFER_SIZE))
137 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
141 os.write(buf, bytes_read);
157 void GlobalMetadataStore::insert_xml_base(
int fd, ostream &os,
const string &xml_base)
159 static const int BUFFER_SIZE = 1024;
161 #if _POSIX_C_SOURCE >= 200112L
163 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
165 ERROR_LOG(prolog <<
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
168 char buf[BUFFER_SIZE + 1];
169 size_t bytes_read = read(fd, buf, BUFFER_SIZE);
171 if(bytes_read == (
size_t)-1)
172 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
189 while (buf[i++] !=
'>')
196 char xml_base_literal[] =
"xml:base";
197 while (i < bytes_read) {
199 os.write(buf + s, i - s);
200 os <<
" xml:base=\"" << xml_base <<
"\"";
203 else if (j ==
sizeof(xml_base_literal) - 1) {
204 os.write(buf + s, i - s);
205 while (buf[i++] !=
'=')
207 while (buf[i++] !=
'"')
209 while (buf[i++] !=
'"')
211 os <<
"=\"" << xml_base <<
"\"";
214 else if (buf[i] == xml_base_literal[j]) {
225 os.write(buf + i, bytes_read - i);
228 transfer_bytes(fd, os);
231 unsigned long GlobalMetadataStore::get_cache_size_from_config()
235 unsigned long size_in_megabytes = default_cache_size;
239 "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY <<
"=" << size << endl);
240 istringstream iss(size);
241 iss >> size_in_megabytes;
244 return size_in_megabytes;
247 string GlobalMetadataStore::get_cache_prefix_from_config()
250 string prefix = default_cache_prefix;
254 "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY <<
"=" << prefix << endl);
262 string GlobalMetadataStore::get_cache_dir_from_config()
266 string cacheDir = default_cache_dir;
270 "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<<
"=" << cacheDir << endl);
308 GlobalMetadataStore::get_instance(
const string &cache_dir,
const string &prefix,
unsigned long long size)
310 if (d_enabled && d_instance == 0) {
312 d_enabled = d_instance->cache_enabled();
317 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
320 AT_EXIT(delete_instance);
322 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
326 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
338 GlobalMetadataStore::get_instance()
340 if (d_enabled && d_instance == 0) {
341 d_instance =
new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
342 get_cache_size_from_config());
343 d_enabled = d_instance->cache_enabled();
347 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
350 AT_EXIT(delete_instance);
352 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
356 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance() - d_instance: " << (
void *) d_instance << endl);
366 GlobalMetadataStore::initialize()
372 BESDEBUG(DEBUG_KEY,
"Located BES key " << LEDGER_KEY <<
"=" << d_ledger_name << endl);
375 d_ledger_name = default_ledger_name;
378 ofstream of(d_ledger_name.c_str(), ios::app);
381 string local_time =
"no";
383 d_use_local_time = (local_time ==
"YES" || local_time ==
"Yes" || local_time ==
"yes");
401 GlobalMetadataStore::GlobalMetadataStore()
402 :
BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
418 static void dump_time(ostream &os,
bool use_local_time)
422 char buf[
sizeof "YYYY-MM-DDTHH:MM:SSzone"];
432 status = strftime(buf,
sizeof buf,
"%FT%T%Z", gmtime(&now));
434 status = strftime(buf,
sizeof buf,
"%FT%T%Z", localtime(&now));
437 ERROR_LOG(prolog <<
"Error getting time for Metadata Store ledger.");
451 if (get_exclusive_lock(d_ledger_name, fd)) {
452 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Ledger " << d_ledger_name <<
" write locked." << endl);
455 dump_time(of, d_use_local_time);
456 of <<
" " << d_ledger_entry << endl;
457 VERBOSE(
"MDS Ledger name: '" << d_ledger_name <<
"', entry: '" << d_ledger_entry +
"'.");
466 ERROR_LOG(prolog <<
"Warning: Metadata store could not write to its ledger file.");
471 throw BESInternalError(
"Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
485 throw BESInternalError(
"Empty name passed to the Metadata Store.", __FILE__, __LINE__);
487 return picosha2::hash256_hex_string(name[0] ==
'/' ? name :
"/" + name);
509 D4BaseTypeFactory factory;
510 DMR dmr(&factory, *d_dds);
517 d_dmr->print_dap4(xml);
530 d_dmr->getDDS()->print(os);
538 d_dds->print_das(os);
540 d_dmr->getDDS()->print_das(os);
561 const string &response_name)
563 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" BEGIN " << key << endl);
569 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Storing " << item_name << endl);
572 ofstream response(item_name.c_str(), ios::out|ios::app);
573 if (!response.is_open())
574 throw BESInternalError(
"Could not open '" + key +
"' to write the response.", __FILE__, __LINE__);
583 if (!
is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
601 VERBOSE(
"Metadata store: Wrote " << response_name <<
" response for '" << name <<
"'." << endl);
602 d_ledger_entry.append(
" ").append(key);
608 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Found " << item_name <<
" in the store already." << endl);
611 ERROR_LOG(prolog <<
"Metadata store: unable to store the " << response_name <<
" response for '" << name <<
"'." << endl);
616 throw BESInternalError(
"Could neither create or open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
650 d_ledger_entry = string(
"add DDS ").append(name);
663 #if SYMETRIC_ADD_RESPONSES
670 #if SYMETRIC_ADD_RESPONSES
671 return (stored_dds && stored_das && stored_dmr);
673 return (stored_dds && stored_das);
692 d_ledger_entry = string(
"add DMR ").append(name);
699 #if SYMETRIC_ADD_RESPONSES
712 #if SYMETRIC_ADD_RESPONSES
713 return (stored_dds && stored_das && stored_dmr);
735 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS hashing name '" << name <<
"', '" << suffix <<
"'"<< endl);
739 "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
744 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS lock for " << item_name <<
": " << lock() << endl);
747 INFO_LOG(prolog <<
"MDS Cache hit for '" << name <<
"' and response " << object_name << endl);
749 INFO_LOG(prolog <<
"MDS Cache miss for '" << name <<
"' and response " << object_name << endl);
977 time_t file_time = besRH->
get_lmt(realName);
983 if (file_time > cache_time){
1002 struct stat statbuf;
1004 if (stat(item_name.c_str(), &statbuf) == -1){
1008 return statbuf.st_mtime;
1027 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
1028 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1039 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
1053 const string &object_name)
1058 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
1059 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1072 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
1111 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
1113 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1116 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
1134 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
1136 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1139 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
1157 string hash =
get_hash(name + suffix);
1159 VERBOSE(
"Metadata store: Removed " << object_name <<
" response for '" << hash <<
"'." << endl);
1160 d_ledger_entry.append(
" ").append(hash);
1164 ERROR_LOG(prolog <<
"Metadata store: unable to remove the " << object_name <<
" response for '" << name <<
"' (" << strerror(errno) <<
")."<< endl);
1180 d_ledger_entry = string(
"remove ").append(name);
1192 #if SYMETRIC_ADD_RESPONSES
1193 return (removed_dds && removed_das && removed_dmr);
1195 return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1217 D4BaseTypeFactory d4_btf;
1218 auto_ptr<DMR> dmr(
new DMR(&d4_btf,
"mds"));
1220 D4ParserSax2 parser;
1221 parser.intern(oss.str(), dmr.get());
1223 dmr->set_factory(0);
1225 return dmr.release();
1254 fstream dds_fs(dds_tmp.
get_name().c_str(), std::fstream::out);
1264 BaseTypeFactory btf;
1265 auto_ptr<DDS> dds(
new DDS(&btf));
1269 fstream das_fs(das_tmp.
get_name().c_str(), std::fstream::out);
1279 auto_ptr<DAS> das(
new DAS());
1282 dds->transfer_attributes(das.get());
1283 dds->set_factory(0);
1285 return dds.release();
1290 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das,
const std::string &name) {
1291 string suffix =
"das_r";
1295 VERBOSE(
"Metadata store: Cache hit: read " <<
" response for '" << name <<
"'." << endl);
1296 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1299 das->parse(item_name);
1308 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
A container is something that holds data. E.G., a netcdf file or a database entry.
std::string get_relative_name() const
Get the relative name of the object in this container.
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
static TheBESKeys * TheKeys()
Get a new temporary file.
std::string get_name() const