LCOV - code coverage report
Current view: top level - cppu/source/threadpool - threadpool.cxx (source / functions) Hit Total Coverage
Test: commit c8344322a7af75b84dd3ca8f78b05543a976dfd5 Lines: 190 192 99.0 %
Date: 2015-06-13 12:38:46 Functions: 28 28 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
       2             : /*
       3             :  * This file is part of the LibreOffice project.
       4             :  *
       5             :  * This Source Code Form is subject to the terms of the Mozilla Public
       6             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       7             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       8             :  *
       9             :  * This file incorporates work covered by the following license notice:
      10             :  *
      11             :  *   Licensed to the Apache Software Foundation (ASF) under one or more
      12             :  *   contributor license agreements. See the NOTICE file distributed
      13             :  *   with this work for additional information regarding copyright
      14             :  *   ownership. The ASF licenses this file to you under the Apache
      15             :  *   License, Version 2.0 (the "License"); you may not use this file
      16             :  *   except in compliance with the License. You may obtain a copy of
      17             :  *   the License at http://www.apache.org/licenses/LICENSE-2.0 .
      18             :  */
      19             : 
      20             : #include "sal/config.h"
      21             : 
      22             : #include <cassert>
      23             : #include <unordered_map>
      24             : 
      25             : #include <osl/diagnose.h>
      26             : #include <osl/mutex.hxx>
      27             : #include <osl/thread.h>
      28             : #include <rtl/instance.hxx>
      29             : #include <sal/log.hxx>
      30             : 
      31             : #include <uno/threadpool.h>
      32             : 
      33             : #include "threadpool.hxx"
      34             : #include "thread.hxx"
      35             : 
      36             : using namespace ::std;
      37             : using namespace ::osl;
      38             : using namespace ::rtl;
      39             : 
      40             : namespace cppu_threadpool
      41             : {
      42             :     struct theDisposedCallerAdmin :
      43             :         public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
      44             :     {
      45          65 :         DisposedCallerAdminHolder operator () () {
      46          65 :             return DisposedCallerAdminHolder(new DisposedCallerAdmin());
      47             :         }
      48             :     };
      49             : 
      50      232210 :     DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
      51             :     {
      52      232210 :         return theDisposedCallerAdmin::get();
      53             :     }
      54             : 
      55          30 :     DisposedCallerAdmin::~DisposedCallerAdmin()
      56             :     {
      57             :         SAL_WARN_IF( !m_lst.empty(), "cppu.threadpool", "DisposedCallerList :  " << m_lst.size() << " left\n");
      58          30 :     }
      59             : 
      60          66 :     void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
      61             :     {
      62          66 :         MutexGuard guard( m_mutex );
      63          66 :         m_lst.push_back( nDisposeId );
      64          66 :     }
      65             : 
      66          66 :     void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
      67             :     {
      68          66 :         MutexGuard guard( m_mutex );
      69         198 :         for( DisposedCallerList::iterator ii = m_lst.begin() ;
      70         132 :              ii != m_lst.end() ;
      71             :              ++ ii )
      72             :         {
      73          66 :             if( (*ii) == nDisposeId )
      74             :             {
      75          66 :                 m_lst.erase( ii );
      76          66 :                 break;
      77             :             }
      78          66 :         }
      79          66 :     }
      80             : 
      81      239311 :     bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
      82             :     {
      83      239311 :         MutexGuard guard( m_mutex );
      84      718023 :         for( DisposedCallerList::iterator ii = m_lst.begin() ;
      85      478682 :              ii != m_lst.end() ;
      86             :              ++ ii )
      87             :         {
      88           0 :             if( (*ii) == nDisposeId )
      89             :             {
      90           0 :                 return true;
      91             :             }
      92             :         }
      93      239341 :         return false;
      94             :     }
      95             : 
      96             : 
      97             : 
      98             : 
      99          65 :     ThreadPool::ThreadPool()
     100             :     {
     101          65 :         m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
     102          65 :     }
     103             : 
     104         130 :     ThreadPool::~ThreadPool()
     105             :     {
     106             :         SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap:  " << m_mapQueue.size() << " left\n");
     107         130 :     }
     108             : 
     109          66 :     void ThreadPool::dispose( sal_Int64 nDisposeId )
     110             :     {
     111          66 :         m_DisposedCallerAdmin->dispose( nDisposeId );
     112             : 
     113          66 :         MutexGuard guard( m_mutex );
     114         327 :         for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
     115         218 :              ii != m_mapQueue.end();
     116             :              ++ii)
     117             :         {
     118          43 :             if( (*ii).second.first )
     119             :             {
     120          42 :                 (*ii).second.first->dispose( nDisposeId );
     121             :             }
     122          43 :             if( (*ii).second.second )
     123             :             {
     124           1 :                 (*ii).second.second->dispose( nDisposeId );
     125             :             }
     126          66 :         }
     127          66 :     }
     128             : 
     129          66 :     void ThreadPool::destroy( sal_Int64 nDisposeId )
     130             :     {
     131          66 :         m_DisposedCallerAdmin->destroy( nDisposeId );
     132          66 :     }
     133             : 
     134             :     /******************
     135             :      * This methods lets the thread wait a certain amount of time. If within this timespan
     136             :      * a new request comes in, this thread is reused. This is done only to improve performance,
     137             :      * it is not required for threadpool functionality.
     138             :      ******************/
     139      231989 :     void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
     140             :     {
     141      231989 :         struct WaitingThread waitingThread;
     142      231989 :         waitingThread.condition = osl_createCondition();
     143      231989 :         waitingThread.thread = pThread;
     144             :         {
     145      231989 :             MutexGuard guard( m_mutexWaitingThreadList );
     146      231989 :             m_lstThreads.push_front( &waitingThread );
     147             :         }
     148             : 
     149             :         // let the thread wait 2 seconds
     150      231989 :         TimeValue time = { 2 , 0 };
     151      231989 :         osl_waitCondition( waitingThread.condition , &time );
     152             : 
     153             :         {
     154      231932 :             MutexGuard guard ( m_mutexWaitingThreadList );
     155      231989 :             if( waitingThread.thread.is() )
     156             :             {
     157             :                 // thread wasn't reused, remove it from the list
     158             :                 WaitingThreadList::iterator ii = find(
     159         907 :                     m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
     160             :                 OSL_ASSERT( ii != m_lstThreads.end() );
     161         907 :                 m_lstThreads.erase( ii );
     162      231989 :             }
     163             :         }
     164             : 
     165      231989 :         osl_destroyCondition( waitingThread.condition );
     166      231989 :     }
     167             : 
     168          65 :     void ThreadPool::joinWorkers()
     169             :     {
     170             :         {
     171          65 :             MutexGuard guard( m_mutexWaitingThreadList );
     172         576 :             for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
     173         384 :                  ii != m_lstThreads.end() ;
     174             :                  ++ ii )
     175             :             {
     176             :                 // wake the threads up
     177         127 :                 osl_setCondition( (*ii)->condition );
     178          65 :             }
     179             :         }
     180          65 :         m_aThreadAdmin.join();
     181          65 :     }
     182             : 
     183      231989 :     bool ThreadPool::createThread( JobQueue *pQueue ,
     184             :                                    const ByteSequence &aThreadId,
     185             :                                    bool bAsynchron )
     186             :     {
     187             :         {
     188             :             // Can a thread be reused ?
     189      231989 :             MutexGuard guard( m_mutexWaitingThreadList );
     190      231989 :             if( ! m_lstThreads.empty() )
     191             :             {
     192             :                 // inform the thread and let it go
     193      231082 :                 struct WaitingThread *pWaitingThread = m_lstThreads.back();
     194      231082 :                 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
     195      231082 :                 pWaitingThread->thread = 0;
     196             : 
     197             :                 // remove from list
     198      231082 :                 m_lstThreads.pop_back();
     199             : 
     200             :                 // let the thread go
     201      231082 :                 osl_setCondition( pWaitingThread->condition );
     202      231082 :                 return true;
     203         907 :             }
     204             :         }
     205             : 
     206             :         rtl::Reference< ORequestThread > pThread(
     207         907 :             new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
     208         907 :         return pThread->launch();
     209             :     }
     210             : 
     211      232239 :     bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
     212             :     {
     213      232239 :         MutexGuard guard( m_mutex );
     214             : 
     215      232239 :         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     216             :         OSL_ASSERT( ii != m_mapQueue.end() );
     217             : 
     218      232239 :         if( bAsynchron )
     219             :         {
     220       32081 :             if( ! (*ii).second.second->isEmpty() )
     221             :             {
     222             :                 // another thread has put something into the queue
     223         125 :                 return false;
     224             :             }
     225             : 
     226       31956 :             (*ii).second.second = 0;
     227       31956 :             if( (*ii).second.first )
     228             :             {
     229             :                 // all oneway request have been processed, now
     230             :                 // synchronus requests may go on
     231          57 :                 (*ii).second.first->resume();
     232             :             }
     233             :         }
     234             :         else
     235             :         {
     236      200158 :             if( ! (*ii).second.first->isEmpty() )
     237             :             {
     238             :                 // another thread has put something into the queue
     239           7 :                 return false;
     240             :             }
     241      200151 :             (*ii).second.first = 0;
     242             :         }
     243             : 
     244      232107 :         if( 0 == (*ii).second.first && 0 == (*ii).second.second )
     245             :         {
     246      232050 :             m_mapQueue.erase( ii );
     247             :         }
     248             : 
     249      232107 :         return true;
     250             :     }
     251             : 
     252             : 
     253      266747 :     bool ThreadPool::addJob(
     254             :         const ByteSequence &aThreadId ,
     255             :         bool bAsynchron,
     256             :         void *pThreadSpecificData,
     257             :         RequestFun * doRequest )
     258             :     {
     259      266747 :         bool bCreateThread = false;
     260      266747 :         JobQueue *pQueue = 0;
     261             :         {
     262      266747 :             MutexGuard guard( m_mutex );
     263             : 
     264      266747 :             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     265             : 
     266      266747 :             if( ii == m_mapQueue.end() )
     267             :             {
     268      231933 :                 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( nullptr , nullptr );
     269      231933 :                 ii = m_mapQueue.find( aThreadId );
     270             :                 OSL_ASSERT( ii != m_mapQueue.end() );
     271             :             }
     272             : 
     273      266747 :             if( bAsynchron )
     274             :             {
     275       52266 :                 if( ! (*ii).second.second )
     276             :                 {
     277       31956 :                     (*ii).second.second = new JobQueue();
     278       31956 :                     bCreateThread = true;
     279             :                 }
     280       52266 :                 pQueue = (*ii).second.second;
     281             :             }
     282             :             else
     283             :             {
     284      214481 :                 if( ! (*ii).second.first )
     285             :                 {
     286      200033 :                     (*ii).second.first = new JobQueue();
     287      200033 :                     bCreateThread = true;
     288             :                 }
     289      214481 :                 pQueue = (*ii).second.first;
     290             : 
     291      214481 :                 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
     292             :                 {
     293          13 :                     pQueue->suspend();
     294             :                 }
     295             :             }
     296      266747 :             pQueue->add( pThreadSpecificData , doRequest );
     297             :         }
     298             : 
     299      266747 :         return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
     300             :     }
     301             : 
     302       11879 :     void ThreadPool::prepare( const ByteSequence &aThreadId )
     303             :     {
     304       11879 :         MutexGuard guard( m_mutex );
     305             : 
     306       11879 :         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     307             : 
     308       11879 :         if( ii == m_mapQueue.end() )
     309             :         {
     310         155 :             JobQueue *p = new JobQueue();
     311         155 :             m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , nullptr );
     312             :         }
     313       11724 :         else if( 0 == (*ii).second.first )
     314             :         {
     315           1 :             (*ii).second.first = new JobQueue();
     316       11879 :         }
     317       11879 :     }
     318             : 
     319        7140 :     void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
     320             :     {
     321        7140 :         JobQueue *pQueue = 0;
     322             :         {
     323        7140 :             MutexGuard guard( m_mutex );
     324             : 
     325        7140 :             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     326             : 
     327             :             OSL_ASSERT( ii != m_mapQueue.end() );
     328        7140 :             pQueue = (*ii).second.first;
     329             :         }
     330             : 
     331             :         OSL_ASSERT( pQueue );
     332        7140 :         void *pReturn = pQueue->enter( nDisposeId );
     333             : 
     334        7140 :         if( pQueue->isCallstackEmpty() )
     335             :         {
     336         118 :             if( revokeQueue( aThreadId , false) )
     337             :             {
     338             :                 // remove queue
     339         118 :                 delete pQueue;
     340             :             }
     341             :         }
     342        7140 :         return pReturn;
     343             :     }
     344             : }
     345             : 
     346             : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
     347             : // spans share one ThreadPool instance.  When g_pThreadpoolHashSet becomes empty
     348             : // (within the last uno_threadpool_destroy) all worker threads spawned by that
     349             : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
     350             : // must never be called from a worker thread); afterwards, the next call to
     351             : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
     352             : 
     353             : using namespace cppu_threadpool;
     354             : 
     355             : struct uno_ThreadPool_Equal
     356             : {
     357      285964 :     bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
     358             :         {
     359      285964 :             return a == b;
     360             :         }
     361             : };
     362             : 
     363             : struct uno_ThreadPool_Hash
     364             : {
     365      286030 :     sal_Size operator () ( const uno_ThreadPool &a  )  const
     366             :         {
     367      286030 :             return reinterpret_cast<sal_Size>( a );
     368             :         }
     369             : };
     370             : 
     371             : 
     372             : 
     373             : typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
     374             : 
     375             : static ThreadpoolHashSet *g_pThreadpoolHashSet;
     376             : 
     377             : struct _uno_ThreadPool
     378             : {
     379             :     sal_Int32 dummy;
     380             : };
     381             : 
     382             : namespace {
     383             : 
     384      285898 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
     385             : {
     386      285898 :     MutexGuard guard( Mutex::getGlobalMutex() );
     387             :     assert( g_pThreadpoolHashSet != 0 );
     388      285898 :     ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
     389             :     assert( i != g_pThreadpoolHashSet->end() );
     390      285898 :     return i->second;
     391             : }
     392             : 
     393             : }
     394             : 
     395             : extern "C" uno_ThreadPool SAL_CALL
     396          66 : uno_threadpool_create() SAL_THROW_EXTERN_C()
     397             : {
     398          66 :     MutexGuard guard( Mutex::getGlobalMutex() );
     399         132 :     ThreadPoolHolder p;
     400          66 :     if( ! g_pThreadpoolHashSet )
     401             :     {
     402          65 :         g_pThreadpoolHashSet = new ThreadpoolHashSet();
     403          65 :         p = new ThreadPool;
     404             :     }
     405             :     else
     406             :     {
     407             :         assert( !g_pThreadpoolHashSet->empty() );
     408           1 :         p = g_pThreadpoolHashSet->begin()->second;
     409             :     }
     410             : 
     411             :     // Just ensure that the handle is unique in the process (via heap)
     412          66 :     uno_ThreadPool h = new struct _uno_ThreadPool;
     413          66 :     g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
     414         132 :     return h;
     415             : }
     416             : 
     417             : extern "C" void SAL_CALL
     418       11879 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
     419             : {
     420       11879 :     sal_Sequence *pThreadId = 0;
     421       11879 :     uno_getIdOfCurrentThread( &pThreadId );
     422       11879 :     getThreadPool( hPool )->prepare( pThreadId );
     423       11879 :     rtl_byte_sequence_release( pThreadId );
     424       11879 :     uno_releaseIdFromCurrentThread();
     425       11879 : }
     426             : 
     427             : extern "C" void SAL_CALL
     428        7140 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
     429             :     SAL_THROW_EXTERN_C()
     430             : {
     431        7140 :     sal_Sequence *pThreadId = 0;
     432        7140 :     uno_getIdOfCurrentThread( &pThreadId );
     433             :     *ppJob =
     434             :         getThreadPool( hPool )->enter(
     435             :             pThreadId,
     436             :             sal::static_int_cast< sal_Int64 >(
     437        7140 :                 reinterpret_cast< sal_IntPtr >(hPool)) );
     438        7140 :     rtl_byte_sequence_release( pThreadId );
     439        7140 :     uno_releaseIdFromCurrentThread();
     440        7140 : }
     441             : 
     442             : extern "C" void SAL_CALL
     443       11879 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
     444             : {
     445             :     // we might do here some tiding up in case a thread called attach but never detach
     446       11879 : }
     447             : 
     448             : extern "C" void SAL_CALL
     449      266747 : uno_threadpool_putJob(
     450             :     uno_ThreadPool hPool,
     451             :     sal_Sequence *pThreadId,
     452             :     void *pJob,
     453             :     void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
     454             :     sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
     455             : {
     456      266747 :     if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest ))
     457             :     {
     458             :         SAL_WARN(
     459             :             "cppu",
     460             :             "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
     461             :     }
     462      266747 : }
     463             : 
     464             : extern "C" void SAL_CALL
     465          66 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
     466             : {
     467             :     getThreadPool(hPool)->dispose(
     468             :         sal::static_int_cast< sal_Int64 >(
     469          66 :             reinterpret_cast< sal_IntPtr >(hPool)) );
     470          66 : }
     471             : 
     472             : extern "C" void SAL_CALL
     473          66 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
     474             : {
     475          66 :     ThreadPoolHolder p( getThreadPool(hPool) );
     476             :     p->destroy(
     477             :         sal::static_int_cast< sal_Int64 >(
     478          66 :             reinterpret_cast< sal_IntPtr >(hPool)) );
     479             : 
     480             :     bool empty;
     481             :     {
     482             :         OSL_ASSERT( g_pThreadpoolHashSet );
     483             : 
     484          66 :         MutexGuard guard( Mutex::getGlobalMutex() );
     485             : 
     486          66 :         ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
     487             :         OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
     488          66 :         g_pThreadpoolHashSet->erase( ii );
     489          66 :         delete hPool;
     490             : 
     491          66 :         empty = g_pThreadpoolHashSet->empty();
     492          66 :         if( empty )
     493             :         {
     494          65 :             delete g_pThreadpoolHashSet;
     495          65 :             g_pThreadpoolHashSet = 0;
     496          66 :         }
     497             :     }
     498             : 
     499          66 :     if( empty )
     500             :     {
     501          65 :         p->joinWorkers();
     502          66 :     }
     503          66 : }
     504             : 
     505             : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */

Generated by: LCOV version 1.11