00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00039 #include "blocxx/BLOCXX_config.h"
00040 #include "blocxx/Thread.hpp"
00041 #include "blocxx/Assertion.hpp"
00042 #include "blocxx/Format.hpp"
00043 #include "blocxx/ThreadBarrier.hpp"
00044 #include "blocxx/NonRecursiveMutexLock.hpp"
00045 #include "blocxx/ExceptionIds.hpp"
00046 #include "blocxx/Timeout.hpp"
00047 #include "blocxx/TimeoutTimer.hpp"
00048 #include "blocxx/DateTime.hpp"
00049 #include "blocxx/Logger.hpp"
00050
00051 #include <cstring>
00052 #include <cstdio>
00053 #include <cerrno>
00054 #include <iostream>
00055 #include <csignal>
00056 #include <cassert>
00057
00058 #ifdef BLOCXX_HAVE_OPENSSL
00059 #include <openssl/err.h>
00060 #endif
00061
00062
00063 namespace BLOCXX_NAMESPACE
00064 {
00065
00067 BLOCXX_DEFINE_EXCEPTION_WITH_ID(Thread);
00068 BLOCXX_DEFINE_EXCEPTION_WITH_ID(CancellationDenied);
00069
00070 namespace
00071 {
00072 const GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx");
00073
00075
00076 struct ThreadParam
00077 {
00078 ThreadParam(Thread* t, const ThreadDoneCallbackRef& c, const ThreadBarrier& b)
00079 : thread(t)
00080 , cb(c)
00081 , thread_barrier(b)
00082 {}
00083 Thread* thread;
00084 ThreadDoneCallbackRef cb;
00085 ThreadBarrier thread_barrier;
00086 };
00087
00089 static Thread_t
00090 zeroThread()
00091 {
00092 Thread_t zthr;
00093 ::memset(&zthr, 0, sizeof(zthr));
00094 return zthr;
00095 }
00096
00097 static Thread_t NULLTHREAD = zeroThread();
00099 static inline bool
00100 sameId(const Thread_t& t1, const Thread_t& t2)
00101 {
00102 return ThreadImpl::sameThreads(t1, t2);
00103 }
00104 }
00105
00107
00108 Thread::Thread()
00109 : m_id(NULLTHREAD)
00110 , m_isRunning(false)
00111 , m_joined(false)
00112 , m_cancelRequested(0)
00113 , m_cancelled(false)
00114 {
00115 }
00117
00118 Thread::~Thread()
00119 {
00120 try
00121 {
00122 if (!sameId(m_id, NULLTHREAD))
00123 {
00124 if (!m_joined)
00125 {
00126 join();
00127 }
00128 assert(m_isRunning == false);
00129 ThreadImpl::destroyThread(m_id);
00130 }
00131 }
00132 catch (...)
00133 {
00134
00135 }
00136 }
00138
00139 void
00140 Thread::start(const ThreadDoneCallbackRef& cb)
00141 {
00142 if (isRunning())
00143 {
00144 BLOCXX_THROW(ThreadException,
00145 "Thread::start - thread is already running");
00146 }
00147 if (!sameId(m_id, NULLTHREAD))
00148 {
00149 BLOCXX_THROW(ThreadException,
00150 "Thread::start - cannot start previously run thread");
00151 }
00152 UInt32 flgs = BLOCXX_THREAD_FLG_JOINABLE;
00153 ThreadBarrier thread_barrier(2);
00154
00155 ThreadParam* p = new ThreadParam(this, cb, thread_barrier);
00156 int result = ThreadImpl::createThread(m_id, threadRunner, p, flgs);
00157 if (result != 0)
00158 {
00159 BLOCXX_THROW_ERRNO_MSG1(ThreadException, "ThreadImpl::createThread() failed", result);
00160 }
00161 thread_barrier.wait();
00162 }
00164
00165 Int32
00166 Thread::join()
00167 {
00168 BLOCXX_ASSERT(!sameId(m_id, NULLTHREAD));
00169 Int32 rval;
00170 if (ThreadImpl::joinThread(m_id, rval) != 0)
00171 {
00172 BLOCXX_THROW(ThreadException,
00173 Format("Thread::join - ThreadImpl::joinThread: %1(%2)",
00174 errno, strerror(errno)).c_str());
00175 }
00176 m_joined = true;
00177 return rval;
00178 }
00180
00181
00182 Int32
00183 Thread::threadRunner(void* paramPtr)
00184 {
00185 Thread_t theThreadID;
00186 Int32 rval = -1;
00187 try
00188 {
00189
00190 BLOCXX_ASSERT(paramPtr != NULL);
00191 ThreadParam* pParam = static_cast<ThreadParam*>(paramPtr);
00192 Thread* pTheThread = pParam->thread;
00193 ThreadImpl::saveThreadInTLS(pTheThread);
00194 theThreadID = pTheThread->m_id;
00195 ThreadDoneCallbackRef cb = pParam->cb;
00196 ThreadBarrier thread_barrier = pParam->thread_barrier;
00197 delete pParam;
00198 pTheThread->m_isRunning = true;
00199 thread_barrier.wait();
00200
00201 try
00202 {
00203 rval = pTheThread->run();
00204 }
00205
00206 catch (ThreadCancelledException&)
00207 {
00208 }
00209 catch (Exception& ex)
00210 {
00211 #ifdef BLOCXX_DEBUG
00212 std::clog << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00213 std::clog << ex << std::endl;
00214 #endif
00215 Logger logger(COMPONENT_NAME);
00216 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in Thread class: %1", ex));
00217 pTheThread->doneRunning(cb);
00218
00219
00220 throw;
00221 }
00222 catch (std::exception& ex)
00223 {
00224 #ifdef BLOCXX_DEBUG
00225 std::clog << "!!! std::exception: " << ex.what() << " caught in Thread class" << std::endl;
00226 #endif
00227 Logger logger(COMPONENT_NAME);
00228 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in Thread class: %1", ex.what()));
00229 pTheThread->doneRunning(cb);
00230
00231
00232 throw;
00233 }
00234 catch (...)
00235 {
00236 #ifdef BLOCXX_DEBUG
00237 std::clog << "!!! Unknown Exception caught in Thread class" << std::endl;
00238 #endif
00239 Logger logger(COMPONENT_NAME);
00240 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in Thread class.");
00241
00242 pTheThread->doneRunning(cb);
00243
00244
00245 throw;
00246 }
00247
00248 pTheThread->doneRunning(cb);
00249
00250 }
00251 catch (Exception& ex)
00252 {
00253 #ifdef BLOCXX_DEBUG
00254 std::clog << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00255 std::clog << ex << std::endl;
00256 #endif
00257 Logger logger(COMPONENT_NAME);
00258 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in Thread class: %1", ex));
00259
00260 ThreadImpl::exitThread(theThreadID, rval);
00261 }
00262 catch (std::exception& ex)
00263 {
00264 #ifdef BLOCXX_DEBUG
00265 std::clog << "!!! std::exception: " << ex.what() << " caught in Thread class" << std::endl;
00266 #endif
00267 Logger logger(COMPONENT_NAME);
00268 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in Thread class: %1", ex.what()));
00269
00270 ThreadImpl::exitThread(theThreadID, rval);
00271 }
00272 catch (...)
00273 {
00274 #ifdef BLOCXX_DEBUG
00275 std::clog << "!!! Unknown Exception caught in Thread class" << std::endl;
00276 #endif
00277 Logger logger(COMPONENT_NAME);
00278 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in Thread class.");
00279
00280 ThreadImpl::exitThread(theThreadID, rval);
00281 }
00282
00283 ThreadImpl::exitThread(theThreadID, rval);
00284 return rval;
00285 }
00286
00288 void
00289 Thread::doneRunning(const ThreadDoneCallbackRef& cb)
00290 {
00291 {
00292 NonRecursiveMutexLock lock(m_stateGuard);
00293 m_isRunning = false;
00294 m_stateCond.notifyAll();
00295 }
00296
00297 if (cb)
00298 {
00299 cb->notifyThreadDone(this);
00300 }
00301 #ifdef BLOCXX_HAVE_OPENSSL
00302
00303 ERR_remove_state(0);
00304 #endif
00305 }
00306
00308 void
00309 Thread::shutdown()
00310 {
00311 doShutdown();
00312 }
00314 bool
00315 Thread::shutdown(const Timeout& timeout)
00316 {
00317 doShutdown();
00318 return timedWait(timeout);
00319 }
00321 void
00322 Thread::cooperativeCancel()
00323 {
00324 if (!isRunning())
00325 {
00326 return;
00327 }
00328
00329
00330 doCooperativeCancel();
00331 m_cancelRequested = Atomic_t(1);
00332
00333 #if !defined(BLOCXX_WIN32)
00334
00335
00336
00337
00338 try
00339 {
00340 ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00341 }
00342 catch (ThreadException&)
00343 {
00344 }
00345 #endif
00346 }
00348 bool
00349 Thread::definitiveCancel(UInt32 waitForCooperativeSecs)
00350 {
00351 return definitiveCancel(Timeout::relative(waitForCooperativeSecs));
00352 }
00354 bool
00355 Thread::definitiveCancel(const Timeout& timeout)
00356 {
00357 if (!isRunning())
00358 {
00359 return true;
00360 }
00361
00362
00363 doCooperativeCancel();
00364 m_cancelRequested = Atomic_t(1);
00365
00366 #if !defined(BLOCXX_WIN32)
00367
00368
00369
00370
00371 try
00372 {
00373 ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00374 }
00375 catch (ThreadException&)
00376 {
00377 }
00378 #endif
00379
00380 Logger logger(COMPONENT_NAME);
00381 TimeoutTimer timer(timeout);
00382 NonRecursiveMutexLock l(m_stateGuard);
00383 while (!m_cancelled && isRunning())
00384 {
00385 BLOCXX_LOG_DEBUG3(logger, "Thread::definitiveCancel waiting for thread to exit.");
00386 if (!m_stateCond.timedWait(l, timer.asAbsoluteTimeout()))
00387 {
00388
00389 doDefinitiveCancel();
00390
00391 if (!m_cancelled && isRunning())
00392 {
00393 BLOCXX_LOG_ERROR(logger, "Thread::definitiveCancel cancelling thread because it did not exit!");
00394 this->cancel_internal(true);
00395 }
00396 return false;
00397 }
00398 }
00399 return true;
00400 }
00401
00403 void
00404 Thread::cancel()
00405 {
00406 this->cancel_internal(false);
00407 }
00408
00410 void
00411 Thread::cancel_internal(bool is_locked)
00412 {
00413
00414
00415 try
00416 {
00417 ThreadImpl::cancel(m_id);
00418 }
00419 catch (ThreadException&)
00420 {
00421 }
00422 {
00423 NonRecursiveMutex mtx;
00424 NonRecursiveMutexLock l(is_locked ? mtx : m_stateGuard);
00425 m_cancelled = true;
00426 m_isRunning = false;
00427 m_stateCond.notifyAll();
00428 }
00429 }
00431 void
00432 Thread::testCancel()
00433 {
00434 ThreadImpl::testCancel();
00435 }
00436
00438 void
00439 Thread::doShutdown()
00440 {
00441 }
00443 void
00444 Thread::doCooperativeCancel()
00445 {
00446 }
00448 void
00449 Thread::doDefinitiveCancel()
00450 {
00451 }
00452
00454 bool
00455 Thread::timedWait(const Timeout& timeout)
00456 {
00457 TimeoutTimer tt(timeout);
00458 NonRecursiveMutexLock lock(m_stateGuard);
00459 while (m_isRunning == true)
00460 {
00461 if (!m_stateCond.timedWait(lock, tt.asAbsoluteTimeout()))
00462 {
00463 return false;
00464 }
00465 }
00466 return true;
00467 }
00468
00469 }
00470