00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/ThreadPool.hpp"
00040 #include "blocxx/Array.hpp"
00041 #include "blocxx/Thread.hpp"
00042 #include "blocxx/NonRecursiveMutex.hpp"
00043 #include "blocxx/NonRecursiveMutexLock.hpp"
00044 #include "blocxx/Condition.hpp"
00045 #include "blocxx/Format.hpp"
00046 #include "blocxx/Mutex.hpp"
00047 #include "blocxx/MutexLock.hpp"
00048 #include "blocxx/NullLogger.hpp"
00049 #include "blocxx/Timeout.hpp"
00050 #include "blocxx/TimeoutTimer.hpp"
00051 #include "blocxx/GlobalString.hpp"
00052
00053 #include <deque>
00054
00055 #ifdef BLOCXX_DEBUG
00056 #include <iostream>
00057 #endif
00058
00059 namespace BLOCXX_NAMESPACE
00060 {
00061
00062 BLOCXX_DEFINE_EXCEPTION(ThreadPool);
00063
00064
00065 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
00066 #define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
00067 #define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
00068 #define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
00069 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
00070
00072 class ThreadPoolImpl : public IntrusiveCountableBase
00073 {
00074 public:
00075
00076 virtual bool addWork(const RunnableRef& work, const Timeout& timeout) = 0;
00077 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) = 0;
00078 virtual void waitForEmptyQueue() = 0;
00079 virtual ~ThreadPoolImpl()
00080 {
00081 }
00082 };
00083 namespace {
00084
00085 GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.ThreadPool");
00086
00087 class FixedSizePoolImpl;
00089 class FixedSizePoolWorkerThread : public Thread
00090 {
00091 public:
00092 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
00093 : Thread()
00094 , m_thePool(thePool)
00095 {
00096 }
00097 virtual Int32 run();
00098 private:
00099 virtual void doShutdown()
00100 {
00101 MutexLock lock(m_guard);
00102 if (m_currentRunnable)
00103 {
00104 m_currentRunnable->doShutdown();
00105 }
00106 }
00107 virtual void doCooperativeCancel()
00108 {
00109 MutexLock lock(m_guard);
00110 if (m_currentRunnable)
00111 {
00112 m_currentRunnable->doCooperativeCancel();
00113 }
00114 }
00115 virtual void doDefinitiveCancel()
00116 {
00117 MutexLock lock(m_guard);
00118 if (m_currentRunnable)
00119 {
00120 m_currentRunnable->doDefinitiveCancel();
00121 }
00122 }
00123
00124 FixedSizePoolImpl* m_thePool;
00125
00126 Mutex m_guard;
00127 RunnableRef m_currentRunnable;
00128
00129
00130 FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
00131 FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
00132 };
00134 class CommonPoolImpl : public ThreadPoolImpl
00135 {
00136 protected:
00137 CommonPoolImpl(UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00138 : m_maxQueueSize(maxQueueSize)
00139 , m_queueClosed(false)
00140 , m_shutdown(false)
00141 , m_logger(logger)
00142 , m_poolName(poolName)
00143 {
00144 }
00145
00146 virtual ~CommonPoolImpl()
00147 {
00148 }
00149
00150
00151 virtual bool queueIsFull() const
00152 {
00153 return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
00154 }
00155
00156
00157 bool queueClosed() const
00158 {
00159 return m_shutdown || m_queueClosed;
00160 }
00161
00162 bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
00163 {
00164 NonRecursiveMutexLock l(m_queueLock);
00165
00166 if (queueClosed())
00167 {
00168 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue is already closed. Why are you trying to shutdown again?");
00169 return false;
00170 }
00171 m_queueClosed = true;
00172 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue closed");
00173
00174 if (finishWorkInQueue)
00175 {
00176 TimeoutTimer timer(timeout);
00177 while (m_queue.size() != 0)
00178 {
00179 if (timer.infinite())
00180 {
00181 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting forever for queue to empty");
00182 m_queueEmpty.wait(l);
00183 }
00184 else
00185 {
00186 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting w/timout for queue to empty");
00187 if (!m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
00188 {
00189 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Wait timed out. Work in queue will be discarded.");
00190 break;
00191 }
00192 }
00193 }
00194 }
00195 m_shutdown = true;
00196 return true;
00197 }
00198
00199 virtual void waitForEmptyQueue()
00200 {
00201 NonRecursiveMutexLock l(m_queueLock);
00202 while (m_queue.size() != 0)
00203 {
00204 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting for empty queue");
00205 m_queueEmpty.wait(l);
00206 }
00207 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue empty: the wait is over");
00208 }
00209
00210 void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00211 {
00212 TimeoutTimer shutdownTimer(shutdownTimeout);
00213 TimeoutTimer dTimer(definitiveCancelTimeout);
00214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
00215 {
00216 return;
00217 }
00218
00219
00220 m_queueNotEmpty.notifyAll();
00221 m_queueNotFull.notifyAll();
00222
00223 if (!shutdownTimer.infinite())
00224 {
00225
00226 for (UInt32 i = 0; i < m_threads.size(); ++i)
00227 {
00228 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling shutdown on thread %1", i));
00229 m_threads[i]->shutdown();
00230 }
00231
00232
00233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
00234 for (UInt32 i = 0; i < m_threads.size(); ++i)
00235 {
00236 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Waiting for thread %1 to exit.", i));
00237 m_threads[i]->timedWait(absoluteShutdownTimeout);
00238 }
00239
00240
00241 for (UInt32 i = 0; i < m_threads.size(); ++i)
00242 {
00243 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling cooperativeCancel on thread %1", i));
00244 m_threads[i]->cooperativeCancel();
00245 }
00246
00247 if (!dTimer.infinite())
00248 {
00249
00250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
00251 for (UInt32 i = 0; i < m_threads.size(); ++i)
00252 {
00253 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling definitiveCancel on thread %1", i));
00254 try
00255 {
00256 if (!m_threads[i]->definitiveCancel(absoluteDefinitiveTimeout))
00257 {
00258 BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
00259 }
00260 }
00261 catch (CancellationDeniedException& e)
00262 {
00263 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e, i));
00264 }
00265 }
00266 }
00267
00268 }
00269
00270
00271 for (UInt32 i = 0; i < m_threads.size(); ++i)
00272 {
00273 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("calling join() on thread %1", i));
00274 m_threads[i]->join();
00275 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("join() finished for thread %1", i));
00276 }
00277 }
00278
00279 RunnableRef getWorkFromQueue(bool waitForWork)
00280 {
00281 NonRecursiveMutexLock l(m_queueLock);
00282 while ((m_queue.size() == 0) && (!m_shutdown))
00283 {
00284 if (waitForWork)
00285 {
00286 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waiting for work");
00287 m_queueNotEmpty.wait(l);
00288 }
00289 else
00290 {
00291
00292
00293 if (!m_queueNotEmpty.timedWait(l,Timeout::relative(1)))
00294 {
00295 BLOCXX_POOL_LOG_DEBUG3(m_logger, "No work after 1 sec. I'm not waiting any longer");
00296 return RunnableRef();
00297 }
00298 }
00299 }
00300
00301 if (m_shutdown)
00302 {
00303 BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
00304 return RunnableRef();
00305 }
00306
00307 RunnableRef work = m_queue.front();
00308 m_queue.pop_front();
00309
00310
00311 incrementWorkerCount();
00312
00313 if (!queueIsFull())
00314 {
00315 m_queueNotFull.notifyAll();
00316 }
00317
00318
00319 if (m_queue.size() == 0)
00320 {
00321 m_queueEmpty.notifyAll();
00322 }
00323 BLOCXX_POOL_LOG_DEBUG3(m_logger, "A thread got some work to do");
00324 return work;
00325 }
00326
00327
00328 virtual void incrementWorkerCount()
00329 {
00330 }
00331
00332 virtual void decrementWorkerCount()
00333 {
00334 }
00335
00336
00337 UInt32 m_maxQueueSize;
00338
00339 Array<ThreadRef> m_threads;
00340 std::deque<RunnableRef> m_queue;
00341 bool m_queueClosed;
00342 bool m_shutdown;
00343
00344 NonRecursiveMutex m_queueLock;
00345 Condition m_queueNotFull;
00346 Condition m_queueEmpty;
00347 Condition m_queueNotEmpty;
00348 Logger m_logger;
00349 String m_poolName;
00350 };
00351 class FixedSizePoolImpl : public CommonPoolImpl
00352 {
00353 public:
00354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00355 : CommonPoolImpl(maxQueueSize, logger, poolName)
00356 {
00357
00358 m_threads.reserve(numThreads);
00359 for (UInt32 i = 0; i < numThreads; ++i)
00360 {
00361 m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
00362 }
00363 for (UInt32 i = 0; i < numThreads; ++i)
00364 {
00365 try
00366 {
00367 m_threads[i]->start();
00368 }
00369 catch (ThreadException& e)
00370 {
00371 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread #%1: %2", i, e));
00372 m_threads.resize(i);
00373
00374 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
00375 throw;
00376 }
00377 }
00378 BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
00379 }
00380
00381 virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
00382 {
00383
00384 if (!work)
00385 {
00386 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00387 return false;
00388 }
00389
00390 NonRecursiveMutexLock l(m_queueLock);
00391 TimeoutTimer timer(timeout);
00392 while ( queueIsFull() && !queueClosed() )
00393 {
00394 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00395 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
00396 {
00397
00398 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
00399 return false;
00400 }
00401 }
00402
00403
00404 if (queueClosed())
00405 {
00406 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00407 return false;
00408 }
00409
00410 m_queue.push_back(work);
00411
00412
00413 if (m_queue.size() == 1)
00414 {
00415 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waking up sleepy workers");
00416 m_queueNotEmpty.notifyAll();
00417 }
00418
00419 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00420 return true;
00421 }
00422
00423
00424 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00425 {
00426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
00427 }
00428 virtual ~FixedSizePoolImpl()
00429 {
00430
00431 try
00432 {
00433
00434 if (!queueClosed())
00435 {
00436
00437
00438 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
00439 }
00440 }
00441 catch (...)
00442 {
00443 }
00444 }
00445 private:
00446 friend class FixedSizePoolWorkerThread;
00447 };
00448 void runRunnable(const RunnableRef& work)
00449 {
00450
00451 try
00452 {
00453 work->run();
00454 }
00455 catch (ThreadCancelledException&)
00456 {
00457 throw;
00458 }
00459 catch (Exception& ex)
00460 {
00461 #ifdef BLOCXX_DEBUG
00462 std::clog << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
00463 #endif
00464 Logger logger(COMPONENT_NAME);
00465 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in ThreadPool worker: %1", ex));
00466 }
00467 catch(std::exception& ex)
00468 {
00469 #ifdef BLOCXX_DEBUG
00470 std::clog << "!!! std::exception what = \"" << ex.what() << "\" caught in ThreadPool worker" << std::endl;
00471 #endif
00472 Logger logger(COMPONENT_NAME);
00473 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in ThreadPool worker: %1", ex.what()));
00474 }
00475 catch (...)
00476 {
00477 #ifdef BLOCXX_DEBUG
00478 std::clog << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
00479 #endif
00480 Logger logger(COMPONENT_NAME);
00481 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in ThreadPool worker.");
00482 }
00483 }
00484 Int32 FixedSizePoolWorkerThread::run()
00485 {
00486 while (true)
00487 {
00488
00489 RunnableRef work = m_thePool->getWorkFromQueue(true);
00490 if (!work)
00491 {
00492 return 0;
00493 }
00494
00495 {
00496 MutexLock lock(m_guard);
00497 m_currentRunnable = work;
00498 }
00499 runRunnable(work);
00500 {
00501 MutexLock lock(m_guard);
00502 m_currentRunnable = 0;
00503 }
00504 }
00505 return 0;
00506 }
00507 class DynamicSizePoolImpl;
00509 class DynamicSizePoolWorkerThread : public Thread
00510 {
00511 public:
00512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
00513 : Thread()
00514 , m_thePool(thePool)
00515 {
00516 }
00517 virtual Int32 run();
00518 private:
00519 virtual void doShutdown()
00520 {
00521 MutexLock lock(m_guard);
00522 if (m_currentRunnable)
00523 {
00524 m_currentRunnable->doShutdown();
00525 }
00526 }
00527 virtual void doCooperativeCancel()
00528 {
00529 MutexLock lock(m_guard);
00530 if (m_currentRunnable)
00531 {
00532 m_currentRunnable->doCooperativeCancel();
00533 }
00534 }
00535 virtual void doDefinitiveCancel()
00536 {
00537 MutexLock lock(m_guard);
00538 if (m_currentRunnable)
00539 {
00540 m_currentRunnable->doDefinitiveCancel();
00541 }
00542 }
00543
00544 DynamicSizePoolImpl* m_thePool;
00545
00546 Mutex m_guard;
00547 RunnableRef m_currentRunnable;
00548
00549
00550 DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
00551 DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
00552 };
00554 class DynamicSizePoolImpl : public CommonPoolImpl
00555 {
00556 public:
00557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00558 : CommonPoolImpl(maxQueueSize, logger, poolName)
00559 , m_maxThreads(maxThreads)
00560 {
00561 }
00562
00563 virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
00564 {
00565
00566 if (!work)
00567 {
00568 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00569 return false;
00570 }
00571 NonRecursiveMutexLock l(m_queueLock);
00572
00573
00574 if (queueClosed())
00575 {
00576 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00577 return false;
00578 }
00579
00580
00581
00582
00583 size_t i = 0;
00584 while (i < m_threads.size())
00585 {
00586 if (!m_threads[i]->isRunning())
00587 {
00588 BLOCXX_POOL_LOG_DEBUG3(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
00589 m_threads[i]->join();
00590 m_threads.remove(i);
00591 }
00592 else
00593 {
00594 ++i;
00595 }
00596 }
00597
00598 TimeoutTimer timer(timeout);
00599 while ( queueIsFull() && !queueClosed() )
00600 {
00601 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00602 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
00603 {
00604
00605 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
00606 return false;
00607 }
00608 }
00609
00610
00611 if (queueClosed())
00612 {
00613 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00614 return false;
00615 }
00616
00617 m_queue.push_back(work);
00618
00619 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00620
00621
00622
00623
00624
00625
00626
00627 l.release();
00628 m_queueNotEmpty.notifyOne();
00629 Thread::yield();
00630 l.lock();
00631
00632
00633 if (!m_queue.empty() && m_threads.size() < m_maxThreads)
00634 {
00635 ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
00636 m_threads.push_back(theThread);
00637 BLOCXX_POOL_LOG_DEBUG3(m_logger, "About to start a new thread");
00638 try
00639 {
00640 theThread->start();
00641 }
00642 catch (ThreadException& e)
00643 {
00644 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread: %1", e));
00645 m_threads.pop_back();
00646 throw;
00647 }
00648 BLOCXX_POOL_LOG_DEBUG2(m_logger, "New thread started");
00649 }
00650 return true;
00651 }
00652
00653
00654 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00655 {
00656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
00657 }
00658 virtual ~DynamicSizePoolImpl()
00659 {
00660
00661 try
00662 {
00663
00664 if (!queueClosed())
00665 {
00666
00667
00668 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
00669 }
00670 }
00671 catch (...)
00672 {
00673 }
00674 }
00675
00676 protected:
00677 UInt32 getMaxThreads() const
00678 {
00679 return m_maxThreads;
00680 }
00681
00682 private:
00683
00684 UInt32 m_maxThreads;
00685 friend class DynamicSizePoolWorkerThread;
00686 };
00687 Int32 DynamicSizePoolWorkerThread::run()
00688 {
00689 while (true)
00690 {
00691
00692 RunnableRef work = m_thePool->getWorkFromQueue(false);
00693 if (!work)
00694 {
00695 return 0;
00696 }
00697
00698 {
00699 MutexLock lock(m_guard);
00700 m_currentRunnable = work;
00701 }
00702 runRunnable(work);
00703 m_thePool->decrementWorkerCount();
00704 {
00705 MutexLock lock(m_guard);
00706 m_currentRunnable = 0;
00707 }
00708 }
00709 return 0;
00710 }
00711
00713 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
00714 {
00715 public:
00716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const Logger& logger, const String& poolName)
00717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
00718 , m_workingThreads(0)
00719 {
00720 }
00721
00722 virtual ~DynamicSizeNoQueuePoolImpl()
00723 {
00724 }
00725
00726 virtual void incrementWorkerCount()
00727 {
00728 ++m_workingThreads;
00729 }
00730
00731 virtual void decrementWorkerCount()
00732 {
00733 NonRecursiveMutexLock lock(m_queueLock);
00734 --m_workingThreads;
00735
00736 m_queueNotFull.notifyAll();
00737 }
00738
00739
00740 virtual bool queueIsFull() const
00741 {
00742
00743
00744 size_t freeThreads = getMaxThreads() - AtomicGet(m_workingThreads);
00745 return (freeThreads <= m_queue.size());
00746 }
00747
00748 private:
00749
00750 size_t m_workingThreads;
00751
00752 };
00753
00754 }
00756 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const String& poolName)
00757 {
00758 NullLogger logger;
00759 switch (poolType)
00760 {
00761 case FIXED_SIZE:
00762 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00763 break;
00764 case DYNAMIC_SIZE:
00765 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00766 break;
00767 case DYNAMIC_SIZE_NO_QUEUE:
00768 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00769 break;
00770 }
00771 }
00773 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00774 {
00775 switch (poolType)
00776 {
00777 case FIXED_SIZE:
00778 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00779 break;
00780 case DYNAMIC_SIZE:
00781 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00782 break;
00783 case DYNAMIC_SIZE_NO_QUEUE:
00784 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00785 break;
00786 }
00787 }
00789 bool ThreadPool::addWork(const RunnableRef& work)
00790 {
00791 return m_impl->addWork(work, Timeout::infinite);
00792 }
00794 bool ThreadPool::tryAddWork(const RunnableRef& work)
00795 {
00796 return m_impl->addWork(work, Timeout::relative(0));
00797 }
00799 bool ThreadPool::tryAddWork(const RunnableRef& work, const Timeout& timeout)
00800 {
00801 return m_impl->addWork(work, timeout);
00802 }
00804 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00805 {
00806 m_impl->shutdown(finishWorkInQueue, Timeout::relative(shutdownSecs), Timeout::relative(shutdownSecs));
00807 }
00809 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
00810 {
00811 m_impl->shutdown(finishWorkInQueue, timeout, timeout);
00812 }
00814 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00815 {
00816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
00817 }
00819 void ThreadPool::waitForEmptyQueue()
00820 {
00821 m_impl->waitForEmptyQueue();
00822 }
00824 ThreadPool::~ThreadPool()
00825 {
00826 }
00828 ThreadPool::ThreadPool(const ThreadPool& x)
00829 : IntrusiveCountableBase(x)
00830 , m_impl(x.m_impl)
00831 {
00832 }
00834 ThreadPool& ThreadPool::operator=(const ThreadPool& x)
00835 {
00836 m_impl = x.m_impl;
00837 return *this;
00838 }
00839
00840 }
00841