11 #ifndef __INPUTPARSER_H__ 12 #define __INPUTPARSER_H__ 14 #include <shogun/lib/config.h> 20 #include <condition_variable> 25 #define PARSER_DEFAULT_BUFFSIZE 100 292 static void* parse_loop_entry_point(
void* params);
368 read_vector=func_ptr;
375 read_vector_and_label=func_ptr;
381 examples_ring =
nullptr;
384 keep_running.store(
false, std::memory_order_release);
396 input_source = input_file;
398 if (is_labelled ==
true)
404 examples_ring =
new CParseBuffer<T>(size);
407 parsing_done =
false;
408 reading_done =
false;
409 number_of_vectors_parsed = 0;
410 number_of_vectors_read = 0;
414 current_feature_vector = NULL;
416 free_after_release=
true;
423 free_after_release=free_vec;
429 examples_ring->set_free_vectors_on_destruct(destroy);
435 SG_SDEBUG(
"entering CInputParser::start_parser()\n")
438 SG_SERROR(
"Parser thread is already running! Multiple parse threads not supported.\n")
443 examples_ring->init_vector();
444 keep_running.store(
true, std::memory_order_release);
445 parse_thread = std::thread(&parse_loop_entry_point,
this);
447 SG_SDEBUG(
"leaving CInputParser::start_parser()\n")
461 SG_SDEBUG(
"entering CInputParser::is_running()\n")
463 std::lock_guard<std::mutex> lock(examples_state_lock);
473 SG_SDEBUG(
"leaving CInputParser::is_running(), returning %d\n", ret)
482 (input_source->*read_vector_and_label)(feature_vector, length, label);
497 (input_source->*read_vector)(feature_vector, length);
511 examples_ring->copy_example(ex);
521 while (keep_running.load(std::memory_order_acquire))
523 std::unique_lock<std::mutex> lock(examples_state_lock);
531 current_example = examples_ring->get_free_example();
532 current_feature_vector = current_example->fv;
533 current_len = current_example->length;
534 current_label = current_example->label;
537 get_vector_and_label(current_feature_vector, current_len, current_label);
539 get_vector_only(current_feature_vector, current_len);
545 examples_state_changed.notify_one();
549 current_example->label = current_label;
550 current_example->fv = current_feature_vector;
551 current_example->length = current_len;
553 examples_ring->copy_example(current_example);
555 number_of_vectors_parsed++;
556 examples_state_changed.notify_one();
568 if (number_of_vectors_read == number_of_vectors_parsed)
572 examples_state_changed.notify_one();
577 if (number_of_vectors_parsed <= 0)
580 if (number_of_vectors_read == number_of_vectors_parsed)
585 ex = examples_ring->get_unused_example();
586 number_of_vectors_read++;
601 while (keep_running.load(std::memory_order_acquire))
606 std::unique_lock<std::mutex> lock(examples_state_lock);
607 ex = retrieve_example();
619 examples_state_changed.wait(lock);
642 return get_next_example(fv, length, label_dummy);
648 examples_ring->finalize_example(free_after_release);
653 SG_SDEBUG(
"entering CInputParser::end_parser\n")
655 if (parse_thread.joinable())
657 SG_SDEBUG(
"leaving CInputParser::end_parser\n")
663 keep_running.store(
false, std::memory_order_release);
664 examples_state_changed.notify_one();
665 if (parse_thread.joinable())
670 #endif // __INPUTPARSER_H__
A Streaming File access class.
all of classes and functions are contained in the shogun namespace
constexpr size_t CPU_CACHE_LINE_SIZE