00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/Condition.hpp"
00039 #include "blocxx/NonRecursiveMutexLock.hpp"
00040 #include "blocxx/ExceptionIds.hpp"
00041 #include "blocxx/Timeout.hpp"
00042 #include "blocxx/TimeoutTimer.hpp"
00043 #include "blocxx/ThreadImpl.hpp"
00044
00045 #include <cassert>
00046 #include <cerrno>
00047 #include <limits>
00048 #ifdef BLOCXX_HAVE_SYS_TIME_H
00049 #include <sys/time.h>
00050 #endif
00051
00052 namespace BLOCXX_NAMESPACE
00053 {
00054
00055 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionLock);
00056 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionResource);
00057 #if defined(BLOCXX_USE_PTHREAD)
00058
00059 Condition::Condition()
00060 {
00061 int res = pthread_cond_init(&m_condition, PTHREAD_COND_ATTR_DEFAULT);
00062 if (res != 0)
00063 {
00064 BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
00065 }
00066 }
00068 Condition::~Condition()
00069 {
00070 int res = pthread_cond_destroy(&m_condition);
00071 assert(res == 0);
00072 }
00074 void
00075 Condition::notifyOne()
00076 {
00077 int res = pthread_cond_signal(&m_condition);
00078 assert(res == 0);
00079 }
00081 void
00082 Condition::notifyAll()
00083 {
00084 int res = pthread_cond_broadcast(&m_condition);
00085 assert(res == 0);
00086 }
00088 void
00089 Condition::doWait(NonRecursiveMutex& mutex)
00090 {
00091 ThreadImpl::testCancel();
00092 int res;
00093 NonRecursiveMutexLockState state;
00094 mutex.conditionPreWait(state);
00095 res = pthread_cond_wait(&m_condition, state.pmutex);
00096 mutex.conditionPostWait(state);
00097 assert(res == 0 || res == EINTR);
00098 if (res == EINTR)
00099 {
00100 ThreadImpl::testCancel();
00101 }
00102 }
00104 namespace
00105 {
00106 inline
00107 bool timespec_less(struct timespec const & x, struct timespec const & y)
00108 {
00109 return x.tv_sec < y.tv_sec ||
00110 x.tv_sec == y.tv_sec && x.tv_nsec < y.tv_nsec;
00111 }
00112
00113 int check_timedwait(
00114 int rc, pthread_cond_t * cond, pthread_mutex_t * mtx,
00115 struct timespec const * abstime
00116 )
00117 {
00118 #ifdef BLOCXX_NCR
00119 if (rc == -1 && errno == EAGAIN)
00120 {
00121 return ETIMEDOUT;
00122 }
00123 #endif
00124 if (rc != EINVAL)
00125 {
00126 return rc;
00127 }
00128
00129 time_t const max_future = 99999999;
00130 time_t const max_time = std::numeric_limits<time_t>::max();
00131 time_t now_sec = DateTime::getCurrent().get();
00132 struct timespec new_abstime;
00133 new_abstime.tv_sec = (
00134 now_sec <= max_time - max_future
00135 ? now_sec + max_future
00136 : max_time
00137 );
00138 new_abstime.tv_nsec = 0;
00139 bool early = timespec_less(new_abstime, *abstime);
00140 if (!early)
00141 {
00142 new_abstime = *abstime;
00143 }
00144 int newrc = pthread_cond_timedwait(cond, mtx, &new_abstime);
00145 return (newrc == ETIMEDOUT && early ? EINTR : newrc);
00146 }
00147 }
00148
00149 bool
00150 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
00151 {
00152 ThreadImpl::testCancel();
00153 int res;
00154 NonRecursiveMutexLockState state;
00155 mutex.conditionPreWait(state);
00156 bool ret = false;
00157
00158 timespec ts;
00159 TimeoutTimer timer(timeout);
00160
00161 res = pthread_cond_timedwait(&m_condition, state.pmutex, timer.asTimespec(ts));
00162 res = check_timedwait(res, &m_condition, state.pmutex, &ts);
00163 mutex.conditionPostWait(state);
00164 assert(res == 0 || res == ETIMEDOUT || res == EINTR);
00165 if (res == EINTR)
00166 {
00167 ThreadImpl::testCancel();
00168 }
00169 ret = res != ETIMEDOUT;
00170 return ret;
00171 }
00172 #elif defined (BLOCXX_WIN32)
00173
00174 Condition::Condition()
00175 : m_condition(new ConditionInfo_t)
00176 {
00177 m_condition->waitersCount = 0;
00178 m_condition->wasBroadcast = false;
00179 m_condition->queue = ::CreateSemaphore(
00180 NULL,
00181 0,
00182 0x7fffffff,
00183 NULL);
00184 ::InitializeCriticalSection(&m_condition->waitersCountLock);
00185 m_condition->waitersDone = ::CreateEvent(
00186 NULL,
00187 false,
00188 false,
00189 NULL);
00190 }
00192 Condition::~Condition()
00193 {
00194 ::CloseHandle(m_condition->queue);
00195 ::DeleteCriticalSection(&m_condition->waitersCountLock);
00196 ::CloseHandle(m_condition->waitersDone);
00197 delete m_condition;
00198 }
00200 void
00201 Condition::notifyOne()
00202 {
00203 ::EnterCriticalSection(&m_condition->waitersCountLock);
00204 bool haveWaiters = m_condition->waitersCount > 0;
00205 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00206
00207
00208 if (haveWaiters)
00209 {
00210 ::ReleaseSemaphore(m_condition->queue, 1, 0);
00211 }
00212 }
00214 void
00215 Condition::notifyAll()
00216 {
00217 ::EnterCriticalSection(&m_condition->waitersCountLock);
00218 bool haveWaiters = false;
00219 if (m_condition->waitersCount > 0)
00220 {
00221
00222 haveWaiters = m_condition->wasBroadcast = true;
00223 }
00224
00225 if (haveWaiters)
00226 {
00227
00228 ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
00229 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00230
00231
00232 ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
00233 m_condition->wasBroadcast = false;
00234 }
00235 else
00236 {
00237 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00238 }
00239 }
00241 void
00242 Condition::doWait(NonRecursiveMutex& mutex)
00243 {
00244 doTimedWait(mutex, Timeout::infinite);
00245 }
00247 bool
00248
00249 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
00250 {
00251 ThreadImpl::testCancel();
00252 bool cc = true;
00253 NonRecursiveMutexLockState state;
00254 mutex.conditionPreWait(state);
00255
00256 ::EnterCriticalSection(&m_condition->waitersCountLock);
00257 m_condition->waitersCount++;
00258 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00259
00260 TimeoutTimer timer(timeout);
00261
00262
00263 if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, timer.asDWORDMs(),
00264 false) == WAIT_TIMEOUT)
00265 {
00266 cc = false;
00267 }
00268
00269 ::EnterCriticalSection(&m_condition->waitersCountLock);
00270 m_condition->waitersCount--;
00271
00272
00273 bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
00274 && cc == true);
00275
00276 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00277
00278
00279
00280 if (isLastWaiter)
00281 {
00282
00283
00284 ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
00285 INFINITE, false);
00286 }
00287 else
00288 {
00289
00290 ::WaitForSingleObject(mutex.m_mutex, INFINITE);
00291 }
00292 mutex.conditionPostWait(state);
00293 return cc;
00294 }
00295 #else
00296 #error "port me!"
00297 #endif
00298
00299 void
00300 Condition::wait(NonRecursiveMutexLock& lock)
00301 {
00302 if (!lock.isLocked())
00303 {
00304 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00305 }
00306 doWait(*(lock.m_mutex));
00307 }
00309 bool
00310 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
00311 {
00312 return timedWait(lock, Timeout::relative(sTimeout + static_cast<float>(usTimeout) / 1000000.0));
00313 }
00314
00316 bool
00317 Condition::timedWait(NonRecursiveMutexLock& lock, const Timeout& timeout)
00318 {
00319 if (!lock.isLocked())
00320 {
00321 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00322 }
00323 return doTimedWait(*(lock.m_mutex), timeout);
00324 }
00325
00326 }
00327