34 #include <TheBESKeys.h>
35 #include <BESInternalError.h>
36 #include <BESSyntaxUserError.h>
37 #include <BESForbiddenError.h>
38 #include <BESContextManager.h>
41 #include "xml2json/include/xml2json.hpp"
44 #include "CurlUtils.h"
45 #include "CurlHandlePool.h"
46 #include "EffectiveUrlCache.h"
47 #include "DmrppRequestHandler.h"
48 #include "DmrppNames.h"
53 #define prolog std::string("Chunk::").append(__func__).append("() - ")
70 size_t chunk_header_callback(
char *buffer,
size_t ,
size_t nitems,
void *data) {
76 string header(buffer, buffer + nitems - 2);
79 string::size_type pos;
80 if ((pos = header.find(
"Content-Type")) != string::npos) {
82 auto c_ptr =
reinterpret_cast<Chunk *
>(data);
83 c_ptr->set_response_content_type(header.substr(header.find_last_of(
' ') + 1));
102 size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data) {
103 size_t nbytes = size * nmemb;
104 auto chunk =
reinterpret_cast<Chunk *
>(data);
106 BESDEBUG(MODULE, prolog <<
"BEGIN chunk->get_response_content_type():" << chunk->get_response_content_type()
107 <<
" chunk->get_data_url(): " << chunk->get_data_url() << endl);
110 if (chunk->get_response_content_type().find(
"application/xml") != string::npos) {
113 string xml_message =
reinterpret_cast<const char *
>(buffer);
114 xml_message.erase(xml_message.find_last_not_of(
"\t\n\v\f\r 0") + 1);
119 string json_message = xml2json(xml_message.c_str());
120 stringstream aws_msg;
121 aws_msg << prolog <<
"AWS S3 Access Error:" << json_message;
122 BESDEBUG(MODULE, aws_msg.str() << endl);
123 VERBOSE(aws_msg.str() << endl);
126 d.Parse(json_message.c_str());
134 msg << prolog <<
"Error accessing object store data. (Tried: " << chunk->get_data_url() <<
")" <<
135 " Message " << message.GetString();
136 BESDEBUG(MODULE, msg.str() << endl);
137 if (
string(code.GetString()) ==
"AccessDenied") {
149 catch (std::exception &e) {
151 msg << prolog <<
"Error accessing object store data. (Tried: " << chunk->get_data_url() <<
")" <<
152 " Message " << e.what();
153 BESDEBUG(MODULE, msg.str() << endl);
162 unsigned long long bytes_read = chunk->get_bytes_read();
165 if (bytes_read + nbytes > chunk->get_rbuf_size()) {
167 msg << prolog <<
"ERROR! The number of bytes_read: " << bytes_read <<
" plus the number of bytes to read: "
168 << nbytes <<
" is larger than the target buffer size: " << chunk->get_rbuf_size();
169 BESDEBUG(MODULE, msg.str() << endl);
170 DmrppRequestHandler::curl_handle_pool->release_all_handles();
174 memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
175 chunk->set_bytes_read(bytes_read + nbytes);
177 BESDEBUG(MODULE, prolog <<
"END" << endl);
192 void inflate(
char *dest,
unsigned int dest_len,
char *src,
unsigned int src_len) {
196 assert(dest_len > 0);
203 memset(&z_strm, 0,
sizeof(z_strm));
204 z_strm.next_in = (Bytef *) src;
205 z_strm.avail_in = src_len;
206 z_strm.next_out = (Bytef *) dest;
207 z_strm.avail_out = dest_len;
210 if (Z_OK != inflateInit(&z_strm))
211 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
217 status = inflate(&z_strm, Z_SYNC_FLUSH);
220 if (Z_STREAM_END == status)
break;
223 if (Z_OK != status) {
224 (void) inflateEnd(&z_strm);
225 throw BESError(
"Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
232 if (0 == z_strm.avail_out) {
233 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__,
241 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
242 (void) inflateEnd(&z_strm);
243 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
248 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
249 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
253 }
while (status == Z_OK);
256 (void) inflateEnd(&z_strm);
286 void unshuffle(
char *dest,
const char *src,
unsigned int src_size,
unsigned int width) {
287 unsigned int elems = src_size / width;
290 if (!(width > 1 && elems > 1)) {
291 memcpy(dest,
const_cast<char *
>(src), src_size);
295 char *_src =
const_cast<char *
>(src);
299 for (
unsigned int i = 0; i < width; i++) {
311 size_t duffs_index = (elems + 7) / 8;
314 assert(0 &&
"This Should never be executed!");
319 #define DUFF_GUTS *_dest = *_src++; _dest += width;
336 }
while (--duffs_index > 0);
344 size_t leftover = src_size % width;
349 _dest -= (width - 1);
350 memcpy((
void *) _dest, (
void *) _src, leftover);
356 void Chunk::parse_chunk_position_in_array_string(
const string &pia, vector<unsigned int> &cpia_vect){
357 if (pia.empty())
return;
359 if (!cpia_vect.empty()) cpia_vect.clear();
363 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
364 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
366 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
367 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
370 istringstream iss(pia.substr(1, pia.length() - 2));
376 cpia_vect.push_back(i);
395 void Chunk::set_position_in_array(
const string &pia) {
397 if (pia.empty())
return;
399 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
403 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
404 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
406 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
407 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
410 istringstream iss(pia.substr(1, pia.length() - 2));
416 d_chunk_position_in_array.push_back(i);
420 parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
431 void Chunk::set_position_in_array(
const std::vector<unsigned int> &pia) {
432 if (pia.empty())
return;
434 if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
436 d_chunk_position_in_array = pia;
446 string Chunk::get_curl_range_arg_string() {
447 return curl::get_range_arg_string(d_offset, d_size);
461 void Chunk::add_tracking_query_param() {
476 string aws_s3_url_https(
"https://s3.amazonaws.com/");
477 string aws_s3_url_http(
"http://s3.amazonaws.com/");
480 if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
483 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(S3_TRACKING_CONTEXT, found);
485 d_query_marker.append(
"?").append(S3_TRACKING_CONTEXT).append(
"=").append(cloudydap_context_value);
502 void *inflate_chunk(
void *arg_list)
504 inflate_chunk_args *args =
reinterpret_cast<inflate_chunk_args*
>(arg_list);
507 args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
530 void Chunk::inflate_chunk(
bool deflate,
bool shuffle,
unsigned int chunk_size,
unsigned int elem_width) {
545 chunk_size *= elem_width;
548 char *dest =
new char[chunk_size];
550 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
553 set_read_buffer(dest, chunk_size, chunk_size,
true);
555 set_rbuf(dest, chunk_size);
566 char *dest =
new char[get_rbuf_size()];
568 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
570 set_read_buffer(dest,get_rbuf_size(),get_rbuf_size(),
true);
572 set_rbuf(dest, get_rbuf_size());
581 d_is_inflated =
true;
585 unsigned long long chunk_buf_size = get_rbuf_size();
586 dods_float32 *vals = (dods_float32 *) get_rbuf();
588 (*os) << std::fixed << std::setfill(
'_') << std::setw(10) << std::setprecision(0);
589 (*os) <<
"DmrppArray::"<< __func__ <<
"() - Chunk[" << i <<
"]: " << endl;
590 for(
unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
591 (*os) << vals[k] <<
", " << ((k==0)|((k+1)%10)?
"":
"\n");
606 void Chunk::read_chunk() {
608 BESDEBUG(MODULE, prolog <<
"Already been read! Returning." << endl);
614 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
616 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
620 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
623 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
628 if (get_size() != get_bytes_read()) {
630 oss <<
"Wrong number of bytes read for chunk; read: " << get_bytes_read() <<
", expected: " << get_size();
646 void Chunk::dump(ostream &oss)
const {
648 oss <<
"[ptr='" << (
void *)
this <<
"']";
649 oss <<
"[data_url='" << d_data_url <<
"']";
650 oss <<
"[offset=" << d_offset <<
"]";
651 oss <<
"[size=" << d_size <<
"]";
652 oss <<
"[chunk_position_in_array=(";
653 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
655 oss << d_chunk_position_in_array[i];
658 oss <<
"[is_read=" << d_is_read <<
"]";
659 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
662 string Chunk::to_string()
const {
663 std::ostringstream oss;
669 std::string Chunk::get_data_url()
const {
671 string data_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
672 BESDEBUG(MODULE, prolog <<
"Using data_url: " << data_url << endl);
676 if (!d_query_marker.empty()) {
677 return data_url + d_query_marker;
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static std::ostream * GetStrm()
return the debug stream
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Abstract exception class for the BES with basic string message.
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
error thrown if there is a user syntax error in the request or any other user error
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.