00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/Exec.hpp"
00040 #include "blocxx/Select.hpp"
00041 #include "blocxx/ExceptionIds.hpp"
00042 #include "blocxx/TimeoutTimer.hpp"
00043 #include "blocxx/ExecMockObject.hpp"
00044 #include "blocxx/GlobalPtr.hpp"
00045 #include "blocxx/WaitpidThreadFix.hpp"
00046
00047 #if !defined(BLOCXX_WIN32)
00048 #include "blocxx/PosixUnnamedPipe.hpp"
00049 #include "blocxx/PosixExec.hpp"
00050 #else
00051 #include "blocxx/UnnamedPipe.hpp"
00052 #include "blocxx/WinExec.hpp"
00053 #endif
00054
00055 extern "C"
00056 {
00057 #ifdef BLOCXX_HAVE_SYS_RESOURCE_H
00058 #include <sys/resource.h>
00059 #endif
00060 #ifdef BLOCXX_HAVE_SYS_TYPES_H
00061 #include <sys/types.h>
00062 #endif
00063 #ifdef BLOCXX_HAVE_UNISTD_H
00064 #include <unistd.h>
00065 #endif
00066 #ifndef BLOCXX_WIN32
00067 #include <sys/wait.h>
00068 #include <fcntl.h>
00069 #endif
00070 }
00071
00072 #ifdef BLOCXX_NCR
00073 #if defined(sigaction)
00074 #undef sigaction
00075 #endif
00076 #undef SIG_DFL
00077 #define SIG_DFL (void(*)())0
00078 #endif
00079
00080 namespace BLOCXX_NAMESPACE
00081 {
00082 BLOCXX_DEFINE_EXCEPTION_WITH_BASE_AND_ID(ExecTimeout, ExecErrorException);
00083 BLOCXX_DEFINE_EXCEPTION_WITH_BASE_AND_ID(ExecBufferFull, ExecErrorException);
00084 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecError);
00085
00086
00088 namespace Exec
00089 {
00090 ::BLOCXX_NAMESPACE::GlobalPtr<ExecMockObject, Impl::NullFactory> g_execMockObject = BLOCXX_GLOBAL_PTR_INIT;
00091
00093 Process::Status
00094 system(const Array<String>& command, const char* const envp[], const Timeout& timeout)
00095 {
00096
00097 #ifndef BLOCXX_WIN32
00098 PosixExec::SystemPreExec spe;
00099 #else
00100 WinExec::WinSystemPreExec spe;
00101 #endif
00102
00103 ProcessRef proc = Exec::spawn(command[0], command, envp, spe);
00104
00105 proc->waitCloseTerm(Timeout::relative(0), timeout, Timeout::relative(0));
00106 return proc->processStatus();
00107 }
00108
00109
00111 int
00112 safeSystem(const Array<String>& command, const char* const envp[])
00113 {
00114 Process::Status ps = system(command, envp);
00115 return ps.getPOSIXwaitpidStatus();
00116 }
00117
00119 ProcessRef spawnImpl(
00120 char const * exec_path,
00121 char const * const argv[], char const * const envp[],
00122 PreExec & pre_exec
00123 )
00124 {
00125 #ifdef BLOCXX_WIN32
00126 return WinExec::spawnImpl(exec_path, argv, envp, pre_exec);
00127 #else
00128 return PosixExec::spawnImpl(exec_path, argv, envp, pre_exec);
00129 #endif
00130 }
00131
00133 ProcessRef spawn(
00134 char const * exec_path,
00135 char const * const argv[], char const * const envp[],
00136 PreExec & pre_exec
00137 )
00138 {
00139 if (WaitpidThreadFix::shouldUseWaitpidThreadFix())
00140 {
00141 return WaitpidThreadFix::spawnProcess(exec_path, argv, envp, pre_exec);
00142 }
00143 return spawnImpl(exec_path, argv, envp, pre_exec);
00144 }
00145
00147 ProcessRef spawn(
00148 char const * const argv[], char const * const envp[]
00149 )
00150 {
00151
00152 #ifdef BLOCXX_WIN32
00153 WinExec::WinStandardPreExec pre_exec;
00154 #else
00155 PosixExec::StandardPreExec pre_exec;
00156 #endif
00157
00158 return spawn(argv[0], argv, envp, pre_exec);
00159 }
00160
00161 namespace Impl
00162 {
00163 void close_child_ends(UnnamedPipeRef ppipe[BLOCXX_NPIPE])
00164 {
00165
00166 if (ppipe[BLOCXX_IN])
00167 {
00168 ppipe[BLOCXX_IN]->closeInputHandle();
00169 }
00170 if (ppipe[BLOCXX_OUT])
00171 {
00172 ppipe[BLOCXX_OUT]->closeOutputHandle();
00173 }
00174 if (ppipe[BLOCXX_SERR])
00175 {
00176 ppipe[BLOCXX_SERR]->closeOutputHandle();
00177 }
00178 ppipe[BLOCXX_EXEC_ERR]->closeOutputHandle();
00179 }
00180 }
00181
00182 namespace
00183 {
00184
00185 #ifndef BLOCXX_MIN
00186 #define BLOCXX_MIN(x, y) (x) < (y) ? (x) : (y)
00187 #endif
00188
00190 class StringOutputGatherer : public OutputCallback
00191 {
00192 public:
00193 StringOutputGatherer(String& stdoutput, String& erroutput, int outputLimit)
00194 : m_output(stdoutput)
00195 , m_erroutput(erroutput)
00196 , m_outputLimit(outputLimit)
00197 {
00198 }
00199 StringOutputGatherer(String& stdoutput, int outputLimit)
00200 : m_output(stdoutput)
00201 , m_erroutput(stdoutput)
00202 , m_outputLimit(outputLimit)
00203 {
00204 }
00205 private:
00206 virtual void doHandleData(const char* data, size_t dataLen,
00207 EOutputSource outputSource, const ProcessRef& theProc,
00208 size_t streamIndex, Array<char>& inputBuffer)
00209 {
00210 String& output = (outputSource == E_STDOUT) ? m_output : m_erroutput;
00211 if (m_outputLimit >= 0 && output.length() + dataLen > static_cast<size_t>(m_outputLimit))
00212 {
00213
00214 int lentocopy = BLOCXX_MIN(m_outputLimit - output.length(), dataLen);
00215 if (lentocopy >= 0)
00216 {
00217 output += String(data, lentocopy);
00218 }
00219 BLOCXX_THROW(ExecBufferFullException,
00220 "Exec::StringOutputGatherer::doHandleData(): buffer full");
00221 }
00222
00223 output += String(data, dataLen);
00224 }
00225 String& m_output;
00226 String& m_erroutput;
00227 int m_outputLimit;
00228 };
00229
00231 class SingleStringInputCallback : public InputCallback
00232 {
00233 public:
00234 SingleStringInputCallback(const String& s)
00235 : m_s(s)
00236 {
00237 }
00238 private:
00239 virtual void doGetData(Array<char>& inputBuffer, const ProcessRef& theProc, size_t streamIndex)
00240 {
00241 if (m_s.length() > 0)
00242 {
00243 inputBuffer.insert(inputBuffer.end(), m_s.c_str(), m_s.c_str() + m_s.length());
00244 m_s.erase();
00245 }
00246 else if (theProc->in()->isOpen())
00247 {
00248 theProc->in()->close();
00249 }
00250 }
00251 String m_s;
00252 };
00253
00254 }
00255
00257 Process::Status executeProcessAndGatherOutput(
00258 char const * const command[],
00259 String& output,
00260 char const * const envVars[],
00261 const Timeout& timeout,
00262 int outputLimit,
00263 char const * input)
00264 {
00265 if (g_execMockObject.get())
00266 {
00267 return g_execMockObject.get()->executeProcessAndGatherOutput(command, output, envVars, timeout, outputLimit, input);
00268 }
00269 return feedProcessAndGatherOutput(spawn(command, envVars),
00270 output, timeout, outputLimit, input);
00271 }
00272
00274 Process::Status executeProcessAndGatherOutput(
00275 char const * const command[],
00276 String& output,
00277 String& erroutput,
00278 char const * const envVars[],
00279 const Timeout& timeout,
00280 int outputLimit,
00281 char const * input)
00282 {
00283 if (g_execMockObject.get())
00284 {
00285 return g_execMockObject.get()->executeProcessAndGatherOutput2(command, output,
00286 erroutput, envVars, timeout, outputLimit, input);
00287 }
00288
00289 return feedProcessAndGatherOutput(spawn(command, envVars),
00290 output, erroutput, timeout, outputLimit, input);
00291 }
00292
00294 BLOCXX_COMMON_API void executeProcessAndGatherOutput(
00295 const Array<String>& command,
00296 String& output, int& processstatus,
00297 int timeoutsecs, int outputlimit,
00298 const String& input)
00299 {
00300 Timeout timeout = Timeout::infinite;
00301 if (timeoutsecs != -1)
00302 {
00303 timeout = Timeout::relative(timeoutsecs);
00304 }
00305 Process::Status ps = executeProcessAndGatherOutput(command,
00306 output,
00307 timeout,
00308 outputlimit, input);
00309
00310 processstatus = ps.getPOSIXwaitpidStatus();
00311 }
00312
00314 Process::Status feedProcessAndGatherOutput(
00315 ProcessRef const & proc,
00316 String & output,
00317 Timeout const & timeout,
00318 int outputLimit,
00319 String const & input)
00320 {
00321 Array<ProcessRef> procarr(1, proc);
00322 SingleStringInputCallback singleStringInputCallback(input);
00323
00324 StringOutputGatherer gatherer(output, outputLimit);
00325 processInputOutput(gatherer, procarr, singleStringInputCallback, timeout);
00326 proc->waitCloseTerm();
00327 return proc->processStatus();
00328 }
00329
00331 Process::Status feedProcessAndGatherOutput(
00332 ProcessRef const & proc,
00333 String & output,
00334 String & erroutput,
00335 Timeout const & timeout,
00336 int outputLimit,
00337 String const & input)
00338 {
00339 Array<ProcessRef> procarr(1, proc);
00340 SingleStringInputCallback singleStringInputCallback(input);
00341
00342 StringOutputGatherer gatherer(output, erroutput, outputLimit);
00343 processInputOutput(gatherer, procarr, singleStringInputCallback, timeout);
00344 proc->waitCloseTerm();
00345 return proc->processStatus();
00346 }
00347
00349 void
00350 gatherOutput(String& output, const ProcessRef& proc, int timeoutSecs, int outputLimit)
00351 {
00352 gatherOutput(output, proc, Timeout::relativeWithReset(timeoutSecs), outputLimit);
00353 }
00355 void
00356 gatherOutput(String& output, const ProcessRef& proc, const Timeout& timeout, int outputLimit)
00357 {
00358 Array<ProcessRef> procs(1, proc);
00359
00360 StringOutputGatherer gatherer(output, outputLimit);
00361 SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(String());
00362 processInputOutput(gatherer, procs, singleStringInputCallback, timeout);
00363 }
00364
00366 OutputCallback::~OutputCallback()
00367 {
00368
00369 }
00370
00372 void
00373 OutputCallback::handleData(const char* data, size_t dataLen, EOutputSource outputSource, const ProcessRef& theProc, size_t streamIndex, Array<char>& inputBuffer)
00374 {
00375 doHandleData(data, dataLen, outputSource, theProc, streamIndex, inputBuffer);
00376 }
00377
00379 InputCallback::~InputCallback()
00380 {
00381 }
00382
00384 void
00385 InputCallback::getData(Array<char>& inputBuffer, const ProcessRef& theProc, size_t streamIndex)
00386 {
00387 doGetData(inputBuffer, theProc, streamIndex);
00388 }
00389
00390 namespace
00391 {
00392 struct ProcessOutputState
00393 {
00394 bool inIsOpen;
00395 bool outIsOpen;
00396 bool errIsOpen;
00397 size_t availableDataLen;
00398
00399 ProcessOutputState()
00400 : inIsOpen(true)
00401 , outIsOpen(true)
00402 , errIsOpen(true)
00403 , availableDataLen(0)
00404 {
00405 }
00406 };
00407
00408 }
00409
00411 void
00412 processInputOutput(OutputCallback& output, Array<ProcessRef>& procs, InputCallback& input, const Timeout& timeout)
00413 {
00414 TimeoutTimer timer(timeout);
00415
00416 Array<ProcessOutputState> processStates(procs.size());
00417 int numOpenPipes(procs.size() * 2);
00418
00419 Array<Array<char> > inputs(processStates.size());
00420 for (size_t i = 0; i < processStates.size(); ++i)
00421 {
00422 input.getData(inputs[i], procs[i], i);
00423 processStates[i].availableDataLen = inputs[i].size();
00424 if (!procs[i]->out()->isOpen())
00425 {
00426 processStates[i].outIsOpen = false;
00427 }
00428 if (!procs[i]->err()->isOpen())
00429 {
00430 processStates[i].errIsOpen = false;
00431 }
00432 if (!procs[i]->in()->isOpen())
00433 {
00434 processStates[i].inIsOpen = false;
00435 }
00436
00437 }
00438
00439 timer.start();
00440
00441 while (numOpenPipes > 0)
00442 {
00443 Select::SelectObjectArray selObjs;
00444 std::map<int, int> inputIndexProcessIndex;
00445 std::map<int, int> outputIndexProcessIndex;
00446 for (size_t i = 0; i < procs.size(); ++i)
00447 {
00448 if (processStates[i].outIsOpen)
00449 {
00450 Select::SelectObject selObj(procs[i]->out()->getReadSelectObj());
00451 selObj.waitForRead = true;
00452 selObjs.push_back(selObj);
00453 inputIndexProcessIndex[selObjs.size() - 1] = i;
00454 }
00455 if (processStates[i].errIsOpen)
00456 {
00457 Select::SelectObject selObj(procs[i]->err()->getReadSelectObj());
00458 selObj.waitForRead = true;
00459 selObjs.push_back(selObj);
00460 inputIndexProcessIndex[selObjs.size() - 1] = i;
00461 }
00462 if (processStates[i].inIsOpen && processStates[i].availableDataLen > 0)
00463 {
00464 Select::SelectObject selObj(procs[i]->in()->getWriteSelectObj());
00465 selObj.waitForWrite = true;
00466 selObjs.push_back(selObj);
00467 outputIndexProcessIndex[selObjs.size() - 1] = i;
00468 }
00469
00470 }
00471
00472 int selectrval = Select::selectRW(selObjs, timer.asRelativeTimeout());
00473 switch (selectrval)
00474 {
00475 case Select::SELECT_ERROR:
00476 {
00477 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: error selecting on stdout and stderr");
00478 }
00479 break;
00480 case Select::SELECT_TIMEOUT:
00481 {
00482 timer.loop();
00483 if (timer.expired())
00484 {
00485 BLOCXX_THROW(ExecTimeoutException, "Exec::gatherOutput: timedout");
00486 }
00487 }
00488 break;
00489 default:
00490 {
00491 int availableToFind = selectrval;
00492
00493
00494 timer.resetOnLoop();
00495
00496 for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
00497 {
00498 if (!selObjs[i].readAvailable)
00499 {
00500 continue;
00501 }
00502 else
00503 {
00504 --availableToFind;
00505 }
00506 int streamIndex = inputIndexProcessIndex[i];
00507 UnnamedPipeRef readstream;
00508 if (processStates[streamIndex].outIsOpen)
00509 {
00510 if (procs[streamIndex]->out()->getReadSelectObj() == selObjs[i].s)
00511 {
00512 readstream = procs[streamIndex]->out();
00513 }
00514 }
00515
00516 if (!readstream && processStates[streamIndex].errIsOpen)
00517 {
00518 if (procs[streamIndex]->err()->getReadSelectObj() == selObjs[i].s)
00519 {
00520 readstream = procs[streamIndex]->err();
00521 }
00522 }
00523
00524 if (!readstream)
00525 {
00526 continue;
00527 }
00528
00529 char buff[1024];
00530 int readrc = readstream->read(buff, sizeof(buff) - 1);
00531 if (readrc == 0)
00532 {
00533 if (readstream == procs[streamIndex]->out())
00534 {
00535 processStates[streamIndex].outIsOpen = false;
00536 procs[streamIndex]->out()->close();
00537 }
00538 else
00539 {
00540 processStates[streamIndex].errIsOpen = false;
00541 procs[streamIndex]->err()->close();
00542 }
00543 --numOpenPipes;
00544 }
00545 else if (readrc == -1)
00546 {
00547 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: read error");
00548 }
00549 else
00550 {
00551 buff[readrc] = '\0';
00552 output.handleData(
00553 buff,
00554 readrc,
00555 readstream == procs[streamIndex]->out() ? E_STDOUT : E_STDERR,
00556 procs[streamIndex],
00557 streamIndex, inputs[streamIndex]);
00558 }
00559 }
00560
00561
00562 for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
00563 {
00564 if (!selObjs[i].writeAvailable)
00565 {
00566 continue;
00567 }
00568 else
00569 {
00570 --availableToFind;
00571 }
00572 int streamIndex = outputIndexProcessIndex[i];
00573 UnnamedPipeRef writestream;
00574 if (processStates[streamIndex].inIsOpen)
00575 {
00576 writestream = procs[streamIndex]->in();
00577 }
00578
00579 if (!writestream)
00580 {
00581 continue;
00582 }
00583
00584 size_t offset = inputs[streamIndex].size() - processStates[streamIndex].availableDataLen;
00585 int writerc = writestream->write(&inputs[streamIndex][offset], processStates[streamIndex].availableDataLen);
00586 if (writerc == -1 && errno == EPIPE)
00587 {
00588 processStates[streamIndex].inIsOpen = false;
00589 procs[streamIndex]->in()->close();
00590 }
00591 else if (writerc == -1)
00592 {
00593 BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: write error");
00594 }
00595 else if (writerc != 0)
00596 {
00597 inputs[streamIndex].erase(inputs[streamIndex].begin(), inputs[streamIndex].begin() + writerc);
00598 input.getData(inputs[streamIndex], procs[streamIndex], streamIndex);
00599 processStates[streamIndex].availableDataLen = inputs[streamIndex].size();
00600 }
00601 }
00602 }
00603 break;
00604 }
00605 }
00606 }
00607
00608 void processInputOutput(const String& input, String& output, const ProcessRef& process,
00609 const Timeout& timeout, int outputLimit)
00610 {
00611 Array<ProcessRef> procs;
00612 procs.push_back(process);
00613
00614 StringOutputGatherer gatherer(output, outputLimit);
00615 SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(input);
00616 processInputOutput(gatherer, procs, singleStringInputCallback, timeout);
00617 }
00618
00619
00620 }
00621
00622 }
00623