LCOV - code coverage report
Current view: top level - comphelper/source/misc - threadpool.cxx (source / functions) Hit Total Coverage
Test: commit c8344322a7af75b84dd3ca8f78b05543a976dfd5 Lines: 91 94 96.8 %
Date: 2015-06-13 12:38:46 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
       2             : /*
       3             :  * This file is part of the LibreOffice project.
       4             :  *
       5             :  * This Source Code Form is subject to the terms of the Mozilla Public
       6             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       7             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       8             :  */
       9             : 
      10             : #include <comphelper/threadpool.hxx>
      11             : 
      12             : #include <rtl/instance.hxx>
      13             : #include <boost/shared_ptr.hpp>
      14             : #include <thread>
      15             : #include <algorithm>
      16             : 
      17             : namespace comphelper {
      18             : 
      19        3098 : class ThreadPool::ThreadWorker : public salhelper::Thread
      20             : {
      21             :     ThreadPool    *mpPool;
      22             :     osl::Condition maNewWork;
      23             :     bool           mbWorking;
      24             : public:
      25             : 
      26        1549 :     explicit ThreadWorker( ThreadPool *pPool ) :
      27             :         salhelper::Thread("thread-pool"),
      28             :         mpPool( pPool ),
      29        1549 :         mbWorking( false )
      30             :     {
      31        1549 :     }
      32             : 
      33        1549 :     virtual void execute() SAL_OVERRIDE
      34             :     {
      35             :         ThreadTask *pTask;
      36        6480 :         while ( ( pTask = waitForWork() ) )
      37             :         {
      38        3379 :             pTask->doWork();
      39        3381 :             delete pTask;
      40             :         }
      41        1549 :     }
      42             : 
      43        4931 :     ThreadTask *waitForWork()
      44             :     {
      45        4931 :         ThreadTask *pRet = NULL;
      46             : 
      47        4931 :         osl::ResettableMutexGuard aGuard( mpPool->maGuard );
      48             : 
      49        4931 :         pRet = mpPool->popWork();
      50             : 
      51       49322 :         while( !pRet )
      52             :         {
      53       41009 :             if (mbWorking)
      54        2224 :                 mpPool->stopWork();
      55       41009 :             mbWorking = false;
      56       41009 :             maNewWork.reset();
      57             : 
      58       41009 :             if( mpPool->mbTerminate )
      59        1549 :                 break;
      60             : 
      61       39460 :             aGuard.clear(); // unlock
      62             : 
      63       39455 :             maNewWork.wait();
      64             : 
      65       39442 :             aGuard.reset(); // lock
      66             : 
      67       39460 :             pRet = mpPool->popWork();
      68             :         }
      69             : 
      70        4931 :         if (pRet)
      71             :         {
      72        3382 :             if (!mbWorking)
      73        2224 :                 mpPool->startWork();
      74        3382 :             mbWorking = true;
      75             :         }
      76             : 
      77        4931 :         return pRet;
      78             :     }
      79             : 
      80             :     // Why a condition per worker thread - you may ask.
      81             :     //
      82             :     // Unfortunately the Windows synchronisation API that we wrap
      83             :     // is horribly inadequate cf.
      84             :     //    http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
      85             :     // The existing osl::Condition API should only ever be used
      86             :     // between one producer and one consumer thread to avoid the
      87             :     // lost wakeup problem.
      88             : 
      89      101792 :     void signalNewWork()
      90             :     {
      91      101792 :         maNewWork.set();
      92      101792 :     }
      93             : };
      94             : 
      95         185 : ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
      96             :     mnThreadsWorking( 0 ),
      97         185 :     mbTerminate( false )
      98             : {
      99        1734 :     for( sal_Int32 i = 0; i < nWorkers; i++ )
     100        1549 :         maWorkers.push_back( new ThreadWorker( this ) );
     101             : 
     102         185 :     maTasksComplete.set();
     103             : 
     104         185 :     osl::MutexGuard aGuard( maGuard );
     105        1734 :     for( size_t i = 0; i < maWorkers.size(); i++ )
     106        1734 :         maWorkers[ i ]->launch();
     107         185 : }
     108             : 
     109         410 : ThreadPool::~ThreadPool()
     110             : {
     111         185 :     waitAndCleanupWorkers();
     112         225 : }
     113             : 
     114             : struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >,
     115             :                                                       ThreadPoolStatic >
     116             : {
     117          40 :     boost::shared_ptr< ThreadPool > operator () () {
     118          40 :         sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U );
     119          40 :         return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) );
     120             :     };
     121             : };
     122             : 
     123         931 : ThreadPool& ThreadPool::getSharedOptimalPool()
     124             : {
     125         931 :     return *ThreadPoolStatic::get().get();
     126             : }
     127             : 
     128         185 : void ThreadPool::waitAndCleanupWorkers()
     129             : {
     130         185 :     waitUntilEmpty();
     131             : 
     132         185 :     osl::ResettableMutexGuard aGuard( maGuard );
     133         185 :     mbTerminate = true;
     134             : 
     135        1919 :     while( !maWorkers.empty() )
     136             :     {
     137        1549 :         rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
     138        1549 :         maWorkers.pop_back();
     139             :         assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
     140             :                 == maWorkers.end());
     141        1549 :         xWorker->signalNewWork();
     142        1549 :         aGuard.clear();
     143             :         { // unlocked
     144        1549 :             xWorker->join();
     145        1549 :             xWorker.clear();
     146             :         }
     147        1549 :         aGuard.reset();
     148        1734 :     }
     149         185 : }
     150             : 
     151        3382 : void ThreadPool::pushTask( ThreadTask *pTask )
     152             : {
     153        3382 :     osl::MutexGuard aGuard( maGuard );
     154        3382 :     maTasks.insert( maTasks.begin(), pTask );
     155             : 
     156             :     // horrible beyond belief:
     157      103625 :     for( size_t i = 0; i < maWorkers.size(); i++ )
     158      100243 :         maWorkers[ i ]->signalNewWork();
     159        3382 :     maTasksComplete.reset();
     160        3382 : }
     161             : 
     162       44391 : ThreadTask *ThreadPool::popWork()
     163             : {
     164       44391 :     if( !maTasks.empty() )
     165             :     {
     166        3382 :         ThreadTask *pTask = maTasks.back();
     167        3382 :         maTasks.pop_back();
     168        3382 :         return pTask;
     169             :     }
     170             :     else
     171       41009 :         return NULL;
     172             : }
     173             : 
     174        2224 : void ThreadPool::startWork()
     175             : {
     176        2224 :     mnThreadsWorking++;
     177        2224 : }
     178             : 
     179        2224 : void ThreadPool::stopWork()
     180             : {
     181             :     assert( mnThreadsWorking > 0 );
     182        2224 :     if ( --mnThreadsWorking == 0 )
     183        1639 :         maTasksComplete.set();
     184        2224 : }
     185             : 
     186        1261 : void ThreadPool::waitUntilEmpty()
     187             : {
     188        1261 :     osl::ResettableMutexGuard aGuard( maGuard );
     189             : 
     190        1261 :     if( maWorkers.empty() )
     191             :     { // no threads at all -> execute the work in-line
     192             :         ThreadTask *pTask;
     193           0 :         while ( ( pTask = popWork() ) )
     194             :         {
     195           0 :             pTask->doWork();
     196           0 :             delete pTask;
     197             :         }
     198             :     }
     199             :     else
     200             :     {
     201        1261 :         aGuard.clear();
     202        1261 :         maTasksComplete.wait();
     203        1261 :         aGuard.reset();
     204             :     }
     205        1261 :     assert( maTasks.empty() );
     206        1261 : }
     207             : 
     208             : } // namespace comphelper
     209             : 
     210             : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */

Generated by: LCOV version 1.11