bes  Updated for version 3.20.10
SuperChunk.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2018 OPeNDAP, Inc.
6 // Author: Nathan Potter<ndp@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <sstream>
27 #include <vector>
28 #include <string>
29 
30 #include "BESInternalError.h"
31 #include "BESDebug.h"
32 
33 #include "DmrppRequestHandler.h"
34 #include "CurlHandlePool.h"
35 #include "DmrppArray.h"
36 #include "DmrppNames.h"
37 #include "Chunk.h"
38 #include "SuperChunk.h"
39 
40 #define prolog std::string("SuperChunk::").append(__func__).append("() - ")
41 
42 #define SUPER_CHUNK_MODULE "dmrpp:3"
43 
44 using std::stringstream;
45 using std::string;
46 using std::vector;
47 
48 namespace dmrpp {
49 
50 // ThreadPool state variables.
51 std::mutex chunk_processing_thread_pool_mtx; // mutex for critical section
52 atomic_uint chunk_processing_thread_counter(0);
53 #define COMPUTE_THREADS "compute_threads"
54 
73 void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array, const vector<unsigned long long> &constrained_array_shape)
74 {
75  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
76 
77  chunk->read_chunk();
78 
79  if(array) {
80  if (!array->is_filters_empty())
81  chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
82 
83  vector<unsigned long long> target_element_address = chunk->get_position_in_array();
84  vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
85 
86  array->insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk,
87  constrained_array_shape);
88  }
89  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
90 }
91 
111 void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk, const vector<unsigned long long> &chunk_shape,
112  DmrppArray *array, const vector<unsigned long long> &array_shape)
113 {
114  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
115 
116  chunk->read_chunk();
117 
118  if(array){
119  if (!array->is_filters_empty())
120  chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
121 
122  array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
123  }
124  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
125 }
126 
127 
133 bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
134 {
135 
136 #if DMRPP_ENABLE_THREAD_TIMERS
137  stringstream timer_tag;
138  timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
139  " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id;
140  BESStopWatch sw(COMPUTE_THREADS);
141  sw.start(timer_tag.str());
142 #endif
143 
144  process_one_chunk(args->chunk, args->array, args->array_shape);
145  return true;
146 }
147 
153 bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
154 {
155 
156 #if DMRPP_ENABLE_THREAD_TIMERS
157  stringstream timer_tag;
158  timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
159  " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id ;
160  BESStopWatch sw(COMPUTE_THREADS);
161  sw.start(timer_tag.str());
162 #endif
163  process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
164  return true;
165 }
166 
177 bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
178  bool retval = false;
179  std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
180  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << " chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
181  if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
182  chunk_processing_thread_counter++;
183  futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
184  retval = true;
185  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
186  "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
187  }
188  return retval;
189 }
190 
201 bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
202  bool retval = false;
203  std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
204  if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
205  futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
206  chunk_processing_thread_counter++;
207  retval = true;
208  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
209  "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
210  }
211  return retval;
212 }
213 
214 
236 void process_chunks_concurrent(
237  const string &super_chunk_id,
238  queue<shared_ptr<Chunk>> &chunks,
239  DmrppArray *array,
240  const vector<unsigned long long> &constrained_array_shape ){
241 
242  // We maintain a list of futures to track our parallel activities.
243  list<future<bool>> futures;
244  try {
245  bool done = false;
246  bool future_finished = true;
247  while (!done) {
248 
249  if(!futures.empty())
250  future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
251 
252  // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
253  // because future::get() was called or a call to future::valid() returned false.
254  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
255 
256  if (!chunks.empty()){
257  // Next we try to add a new Chunk compute thread if we can - there might be room.
258  bool thread_started = true;
259  while(thread_started && !chunks.empty()) {
260  auto chunk = chunks.front();
261  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
262 
263  auto args = unique_ptr<one_chunk_args>(new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
264  thread_started = start_one_chunk_compute_thread(futures, std::move(args));
265 
266  if (thread_started) {
267  chunks.pop();
268  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
269  } else {
270  // Thread did not start, ownership of the arguments was not passed to the thread.
271  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.) " <<
272  "chunk_processing_thread_counter: " << chunk_processing_thread_counter << " futures.size(): " << futures.size() << endl);
273  }
274  }
275  }
276  else {
277  // No more Chunks and no futures means we're done here.
278  if(futures.empty())
279  done = true;
280  }
281  future_finished = false;
282  }
283  }
284  catch (...) {
285  // Complete all of the futures, otherwise we'll have threads out there using up resources
286  while(!futures.empty()){
287  if(futures.back().valid())
288  futures.back().get();
289  futures.pop_back();
290  }
291  // re-throw the exception
292  throw;
293  }
294 }
295 
296 
297 
298 
321 void process_chunks_unconstrained_concurrent(
322  const string &super_chunk_id,
323  queue<shared_ptr<Chunk>> &chunks,
324  const vector<unsigned long long> &chunk_shape,
325  DmrppArray *array,
326  const vector<unsigned long long> &array_shape){
327 
328  // We maintain a list of futures to track our parallel activities.
329  list<future<bool>> futures;
330  try {
331  bool done = false;
332  bool future_finished = true;
333  while (!done) {
334 
335  if(!futures.empty())
336  future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
337 
338  // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
339  // because future::get() was called or a call to future::valid() returned false.
340  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
341 
342  if (!chunks.empty()){
343  // Next we try to add a new Chunk compute thread if we can - there might be room.
344  bool thread_started = true;
345  while(thread_started && !chunks.empty()) {
346  auto chunk = chunks.front();
347  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
348 
349  auto args = unique_ptr<one_chunk_unconstrained_args>(
350  new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
351  thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
352 
353  if (thread_started) {
354  chunks.pop();
355  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
356  } else {
357  // Thread did not start, ownership of the arguments was not passed to the thread.
358  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
359  " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
360  " futures.size(): " << futures.size() << endl);
361  }
362  }
363  }
364  else {
365  // No more Chunks and no futures means we're done here.
366  if(futures.empty())
367  done = true;
368  }
369  future_finished = false;
370  }
371  }
372  catch (...) {
373  // Complete all of the futures, otherwise we'll have threads out there using up resources
374  while(!futures.empty()){
375  if(futures.back().valid())
376  futures.back().get();
377  futures.pop_back();
378  }
379  // re-throw the exception
380  throw;
381  }
382 }
383 //#####################################################################################################################
384 //#####################################################################################################################
385 //#####################################################################################################################
386 //
387 // SuperChunk Code Begins Here
388 //
389 // = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
390 
391 
400 bool SuperChunk::add_chunk(const std::shared_ptr<Chunk> candidate_chunk) {
401  bool chunk_was_added = false;
402  if(d_chunks.empty()){
403  d_chunks.push_back(candidate_chunk);
404  d_offset = candidate_chunk->get_offset();
405  d_size = candidate_chunk->get_size();
406  d_data_url = candidate_chunk->get_data_url();
407  chunk_was_added = true;
408  }
409  else if(is_contiguous(candidate_chunk) ){
410  this->d_chunks.push_back(candidate_chunk);
411  d_size += candidate_chunk->get_size();
412  chunk_was_added = true;
413  }
414  return chunk_was_added;
415 }
416 
417 
430 bool SuperChunk::is_contiguous(const std::shared_ptr<Chunk> candidate_chunk) {
431  // Are the URLs the same?
432  bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
433  if(contiguous){
434  // If the URLs match then see if the locations are matching
435  contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
436  }
437  return contiguous;
438 }
439 
448 void SuperChunk::map_chunks_to_buffer()
449 {
450  unsigned long long bindex = 0;
451  for(const auto &chunk : d_chunks){
452  chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0, false);
453  bindex += chunk->get_size();
454  if(bindex>d_size){
455  stringstream msg;
456  msg << "ERROR The computed buffer index, " << bindex << " is larger than expected size of the SuperChunk. ";
457  msg << "d_size: " << d_size;
458  throw BESInternalError(msg.str(), __FILE__, __LINE__);
459 
460  }
461  }
462 }
463 
468 void SuperChunk::read_aggregate_bytes()
469 {
470  // Since we already have a good infrastructure for reading Chunks, we just make a big-ol-Chunk to
471  // use for grabbing bytes. Then, once read, we'll use the child Chunks to do the dirty work of inflating
472  // and moving the results into the DmrppCommon object.
473  Chunk chunk(d_data_url, "NOT_USED", d_size, d_offset);
474 
475  chunk.set_read_buffer(d_read_buffer, d_size,0,false);
476 
477  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
478  if (!handle)
479  throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
480 
481  try {
482  handle->read_data(); // throws if error
483  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
484  }
485  catch(...) {
486  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
487  throw;
488  }
489 
490  // If the expected byte count was not read, it's an error.
491  if (d_size != chunk.get_bytes_read()) {
492  ostringstream oss;
493  oss << "Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() << ", expected: " << d_size;
494  throw BESInternalError(oss.str(), __FILE__, __LINE__);
495  }
496  d_is_read = true;
497 }
498 
499 
504  if (d_is_read) {
505  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "SuperChunk (" << (void **) this << ") has already been read! Returning." << endl);
506  return;
507  }
508 
509  if(!d_read_buffer){
510  // Allocate memory for SuperChunk receive buffer.
511  // release memory in destructor.
512  d_read_buffer = new char[d_size];
513  }
514 
515  // Massage the chunks so that their read/receive/intern data buffer
516  // points to the correct section of the d_read_buffer memory.
517  // "Slice it up!"
518  map_chunks_to_buffer();
519 
520  // Read the bytes from the target URL. (pthreads, maybe depends on size...)
521  // Use one (or possibly more) thread(s) depending on d_size
522  // and utilize our friend cURL to stuff the bytes into d_read_buffer
523  read_aggregate_bytes();
524 
525  // Set each Chunk's read state to true.
526  // Set each chunks byte count to the expected
527  // size for the chunk - because upstream events
528  // have assured this to be true.
529  for(auto chunk : d_chunks){
530  chunk->set_is_read(true);
531  chunk->set_bytes_read(chunk->get_size());
532  }
533 }
534 
535 
541  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
542  retrieve_data();
543 
544  vector<unsigned long long> constrained_array_shape = d_parent_array->get_shape(true);
545  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
546  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
547 
548  if(!DmrppRequestHandler::d_use_compute_threads){
549 #if DMRPP_ENABLE_THREAD_TIMERS
550  BESStopWatch sw(SUPER_CHUNK_MODULE);
551  sw.start(prolog+"Serial Chunk Processing. id: " + d_id);
552 #endif
553  for(const auto &chunk :get_chunks()){
554  process_one_chunk(chunk,d_parent_array,constrained_array_shape);
555  }
556  }
557  else {
558 #if DMRPP_ENABLE_THREAD_TIMERS
559  stringstream timer_name;
560  timer_name << prolog << "Concurrent Chunk Processing. id: " << d_id;
561  BESStopWatch sw(SUPER_CHUNK_MODULE);
562  sw.start(timer_name.str());
563 #endif
564  queue<shared_ptr<Chunk>> chunks_to_process;
565  for(const auto &chunk:get_chunks())
566  chunks_to_process.push(chunk);
567 
568  process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
569  }
570  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
571 }
572 
573 
579 
580  BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
581  retrieve_data();
582 
583  // The size in element of each of the array's dimensions
584  const vector<unsigned long long> array_shape = d_parent_array->get_shape(true);
585  // The size, in elements, of each of the chunk's dimensions
586  const vector<unsigned long long> chunk_shape = d_parent_array->get_chunk_dimension_sizes();
587 
588  if(!DmrppRequestHandler::d_use_compute_threads){
589 #if DMRPP_ENABLE_THREAD_TIMERS
590  BESStopWatch sw(SUPER_CHUNK_MODULE);
591  sw.start(prolog + "Serial Chunk Processing. sc_id: " + d_id );
592 #endif
593  for(auto &chunk :get_chunks()){
594  process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
595  }
596  }
597  else {
598 #if DMRPP_ENABLE_THREAD_TIMERS
599  stringstream timer_name;
600  timer_name << prolog << "Concurrent Chunk Processing. sc_id: " << d_id;
601  BESStopWatch sw(SUPER_CHUNK_MODULE);
602  sw.start(timer_name.str());
603 #endif
604  queue<shared_ptr<Chunk>> chunks_to_process;
605  for (auto &chunk:get_chunks())
606  chunks_to_process.push(chunk);
607 
608  process_chunks_unconstrained_concurrent(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
609  }
610 
611 }
612 
613 
619 string SuperChunk::to_string(bool verbose=false) const {
620  stringstream msg;
621  msg << "[SuperChunk: " << (void **)this;
622  msg << " offset: " << d_offset;
623  msg << " size: " << d_size ;
624  msg << " chunk_count: " << d_chunks.size();
625  //msg << " parent: " << d_parent->name();
626  msg << "]";
627  if (verbose) {
628  msg << endl;
629  for (auto chunk: d_chunks) {
630  msg << chunk->to_string() << endl;
631  }
632  }
633  return msg.str();
634 }
635 
640 void SuperChunk::dump(ostream & strm) const {
641  strm << to_string(false) ;
642 }
643 
644 } // namespace dmrpp
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:596
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
Definition: DmrppCommon.h:179
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
Definition: SuperChunk.cc:503
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
Definition: SuperChunk.cc:400
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
Definition: SuperChunk.cc:619
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
Definition: SuperChunk.cc:640
virtual void process_child_chunks()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
Definition: SuperChunk.cc:540
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
Definition: SuperChunk.cc:578