LCOV - code coverage report
Current view: top level - comphelper/source/misc - threadpool.cxx (source / functions) Hit Total Coverage
Test: commit 10e77ab3ff6f4314137acd6e2702a6e5c1ce1fae Lines: 86 94 91.5 %
Date: 2014-11-03 Functions: 14 17 82.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
       2             : /*
       3             :  * This file is part of the LibreOffice project.
       4             :  *
       5             :  * This Source Code Form is subject to the terms of the Mozilla Public
       6             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       7             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       8             :  */
       9             : 
      10             : #include <comphelper/threadpool.hxx>
      11             : 
      12             : #include <rtl/instance.hxx>
      13             : #include <boost/shared_ptr.hpp>
      14             : #include <thread>
      15             : #include <algorithm>
      16             : 
      17             : namespace comphelper {
      18             : 
      19         520 : class ThreadPool::ThreadWorker : public salhelper::Thread
      20             : {
      21             :     ThreadPool    *mpPool;
      22             :     osl::Condition maNewWork;
      23             :     bool           mbWorking;
      24             : public:
      25             : 
      26         260 :     ThreadWorker( ThreadPool *pPool ) :
      27             :         salhelper::Thread("thread-pool"),
      28             :         mpPool( pPool ),
      29         260 :         mbWorking( false )
      30             :     {
      31         260 :     }
      32             : 
      33         260 :     virtual void execute() SAL_OVERRIDE
      34             :     {
      35             :         ThreadTask *pTask;
      36         780 :         while ( ( pTask = waitForWork() ) )
      37             :         {
      38         260 :             pTask->doWork();
      39         260 :             delete pTask;
      40             :         }
      41         260 :     }
      42             : 
      43         520 :     ThreadTask *waitForWork()
      44             :     {
      45         520 :         ThreadTask *pRet = NULL;
      46             : 
      47         520 :         osl::ResettableMutexGuard aGuard( mpPool->maGuard );
      48             : 
      49         520 :         pRet = mpPool->popWork();
      50             : 
      51        1301 :         while( !pRet )
      52             :         {
      53         521 :             if (mbWorking)
      54         246 :                 mpPool->stopWork();
      55         521 :             mbWorking = false;
      56         521 :             maNewWork.reset();
      57             : 
      58         521 :             if( mpPool->mbTerminate )
      59         260 :                 break;
      60             : 
      61         261 :             aGuard.clear(); // unlock
      62             : 
      63         261 :             maNewWork.wait();
      64             : 
      65         261 :             aGuard.reset(); // lock
      66             : 
      67         261 :             pRet = mpPool->popWork();
      68             :         }
      69             : 
      70         520 :         if (pRet)
      71             :         {
      72         260 :             if (!mbWorking)
      73         246 :                 mpPool->startWork();
      74         260 :             mbWorking = true;
      75             :         }
      76             : 
      77         520 :         return pRet;
      78             :     }
      79             : 
      80             :     // Why a condition per worker thread - you may ask.
      81             :     //
      82             :     // Unfortunately the Windows synchronisation API that we wrap
      83             :     // is horribly inadequate cf.
      84             :     //    http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
      85             :     // The existing osl::Condition API should only ever be used
      86             :     // between one producer and one consumer thread to avoid the
      87             :     // lost wakeup problem.
      88             : 
      89         880 :     void signalNewWork()
      90             :     {
      91         880 :         maNewWork.set();
      92         880 :     }
      93             : };
      94             : 
      95         128 : ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
      96             :     mnThreadsWorking( 0 ),
      97         128 :     mbTerminate( false )
      98             : {
      99         388 :     for( sal_Int32 i = 0; i < nWorkers; i++ )
     100         260 :         maWorkers.push_back( new ThreadWorker( this ) );
     101             : 
     102         128 :     maTasksComplete.reset();
     103             : 
     104         128 :     osl::MutexGuard aGuard( maGuard );
     105         388 :     for( size_t i = 0; i < maWorkers.size(); i++ )
     106         388 :         maWorkers[ i ]->launch();
     107         128 : }
     108             : 
     109         256 : ThreadPool::~ThreadPool()
     110             : {
     111         128 :     waitAndCleanupWorkers();
     112         128 : }
     113             : 
     114             : struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >,
     115             :                                                       ThreadPoolStatic >
     116             : {
     117           0 :     boost::shared_ptr< ThreadPool > operator () () {
     118           0 :         sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U );
     119           0 :         return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) );
     120             :     };
     121             : };
     122             : 
     123           0 : ThreadPool& ThreadPool::getSharedOptimalPool()
     124             : {
     125           0 :     return *ThreadPoolStatic::get().get();
     126             : }
     127             : 
     128         128 : void ThreadPool::waitAndCleanupWorkers()
     129             : {
     130         128 :     waitUntilEmpty();
     131             : 
     132         128 :     osl::ResettableMutexGuard aGuard( maGuard );
     133         128 :     mbTerminate = true;
     134             : 
     135         516 :     while( !maWorkers.empty() )
     136             :     {
     137         260 :         rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
     138         260 :         maWorkers.pop_back();
     139             :         assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
     140             :                 == maWorkers.end());
     141         260 :         xWorker->signalNewWork();
     142         260 :         aGuard.clear();
     143             :         { // unlocked
     144         260 :             xWorker->join();
     145         260 :             xWorker.clear();
     146             :         }
     147         260 :         aGuard.reset();
     148         388 :     }
     149         128 : }
     150             : 
     151         260 : void ThreadPool::pushTask( ThreadTask *pTask )
     152             : {
     153         260 :     osl::MutexGuard aGuard( maGuard );
     154         260 :     maTasks.insert( maTasks.begin(), pTask );
     155             : 
     156             :     // horrible beyond belief:
     157         880 :     for( size_t i = 0; i < maWorkers.size(); i++ )
     158         620 :         maWorkers[ i ]->signalNewWork();
     159         260 :     maTasksComplete.reset();
     160         260 : }
     161             : 
     162         781 : ThreadTask *ThreadPool::popWork()
     163             : {
     164         781 :     if( !maTasks.empty() )
     165             :     {
     166         260 :         ThreadTask *pTask = maTasks.back();
     167         260 :         maTasks.pop_back();
     168         260 :         return pTask;
     169             :     }
     170             :     else
     171         521 :         return NULL;
     172             : }
     173             : 
     174         246 : void ThreadPool::startWork()
     175             : {
     176         246 :     mnThreadsWorking++;
     177         246 : }
     178             : 
     179         246 : void ThreadPool::stopWork()
     180             : {
     181             :     assert( mnThreadsWorking > 0 );
     182         246 :     if ( --mnThreadsWorking == 0 )
     183         128 :         maTasksComplete.set();
     184         246 : }
     185             : 
     186         256 : void ThreadPool::waitUntilEmpty()
     187             : {
     188         256 :     osl::ResettableMutexGuard aGuard( maGuard );
     189             : 
     190         256 :     if( maWorkers.empty() )
     191             :     { // no threads at all -> execute the work in-line
     192             :         ThreadTask *pTask;
     193           0 :         while ( ( pTask = popWork() ) )
     194             :         {
     195           0 :             pTask->doWork();
     196           0 :             delete pTask;
     197             :         }
     198             :     }
     199             :     else
     200             :     {
     201         256 :         aGuard.clear();
     202         256 :         maTasksComplete.wait();
     203         256 :         aGuard.reset();
     204             :     }
     205         256 :     assert( maTasks.empty() );
     206         256 : }
     207             : 
     208             : } // namespace comphelper
     209             : 
     210             : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */

Generated by: LCOV version 1.10