48 #include "BESInterface.h" 50 #include "TheBESKeys.h" 51 #include "BESResponseHandler.h" 52 #include "BESAggFactory.h" 53 #include "BESAggregationServer.h" 54 #include "BESReporterList.h" 55 #include "BESContextManager.h" 57 #include "BESExceptionManager.h" 59 #include "BESDataNames.h" 62 #include "BESStopWatch.h" 63 #include "BESTimeoutError.h" 64 #include "BESInternalError.h" 65 #include "BESInternalFatalError.h" 71 list<p_bes_init> BESInterface::_init_list;
72 list<p_bes_end> BESInterface::_end_list;
74 static jmp_buf timeout_jump;
75 static bool timeout_jump_valid =
false;
86 static volatile int timeout = 0;
88 #define BES_TIMEOUT_KEY "BES.TimeOutInSeconds" 90 static void catch_sig_alarm(
int sig)
93 LOG(
"Child listener timeout after " << timeout <<
" seconds, exiting." << endl);
98 if (timeout_jump_valid)
99 longjmp(timeout_jump, 1);
104 signal(SIGTERM, SIG_DFL);
110 static void register_signal_handler()
112 struct sigaction act;
113 sigemptyset(&act.sa_mask);
114 sigaddset(&act.sa_mask, SIGALRM);
120 act.sa_handler = catch_sig_alarm;
121 if (sigaction(SIGALRM, &act, 0))
122 throw BESInternalFatalError(
"Could not register a handler to catch alarm/timeout.", __FILE__, __LINE__);
153 static pthread_t alarm_thread;
155 static void* alarm_wait(
void * )
157 BESDEBUG(
"bes",
"Starting: " << __PRETTY_FUNCTION__ << endl);
161 sigemptyset(&sigset);
162 sigaddset(&sigset, SIGALRM);
163 sigprocmask(SIG_BLOCK, &sigset, NULL);
168 int result = sigwait(&sigset, &sig);
170 BESDEBUG(
"bes",
"Fatal error establishing timeout: " << strerror(result) << endl);
171 throw BESInternalFatalError(
string(
"Fatal error establishing timeout: ") + strerror(result), __FILE__, __LINE__);
173 else if (result == 0 && sig == SIGALRM) {
174 BESDEBUG(
"bes",
"Timeout found in " << __PRETTY_FUNCTION__ << endl);
179 oss <<
"While waiting for a timeout, found signal '" << result <<
"' in " << __PRETTY_FUNCTION__ << ends;
180 BESDEBUG(
"bes", oss.str() << endl);
185 static void wait_for_timeout()
187 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
189 pthread_attr_t thread_attr;
191 if (pthread_attr_init(&thread_attr) != 0)
193 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED ) != 0)
194 throw BESInternalFatalError(
"Failed to complete pthread attribute initialization.", __FILE__, __LINE__);
196 int status = pthread_create(&alarm_thread, &thread_attr, alarm_wait, NULL);
202 BESInterface::BESInterface(ostream *output_stream) :
203 _strm(output_stream), _timeout_from_keys(0), _dhi(0), _transmitter(0)
205 if (!output_stream) {
206 throw BESInternalError(
"output stream must be set in order to output responses", __FILE__, __LINE__);
214 string timeout_key_value;
217 istringstream iss(timeout_key_value);
218 iss >> _timeout_from_keys;
222 register_signal_handler();
229 BESInterface::~BESInterface()
271 extern BESStopWatch *bes_timing::elapsedTimeToReadStart;
272 extern BESStopWatch *bes_timing::elapsedTimeToTransmitStart;
274 int BESInterface::execute_request(
const string &from)
276 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ << endl);
279 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
283 if (BESISDEBUG(TIMING_LOG)) {
284 sw.
start(
"BESInterface::execute_request", _dhi->data[REQUEST_ID]);
286 bes_timing::elapsedTimeToReadStart =
new BESStopWatch();
287 bes_timing::elapsedTimeToReadStart->
start(
"TIME_TO_READ_START", _dhi->data[REQUEST_ID]);
289 bes_timing::elapsedTimeToTransmitStart =
new BESStopWatch();
290 bes_timing::elapsedTimeToTransmitStart->
start(
"TIME_TO_TRANSMIT_START", _dhi->data[REQUEST_ID]);
294 throw BESInternalError(
"DataHandlerInterface can not be null", __FILE__, __LINE__);
297 _dhi->set_output_stream(_strm);
298 _dhi->data[REQUEST_FROM] = from;
300 pid_t thepid = getpid();
303 _dhi->data[SERVER_PID] = ss.str();
315 *(BESLog::TheLog()) << _dhi->data[SERVER_PID] <<
" from " << _dhi->data[REQUEST_FROM] <<
" request received" 320 validate_data_request();
322 build_data_request_plan();
325 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
334 if (setjmp(timeout_jump) == 0) {
335 timeout_jump_valid =
true;
336 execute_data_request_plan();
338 timeout_jump_valid =
false;
342 oss <<
"BES listener timeout after " << timeout <<
" seconds." << ends;
346 _dhi->executed =
true;
349 timeout_jump_valid =
false;
350 return exception_manager(ex);
352 catch (bad_alloc &e) {
353 timeout_jump_valid =
false;
355 return exception_manager(ex);
357 catch (exception &e) {
358 timeout_jump_valid =
false;
360 return exception_manager(ex);
363 timeout_jump_valid =
false;
364 BESInternalError ex(
"An undefined exception has been thrown", __FILE__, __LINE__);
365 return exception_manager(ex);
368 delete bes_timing::elapsedTimeToReadStart;
369 bes_timing::elapsedTimeToReadStart = 0;
371 delete bes_timing::elapsedTimeToTransmitStart;
372 bes_timing::elapsedTimeToTransmitStart = 0;
380 int BESInterface::finish(
int )
382 BESDEBUG(
"bes",
"Entering: " << __PRETTY_FUNCTION__ <<
" ***" << endl);
391 if (_dhi->error_info) {
393 delete _dhi->error_info;
394 _dhi->error_info = 0;
398 status = exception_manager(ex);
400 catch (bad_alloc &) {
401 string serr =
"BES out of memory";
403 status = exception_manager(ex);
406 string serr =
"An undefined exception has been thrown";
408 status = exception_manager(ex);
415 if (_dhi->error_info) {
416 _dhi->error_info->print(cout);
417 delete _dhi->error_info;
418 _dhi->error_info = 0;
428 (*BESLog::TheLog()) <<
"Problem logging status: " << ex.
get_message() << endl;
431 (*BESLog::TheLog()) <<
"Unknown problem logging status" << endl;
438 (*BESLog::TheLog()) <<
"Problem reporting request: " << ex.
get_message() << endl;
441 (*BESLog::TheLog()) <<
"Unknown problem reporting request" << endl;
448 (*BESLog::TheLog()) <<
"Problem ending request: " << ex.
get_message() << endl;
451 (*BESLog::TheLog()) <<
"Unknown problem ending request" << endl;
457 int BESInterface::finish_with_error(
int status)
459 if (!_dhi->error_info) {
461 string serr =
"Finish_with_error called with no error object";
463 status = exception_manager(ex);
466 return finish(status);
469 void BESInterface::add_init_callback(p_bes_init init)
471 _init_list.push_back(init);
482 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::initialize", _dhi->data[REQUEST_ID]);
484 BESDEBUG(
"bes",
"Initializing request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
485 bool do_continue =
true;
486 init_iter i = _init_list.begin();
488 for (; i != _init_list.end() && do_continue ==
true; i++) {
490 do_continue = p(*_dhi);
494 BESDEBUG(
"bes",
"FAILED" << endl);
495 string se =
"Initialization callback failed, exiting";
499 BESDEBUG(
"bes",
"OK" << endl);
528 if (BESISDEBUG(TIMING_LOG))
529 sw.
start(
"BESInterface::execute_data_request_plan(\"" + _dhi->data[DATA_REQUEST] +
"\")",
530 _dhi->data[REQUEST_ID]);
535 string context = BESContextManager::TheManager()->
get_context(
"bes_timeout", found);
537 timeout = strtol(context.c_str(), NULL, 10);
538 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from context)." << endl);
541 else if (_timeout_from_keys != 0) {
542 timeout = _timeout_from_keys;
543 VERBOSE(
"Set request timeout to " << timeout <<
" seconds (from keys)." << endl);
548 BESDEBUG(
"bes",
"Executing request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
554 BESDEBUG(
"bes",
"FAILED" << endl);
555 string se =
"The response handler \"" + _dhi->action +
"\" does not exist";
558 BESDEBUG(
"bes",
"OK" << endl);
561 invoke_aggregation();
578 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::invoke_aggregation", _dhi->data[REQUEST_ID]);
580 if (_dhi->data[AGG_CMD] !=
"") {
581 BESDEBUG(
"bes",
"aggregating with: " << _dhi->data[AGG_CMD] <<
" ... "<< endl);
587 BESDEBUG(
"bes",
"FAILED" << endl);
588 string se =
"The aggregation handler " + _dhi->data[AGG_HANDLER] +
"does not exist";
591 BESDEBUG(
"bes",
"OK" << endl);
611 if (BESISDEBUG(TIMING_LOG)) sw.
start(
"BESInterface::transmit_data", _dhi->data[REQUEST_ID]);
613 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting request: " << _dhi->data[DATA_REQUEST] << endl);
617 if (_dhi->error_info) {
619 _dhi->error_info->print(strm);
620 (*BESLog::TheLog()) << strm.str() << endl;
621 BESDEBUG(
"bes",
" transmitting error info using transmitter ... " << endl << strm.str() << endl);
623 _dhi->error_info->transmit(_transmitter, *_dhi);
625 else if (_dhi->response_handler) {
626 BESDEBUG(
"bes",
" BESInterface::transmit_data() - Response handler " << _dhi->response_handler->get_name() << endl);
628 _dhi->response_handler->transmit(_transmitter, *_dhi);
634 if (_dhi->error_info) {
635 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Transmitting error info using cout ... " << endl);
636 _dhi->error_info->print(cout);
637 delete _dhi->error_info;
638 _dhi->error_info = 0;
641 BESDEBUG(
"bes",
"BESInterface::transmit_data() - Unable to transmit the response ... FAILED " << endl);
643 throw BESInternalError(
"Unable to transmit the response, no transmitter", __FILE__, __LINE__);
648 BESDEBUG(
"bes",
"BESInterface::transmit_data() - OK" << endl);
670 BESDEBUG(
"bes",
"Reporting on request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
672 BESReporterList::TheList()->report(*_dhi);
674 BESDEBUG(
"bes",
"OK" << endl);
677 void BESInterface::add_end_callback(p_bes_end end)
679 _end_list.push_back(end);
689 BESDEBUG(
"bes",
"Ending request: " << _dhi->data[DATA_REQUEST] <<
" ... " << endl);
690 end_iter i = _end_list.begin();
691 for (; i != _end_list.end(); i++) {
698 _dhi->first_container();
699 while (_dhi->container) {
700 BESDEBUG(
"bes",
"Calling BESContainer::release()" << endl);
701 _dhi->container->release();
702 _dhi->next_container();
705 BESDEBUG(
"bes",
"OK" << endl);
712 if (_dhi) _dhi->clean();
742 strm << BESIndent::LMarg <<
"BESInterface::dump - (" << (
void *)
this <<
")" << endl;
745 if (_init_list.size()) {
746 strm << BESIndent::LMarg <<
"termination functions:" << endl;
748 init_iter i = _init_list.begin();
749 for (; i != _init_list.end(); i++) {
752 strm << BESIndent::LMarg << (
void *) (*i) << endl;
754 BESIndent::UnIndent();
757 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
760 if (_end_list.size()) {
761 strm << BESIndent::LMarg <<
"termination functions:" << endl;
763 end_iter i = _end_list.begin();
764 for (; i != _end_list.end(); i++) {
765 strm << BESIndent::LMarg << (
void *) (*i) << endl;
767 BESIndent::UnIndent();
770 strm << BESIndent::LMarg <<
"termination functions: none" << endl;
773 strm << BESIndent::LMarg <<
"data handler interface:" << endl;
776 BESIndent::UnIndent();
779 strm << BESIndent::LMarg <<
"transmitter:" << endl;
781 _transmitter->dump(strm);
782 BESIndent::UnIndent();
785 strm << BESIndent::LMarg <<
"transmitter: not set" << endl;
787 BESIndent::UnIndent();
error thrown if there is a user syntax error in the request or any other user error ...
exception thrown if an internal error is found and is fatal to the BES
exception thrown if inernal error encountered
virtual void initialize()
Initialize the BES object.
virtual std::string get_message()
get the error message for this exception
virtual void execute(BESDataHandlerInterface &dhi)=0
knows how to build a requested response object
virtual int exception_manager(BESError &e)
Manage any exceptions thrown during the whole process.
virtual void aggregate(BESDataHandlerInterface &dhi)=0
aggregate the response object
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
virtual bool start(string name)
virtual void transmit_data()
Transmit the resulting response object.
handler object that knows how to create a specific response object
Abstract exception class for the BES with basic string message.
virtual void report_request()
Report the request and status of the request to BESReporterList::TheList()
virtual void validate_data_request()
Validate the incoming request information.
virtual void clean()
Clean up after the request.
void get_value(const string &s, string &val, bool &found)
Retrieve the value of a given key, if set.
virtual void invoke_aggregation()
Aggregate the resulting response object.
virtual void end_request()
End the BES request.
virtual void dump(ostream &strm) const
dumps information about this object
virtual BESAggregationServer * find_handler(const string &handler_name)
returns the aggregation handler with the given name in the list
virtual void log_status()
Log the status of the request.
virtual int handle_exception(BESError &e, BESDataHandlerInterface &dhi)
Manage any exceptions thrown during the handling of a request.
Abstraction representing mechanism for aggregating data.
virtual void execute_data_request_plan()
Execute the data request plan.
static BESKeys * TheKeys()