bes  Updated for version 3.20.10
Chunk.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: Nathan Potter <ndp@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <sstream>
27 #include <cstring>
28 #include <cassert>
29 
30 #include <zlib.h>
31 
32 #include <BESDebug.h>
33 #include <BESLog.h>
34 #include <BESInternalError.h>
35 #include <BESSyntaxUserError.h>
36 #include <BESForbiddenError.h>
37 #include <BESContextManager.h>
38 #include <BESUtil.h>
39 
40 #define PUGIXML_NO_XPATH
41 #define PUGIXML_HEADER_ONLY
42 #include <pugixml.hpp>
43 
44 #include "Chunk.h"
45 #include "CurlUtils.h"
46 #include "CurlHandlePool.h"
47 #include "EffectiveUrlCache.h"
48 #include "DmrppRequestHandler.h"
49 #include "DmrppNames.h"
50 
51 using namespace std;
53 
54 #define prolog std::string("Chunk::").append(__func__).append("() - ")
55 
56 #define FLETCHER32_CHECKSUM 4 // Bytes in the fletcher32 checksum
57 #define ACTUALLY_USE_FLETCHER32_CHECKSUM 1 // Computing checksums takes time...
58 
59 namespace dmrpp {
60 
73 size_t chunk_header_callback(char *buffer, size_t /*size*/, size_t nitems, void *data) {
74  // received header is nitems * size long in 'buffer' NOT ZERO TERMINATED
75  // 'userdata' is set with CURLOPT_HEADERDATA
76  // 'size' is always 1
77 
78  // -2 strips of the CRLF at the end of the header
79  string header(buffer, buffer + nitems - 2);
80 
81  // Look for the content type header and store its value in the Chunk
82  if (header.find("Content-Type") != string::npos) {
83  // Header format 'Content-Type: <value>'
84  auto c_ptr = reinterpret_cast<Chunk *>(data);
85  c_ptr->set_response_content_type(header.substr(header.find_last_of(' ') + 1));
86  }
87 
88  return nitems;
89 }
90 
96 void process_s3_error_response(const shared_ptr<http::url> &data_url, const string &xml_message)
97 {
98 #if 0
99  string json_message = xml2json(xml_message.c_str());
101  d.Parse(json_message.c_str());
102  // rapidjson::Value &message = d["Error"]["Message"];
103  rapidjson::Value &code = d["Error"]["Code"];
104 #endif
105  // See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
106  // for the low-down on this XML document.
107  pugi::xml_document error;
108  pugi::xml_parse_result result = error.load_string(xml_message.c_str());
109  if (!result)
110  throw BESInternalError("The underlying data store returned an unintelligible error message.", __FILE__, __LINE__);
111 
112  pugi::xml_node err_elmnt = error.document_element();
113  if (!err_elmnt || (strcmp(err_elmnt.name(), "Error") != 0))
114  throw BESInternalError("The underlying data store returned a bogus error message.", __FILE__, __LINE__);
115 
116  string code = err_elmnt.child_value("Code");
117  string message = err_elmnt.child_value("Message");
118 
119  // We might want to get the "Code" from the "Error" if these text messages
120  // are not good enough. But the "Code" is not really suitable for normal humans...
121  // jhrg 12/31/19
122 
123  if (code == "AccessDenied") {
124  stringstream msg;
125  msg << prolog << "ACCESS DENIED - The underlying object store has refused access to: ";
126  msg << data_url->str() << " Object Store Message: " << message;
127  BESDEBUG(MODULE, msg.str() << endl);
128  VERBOSE(msg.str() << endl);
129  throw BESForbiddenError(msg.str(), __FILE__, __LINE__);
130  }
131  else {
132  stringstream msg;
133  msg << prolog << "ERROR - The underlying object store returned an error. ";
134  msg << "(Tried: " << data_url->str() << ") Object Store Message: " << message;
135  BESDEBUG(MODULE, msg.str() << endl);
136  VERBOSE(msg.str() << endl);
137  throw BESInternalError(msg.str(), __FILE__, __LINE__);
138  }
139 }
140 
154 size_t chunk_write_data(void *buffer, size_t size, size_t nmemb, void *data) {
155  BESDEBUG(MODULE, prolog << "BEGIN " << endl);
156  size_t nbytes = size * nmemb;
157  auto chunk = reinterpret_cast<Chunk *>(data);
158 
159 
160  auto data_url = chunk->get_data_url();
161  BESDEBUG(MODULE, prolog << "chunk->get_data_url():" << data_url << endl);
162 
163  // When Content-Type is 'application/xml,' that's an error. jhrg 6/9/20
164  BESDEBUG(MODULE, prolog << "chunk->get_response_content_type():" << chunk->get_response_content_type() << endl);
165  if (chunk->get_response_content_type().find("application/xml") != string::npos) {
166  // At this point we no longer care about great performance - error msg readability
167  // is more important. jhrg 12/30/19
168  string xml_message = reinterpret_cast<const char *>(buffer);
169  xml_message.erase(xml_message.find_last_not_of("\t\n\v\f\r 0") + 1);
170  // Decode the AWS XML error message. In some cases this will fail because pub keys,
171  // which maybe in this error text, may have < or > chars in them. the XML parser
172  // will be sad if that happens. jhrg 12/30/19
173  try {
174  process_s3_error_response(data_url, xml_message); // throws a BESError
175  }
176  catch (BESError) {
177  // re-throw any BESError - added for the future if we make BESError a child
178  // of std::exception as it should be. jhrg 12/30/19
179  throw;
180  }
181  catch (std::exception &e) {
182  stringstream msg;
183  msg << prolog << "Caught std::exception when accessing object store data.";
184  msg << " (Tried: " << data_url->str() << ")" << " Message: " << e.what();
185  BESDEBUG(MODULE, msg.str() << endl);
186  throw BESSyntaxUserError(msg.str(), __FILE__, __LINE__);
187  }
188  }
189 
190  // rbuf: |******++++++++++----------------------|
191  // ^ ^ bytes_read + nbytes
192  // | bytes_read
193 
194  unsigned long long bytes_read = chunk->get_bytes_read();
195 
196  // If this fails, the code will write beyond the buffer.
197  if (bytes_read + nbytes > chunk->get_rbuf_size()) {
198  stringstream msg;
199  msg << prolog << "ERROR! The number of bytes_read: " << bytes_read << " plus the number of bytes to read: "
200  << nbytes << " is larger than the target buffer size: " << chunk->get_rbuf_size();
201  BESDEBUG(MODULE, msg.str() << endl);
202  DmrppRequestHandler::curl_handle_pool->release_all_handles();
203  throw BESInternalError(msg.str(), __FILE__, __LINE__);
204  }
205 
206  memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
207  chunk->set_bytes_read(bytes_read + nbytes);
208 
209  BESDEBUG(MODULE, prolog << "END" << endl);
210 
211  return nbytes;
212 }
213 
224 void inflate(char *dest, unsigned long long dest_len, char *src, unsigned long long src_len) {
225  /* Sanity check */
226  assert(src_len > 0);
227  assert(src);
228  assert(dest_len > 0);
229  assert(dest);
230 
231  /* Input; uncompress */
232  z_stream z_strm; /* zlib parameters */
233 
234  /* Set the decompression parameters */
235  memset(&z_strm, 0, sizeof(z_strm));
236  z_strm.next_in = (Bytef *) src;
237  z_strm.avail_in = src_len;
238  z_strm.next_out = (Bytef *) dest;
239  z_strm.avail_out = dest_len;
240 
241  /* Initialize the decompression routines */
242  if (Z_OK != inflateInit(&z_strm))
243  throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
244 
245  /* Loop to uncompress the buffer */
246  int status = Z_OK;
247  do {
248  /* Uncompress some data */
249  status = inflate(&z_strm, Z_SYNC_FLUSH);
250 
251  /* Check if we are done decompressing data */
252  if (Z_STREAM_END == status) break; /*done*/
253 
254  /* Check for error */
255  if (Z_OK != status) {
256  stringstream err_msg;
257  err_msg << "Failed to inflate data chunk.";
258  char *err_msg_cstr = z_strm.msg;
259  if(err_msg_cstr)
260  err_msg << " zlib message: " << err_msg_cstr;
261  (void) inflateEnd(&z_strm);
262  throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
263  }
264  else {
265  /* If we're not done and just ran out of buffer space, it's an error.
266  * The HDF5 library code would extend the buffer as-needed, but for
267  * this handler, we should always know the size of the decompressed chunk.
268  */
269  if (0 == z_strm.avail_out) {
270  throw BESError("Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
271 #if 0
272  /* Here's how to extend the buffer if needed. This might be useful some day... */
273  void *new_outbuf; /* Pointer to new output buffer */
274 
275  /* Allocate a buffer twice as big */
276  nalloc *= 2;
277  if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
278  (void) inflateEnd(&z_strm);
279  HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0, "memory allocation failed for inflate decompression")
280  } /* end if */
281  outbuf = new_outbuf;
282 
283  /* Update pointers to buffer for next set of uncompressed data */
284  z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
285  z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
286 #endif
287  } /* end if */
288  } /* end else */
289  } while (true /* status == Z_OK */); // Exit via the break statement after the call to inflate(). jhrg 11/8/21
290 
291  /* Finish decompressing the stream */
292  (void) inflateEnd(&z_strm);
293 }
294 
295 // #define this to enable the duff's device loop unrolling code.
296 // jhrg 1/19/17
297 #define DUFFS_DEVICE
298 
320 void unshuffle(char *dest, const char *src, unsigned long long src_size, unsigned long long width) {
321  unsigned long long elems = src_size / width; // int division rounds down
322 
323  /* Don't do anything for 1-byte elements, or "fractional" elements */
324  if (!(width > 1 && elems > 1)) {
325  memcpy(dest, const_cast<char *>(src), src_size);
326  }
327  else {
328  /* Get the pointer to the source buffer (Alias for source buffer) */
329  char *_src = const_cast<char *>(src);
330  char *_dest = 0; // Alias for destination buffer
331 
332  /* Input; unshuffle */
333  for (unsigned int i = 0; i < width; i++) {
334  _dest = dest + i;
335 #ifndef DUFFS_DEVICE
336  size_t j = elems;
337  while(j > 0) {
338  *_dest = *_src++;
339  _dest += width;
340 
341  j--;
342  }
343 #else /* DUFFS_DEVICE */
344  {
345  size_t duffs_index = (elems + 7) / 8; /* Counting index for Duff's device */
346  switch (elems % 8) {
347  default:
348  assert(0 && "This Should never be executed!");
349  break;
350  case 0:
351  do {
352  // This macro saves repeating the same line 8 times
353 #define DUFF_GUTS *_dest = *_src++; _dest += width;
354 
355  DUFF_GUTS
356  case 7:
357  DUFF_GUTS
358  case 6:
359  DUFF_GUTS
360  case 5:
361  DUFF_GUTS
362  case 4:
363  DUFF_GUTS
364  case 3:
365  DUFF_GUTS
366  case 2:
367  DUFF_GUTS
368  case 1:
369  DUFF_GUTS
370  } while (--duffs_index > 0);
371  } /* end switch */
372  } /* end block */
373 #endif /* DUFFS_DEVICE */
374 
375  } /* end for i = 0 to width*/
376 
377  /* Compute the leftover bytes if there are any */
378  size_t leftover = src_size % width;
379 
380  /* Add leftover to the end of data */
381  if (leftover > 0) {
382  /* Adjust back to end of shuffled bytes */
383  _dest -= (width - 1); /*lint !e794 _dest is initialized */
384  memcpy((void *) _dest, (void *) _src, leftover);
385  }
386  } /* end if width and elems both > 1 */
387 }
388 
394 static void split_by_comma(const string &s, vector<unsigned long long> &res)
395 {
396  const string delimiter = ",";
397  const size_t delim_len = delimiter.length();
398 
399  size_t pos_start = 0, pos_end;
400 
401  while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
402  res.push_back (stoull(s.substr(pos_start, pos_end - pos_start)));
403  pos_start = pos_end + delim_len;
404  }
405 
406  res.push_back (stoull(s.substr (pos_start)));
407 }
408 
409 void Chunk::parse_chunk_position_in_array_string(const string &pia, vector<unsigned long long> &cpia_vect)
410 {
411  if (pia.empty()) return;
412 
413  if (!cpia_vect.empty()) cpia_vect.clear();
414 
415  // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
416  // [1] is a minimal 'position in array' string.
417  if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
418  throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
419 
420  if (pia.find_first_not_of("[]1234567890,") != string::npos)
421  throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
422 
423 #if 0
424  // strip off []; iss holds x,y,...,z
425  istringstream iss(pia.substr(1, pia.length() - 2));
426 
427  char c;
428  unsigned int i;
429  while (!iss.eof()) {
430  iss >> i; // read an integer
431  cpia_vect.push_back(i);
432  iss >> c; // read a separator (,)
433  }
434 #else
435  try {
436  split_by_comma(pia.substr(1, pia.length() - 2), cpia_vect);
437  }
438  catch(std::invalid_argument &e) {
439  throw BESInternalError(string("while parsing a DMR++, chunk position string illegal character(s): ").append(e.what()), __FILE__, __LINE__);
440  }
441 #endif
442 }
443 
444 
458 void Chunk::set_position_in_array(const string &pia) {
459 #if 0
460  if (pia.empty()) return;
461 
462  if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
463 
464  // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
465  // [1] is a minimal 'position in array' string.
466  if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
467  throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
468 
469  if (pia.find_first_not_of("[]1234567890,") != string::npos)
470  throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
471 
472  // strip off []; iss holds x,y,...,z
473  istringstream iss(pia.substr(1, pia.length() - 2));
474 
475  char c;
476  unsigned int i;
477  while (!iss.eof()) {
478  iss >> i; // read an integer
479  d_chunk_position_in_array.push_back(i);
480  iss >> c; // read a separator (,)
481  }
482 #endif
483  parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
484 }
485 
494 void Chunk::set_position_in_array(const std::vector<unsigned long long> &pia) {
495  if (pia.empty()) return;
496 
497  if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
498 
499  d_chunk_position_in_array = pia;
500 }
501 
509 string Chunk::get_curl_range_arg_string() {
510  return curl::get_range_arg_string(d_offset, d_size);
511 }
512 
526 void Chunk::add_tracking_query_param() {
527 
528  // If there is no data url then there is nothing to add the parameter too.
529  if(d_data_url == nullptr)
530  return;
531 
532  bool found = false;
533  string cloudydap_context_value = BESContextManager::TheManager()->get_context(S3_TRACKING_CONTEXT, found);
534  if (!found)
535  return;
536 
551  bool add_tracking = false;
552 
553  // All S3 buckets, virtual host style URL
554  // Simpler regex that's likely equivalent:
555  // ^https?:\/\/[a-z0-9]([-.a-z0-9]){1,61}[a-z0-9]\.s3[-.]us-(east|west)-[12])?\.amazonaws\.com\/.*$
556  string s3_vh_regex_str = R"(^https?:\/\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\.s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/.*$)";
557 
558  BESRegex s3_vh_regex(s3_vh_regex_str.c_str());
559  int match_result = s3_vh_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
560  if(match_result>=0) {
561  auto match_length = (unsigned int) match_result;
562  if (match_length == d_data_url->str().length()) {
563  BESDEBUG(MODULE,
564  prolog << "FULL MATCH. pattern: " << s3_vh_regex_str << " url: " << d_data_url->str() << endl);
565  add_tracking = true;;
566  }
567  }
568 
569  if(!add_tracking){
570  // All S3 buckets, path style URL
571  string s3_path_regex_str = R"(^https?:\/\/s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\/.*$)";
572  BESRegex s3_path_regex(s3_path_regex_str.c_str());
573  match_result = s3_path_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
574  if(match_result>=0) {
575  auto match_length = (unsigned int) match_result;
576  if (match_length == d_data_url->str().length()) {
577  BESDEBUG(MODULE,
578  prolog << "FULL MATCH. pattern: " << s3_vh_regex_str << " url: " << d_data_url->str() << endl);
579  add_tracking = true;;
580  }
581  }
582  }
583 
584  if (add_tracking) {
585  // Yup, headed to S3.
586  d_query_marker.append(S3_TRACKING_CONTEXT).append("=").append(cloudydap_context_value);
587  }
588 }
589 
596 uint32_t
597 checksum_fletcher32(const void *_data, size_t _len)
598 {
599  const auto *data = (const uint8_t *)_data; // Pointer to the data to be summed
600  size_t len = _len / 2; // Length in 16-bit words
601  uint32_t sum1 = 0, sum2 = 0;
602 
603  // Sanity check
604  assert(_data);
605  assert(_len > 0);
606 
607  // Compute checksum for pairs of bytes
608  // (the magic "360" value is the largest number of sums that can be performed without numeric overflow)
609  while (len) {
610  size_t tlen = len > 360 ? 360 : len;
611  len -= tlen;
612  do {
613  sum1 += (uint32_t)(((uint16_t)data[0]) << 8) | ((uint16_t)data[1]);
614  data += 2;
615  sum2 += sum1;
616  } while (--tlen);
617  sum1 = (sum1 & 0xffff) + (sum1 >> 16);
618  sum2 = (sum2 & 0xffff) + (sum2 >> 16);
619  }
620 
621  /* Check for odd # of bytes */
622  if(_len % 2) {
623  sum1 += (uint32_t)(((uint16_t)*data) << 8);
624  sum2 += sum1;
625  sum1 = (sum1 & 0xffff) + (sum1 >> 16);
626  sum2 = (sum2 & 0xffff) + (sum2 >> 16);
627  } /* end if */
628 
629  /* Second reduction step to reduce sums to 16 bits */
630  sum1 = (sum1 & 0xffff) + (sum1 >> 16);
631  sum2 = (sum2 & 0xffff) + (sum2 >> 16);
632 
633  return ((sum2 << 16) | sum1);
634 } /* end H5_checksum_fletcher32() */
635 
636 #if 0
648 void Chunk::inflate_chunk(bool deflate, bool shuffle, bool fletcher32, unsigned long long chunk_size,
649  unsigned long long elem_width) {
650  // This code is pretty naive - there are apparently a number of
651  // different ways HDF5 can compress data, and it does also use a scheme
652  // where several algorithms can be applied in sequence. For now, get
653  // simple zlib deflate working.jhrg 1/15/17
654  // Added support for shuffle. Assuming unshuffle always is applied _after_
655  // inflating the data (reversing the shuffle --> deflate process). It is
656  // possible that data could just be deflated or shuffled (because we
657  // have test data are use only shuffle). jhrg 1/20/17
658  // The file that implements the deflate filter is H5Zdeflate.c in the hdf5 source.
659  // The file that implements the shuffle filter is H5Zshuffle.c.
660 
661  if (d_is_inflated)
662  return;
663 
664  chunk_size *= elem_width;
665 
666  if (deflate) {
667  char *dest = new char[chunk_size];
668  try {
669  inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
670  // This replaces (and deletes) the original read_buffer with dest.
671 #if DMRPP_USE_SUPER_CHUNKS
672  set_read_buffer(dest, chunk_size, chunk_size, true);
673 #else
674  set_rbuf(dest, chunk_size);
675 #endif
676  }
677  catch (...) {
678  delete[] dest;
679  throw;
680  }
681  }
682 
683  if (shuffle) {
684  // The internal buffer is chunk's full size at this point.
685  char *dest = new char[get_rbuf_size()];
686  try {
687  unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
688 #if DMRPP_USE_SUPER_CHUNKS
689  set_read_buffer(dest,get_rbuf_size(),get_rbuf_size(), true);
690 #else
691  set_rbuf(dest, get_rbuf_size());
692 #endif
693  }
694  catch (...) {
695  delete[] dest;
696  throw;
697  }
698  }
699 
700  if (fletcher32) {
701  // Compute the fletcher32 checksum and compare to the value of the last four bytes of the chunk.
702 #if ACTUALLY_USE_FLETCHER32_CHECKSUM
703  // Get the last four bytes of chunk's data (which is a byte array) and treat that as the four-byte
704  // integer fletcher32 checksum. jhrg 10/15/21
705 #pragma GCC diagnostic push
706 #pragma GCC diagnostic ignored "-Wcast-align"
707  assert(get_rbuf_size() - FLETCHER32_CHECKSUM >= 0);
708  assert((get_rbuf_size() - FLETCHER32_CHECKSUM) % 4 == 0);
709  auto f_checksum = *(uint32_t *)(get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM);
710 #pragma GCC diagnostic pop
711 
712  // If the code should actually use the checksum (they can be expensive to compute), does it match
713  // with once computed on the data actually read? Maybe make this a bes.conf parameter?
714  // jhrg 10/15/21
715  if (f_checksum != checksum_fletcher32((const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM)) {
716  throw BESInternalError("Data read from the DMR++ handler did not match the Fletcher32 checksum.",
717  __FILE__, __LINE__);
718  }
719 #endif
720  if (d_read_buffer_size > FLETCHER32_CHECKSUM)
721  d_read_buffer_size -= FLETCHER32_CHECKSUM;
722  else {
723  throw BESInternalError("Data filtered with fletcher32 don't include the four-byte checksum.",
724  __FILE__, __LINE__);
725  }
726  }
727 
728  d_is_inflated = true;
729 
730 #if 0 // This was handy during development for debugging. Keep it for a while (year or two) before we drop it ndp - 01/18/17
731  if(BESDebug::IsSet(MODULE)) {
732  unsigned long long chunk_buf_size = get_rbuf_size();
733  dods_float32 *vals = (dods_float32 *) get_rbuf();
734  ostream *os = BESDebug::GetStrm();
735  (*os) << std::fixed << std::setfill('_') << std::setw(10) << std::setprecision(0);
736  (*os) << "DmrppArray::"<< __func__ <<"() - Chunk[" << i << "]: " << endl;
737  for(unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
738  (*os) << vals[k] << ", " << ((k==0)|((k+1)%10)?"":"\n");
739  }
740  }
741 #endif
742 }
743 #endif
744 
755 void Chunk::filter_chunk(const string &filters, unsigned long long chunk_size, unsigned long long elem_width) {
756 
757  if (d_is_inflated)
758  return;
759 
760  chunk_size *= elem_width;
761 
762  vector<string> filter_array = BESUtil::split(filters, ' ' );
763 
764  for (auto i = filter_array.rbegin(), e = filter_array.rend(); i != e; ++i){
765  string filter = *i;
766 
767  if (filter == "deflate"){
768  char *dest = new char[chunk_size];
769  try {
770  inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
771  // This replaces (and deletes) the original read_buffer with dest.
772 #if DMRPP_USE_SUPER_CHUNKS
773  set_read_buffer(dest, chunk_size, chunk_size, true);
774 #else
775  set_rbuf(dest, chunk_size);
776 #endif
777  }
778  catch (...) {
779  delete[] dest;
780  throw;
781  }
782  }// end if(filter == deflate)
783  else if (filter == "shuffle"){
784  // The internal buffer is chunk's full size at this point.
785  char *dest = new char[get_rbuf_size()];
786  try {
787  unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
788 #if DMRPP_USE_SUPER_CHUNKS
789  set_read_buffer(dest,get_rbuf_size(),get_rbuf_size(), true);
790 #else
791  set_rbuf(dest, get_rbuf_size());
792 #endif
793  }
794  catch (...) {
795  delete[] dest;
796  throw;
797  }
798  }//end if(filter == shuffle)
799  else if (filter == "fletcher32"){
800  // Compute the fletcher32 checksum and compare to the value of the last four bytes of the chunk.
801 #if ACTUALLY_USE_FLETCHER32_CHECKSUM
802  // Get the last four bytes of chunk's data (which is a byte array) and treat that as the four-byte
803  // integer fletcher32 checksum. jhrg 10/15/21
804 #pragma GCC diagnostic push
805 #pragma GCC diagnostic ignored "-Wcast-align"
806  assert(get_rbuf_size() > FLETCHER32_CHECKSUM);
807  //assert((get_rbuf_size() - FLETCHER32_CHECKSUM) % 4 == 0); //probably wrong
808  auto f_checksum = *(uint32_t *)(get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM);
809 #pragma GCC diagnostic pop
810 
811  // If the code should actually use the checksum (they can be expensive to compute), does it match
812  // with once computed on the data actually read? Maybe make this a bes.conf parameter?
813  // jhrg 10/15/21
814  uint32_t calc_checksum = checksum_fletcher32((const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM);
815  if (f_checksum != calc_checksum) {
816  throw BESInternalError("Data read from the DMR++ handler did not match the Fletcher32 checksum.",
817  __FILE__, __LINE__);
818  }
819 #endif
820  if (d_read_buffer_size > FLETCHER32_CHECKSUM)
821  d_read_buffer_size -= FLETCHER32_CHECKSUM;
822  else {
823  throw BESInternalError("Data filtered with fletcher32 don't include the four-byte checksum.",
824  __FILE__, __LINE__);
825  }
826  } //end if(filter == fletcher32)
827  }// end for loop
828  d_is_inflated = true;
829 }
830 
840 void Chunk::read_chunk() {
841  if (d_is_read) {
842  BESDEBUG(MODULE, prolog << "Already been read! Returning." << endl);
843  return;
844  }
845 
846  set_rbuf_to_size();
847 
848  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
849  if (!handle)
850  throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
851 
852  try {
853  handle->read_data(); // throws if error
854  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
855  }
856  catch(...) {
857  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
858  throw;
859  }
860 
861  // If the expected byte count was not read, it's an error.
862  if (get_size() != get_bytes_read()) {
863  ostringstream oss;
864  oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
865  throw BESInternalError(oss.str(), __FILE__, __LINE__);
866  }
867 
868  d_is_read = true;
869 }
870 
880 void Chunk::dump(ostream &oss) const {
881  oss << "Chunk";
882  oss << "[ptr='" << (void *) this << "']";
883  oss << "[data_url='" << d_data_url->str() << "']";
884  oss << "[offset=" << d_offset << "]";
885  oss << "[size=" << d_size << "]";
886  oss << "[chunk_position_in_array=(";
887  for (unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
888  if (i) oss << ",";
889  oss << d_chunk_position_in_array[i];
890  }
891  oss << ")]";
892  oss << "[is_read=" << d_is_read << "]";
893  oss << "[is_inflated=" << d_is_inflated << "]";
894 }
895 
896 string Chunk::to_string() const {
897  std::ostringstream oss;
898  dump(oss);
899  return oss.str();
900 }
901 
902 
903 std::shared_ptr<http::url> Chunk::get_data_url() const {
904 
905  std::shared_ptr<http::EffectiveUrl> effective_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
906  BESDEBUG(MODULE, prolog << "Using data_url: " << effective_url->str() << endl);
907 
908  //A conditional call to void Chunk::add_tracking_query_param()
909  // here for the NASA cost model work THG's doing. jhrg 8/7/18
910  if (!d_query_marker.empty()) {
911  string url_str = effective_url->str();
912  if(url_str.find("?") != string::npos){
913  url_str += "&";
914  }
915  else {
916  url_str +="?";
917  }
918  url_str += d_query_marker;
919  shared_ptr<http::url> query_marker_url( new http::url(url_str));
920  return query_marker_url;
921  }
922 
923  return effective_url;
924 }
925 
926 } // namespace dmrpp
927 
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static std::ostream * GetStrm()
return the debug stream
Definition: BESDebug.h:187
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
Regular expression matching.
Definition: BESRegex.h:53
int match(const char *s, int len, int pos=0) const
Does the pattern match.
Definition: BESRegex.cc:127
error thrown if there is a user syntax error in the request or any other user error
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
Definition: BESUtil.cc:1159
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
Definition: document.h:2189
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.
Definition: document.h:2585