00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00034 #include "blocxx/Thread.hpp"
00035 #include "blocxx/WaitpidThreadFix.hpp"
00036 #include "blocxx/Exec.hpp"
00037 #include "blocxx/WaitpidThreadFixFwd.hpp"
00038 #include "blocxx/ThreadOnce.hpp"
00039 #include "blocxx/NonRecursiveMutex.hpp"
00040 #include "blocxx/NonRecursiveMutexLock.hpp"
00041 #include "blocxx/Condition.hpp"
00042 #include "blocxx/Reference.hpp"
00043 #include "blocxx/IntrusiveReference.hpp"
00044 #include <queue>
00045 #include <sys/types.h>
00046 #ifndef BLOCXX_WIN32
00047 #include <sys/wait.h>
00048 #endif
00049
00050 using namespace blocxx;
00051
00052 namespace BLOCXX_NAMESPACE
00053 {
00054
00055 namespace
00056 {
00057 bool g_useWaitpidThreadFix =
00058 #ifdef BLOCXX_WAITPID_THREADING_PROBLEM
00059 true;
00060 #else
00061 false;
00062 #endif
00063
00064 class ProcessThread;
00065
00066 OnceFlag g_initThreadGuard = BLOCXX_ONCE_INIT;
00067 ProcessThread* g_processThread = 0;
00068
00069 void initThread();
00070
00071 Thread_t getWorkerThreadId();
00072
00073 }
00074
00075 bool WaitpidThreadFix::setWaitpidThreadFixEnabled(bool enabled)
00076 {
00077 bool rv = g_useWaitpidThreadFix;
00078 g_useWaitpidThreadFix = enabled;
00079 return rv;
00080 }
00081
00082 bool WaitpidThreadFix::shouldUseWaitpidThreadFix()
00083 {
00084 if (!g_useWaitpidThreadFix)
00085 {
00086 return false;
00087 }
00088 Thread_t currThread = ThreadImpl::currentThread();
00089 Thread_t workerThread = getWorkerThreadId();
00090
00091
00092
00093 if (ThreadImpl::sameThreads(currThread, workerThread))
00094 {
00095 return false;
00096 }
00097 return true;
00098 }
00099
00100 namespace
00101 {
00102 typedef Reference<Exception> ExceptionPtr;
00103
00104
00105 class WorkSignal
00106 {
00107 public:
00108 WorkSignal()
00109 : m_signal(false)
00110 {
00111 }
00112
00113 ~WorkSignal()
00114 {
00115 }
00116
00117 void signal()
00118 {
00119 NonRecursiveMutexLock lock(m_mutex);
00120 m_signal = true;
00121 m_cond.notifyAll();
00122 }
00123
00124 void waitForSignal()
00125 {
00126 NonRecursiveMutexLock lock(m_mutex);
00127
00128 while(!m_signal)
00129 {
00130 m_cond.wait(lock);
00131 }
00132 }
00133
00134 private:
00135 bool m_signal;
00136 Condition m_cond;
00137 NonRecursiveMutex m_mutex;
00138 };
00139
00140
00141
00142
00143 class WorkItem : public IntrusiveCountableBase
00144 {
00145 public:
00146 virtual ~WorkItem()
00147 {
00148 }
00149
00150 virtual void doWork() = 0;
00151
00152 void signalDone()
00153 {
00154 m_doneSig.signal();
00155 }
00156
00157 void saveException(Exception* err)
00158 {
00159 NonRecursiveMutexLock lock(m_errMutex);
00160 m_err = err;
00161 }
00162
00163 Exception* getException()
00164 {
00165 NonRecursiveMutexLock lock(m_errMutex);
00166 return m_err.getPtr();
00167 }
00168
00169 protected:
00170 ExceptionPtr m_err;
00171 NonRecursiveMutex m_errMutex;
00172 WorkSignal m_doneSig;
00173 };
00174
00175
00176
00177 class SpawnWorkItem : public WorkItem
00178 {
00179 public:
00180 SpawnWorkItem(char const * execPath, char const * const argv[],
00181 char const * const envp[], Exec::PreExec & preExec)
00182 : m_execPath(execPath)
00183 , m_argv(argv)
00184 , m_envp(envp)
00185 , m_preExec(preExec)
00186 {
00187 }
00188
00189 virtual ~SpawnWorkItem()
00190 {
00191 }
00192
00193 virtual void doWork()
00194 {
00195 NonRecursiveMutexLock lock(m_resultMutex);
00196 m_result = Exec::spawnImpl(m_execPath, m_argv, m_envp, m_preExec);
00197 }
00198
00199 ProcessRef waitTillDone()
00200 {
00201 m_doneSig.waitForSignal();
00202
00203 NonRecursiveMutexLock lock(m_resultMutex);
00204 return m_result;
00205 }
00206
00207 protected:
00208 ProcessRef m_result;
00209 NonRecursiveMutex m_resultMutex;
00210
00211 const char * m_execPath;
00212 const char * const * m_argv;
00213 const char * const * m_envp;
00214 Exec::PreExec& m_preExec;
00215 };
00216
00217
00218
00219 class WaitpidWorkItem : public WorkItem
00220 {
00221 public:
00222 WaitpidWorkItem(const ::pid_t& pid)
00223 : m_pid(pid)
00224 {
00225 }
00226
00227 virtual ~WaitpidWorkItem()
00228 {
00229 }
00230
00231 virtual void doWork()
00232 {
00233 NonRecursiveMutexLock lock(m_resultMutex);
00234 m_result = pollStatusImpl(m_pid);
00235 }
00236
00237 Process::Status waitTillDone()
00238 {
00239 m_doneSig.waitForSignal();
00240
00241 NonRecursiveMutexLock lock(m_resultMutex);
00242 return m_result;
00243 }
00244
00245
00246 protected:
00247 Process::Status m_result;
00248 NonRecursiveMutex m_resultMutex;
00249
00250 const ::pid_t& m_pid;
00251 };
00252
00253 typedef IntrusiveReference<SpawnWorkItem> SpawnWorkItemPtr;
00254 typedef IntrusiveReference<WaitpidWorkItem> WaitpidWorkItemPtr;
00255
00256 class WorkQueue
00257 {
00258 public:
00259 WorkQueue() {}
00260 virtual ~WorkQueue() {}
00261
00262 WorkItem* getWork()
00263 {
00264 NonRecursiveMutexLock lock(m_workMutex);
00265
00266
00267
00268 while(m_work.empty())
00269 {
00270 m_workNotEmpty.wait(lock);
00271 }
00272
00273 WorkItem* newWork = m_work.front();
00274 m_work.pop();
00275
00276 return newWork;
00277 }
00278
00279 void addWork(WorkItem* newWork)
00280 {
00281 NonRecursiveMutexLock lock(m_workMutex);
00282 m_work.push(newWork);
00283 m_workNotEmpty.notifyAll();
00284 }
00285
00286 private:
00287 std::queue<WorkItem*> m_work;
00288 Condition m_workNotEmpty;
00289 NonRecursiveMutex m_workMutex;
00290 };
00291
00292
00293
00294
00295
00296 class ProcessThread : public Thread
00297 {
00298 public:
00299 ProcessThread();
00300 virtual ~ProcessThread();
00301
00302 virtual Int32 run();
00303
00304 ProcessRef spawn(
00305 char const * exec_path,
00306 char const * const argv[],
00307 char const * const envp[],
00308 Exec::PreExec & pre_exec
00309 );
00310
00311 Process::Status waitPid(const ProcId& pid);
00312
00313 protected:
00314 WorkQueue m_workQueue;
00315
00316 NonRecursiveMutex m_idMutex;
00317 };
00318
00319 ProcessThread::ProcessThread()
00320 {
00321 }
00322
00323 ProcessThread::~ProcessThread()
00324 {
00325 }
00326
00327
00328 Int32 ProcessThread::run()
00329 {
00330
00331 while(true)
00332 {
00333 WorkItem* newWork;
00334 newWork = m_workQueue.getWork();
00335
00336 try
00337 {
00338 newWork->doWork();
00339 }
00340 catch(Exception& e)
00341 {
00342 newWork->saveException(e.clone());
00343 }
00344 newWork->signalDone();
00345 }
00346
00347
00348 return 0;
00349 }
00350
00351 ProcessRef ProcessThread::spawn(char const * exec_path, char const * const argv[],
00352 char const * const envp[], Exec::PreExec & pre_exec)
00353 {
00354 SpawnWorkItemPtr newWork(new SpawnWorkItem(exec_path, argv, envp, pre_exec));
00355 m_workQueue.addWork(newWork.getPtr());
00356
00357 ProcessRef result = newWork->waitTillDone();
00358
00359 Exception* err = newWork->getException();
00360 if(err != 0)
00361 {
00362 err->rethrow();
00363 }
00364
00365 return result;
00366 }
00367
00368 Process::Status ProcessThread::waitPid(const ProcId& pid)
00369 {
00370 WaitpidWorkItemPtr newWork(new WaitpidWorkItem(pid));
00371 m_workQueue.addWork(newWork.getPtr());
00372
00373 Process::Status result = newWork->waitTillDone();
00374
00375 Exception* err = newWork->getException();
00376 if(err != 0)
00377 {
00378 err->rethrow();
00379 }
00380
00381 return result;
00382 }
00383
00384
00385 void initThread()
00386 {
00387
00388 g_processThread = new ProcessThread();
00389 g_processThread->start();
00390 }
00391
00392 Thread_t getWorkerThreadId()
00393 {
00394 callOnce(g_initThreadGuard, initThread);
00395 return g_processThread->getId();
00396 }
00397
00398 }
00399
00400
00401 ProcessRef WaitpidThreadFix::spawnProcess(char const * exec_path,
00402 char const * const argv[], char const * const envp[], Exec::PreExec & pre_exec)
00403 {
00404 callOnce(g_initThreadGuard, initThread);
00405 return g_processThread->spawn(exec_path, argv, envp, pre_exec);
00406 }
00407
00408 Process::Status WaitpidThreadFix::waitPid(const ProcId& pid)
00409 {
00410 callOnce(g_initThreadGuard, initThread);
00411 return g_processThread->waitPid(pid);
00412 }
00413
00414 }
00415