00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/ThreadImpl.hpp"
00040 #include "blocxx/Mutex.hpp"
00041 #include "blocxx/Assertion.hpp"
00042 #include "blocxx/Thread.hpp"
00043 #include "blocxx/NonRecursiveMutexLock.hpp"
00044 #include "blocxx/NonRecursiveMutex.hpp"
00045 #include "blocxx/Condition.hpp"
00046 #include "blocxx/Timeout.hpp"
00047 #include "blocxx/Format.hpp"
00048 #include "blocxx/TimeoutTimer.hpp"
00049 #if defined(BLOCXX_WIN32)
00050 #include "blocxx/Map.hpp"
00051 #include "blocxx/MutexLock.hpp"
00052 #endif
00053 #include <cassert>
00054 #include <cstring>
00055 #include <cstddef>
00056
00057 extern "C"
00058 {
00059 #ifdef BLOCXX_HAVE_SYS_TIME_H
00060 #include <sys/time.h>
00061 #endif
00062
00063 #include <sys/types.h>
00064
00065 #ifdef BLOCXX_HAVE_UNISTD_H
00066 #include <unistd.h>
00067 #endif
00068
00069 #include <errno.h>
00070 #include <signal.h>
00071
00072 #ifdef BLOCXX_USE_PTHREAD
00073 #include <pthread.h>
00074 #endif
00075
00076 #ifdef BLOCXX_WIN32
00077 #include <process.h>
00078 #endif
00079 }
00080
00081 namespace BLOCXX_NAMESPACE
00082 {
00083
00084 namespace ThreadImpl {
00085
00087
00088 void
00089 sleep(UInt32 milliSeconds)
00090 {
00091 sleep(Timeout::relative(milliSeconds / 1000.0));
00092 }
00093
00094 void
00095 sleep(const Timeout& timeout)
00096 {
00097 NonRecursiveMutex mtx;
00098 NonRecursiveMutexLock lock(mtx);
00099 Condition cond;
00100 TimeoutTimer timer(timeout);
00101 while (!timer.expired())
00102 {
00103
00104 if (!cond.timedWait(lock, timer.asAbsoluteTimeout()))
00105 {
00106 return;
00107 }
00108 timer.loop();
00109 }
00110 }
00112
00113 void
00114 yield()
00115 {
00116 #if defined(BLOCXX_HAVE_SCHED_YIELD)
00117 sched_yield();
00118 #elif defined(BLOCXX_WIN32)
00119 ThreadImpl::testCancel();
00120 ::SwitchToThread();
00121 #else
00122 ThreadImpl::sleep(1);
00123 #endif
00124 }
00125
00126 #if defined(BLOCXX_USE_PTHREAD)
00127 namespace {
00128 struct LocalThreadParm
00129 {
00130 ThreadFunction m_func;
00131 void* m_funcParm;
00132 };
00133 extern "C" {
00134 static void*
00135 threadStarter(void* arg)
00136 {
00137
00138 #ifdef BLOCXX_NCR
00139 pthread_setcancel(CANCEL_ON);
00140 pthread_setasynccancel(CANCEL_OFF);
00141 #else
00142 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00143 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
00144 #endif
00145
00146
00147 sigset_t signalSet;
00148 int rv = sigfillset(&signalSet);
00149 BLOCXX_ASSERT(rv == 0);
00150 rv = sigdelset(&signalSet, SIGUSR1);
00151 BLOCXX_ASSERT(rv == 0);
00152 rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
00153 BLOCXX_ASSERT(rv == 0);
00154
00155 LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
00156 ThreadFunction func = parg->m_func;
00157 void* funcParm = parg->m_funcParm;
00158 delete parg;
00159 Int32 rval = (*func)(funcParm);
00160 void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00161 pthread_exit(prval);
00162 return prval;
00163 }
00164 }
00165
00166
00167 struct default_stack_size
00168 {
00169 #if !defined(BLOCXX_NCR)
00170 default_stack_size()
00171 {
00172
00173 val = 0;
00174 needsSetting = false;
00175
00176
00177
00178
00179 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00180 pthread_attr_t stack_size_attr;
00181 if (pthread_attr_init(&stack_size_attr) != 0)
00182 {
00183 return;
00184 }
00185 if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
00186 {
00187 return;
00188 }
00189
00190 if (val < 1048576)
00191 {
00192 val = 1048576;
00193 needsSetting = true;
00194 }
00195 #ifdef PTHREAD_STACK_MIN
00196 if (PTHREAD_STACK_MIN > val)
00197 {
00198 val = PTHREAD_STACK_MIN;
00199 needsSetting = true;
00200 }
00201 #endif
00202
00203 #endif //#ifdef _POSIX_THREAD_ATTR_STACKSIZE
00204 }
00205
00206 #else //#if !defined(BLOCXX_NCR)
00207 default_stack_size()
00208 {
00209
00210 val = 0;
00211 needsSetting = false;
00212
00213
00214
00215
00216 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00217 pthread_attr_t stack_size_attr;
00218 if (pthread_attr_create(&stack_size_attr) != 0)
00219 {
00220 return;
00221 }
00222
00223 val = pthread_attr_getstacksize(stack_size_attr);
00224 if (static_cast<signed>(val) == -1)
00225 {
00226 return;
00227 }
00228
00229
00230
00231
00232 #if defined(PTHREAD_STACK_MIN) && defined(_SC_THREAD_STACK_MIN)
00233 if (PTHREAD_STACK_MIN > val)
00234 {
00235 val = PTHREAD_STACK_MIN;
00236 needsSetting = true;
00237 }
00238 #endif
00239
00240 #endif //#ifdef _POSIX_THREAD_ATTR_STACKSIZE
00241 }
00242 #endif //#if !defined(BLOCXX_NCR)
00243
00244 static size_t val;
00245 static bool needsSetting;
00246 };
00247
00248 size_t default_stack_size::val = 0;
00249 bool default_stack_size::needsSetting(false);
00250 default_stack_size g_theDefaultStackSize;
00252 pthread_once_t once_control = PTHREAD_ONCE_INIT;
00253 pthread_key_t theKey;
00254 extern "C" {
00255
00256 #ifdef BLOCXX_NCR
00257 static void
00258 SIGUSR1Handler()
00259 {
00260
00261 }
00262 #else
00263 static void
00264 SIGUSR1Handler(int sig)
00265 {
00266
00267 }
00268 #endif
00269
00271 static void doOneTimeThreadInitialization()
00272 {
00273 #ifdef BLOCXX_NCR
00274 pthread_keycreate(&theKey, NULL);
00275 #else
00276 pthread_key_create(&theKey, NULL);
00277 #endif
00278
00279 struct sigaction temp;
00280 memset(&temp, '\0', sizeof(temp));
00281 sigaction(SIGUSR1, 0, &temp);
00282 temp.sa_handler = SIGUSR1Handler;
00283 sigemptyset(&temp.sa_mask);
00284 temp.sa_flags = 0;
00285 sigaction(SIGUSR1, &temp, NULL);
00286 }
00287
00288 }
00289 }
00291
00292 #if !defined(BLOCXX_NCR)
00293
00294 int
00295 createThread(Thread_t& handle, ThreadFunction func,
00296 void* funcParm, UInt32 threadFlags)
00297 {
00298 int cc = 0;
00299 pthread_attr_t attr;
00300 pthread_attr_init(&attr);
00301 if (!(threadFlags & BLOCXX_THREAD_FLG_JOINABLE))
00302 {
00303 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00304 }
00305
00306 #if !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00307
00308 if (default_stack_size::needsSetting)
00309 {
00310 pthread_attr_setstacksize(&attr, default_stack_size::val);
00311 }
00312 #endif
00313
00314 LocalThreadParm* parg = new LocalThreadParm;
00315 parg->m_func = func;
00316 parg->m_funcParm = funcParm;
00317 cc = pthread_create(&handle, &attr, threadStarter, parg);
00318 pthread_attr_destroy(&attr);
00319 return cc;
00320 }
00321
00322 #else //#if !defined(BLOCXX_NCR)
00323
00324 int
00325 createThread(Thread_t& handle, ThreadFunction func,
00326 void* funcParm, UInt32 threadFlags)
00327 {
00328 int cc = 0;
00329 pthread_attr_t attr;
00330 pthread_attr_create(&attr);
00331
00332 #if !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00333
00334 if (default_stack_size::needsSetting)
00335 {
00336 pthread_attr_setstacksize(&attr, default_stack_size::val);
00337 }
00338 #endif
00339
00340 LocalThreadParm* parg = new LocalThreadParm;
00341 parg->m_func = func;
00342 parg->m_funcParm = funcParm;
00343 if (pthread_create(&handle, attr, threadStarter, parg) != 0)
00344 {
00345 cc = -1;
00346 }
00347
00348 if (cc != -1 && !(threadFlags & BLOCXX_THREAD_FLG_JOINABLE))
00349 {
00350 pthread_detach(&handle);
00351 }
00352
00353 pthread_attr_delete(&attr);
00354 return cc;
00355 }
00356 #endif //#if !defined(BLOCXX_NCR)
00357
00358
00359 void
00360 exitThread(Thread_t&, Int32 rval)
00361 {
00362 void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00363 pthread_exit(prval);
00364 }
00365
00366
00367 #if defined(BLOCXX_SIZEOF_PTHREAD_T)
00368 #if BLOCXX_SIZEOF_PTHREAD_T == 2
00369 #define BLOCXX_THREAD_CONVERTER UInt16
00370 #elif BLOCXX_SIZEOF_PTHREAD_T == 4
00371 #define BLOCXX_THREAD_CONVERTER UInt32
00372 #elif BLOCXX_SIZEOF_PTHREAD_T == 8
00373 #define BLOCXX_THREAD_CONVERTER UInt64
00374 #else
00375 #ifdef BLOCXX_NCR //BLOCXX_SIZEOF_PTHREAD_T=0 for this OS
00376 #define BLOCXX_THREAD_CONVERTER UInt16
00377 #else
00378 #error Unexpected size for pthread_t
00379 #endif
00380 #endif
00381 #else
00382 #error No pthread_t size was found!
00383 #endif
00384
00385 UInt64 thread_t_ToUInt64(Thread_t thr)
00386 {
00387 #ifdef BLOCXX_NCR
00388 return UInt64(BLOCXX_THREAD_CONVERTER(cma_thread_get_unique(&thr)));
00389 #else
00390 return UInt64(BLOCXX_THREAD_CONVERTER(thr));
00391 #endif
00392 }
00393 #undef BLOCXX_THREAD_CONVERTER
00394
00396
00397 void
00398 destroyThread(Thread_t& )
00399 {
00400 }
00402
00403 int
00404 setThreadDetached(Thread_t& handle)
00405 {
00406 #ifdef BLOCXX_NCR
00407 int cc = pthread_detach(&handle);
00408 #else
00409 int cc = pthread_detach(handle);
00410 #endif
00411 if (cc != 0)
00412 {
00413 if (cc != EINVAL)
00414 {
00415 cc = -1;
00416 }
00417 }
00418 return cc;
00419 }
00421
00422 int
00423 joinThread(Thread_t& handle, Int32& rval)
00424 {
00425 void* prval(0);
00426 if ((errno = pthread_join(handle, &prval)) == 0)
00427 {
00428 rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
00429 return 0;
00430 }
00431 else
00432 {
00433 return 1;
00434 }
00435 }
00437 void
00438 testCancel()
00439 {
00440
00441 pthread_once(&once_control, &doOneTimeThreadInitialization);
00442 Thread* theThread = NULL;
00443 #ifdef BLOCXX_NCR
00444 pthread_addr_t addr_ptr = NULL;
00445 int ret = pthread_getspecific(theKey, &addr_ptr);
00446 if (ret == 0)
00447 {
00448 theThread = reinterpret_cast<Thread*>(addr_ptr);
00449 }
00450 #else
00451 theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
00452 #endif
00453 if (theThread == 0)
00454 {
00455 return;
00456 }
00457 if (AtomicGet(theThread->m_cancelRequested) == 1)
00458 {
00459
00460
00461
00462
00463
00464 throw ThreadCancelledException();
00465 }
00466 }
00468 void saveThreadInTLS(void* pTheThread)
00469 {
00470
00471 pthread_once(&once_control, &doOneTimeThreadInitialization);
00472 int rc;
00473 if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
00474 {
00475 BLOCXX_THROW(ThreadException, Format("pthread_setspecific failed. error = %1(%2)", rc, strerror(rc)).c_str());
00476 }
00477 }
00479 void sendSignalToThread(Thread_t threadID, int signo)
00480 {
00481 int rc;
00482 if ((rc = pthread_kill(threadID, signo)) != 0)
00483 {
00484 BLOCXX_THROW(ThreadException, Format("pthread_kill failed. error = %1(%2)", rc, strerror(rc)).c_str());
00485 }
00486 }
00488 void cancel(Thread_t threadID)
00489 {
00490 int rc;
00491 if ((rc = pthread_cancel(threadID)) != 0)
00492 {
00493 BLOCXX_THROW(ThreadException, Format("pthread_cancel failed. error = %1(%2)", rc, strerror(rc)).c_str());
00494 }
00495 }
00496 #endif // #ifdef BLOCXX_USE_PTHREAD
00497
00498 #if defined(BLOCXX_WIN32)
00499
00500 namespace {
00501
00502 struct WThreadInfo
00503 {
00504 HANDLE handle;
00505 BLOCXX_NAMESPACE::Thread* pTheThread;
00506 };
00507
00508 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
00509 Win32ThreadMap g_threads;
00510 Mutex g_threadsGuard;
00511
00512 struct LocalThreadParm
00513 {
00514 ThreadFunction m_func;
00515 void* m_funcParm;
00516 };
00517
00519 extern "C" {
00520 unsigned __stdcall threadStarter(void* arg)
00521 {
00522 LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
00523 ThreadFunction func = parg->m_func;
00524 void* funcParm = parg->m_funcParm;
00525 delete parg;
00526 Int32 rval = (*func)(funcParm);
00527 ::_endthreadex(static_cast<unsigned>(rval));
00528 return rval;
00529 }
00530 }
00531
00533 void
00534 addThreadToMap(DWORD threadId, HANDLE threadHandle)
00535 {
00536 MutexLock ml(g_threadsGuard);
00537 WThreadInfo wi;
00538 wi.handle = threadHandle;
00539 wi.pTheThread = 0;
00540 g_threads[threadId] = wi;
00541 }
00542
00544 HANDLE
00545 getThreadHandle(DWORD threadId)
00546 {
00547 MutexLock ml(g_threadsGuard);
00548 HANDLE chdl = 0;
00549 Win32ThreadMap::iterator it = g_threads.find(threadId);
00550 if (it != g_threads.end())
00551 {
00552 chdl = it->second.handle;
00553 }
00554 return chdl;
00555 }
00556
00558 void
00559 setThreadPointer(DWORD threadId, Thread* pTheThread)
00560 {
00561 MutexLock ml(g_threadsGuard);
00562 Win32ThreadMap::iterator it = g_threads.find(threadId);
00563 if (it != g_threads.end())
00564 {
00565 it->second.pTheThread = pTheThread;
00566 }
00567 }
00568
00570 HANDLE
00571 removeThreadFromMap(DWORD threadId)
00572 {
00573 MutexLock ml(g_threadsGuard);
00574 HANDLE chdl = 0;
00575 Win32ThreadMap::iterator it = g_threads.find(threadId);
00576 if (it != g_threads.end())
00577 {
00578 chdl = it->second.handle;
00579 g_threads.erase(it);
00580 }
00581 return chdl;
00582 }
00583
00585 Thread*
00586 getThreadObject(DWORD threadId)
00587 {
00588 Thread* pTheThread = 0;
00589 MutexLock ml(g_threadsGuard);
00590 Win32ThreadMap::iterator it = g_threads.find(threadId);
00591 if (it != g_threads.end())
00592 {
00593 pTheThread = it->second.pTheThread;
00594 }
00595 return pTheThread;
00596 }
00597
00598 }
00599
00601
00602 int
00603 createThread(Thread_t& handle, ThreadFunction func,
00604 void* funcParm, UInt32 threadFlags)
00605 {
00606 int cc = -1;
00607 HANDLE hThread;
00608 unsigned threadId;
00609
00610 LocalThreadParm* parg = new LocalThreadParm;
00611 parg->m_func = func;
00612 parg->m_funcParm = funcParm;
00613 hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
00614 parg, 0, &threadId));
00615 if (hThread != 0)
00616 {
00617 addThreadToMap(threadId, hThread);
00618 handle = threadId;
00619 cc = 0;
00620 }
00621 else
00622 {
00623 cc = errno;
00624 }
00625
00626 return cc;
00627 }
00629
00630 void
00631 exitThread(Thread_t&, Int32 rval)
00632 {
00633 ::_endthreadex(static_cast<unsigned>(rval));
00634 }
00635
00637
00638 UInt64 thread_t_ToUInt64(Thread_t thr)
00639 {
00640
00641 BLOCXX_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t)," Thread_t truncated!");
00642 return static_cast<UInt64>(thr);
00643 }
00644
00646
00647 void
00648 destroyThread(Thread_t& threadId)
00649 {
00650 HANDLE thdl = removeThreadFromMap(threadId);
00651 if (thdl != 0)
00652 {
00653 ::CloseHandle(thdl);
00654 }
00655 }
00657
00658 int
00659 setThreadDetached(Thread_t& handle)
00660 {
00661
00662 return 0;
00663 }
00665
00666 int
00667 joinThread(Thread_t& threadId, Int32& rvalArg)
00668 {
00669 int cc = -1;
00670 DWORD rval;
00671 HANDLE thdl = getThreadHandle(threadId);
00672 if (thdl != 0)
00673 {
00674 if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
00675 {
00676 if (::GetExitCodeThread(thdl, &rval) != 0)
00677 {
00678 rvalArg = static_cast<Int32>(rval);
00679 cc = 0;
00680 }
00681 }
00682 }
00683 return cc;
00684 }
00685
00687 void
00688 testCancel()
00689 {
00690 DWORD threadId = ThreadImpl::currentThread();
00691 Thread* pTheThread = getThreadObject(threadId);
00692 if (pTheThread)
00693 {
00694 if (AtomicGet(pTheThread->m_cancelRequested) == 1)
00695 {
00696
00697
00698
00699
00700
00701 throw ThreadCancelledException();
00702 }
00703 }
00704 }
00706 void saveThreadInTLS(void* pThreadArg)
00707 {
00708 Thread* pThread = static_cast<Thread*>(pThreadArg);
00709 DWORD threadId = pThread->getId();
00710 setThreadPointer(threadId, pThread);
00711 }
00713 void sendSignalToThread(Thread_t threadID, int signo)
00714 {
00715 }
00717 void cancel(Thread_t threadId)
00718 {
00719 HANDLE thdl = getThreadHandle(threadId);
00720 if (thdl != 0)
00721 {
00722 ::TerminateThread(thdl, -1);
00723 }
00724 }
00725
00726 #endif // #ifdef BLOCXX_WIN32
00727 }
00728
00729 }
00730