00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/ThreadBarrier.hpp"
00040 #include "blocxx/Assertion.hpp"
00041 #include "blocxx/Format.hpp"
00042 #include "blocxx/ExceptionIds.hpp"
00043 #include "blocxx/IntrusiveCountableBase.hpp"
00044
00045 #if defined(BLOCXX_USE_PTHREAD) && defined(BLOCXX_HAVE_PTHREAD_BARRIER) && !defined(BLOCXX_VALGRIND_SUPPORT)
00046 #include <pthread.h>
00047 #include <cstring>
00048 #else
00049
00050 #include "blocxx/Condition.hpp"
00051 #include "blocxx/NonRecursiveMutex.hpp"
00052 #include "blocxx/NonRecursiveMutexLock.hpp"
00053 #endif
00054
00056 namespace BLOCXX_NAMESPACE
00057 {
00058
00059 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ThreadBarrier);
00060
00061 #if defined(BLOCXX_USE_PTHREAD) && defined(BLOCXX_HAVE_PTHREAD_BARRIER) && !defined(BLOCXX_VALGRIND_SUPPORT) // valgrind doesn't support pthread_barrier_*()
00062 class ThreadBarrierImpl : public IntrusiveCountableBase
00063 {
00064 public:
00065 ThreadBarrierImpl(UInt32 threshold)
00066 {
00067 BLOCXX_ASSERT(threshold != 0);
00068 memset(&barrier, 0, sizeof(barrier));
00069 int res = pthread_barrier_init(&barrier, NULL, threshold);
00070 if (res != 0)
00071 {
00072 BLOCXX_THROW(ThreadBarrierException, Format("pthread_barrier_init failed: %1(%2)", res, strerror(res)).c_str());
00073 }
00074 }
00075 ~ThreadBarrierImpl()
00076 {
00077 int res = pthread_barrier_destroy(&barrier);
00078 if (res != 0)
00079 {
00080
00081 }
00082 }
00083
00084 void wait()
00085 {
00086 int res = pthread_barrier_wait(&barrier);
00087 if (res != 0 && res != PTHREAD_BARRIER_SERIAL_THREAD)
00088 {
00089 BLOCXX_THROW(ThreadBarrierException, Format("pthread_barrier_wait failed: %1(%2)", res, strerror(res)).c_str());
00090 }
00091 }
00092 private:
00093 pthread_barrier_t barrier;
00094 };
00095
00096 #else
00097
00098
00099
00100 class ThreadBarrierImpl : public IntrusiveCountableBase
00101 {
00102 public:
00113 struct SubBarrier
00114 {
00115 SubBarrier() : m_waitingCount(0) {}
00117 UInt32 m_waitingCount;
00119 Condition m_cond;
00120 };
00121 ThreadBarrierImpl(UInt32 threshold)
00122 : m_threshold(threshold)
00123 , m_curSubBarrier(0)
00124 {
00125 }
00126 void wait()
00127 {
00128 NonRecursiveMutexLock l(m_mutex);
00129
00130 SubBarrier& curBarrier = m_curSubBarrier?m_subBarrier0:m_subBarrier1;
00131 ++curBarrier.m_waitingCount;
00132 if (curBarrier.m_waitingCount == m_threshold)
00133 {
00134
00135 curBarrier.m_waitingCount = 0;
00136
00137 m_curSubBarrier = 1 - m_curSubBarrier;
00138
00139 curBarrier.m_cond.notifyAll();
00140 }
00141 else
00142 {
00143
00144
00145
00146 while (curBarrier.m_waitingCount != 0)
00147 {
00148 curBarrier.m_cond.wait(l);
00149 }
00150 }
00151 }
00152 private:
00154 UInt32 m_threshold;
00157 int m_curSubBarrier;
00158 NonRecursiveMutex m_mutex;
00159 SubBarrier m_subBarrier0;
00160 SubBarrier m_subBarrier1;
00161 };
00162
00163 #endif
00164
00166 ThreadBarrier::ThreadBarrier(UInt32 threshold)
00167 : m_impl(new ThreadBarrierImpl(threshold))
00168 {
00169 BLOCXX_ASSERT(threshold != 0);
00170 }
00172 void ThreadBarrier::wait()
00173 {
00174 m_impl->wait();
00175 }
00177 ThreadBarrier::~ThreadBarrier()
00178 {
00179 }
00181 ThreadBarrier::ThreadBarrier(const ThreadBarrier& x)
00182 : m_impl(x.m_impl)
00183 {
00184 }
00186 ThreadBarrier& ThreadBarrier::operator=(const ThreadBarrier& x)
00187 {
00188 m_impl = x.m_impl;
00189 return *this;
00190 }
00191
00192 }
00193