bes  Updated for version 3.20.10
DmrppArray.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: James Gallagher <jgallagher@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <string>
27 #include <sstream>
28 #include <vector>
29 #include <memory>
30 #include <queue>
31 #include <iterator>
32 #include <thread>
33 #include <future> // std::async, std::future
34 #include <chrono> // std::chrono::milliseconds
35 
36 #include <cstring>
37 #include <cassert>
38 #include <cerrno>
39 
40 #include <pthread.h>
41 #include <cmath>
42 
43 #include <unistd.h>
44 
45 #include <libdap/D4Enum.h>
46 #include <libdap/D4Attributes.h>
47 #include <libdap/D4Maps.h>
48 #include <libdap/D4Group.h>
49 
50 #include "BESInternalError.h"
51 #include "BESDebug.h"
52 #include "BESLog.h"
53 #include "BESStopWatch.h"
54 
55 #include "byteswap_compat.h"
56 #include "CurlHandlePool.h"
57 #include "Chunk.h"
58 #include "DmrppArray.h"
59 #include "DmrppRequestHandler.h"
60 #include "DmrppNames.h"
61 #include "Base64.h"
62 
63 // Used with BESDEBUG
64 #define dmrpp_3 "dmrpp:3"
65 #define dmrpp_4 "dmrpp:4"
66 
67 using namespace libdap;
68 using namespace std;
69 
70 #define MB (1024*1024)
71 #define prolog std::string("DmrppArray::").append(__func__).append("() - ")
72 
73 namespace dmrpp {
74 
75 
76 // Transfer Thread Pool state variables.
77 std::mutex transfer_thread_pool_mtx; // mutex for critical section
78 atomic_uint transfer_thread_counter(0);
79 
80 
81 
96 bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter, unsigned long timeout, string debug_prefix) {
97  bool future_finished = false;
98  bool done = false;
99  std::chrono::milliseconds timeout_ms (timeout);
100 
101  while(!done){
102  auto futr = futures.begin();
103  auto fend = futures.end();
104  bool future_is_valid = true;
105  while(!future_finished && future_is_valid && futr != fend){
106  future_is_valid = (*futr).valid();
107  if(future_is_valid){
108  // What happens if wait_for() always returns future_status::timeout for a stuck thread?
109  // If that were to happen, the loop would run forever. However, we assume that these
110  // threads are never 'stuck.' We assume that their computations always complete, either
111  // with success or failure. For the transfer threads, timeouts will stop them if nothing
112  // else does and for the decompression threads, the worst case is a segmentation fault.
113  // jhrg 2/5/21
114  if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
115  try {
116  bool success = (*futr).get();
117  future_finished = true;
118  BESDEBUG(dmrpp_3, debug_prefix << prolog << "Called future::get() on a ready future."
119  << " success: " << (success?"true":"false") << endl);
120  if(!success){
121  stringstream msg;
122  msg << debug_prefix << prolog << "The std::future has failed!";
123  msg << " thread_counter: " << thread_counter;
124  throw BESInternalError(msg.str(), __FILE__, __LINE__);
125  }
126  }
127  catch(...){
128  // TODO I had to add this to make the thread counting work when there's errors
129  // But I think it's primitive because it trashes everything - there's
130  // surely a way to handle the situation on a per thread basis and maybe even
131  // retry?
132  futures.clear();
133  thread_counter=0;
134  throw;
135  }
136  }
137  else {
138  futr++;
139  BESDEBUG(dmrpp_3, debug_prefix << prolog << "future::wait_for() timed out. (timeout: " <<
140  timeout << " ms) There are currently " << futures.size() << " futures in process."
141  << " thread_counter: " << thread_counter << endl);
142  }
143  }
144  else {
145  BESDEBUG(dmrpp_3, debug_prefix << prolog << "The future was not valid. Dumping... " << endl);
146  future_finished = true;
147  }
148  }
149 
150  if (futr!=fend && future_finished) {
151  futures.erase(futr);
152  thread_counter--;
153  BESDEBUG(dmrpp_3, debug_prefix << prolog << "Erased future from futures list. (Erased future was "
154  << (future_is_valid?"":"not ") << "valid at start.) There are currently " <<
155  futures.size() << " futures in process. thread_counter: " << thread_counter << endl);
156  }
157 
158  done = future_finished || futures.empty();
159  }
160 
161  return future_finished;
162 }
163 
164 
165 
176 bool one_child_chunk_thread_new(unique_ptr<one_child_chunk_args_new> args)
177 {
178 
179  args->child_chunk->read_chunk();
180 
181  assert(args->the_one_chunk->get_rbuf());
182  assert(args->child_chunk->get_rbuf());
183  assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
184 
185  // the_one_chunk offset \/
186  // the_one_chunk: mmmmmmmmmmmmmmmm
187  // child chunks: 1111222233334444 (there are four child chunks)
188  // child offsets: ^ ^ ^ ^
189  // For this example, child_1_offset - the_one_chunk_offset == 0 (that's always true)
190  // child_2_offset - the_one_chunk_offset == 4; child_2_offset - the_one_chunk_offset == 8
191  // and child_3_offset - the_one_chunk_offset == 12.
192  // Those are the starting locations with in the data buffer of the the_one_chunk
193  // where that child chunk should be written.
194  // Note: all the offset values start at the beginning of the file.
195 
196  unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
197 
198  memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
199  args->child_chunk->get_bytes_read());
200 
201  return true;
202 }
203 
204 
205 
211 bool one_super_chunk_transfer_thread(unique_ptr<one_super_chunk_args> args)
212 {
213 
214 #if DMRPP_ENABLE_THREAD_TIMERS
215  stringstream timer_tag;
216  timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
217  " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
218  BESStopWatch sw(TRANSFER_THREADS);
219  sw.start(timer_tag.str());
220 #endif
221 
222  args->super_chunk->read();
223  return true;
224 }
225 
231 bool one_super_chunk_unconstrained_transfer_thread(unique_ptr<one_super_chunk_args> args)
232 {
233 
234 #if DMRPP_ENABLE_THREAD_TIMERS
235  stringstream timer_tag;
236  timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
237  " parent_tid: 0x" << std::hex << args->parent_thread_id << " sc_id: " << args->super_chunk->id();
238  BESStopWatch sw(TRANSFER_THREADS);
239  sw.start(timer_tag.str());
240 #endif
241 
242  args->super_chunk->read_unconstrained();
243  return true;
244 }
245 
246 
247 bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
248  bool retval = false;
249  std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
250  if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
251  transfer_thread_counter++;
252  futures.push_back( std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
253  retval = true;
254  BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
255  "' from std::async for " << args->child_chunk->to_string() << endl);
256  }
257  return retval;
258 }
259 
260 
268 bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
269  bool retval = false;
270  std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
271  if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
272  transfer_thread_counter++;
273  futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
274  retval = true;
275  BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
276  "' from std::async for " << args->super_chunk->to_string(false) << endl);
277  }
278  return retval;
279 }
280 
288 bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
289  bool retval = false;
290  std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
291  if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
292  transfer_thread_counter++;
293  futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
294  retval = true;
295  BESDEBUG(dmrpp_3, prolog << "Got std::future '" << futures.size() <<
296  "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
297  }
298  return retval;
299 }
300 
301 
322 void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
323 {
324  BESStopWatch sw;
325  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+array->name(), "");
326 
327  // Parallel version based on read_chunks_unconstrained(). There is
328  // substantial duplication of the code in read_chunks_unconstrained(), but
329  // wait to remove that when we move to C++11 which has threads integrated.
330 
331  // We maintain a list of futures to track our parallel activities.
332  list<future<bool>> futures;
333  try {
334  bool done = false;
335  bool future_finished = true;
336  while (!done) {
337 
338  if(!futures.empty())
339  future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
340 
341  // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
342  // because future::get() was called or a call to future::valid() returned false.
343  BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
344 
345  if (!super_chunks.empty()){
346  // Next we try to add a new Chunk compute thread if we can - there might be room.
347  bool thread_started = true;
348  while(thread_started && !super_chunks.empty()) {
349  auto super_chunk = super_chunks.front();
350  BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
351 
352  auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
353  thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
354 
355  if (thread_started) {
356  super_chunks.pop();
357  BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
358  } else {
359  // Thread did not start, ownership of the arguments was not passed to the thread.
360  BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
361  " transfer_thread_counter: " << transfer_thread_counter <<
362  " futures.size(): " << futures.size() << endl);
363  }
364  }
365  }
366  else {
367  // No more Chunks and no futures means we're done here.
368  if(futures.empty())
369  done = true;
370  }
371  future_finished = false;
372  }
373  }
374  catch (...) {
375  // Complete all the futures, otherwise we'll have threads out there using up resources
376  while(!futures.empty()){
377  if(futures.back().valid())
378  futures.back().get();
379  futures.pop_back();
380  }
381  // re-throw the exception
382  throw;
383  }
384 }
385 
386 
387 
388 
409 void read_super_chunks_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
410 {
411  BESStopWatch sw;
412  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+array->name(), "");
413 
414  // Parallel version based on read_chunks_unconstrained(). There is
415  // substantial duplication of the code in read_chunks_unconstrained(), but
416  // wait to remove that when we move to C++11 which has threads integrated.
417 
418  // We maintain a list of futures to track our parallel activities.
419  list<future<bool>> futures;
420  try {
421  bool done = false;
422  bool future_finished = true;
423  while (!done) {
424 
425  if(!futures.empty())
426  future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
427 
428  // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
429  // because future::get() was called or a call to future::valid() returned false.
430  BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
431 
432  if (!super_chunks.empty()){
433  // Next we try to add a new Chunk compute thread if we can - there might be room.
434  bool thread_started = true;
435  while(thread_started && !super_chunks.empty()) {
436  auto super_chunk = super_chunks.front();
437  BESDEBUG(dmrpp_3, prolog << "Starting thread for " << super_chunk->to_string(false) << endl);
438 
439  auto args = unique_ptr<one_super_chunk_args>(new one_super_chunk_args(super_chunk, array));
440  thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
441 
442  if (thread_started) {
443  super_chunks.pop();
444  BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << super_chunk->to_string(false) << endl);
445  } else {
446  // Thread did not start, ownership of the arguments was not passed to the thread.
447  BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
448  " transfer_thread_counter: " << transfer_thread_counter <<
449  " futures.size(): " << futures.size() << endl);
450  }
451  }
452  }
453  else {
454  // No more Chunks and no futures means we're done here.
455  if(futures.empty())
456  done = true;
457  }
458  future_finished = false;
459  }
460  }
461  catch (...) {
462  // Complete all the futures, otherwise we'll have threads out there using up resources
463  while(!futures.empty()){
464  if(futures.back().valid())
465  futures.back().get();
466  futures.pop_back();
467  }
468  // re-throw the exception
469  throw;
470  }
471 }
472 
491 static unsigned long long
492 get_index(const vector<unsigned long long> &address_in_target, const vector<unsigned long long> &target_shape)
493 {
494  assert(address_in_target.size() == target_shape.size()); // ranks must be equal
495 
496  auto shape_index = target_shape.rbegin();
497  auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
498 
499  unsigned long long multiplier_var = *shape_index++;
500  unsigned long long offset = *index++;
501 
502  while (index != index_end) {
503  assert(*index < *shape_index); // index < shape for each dim
504 
505  offset += multiplier_var * *index++;
506  multiplier_var *= *shape_index++;
507  }
508 
509  return offset;
510 }
511 
514 
528 static unsigned long multiplier(const vector<unsigned long long> &shape, unsigned int k)
529 {
530  assert(shape.size() > 1);
531  assert(shape.size() > k + 1);
532 
533  vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
534  advance(i, k + 1);
535  unsigned long multiplier = *i++;
536  while (i != e) {
537  multiplier *= *i++;
538  }
539 
540  return multiplier;
541 }
542 
543 //#####################################################################################################################
544 //
545 // DmrppArray code begins here.
546 //
547 // = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
548 
549 DmrppArray &
550 DmrppArray::operator=(const DmrppArray &rhs)
551 {
552  if (this == &rhs) return *this;
553 
554  dynamic_cast<Array &>(*this) = rhs; // run Constructor=
555 
556  dynamic_cast<DmrppCommon &>(*this) = rhs;
557  // Removed DmrppCommon::m_duplicate_common(rhs); jhrg 11/12/21
558 
559  return *this;
560 }
561 
566 bool DmrppArray::is_projected()
567 {
568  for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
569  if (dimension_size(p, true) != dimension_size(p, false)) return true;
570 
571  return false;
572 }
573 
580 unsigned long long DmrppArray::get_size(bool constrained)
581 {
582  // number of array elements in the constrained array
583  unsigned long long size = 1;
584  for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
585  size *= dimension_size(dim, constrained);
586  }
587  return size;
588 }
589 
596 vector<unsigned long long> DmrppArray::get_shape(bool constrained)
597 {
598  auto dim = dim_begin(), edim = dim_end();
599  vector<unsigned long long> shape;
600 
601  // For a 3d array, this method took 14ms without reserve(), 5ms with
602  // (when called many times).
603  shape.reserve(edim - dim);
604 
605  for (; dim != edim; dim++) {
606  shape.push_back(dimension_size(dim, constrained));
607  }
608 
609  return shape;
610 }
611 
617 DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
618 {
619  assert(i <= (dim_end() - dim_begin()));
620  return *(dim_begin() + i);
621 }
622 
625 
636 void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter, unsigned long *target_index,
637  vector<unsigned long long> &subset_addr,
638  const vector<unsigned long long> &array_shape, char /*Chunk*/*src_buf)
639 {
640  BESDEBUG("dmrpp", "DmrppArray::" << __func__ << "() - subsetAddress.size(): " << subset_addr.size() << endl);
641 
642  unsigned int bytes_per_elem = prototype()->width();
643 
644  char *dest_buf = get_buf();
645 
646  unsigned int start = this->dimension_start(dim_iter, true);
647  unsigned int stop = this->dimension_stop(dim_iter, true);
648  unsigned int stride = this->dimension_stride(dim_iter, true);
649 
650  dim_iter++;
651 
652  // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
653  // See the else clause for the general case.
654  if (dim_iter == dim_end() && stride == 1) {
655  // For the start and stop indexes of the subset, get the matching indexes in the whole array.
656  subset_addr.push_back(start);
657  unsigned long long start_index = get_index(subset_addr, array_shape);
658  subset_addr.pop_back();
659 
660  subset_addr.push_back(stop);
661  unsigned long long stop_index = get_index(subset_addr, array_shape);
662  subset_addr.pop_back();
663 
664  // Copy data block from start_index to stop_index
665  // TODO Replace this loop with a call to std::memcpy()
666  for (unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
667  unsigned long target_byte = *target_index * bytes_per_elem;
668  unsigned long source_byte = source_index * bytes_per_elem;
669  // Copy a single value.
670  for (unsigned long i = 0; i < bytes_per_elem; i++) {
671  dest_buf[target_byte++] = src_buf[source_byte++];
672  }
673  (*target_index)++;
674  }
675 
676  }
677  else {
678  for (unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
679 
680  // Is it the last dimension?
681  if (dim_iter != dim_end()) {
682  // Nope! Then we recurse to the last dimension to read stuff
683  subset_addr.push_back(myDimIndex);
684  insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
685  subset_addr.pop_back();
686  }
687  else {
688  // We are at the last (innermost) dimension, so it's time to copy values.
689  subset_addr.push_back(myDimIndex);
690  unsigned int sourceIndex = get_index(subset_addr, array_shape);
691  subset_addr.pop_back();
692 
693  // Copy a single value.
694  unsigned long target_byte = *target_index * bytes_per_elem;
695  unsigned long source_byte = sourceIndex * bytes_per_elem;
696 
697  for (unsigned int i = 0; i < bytes_per_elem; i++) {
698  dest_buf[target_byte++] = src_buf[source_byte++];
699  }
700  (*target_index)++;
701  }
702  }
703  }
704 }
705 
722 void DmrppArray::read_contiguous()
723 {
724  BESStopWatch sw;
725  if (BESDebug::IsSet(TIMING_LOG_KEY)) sw.start(prolog + " name: "+name(), "");
726 
727  // Get the single chunk that makes up this CONTIGUOUS variable.
728  if (get_chunks_size() != 1)
729  throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
730 
731  // This is the original chunk for this 'contiguous' variable.
732  auto the_one_chunk = get_immutable_chunks()[0];
733 
734  unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
735  unsigned long long the_one_chunk_size = the_one_chunk->get_size();
736 
737  // We only want to read in the Chunk concurrently if:
738  // - Concurrent transfers are enabled (DmrppRequestHandler::d_use_transfer_threads)
739  // - The variables size is above the threshold value held in DmrppRequestHandler::d_contiguous_concurrent_threshold
740  if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
741  // Read the the_one_chunk as is. This is the non-parallel I/O case
742  the_one_chunk->read_chunk();
743  }
744  else {
745 
746  // Allocate memory for the 'the_one_chunk' so the transfer threads can transfer data
747  // from the child chunks to it.
748  the_one_chunk->set_rbuf_to_size();
749 
750  // The number of child chunks are determined based on the size of the data.
751  // If the size of the the_one_chunk is 3 MB then 3 chunks will be made. We will round down
752  // when necessary and handle the remainder later on (3.3MB = 3 chunks, 4.2MB = 4 chunks, etc.)
753  unsigned long long num_chunks = floor(the_one_chunk_size / MB);
754  if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
755  num_chunks = DmrppRequestHandler::d_max_transfer_threads;
756 
757  // Use the original chunk's size and offset to evenly split it into smaller chunks
758  unsigned long long chunk_size = the_one_chunk_size / num_chunks;
759  std::string chunk_byteorder = the_one_chunk->get_byte_order();
760 
761  // If the size of the the_one_chunk is not evenly divisible by num_chunks, capture
762  // the remainder here and increase the size of the last chunk by this number of bytes.
763  unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
764 
765  auto chunk_url = the_one_chunk->get_data_url();
766 
767  // Set up a queue to break up the original the_one_chunk and keep track of the pieces
768  queue<shared_ptr<Chunk>> chunks_to_read;
769 
770  // Make the Chunk objects
771  unsigned long long chunk_offset = the_one_chunk_offset;
772  for (unsigned int i = 0; i < num_chunks - 1; i++) {
773  chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
774  chunk_offset += chunk_size;
775  }
776  // Make the remainder Chunk, see above for details.
777  chunks_to_read.push(shared_ptr<Chunk>(new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
778 
779  // We maintain a list of futures to track our parallel activities.
780  list<future<bool>> futures;
781  try {
782  bool done = false;
783  bool future_finished = true;
784  while (!done) {
785 
786  if (!futures.empty())
787  future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
788 
789  // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
790  // because future::get() was called or a call to future::valid() returned false.
791  BESDEBUG(dmrpp_3, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
792 
793  if (!chunks_to_read.empty()) {
794  // Next we try to add a new Chunk compute thread if we can - there might be room.
795  bool thread_started = true;
796  while (thread_started && !chunks_to_read.empty()) {
797  auto current_chunk = chunks_to_read.front();
798  BESDEBUG(dmrpp_3, prolog << "Starting thread for " << current_chunk->to_string() << endl);
799 
800  auto args = unique_ptr<one_child_chunk_args_new>(new one_child_chunk_args_new(current_chunk, the_one_chunk));
801  thread_started = start_one_child_chunk_thread(futures, std::move(args));
802 
803  if (thread_started) {
804  chunks_to_read.pop();
805  BESDEBUG(dmrpp_3, prolog << "STARTED thread for " << current_chunk->to_string() << endl);
806  } else {
807  // Thread did not start, ownership of the arguments was not passed to the thread.
808  BESDEBUG(dmrpp_3, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
809  " transfer_thread_counter: " << transfer_thread_counter <<
810  " futures.size(): " << futures.size() << endl);
811  }
812  }
813  } else {
814  // No more Chunks and no futures means we're done here.
815  if (futures.empty())
816  done = true;
817  }
818  future_finished = false;
819  }
820  }
821  catch (...) {
822  // Complete all the futures, otherwise we'll have threads out there using up resources
823  while (!futures.empty()) {
824  if (futures.back().valid())
825  futures.back().get();
826  futures.pop_back();
827  }
828  // re-throw the exception
829  throw;
830  }
831  }
832 
833  // Now that the_one_chunk has been read, we do what is necessary...
834  if (!is_filters_empty()){
835  the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(),var()->width());
836  }
837 
838  // The 'the_one_chunk' now holds the data values. Transfer it to the Array.
839  if (!is_projected()) { // if there is no projection constraint
840  reserve_value_capacity(get_size(false));
841  val2buf(the_one_chunk->get_rbuf()); // yes, it's not type-safe
842  }
843  else { // apply the constraint
844  vector<unsigned long long> array_shape = get_shape(false);
845 
846  // Reserve space in this array for the constrained size of the data request
847  reserve_value_capacity(get_size(true));
848  unsigned long target_index = 0;
849  vector<unsigned long long> subset;
850 
851  insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf());
852  }
853 
854  set_read_p(true);
855 }
856 
876 void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk, unsigned int dim, unsigned long long array_offset,
877  const vector<unsigned long long> &array_shape,
878  unsigned long long chunk_offset, const vector<unsigned long long> &chunk_shape,
879  const vector<unsigned long long> &chunk_origin)
880 {
881  // Now we figure out the correct last element. It's possible that a
882  // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
883  // last element of the destination array
884  dimension thisDim = this->get_dimension(dim);
885  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
886  if ((unsigned) thisDim.stop < end_element) {
887  end_element = thisDim.stop;
888  }
889 
890  unsigned long long chunk_end = end_element - chunk_origin[dim];
891 
892  unsigned int last_dim = chunk_shape.size() - 1;
893  if (dim == last_dim) {
894  unsigned int elem_width = prototype()->width();
895 
896  array_offset += chunk_origin[dim];
897 
898  // Compute how much we are going to copy
899  unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
900  char *source_buffer = chunk->get_rbuf();
901  char *target_buffer = get_buf();
902  memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
903  }
904  else {
905  unsigned long mc = multiplier(chunk_shape, dim);
906  unsigned long ma = multiplier(array_shape, dim);
907 
908  // Not the last dimension, so we continue to proceed down the Recursion Branch.
909  for (unsigned int chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
910  unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
911  unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
912 
913  // Re-entry here:
914  insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
915  chunk_origin);
916  }
917  }
918 }
919 
931 void DmrppArray::read_chunks_unconstrained()
932 {
933  if (get_chunks_size() < 2)
934  throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
935 
936  // Find all the required chunks to read. I used a queue to preserve the chunk order, which
937  // made using a debugger easier. However, order does not matter, AFAIK.
938 
939  unsigned long long sc_count=0;
940  stringstream sc_id;
941  sc_id << name() << "-" << sc_count++;
942  queue<shared_ptr<SuperChunk>> super_chunks;
943  auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this)) ;
944  super_chunks.push(current_super_chunk);
945 
946  // Make the SuperChunks using all the chunks.
947  for(const auto& chunk: get_immutable_chunks()){
948  bool added = current_super_chunk->add_chunk(chunk);
949  if(!added){
950  sc_id.str(std::string());
951  sc_id << name() << "-" << sc_count++;
952  current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
953  super_chunks.push(current_super_chunk);
954  if(!current_super_chunk->add_chunk(chunk)){
955  stringstream msg ;
956  msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
957  throw BESInternalError(msg.str(), __FILE__, __LINE__);
958  }
959  }
960  }
961  reserve_value_capacity(get_size());
962  // The size in element of each of the array's dimensions
963  const vector<unsigned long long> array_shape = get_shape(true);
964  // The size, in elements, of each of the chunk's dimensions
965  const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
966 
967 
968  BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
969  BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
970 
971  if (!DmrppRequestHandler::d_use_transfer_threads) { // Serial transfers
972 #if DMRPP_ENABLE_THREAD_TIMERS
973  BESStopWatch sw(dmrpp_3);
974  sw.start(prolog + "Serial SuperChunk Processing.");
975 #endif
976  while(!super_chunks.empty()) {
977  auto super_chunk = super_chunks.front();
978  super_chunks.pop();
979  BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
980  // FIXME Since this is read_chunks_unconstrained, should call SuperChunk::read_unconstrained()
981  // jhrg 11/19/21
982  super_chunk->read();
983  }
984  }
985  else { // Parallel transfers
986 #if DMRPP_ENABLE_THREAD_TIMERS
987  stringstream timer_name;
988  timer_name << prolog << "Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
989  BESStopWatch sw(dmrpp_3);
990  sw.start(timer_name.str());
991 #endif
992  read_super_chunks_unconstrained_concurrent(super_chunks, this);
993  }
994  set_read_p(true);
995 }
996 
997 
1000 
1013 unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned int chunk_origin)
1014 {
1015  // What's the first element that we are going to access for this dimension of the chunk?
1016  unsigned long long first_element_offset = 0; // start with 0
1017  if ((unsigned) (thisDim.start) < chunk_origin) {
1018  // If the start is behind this chunk, then it's special.
1019  if (thisDim.stride != 1) {
1020  // And if the stride isn't 1, we have to figure our where to begin in this chunk.
1021  first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1022  // If it's zero great!
1023  if (first_element_offset != 0) {
1024  // otherwise, adjust to get correct first element.
1025  first_element_offset = thisDim.stride - first_element_offset;
1026  }
1027  }
1028  }
1029  else {
1030  first_element_offset = thisDim.start - chunk_origin;
1031  }
1032 
1033  return first_element_offset;
1034 }
1035 
1057 shared_ptr<Chunk>
1058 DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1059 {
1060  BESDEBUG(dmrpp_3, prolog << " BEGIN, dim: " << dim << endl);
1061 
1062  // The size, in elements, of each of the chunk's dimensions.
1063  const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1064 
1065  // The chunk's origin point a.k.a. its "position in array".
1066  const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1067 
1068  dimension thisDim = this->get_dimension(dim);
1069 
1070  // Do we even want this chunk?
1071  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1072  (unsigned) thisDim.stop < chunk_origin[dim]) {
1073  return nullptr; // No. No, we do not. Skip this chunk.
1074  }
1075 
1076  // What's the first element that we are going to access for this dimension of the chunk?
1077  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1078 
1079  // Is the next point to be sent in this chunk at all? If no, return.
1080  if (chunk_start > chunk_shape[dim]) {
1081  return nullptr;
1082  }
1083 
1084  // Now we figure out the correct last element, based on the subset expression
1085  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1086  if ((unsigned) thisDim.stop < end_element) {
1087  end_element = thisDim.stop;
1088  }
1089 
1090  unsigned long long chunk_end = end_element - chunk_origin[dim];
1091 
1092  unsigned int last_dim = chunk_shape.size() - 1;
1093  if (dim == last_dim) {
1094  BESDEBUG(dmrpp_3, prolog << " END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1095  return chunk;
1096  }
1097  else {
1098  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1099  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1100  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1101 
1102  // Re-entry here:
1103  auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1104  if (needed){
1105  BESDEBUG(dmrpp_3, prolog << " END, Found chunk: " << needed->to_string() << endl);
1106  return needed;
1107  }
1108 
1109  }
1110  }
1111  BESDEBUG(dmrpp_3, prolog << " END, dim: " << dim << endl);
1112 
1113  return nullptr;
1114 }
1115 
1135 void DmrppArray::insert_chunk(
1136  unsigned int dim,
1137  vector<unsigned long long> *target_element_address,
1138  vector<unsigned long long> *chunk_element_address,
1139  shared_ptr<Chunk> chunk,
1140  const vector<unsigned long long> &constrained_array_shape){
1141 
1142  // The size, in elements, of each of the chunk's dimensions.
1143  const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1144 
1145  // The chunk's origin point a.k.a. its "position in array".
1146  const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1147 
1148  dimension thisDim = this->get_dimension(dim);
1149 
1150  // What's the first element that we are going to access for this dimension of the chunk?
1151  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1152 
1153  // Now we figure out the correct last element, based on the subset expression
1154  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1155  if ((unsigned) thisDim.stop < end_element) {
1156  end_element = thisDim.stop;
1157  }
1158 
1159  unsigned long long chunk_end = end_element - chunk_origin[dim];
1160 
1161  unsigned int last_dim = chunk_shape.size() - 1;
1162  if (dim == last_dim) {
1163  char *source_buffer = chunk->get_rbuf();
1164  char *target_buffer = get_buf();
1165  unsigned int elem_width = prototype()->width();
1166 
1167  if (thisDim.stride == 1) {
1168  // The start element in this array
1169  unsigned long long start_element = chunk_origin[dim] + chunk_start;
1170  // Compute how much we are going to copy
1171  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1172 
1173  // Compute where we need to put it.
1174  (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
1175  // Compute where we are going to read it from
1176  (*chunk_element_address)[dim] = chunk_start;
1177 
1178  // See below re get_index()
1179  unsigned long long target_char_start_index =
1180  get_index(*target_element_address, constrained_array_shape) * elem_width;
1181  unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1182 
1183  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1184  chunk_constrained_inner_dim_bytes);
1185  }
1186  else {
1187  // Stride != 1
1188  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1189  // Compute where we need to put it.
1190  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1191 
1192  // Compute where we are going to read it from
1193  (*chunk_element_address)[dim] = chunk_index;
1194 
1195  // These calls to get_index() can be removed as with the insert...unconstrained() code.
1196  unsigned int target_char_start_index =
1197  get_index(*target_element_address, constrained_array_shape) * elem_width;
1198  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1199 
1200  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1201  }
1202  }
1203  }
1204  else {
1205  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1206  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1207  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1208  (*chunk_element_address)[dim] = chunk_index;
1209 
1210  // Re-entry here:
1211  insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1212  }
1213  }
1214 }
1215 
1222 void DmrppArray::read_chunks()
1223 {
1224  if (get_chunks_size() < 2)
1225  throw BESInternalError(string("Expected chunks for variable ") + name(), __FILE__, __LINE__);
1226 
1227  // Find all the required chunks to read. I used a queue to preserve the chunk order, which
1228  // made using a debugger easier. However, order does not matter, AFAIK.
1229  unsigned long long sc_count=0;
1230  stringstream sc_id;
1231  sc_id << name() << "-" << sc_count++;
1232  queue<shared_ptr<SuperChunk>> super_chunks;
1233  auto current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(), this)) ;
1234  super_chunks.push(current_super_chunk);
1235 
1236  // TODO We know that non-contiguous chunks may be forward or backward in the file from
1237  // the current offset. When an add_chunk() call fails, prior to making a new SuperChunk
1238  // we might want want try adding the rejected Chunk to the other existing SuperChunks to see
1239  // if it's contiguous there.
1240  // Find the required Chunks and put them into SuperChunks.
1241  for(const auto& chunk: get_immutable_chunks()){
1242  vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1243  auto needed = find_needed_chunks(0 /* dimension */, &target_element_address, chunk);
1244  if (needed){
1245  bool added = current_super_chunk->add_chunk(chunk);
1246  if(!added){
1247  sc_id.str(std::string()); // Clears stringstream.
1248  sc_id << name() << "-" << sc_count++;
1249  current_super_chunk = shared_ptr<SuperChunk>(new SuperChunk(sc_id.str(),this));
1250  super_chunks.push(current_super_chunk);
1251  if(!current_super_chunk->add_chunk(chunk)){
1252  stringstream msg ;
1253  msg << prolog << "Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1254  throw BESInternalError(msg.str(), __FILE__, __LINE__);
1255  }
1256  }
1257  }
1258  }
1259 
1260  reserve_value_capacity(get_size(true));
1261 
1262  BESDEBUG(dmrpp_3, prolog << "d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ? "true" : "false") << endl);
1263  BESDEBUG(dmrpp_3, prolog << "d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1264  BESDEBUG(dmrpp_3, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
1265  BESDEBUG(dmrpp_3, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
1266  BESDEBUG(dmrpp_3, prolog << "SuperChunks.size(): " << super_chunks.size() << endl);
1267 
1268  if (!DmrppRequestHandler::d_use_transfer_threads) {
1269  // This version is the 'serial' version of the code. It reads a chunk, inserts it,
1270  // reads the next one, and so on.
1271 #if DMRPP_ENABLE_THREAD_TIMERS
1272  BESStopWatch sw(dmrpp_3);
1273  sw.start(prolog + "Serial SuperChunk Processing.");
1274 #endif
1275  while (!super_chunks.empty()) {
1276  auto super_chunk = super_chunks.front();
1277  super_chunks.pop();
1278  BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(true) << endl );
1279  super_chunk->read();
1280  }
1281  }
1282  else {
1283 #if DMRPP_ENABLE_THREAD_TIMERS
1284  stringstream timer_name;
1285  timer_name << prolog << "Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
1286  BESStopWatch sw(dmrpp_3);
1287  sw.start(timer_name.str());
1288 #endif
1289  read_super_chunks_concurrent(super_chunks, this);
1290  }
1291  set_read_p(true);
1292 }
1293 
1294 
1295 #ifdef USE_READ_SERIAL
1317 void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1318  Chunk *chunk)
1319 {
1320  BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
1321 
1322  // The size, in elements, of each of the chunk's dimensions.
1323  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1324 
1325  // The chunk's origin point a.k.a. its "position in array".
1326  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1327 
1328  dimension thisDim = this->get_dimension(dim);
1329 
1330  // Do we even want this chunk?
1331  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
1332  return; // No. No, we do not. Skip this.
1333  }
1334 
1335  // What's the first element that we are going to access for this dimension of the chunk?
1336  unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1337 
1338  // Is the next point to be sent in this chunk at all? If no, return.
1339  if (first_element_offset > chunk_shape[dim]) {
1340  return;
1341  }
1342 
1343  // Now we figure out the correct last element, based on the subset expression
1344  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1345  if ((unsigned) thisDim.stop < end_element) {
1346  end_element = thisDim.stop;
1347  }
1348 
1349  unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
1350  unsigned long long chunk_end = end_element - chunk_origin[dim];
1351  vector<unsigned int> constrained_array_shape = get_shape(true);
1352 
1353  unsigned int last_dim = chunk_shape.size() - 1;
1354  if (dim == last_dim) {
1355  // Read and Process chunk
1356  chunk->read_chunk();
1357 
1358  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
1359 
1360  char *source_buffer = chunk->get_rbuf();
1361  char *target_buffer = get_buf();
1362  unsigned int elem_width = prototype()->width();
1363 
1364  if (thisDim.stride == 1) {
1365  // The start element in this array
1366  unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1367  // Compute how much we are going to copy
1368  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1369 
1370  // Compute where we need to put it.
1371  (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1372  // Compute where we are going to read it from
1373  (*chunk_element_address)[dim] = first_element_offset;
1374 
1375  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1376  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1377 
1378  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1379  }
1380  else {
1381  // Stride != 1
1382  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1383  // Compute where we need to put it.
1384  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1385 
1386  // Compute where we are going to read it from
1387  (*chunk_element_address)[dim] = chunk_index;
1388 
1389  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1390  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1391 
1392  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1393  }
1394  }
1395  }
1396  else {
1397  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1398  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1399  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1400  (*chunk_element_address)[dim] = chunk_index;
1401 
1402  // Re-entry here:
1403  insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1404  }
1405  }
1406 }
1407 
1408 void DmrppArray::read_chunks_serial()
1409 {
1410  BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
1411 
1412  vector<Chunk> &chunk_refs = get_chunk_vec();
1413  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1414 
1415  // Allocate target memory.
1416  reserve_value_capacity(get_size(true));
1417 
1418  /*
1419  * Find the chunks to be read, make curl_easy handles for them, and
1420  * stuff them into our curl_multi handle. This is a recursive activity
1421  * which utilizes the same code that copies the data from the chunk to
1422  * the variables.
1423  */
1424  for (unsigned long i = 0; i < chunk_refs.size(); i++) {
1425  Chunk &chunk = chunk_refs[i];
1426 
1427  vector<unsigned int> chunk_source_address(dimensions(), 0);
1428  vector<unsigned int> target_element_address = chunk.get_position_in_array();
1429 
1430  // Recursive insertion operation.
1431  insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1432  }
1433 
1434  set_read_p(true);
1435 
1436  BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
1437 }
1438 #endif
1439 
1440 void
1441 DmrppArray::set_send_p(bool state)
1442 {
1443  if (!get_attributes_loaded())
1444  load_attributes(this);
1445 
1446  Array::set_send_p(state);
1447 }
1448 
1460 bool DmrppArray::read()
1461 {
1462  // If the chunks are not loaded, load them now. NB: load_chunks()
1463  // reads data for HDF5 COMPACT storage, so read_p() will be true.
1464  // Thus, call load_chunks() before testing read_p() to cover that
1465  // case. jhrg 11/15/21
1466  if (!get_chunks_loaded())
1467  load_chunks(this);
1468 
1469  if (read_p()) return true;
1470 
1471  // Single chunk and 'contiguous' are the same for this code.
1472 
1473  if (get_chunks_size() == 1) {
1474  BESDEBUG(dmrpp_4, "Calling read_contiguous() for " << name() << endl);
1475  read_contiguous(); // Throws on various errors
1476  }
1477  else { // Handle the more complex case where the data is chunked.
1478  if (!is_projected()) {
1479  BESDEBUG(dmrpp_4, "Calling read_chunks_unconstrained() for " << name() << endl);
1480  read_chunks_unconstrained();
1481  }
1482  else {
1483  BESDEBUG(dmrpp_4, "Calling read_chunks() for " << name() << endl);
1484  read_chunks();
1485  }
1486  }
1487 
1488  if (this->twiddle_bytes()) {
1489  int num = this->length();
1490  Type var_type = this->var()->type();
1491 
1492  switch (var_type) {
1493  case dods_int16_c:
1494  case dods_uint16_c: {
1495  dods_uint16 *local = reinterpret_cast<dods_uint16*>(this->get_buf());
1496  while (num--) {
1497  *local = bswap_16(*local);
1498  local++;
1499  }
1500  break;
1501  }
1502  case dods_int32_c:
1503  case dods_uint32_c: {
1504  dods_uint32 *local = reinterpret_cast<dods_uint32*>(this->get_buf());;
1505  while (num--) {
1506  *local = bswap_32(*local);
1507  local++;
1508  }
1509  break;
1510  }
1511  case dods_int64_c:
1512  case dods_uint64_c: {
1513  dods_uint64 *local = reinterpret_cast<dods_uint64*>(this->get_buf());;
1514  while (num--) {
1515  *local = bswap_64(*local);
1516  local++;
1517  }
1518  break;
1519  }
1520  default: break; // Do nothing for all other types.
1521  }
1522  }
1523 
1524  return true;
1525 }
1526 
1531 class PrintD4ArrayDimXMLWriter : public unary_function<Array::dimension &, void> {
1532  XMLWriter &xml;
1533  // Was this variable constrained using local/direct slicing? i.e., is d_local_constraint set?
1534  // If so, don't use shared dimensions; instead emit Dim elements that are anonymous.
1535  bool d_constrained;
1536 public:
1537 
1538  PrintD4ArrayDimXMLWriter(XMLWriter &xml, bool c) :
1539  xml(xml), d_constrained(c)
1540  {
1541  }
1542 
1543  void operator()(Array::dimension &d)
1544  {
1545  // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
1546  // because of the need to print the constrained size of a dimension). I think that
1547  // the constraint information has to be kept here and not in the dimension (since they
1548  // are shared dims). Could hack print_dap4() to take the constrained size, however.
1549  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) "Dim") < 0)
1550  throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
1551 
1552  string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1553  // If there is a name, there must be a Dimension (named dimension) in scope
1554  // so write its name but not its size.
1555  if (!d_constrained && !name.empty()) {
1556  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1557  (const xmlChar *) name.c_str()) < 0)
1558  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1559  }
1560  else if (d.use_sdim_for_slice) {
1561  assert(!name.empty());
1562  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name",
1563  (const xmlChar *) name.c_str()) < 0)
1564  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1565  }
1566  else {
1567  ostringstream size;
1568  size << (d_constrained ? d.c_size : d.size);
1569  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "size",
1570  (const xmlChar *) size.str().c_str()) < 0)
1571  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1572  }
1573 
1574  if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1575  throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
1576  }
1577 };
1578 
1579 class PrintD4ConstructorVarXMLWriter : public unary_function<BaseType *, void> {
1580  XMLWriter &xml;
1581  bool d_constrained;
1582 public:
1583  PrintD4ConstructorVarXMLWriter(XMLWriter &xml, bool c) :
1584  xml(xml), d_constrained(c)
1585  {
1586  }
1587 
1588  void operator()(BaseType *btp)
1589  {
1590  btp->print_dap4(xml, d_constrained);
1591  }
1592 };
1593 
1594 class PrintD4MapXMLWriter : public unary_function<D4Map *, void> {
1595  XMLWriter &xml;
1596 
1597 public:
1598  PrintD4MapXMLWriter(XMLWriter &xml) :
1599  xml(xml)
1600  {
1601  }
1602 
1603  void operator()(D4Map *m)
1604  {
1605  m->print_dap4(xml);
1606  }
1607 };
1609 
1633 void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/)
1634 {
1635  if (constrained && !send_p()) return;
1636 
1637  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar *) var()->type_name().c_str()) < 0)
1638  throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
1639 
1640  if (!name().empty())
1641  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "name", (const xmlChar *) name().c_str()) <
1642  0)
1643  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1644 
1645  // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
1646  if (var()->type() == dods_enum_c) {
1647  D4Enum *e = static_cast<D4Enum *>(var());
1648  string path = e->enumeration()->name();
1649  if (e->enumeration()->parent()) {
1650  // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
1651  path = static_cast<D4Group *>(e->enumeration()->parent()->parent())->FQN() + path;
1652  }
1653  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar *) "enum", (const xmlChar *) path.c_str()) < 0)
1654  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
1655  }
1656 
1657  if (prototype()->is_constructor_type()) {
1658  Constructor &c = static_cast<Constructor &>(*prototype());
1659  for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1660  // bind2nd(mem_fun_ref(&BaseType::print_dap4), xml));
1661  }
1662 
1663  // Drop the local_constraint which is per-array and use a per-dimension on instead
1664  for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1665 
1666  attributes()->print_dap4(xml);
1667 
1668  for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1669 
1670  // Only print the chunks' info if there. This is the code added to libdap::Array::print_dap4().
1671  // jhrg 5/10/18
1672  if (DmrppCommon::d_print_chunks && get_chunks_size() > 0)
1673  print_chunks_element(xml, DmrppCommon::d_ns_prefix);
1674 
1675  // If this variable uses the COMPACT layout, encode the values for
1676  // the array using base64. Note that strings are a special case; each
1677  // element of the array is a string and is encoded in its own base64
1678  // xml element. So, while an array of 10 int32 will be encoded in a
1679  // single base64 element, an array of 10 strings will use 10 base64
1680  // elements. This is because the size of each string's value is different.
1681  // Not so for an int32.
1682  if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
1683  switch (var()->type()) {
1684  case dods_byte_c:
1685  case dods_char_c:
1686  case dods_int8_c:
1687  case dods_uint8_c:
1688  case dods_int16_c:
1689  case dods_uint16_c:
1690  case dods_int32_c:
1691  case dods_uint32_c:
1692  case dods_int64_c:
1693  case dods_uint64_c:
1694 
1695  case dods_enum_c:
1696 
1697  case dods_float32_c:
1698  case dods_float64_c: {
1699  u_int8_t *values = 0;
1700  try {
1701  size_t size = buf2val(reinterpret_cast<void **>(&values));
1702  string encoded = base64::Base64::encode(values, size);
1703  print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1704  delete[] values;
1705  }
1706  catch (...) {
1707  delete[] values;
1708  throw;
1709  }
1710  break;
1711  }
1712 
1713  case dods_str_c:
1714  case dods_url_c: {
1715  string *values = 0;
1716  try {
1717  // discard the return value of buf2val()
1718  buf2val(reinterpret_cast<void **>(&values));
1719  string str;
1720  for (int i = 0; i < length(); ++i) {
1721  str = (*(static_cast<string *> (values) + i));
1722  string encoded = base64::Base64::encode(reinterpret_cast<const u_int8_t *>(str.c_str()), str.size());
1723  print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1724  }
1725  delete[] values;
1726  }
1727  catch (...) {
1728  delete[] values;
1729  throw;
1730  }
1731  break;
1732  }
1733 
1734  default:
1735  throw InternalErr(__FILE__, __LINE__, "Vector::val2buf: bad type");
1736  }
1737  }
1738  if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1739  throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
1740 }
1741 
1742 void DmrppArray::dump(ostream &strm) const
1743 {
1744  strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
1745  BESIndent::Indent();
1746  DmrppCommon::dump(strm);
1747  Array::dump(strm);
1748  strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
1749  BESIndent::UnIndent();
1750 }
1751 
1752 } // namespace dmrpp
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
Type
Type of JSON value.
Definition: rapidjson.h:664