LCOV - code coverage report
Current view: top level - cppu/source/threadpool - threadpool.cxx (source / functions) Hit Total Coverage
Test: commit 10e77ab3ff6f4314137acd6e2702a6e5c1ce1fae Lines: 194 197 98.5 %
Date: 2014-11-03 Functions: 28 28 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
       2             : /*
       3             :  * This file is part of the LibreOffice project.
       4             :  *
       5             :  * This Source Code Form is subject to the terms of the Mozilla Public
       6             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       7             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       8             :  *
       9             :  * This file incorporates work covered by the following license notice:
      10             :  *
      11             :  *   Licensed to the Apache Software Foundation (ASF) under one or more
      12             :  *   contributor license agreements. See the NOTICE file distributed
      13             :  *   with this work for additional information regarding copyright
      14             :  *   ownership. The ASF licenses this file to you under the Apache
      15             :  *   License, Version 2.0 (the "License"); you may not use this file
      16             :  *   except in compliance with the License. You may obtain a copy of
      17             :  *   the License at http://www.apache.org/licenses/LICENSE-2.0 .
      18             :  */
      19             : 
      20             : #include "sal/config.h"
      21             : 
      22             : #include <boost/unordered_map.hpp>
      23             : #include <cassert>
      24             : #include <stdio.h>
      25             : 
      26             : #include <osl/diagnose.h>
      27             : #include <osl/mutex.hxx>
      28             : #include <osl/thread.h>
      29             : #include <rtl/instance.hxx>
      30             : 
      31             : #include <uno/threadpool.h>
      32             : 
      33             : #include "threadpool.hxx"
      34             : #include "thread.hxx"
      35             : 
      36             : using namespace ::std;
      37             : using namespace ::osl;
      38             : using namespace ::rtl;
      39             : 
      40             : namespace cppu_threadpool
      41             : {
      42             :     struct theDisposedCallerAdmin :
      43             :         public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
      44             :     {
      45         100 :         DisposedCallerAdminHolder operator () () {
      46         100 :             return DisposedCallerAdminHolder(new DisposedCallerAdmin());
      47             :         }
      48             :     };
      49             : 
      50      368406 :     DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
      51             :     {
      52      368406 :         return theDisposedCallerAdmin::get();
      53             :     }
      54             : 
      55          52 :     DisposedCallerAdmin::~DisposedCallerAdmin()
      56             :     {
      57             : #if OSL_DEBUG_LEVEL > 1
      58             :         if( !m_lst.empty() )
      59             :         {
      60             :             printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
      61             :         }
      62             : #endif
      63          52 :     }
      64             : 
      65         102 :     void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
      66             :     {
      67         102 :         MutexGuard guard( m_mutex );
      68         102 :         m_lst.push_back( nDisposeId );
      69         102 :     }
      70             : 
      71         102 :     void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
      72             :     {
      73         102 :         MutexGuard guard( m_mutex );
      74         306 :         for( DisposedCallerList::iterator ii = m_lst.begin() ;
      75         204 :              ii != m_lst.end() ;
      76             :              ++ ii )
      77             :         {
      78         102 :             if( (*ii) == nDisposeId )
      79             :             {
      80         102 :                 m_lst.erase( ii );
      81         102 :                 break;
      82             :             }
      83         102 :         }
      84         102 :     }
      85             : 
      86      382368 :     bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
      87             :     {
      88      382368 :         MutexGuard guard( m_mutex );
      89     1147107 :         for( DisposedCallerList::iterator ii = m_lst.begin() ;
      90      764738 :              ii != m_lst.end() ;
      91             :              ++ ii )
      92             :         {
      93           0 :             if( (*ii) == nDisposeId )
      94             :             {
      95           0 :                 return true;
      96             :             }
      97             :         }
      98      382369 :         return false;
      99             :     }
     100             : 
     101             : 
     102             : 
     103             : 
     104         100 :     ThreadPool::ThreadPool()
     105             :     {
     106         100 :         m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
     107         100 :     }
     108             : 
     109         200 :     ThreadPool::~ThreadPool()
     110             :     {
     111             : #if OSL_DEBUG_LEVEL > 1
     112             :         if( m_mapQueue.size() )
     113             :         {
     114             :             printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
     115             :         }
     116             : #endif
     117         200 :     }
     118             : 
     119         102 :     void ThreadPool::dispose( sal_Int64 nDisposeId )
     120             :     {
     121         102 :         m_DisposedCallerAdmin->dispose( nDisposeId );
     122             : 
     123         102 :         MutexGuard guard( m_mutex );
     124         480 :         for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
     125         320 :              ii != m_mapQueue.end();
     126             :              ++ii)
     127             :         {
     128          58 :             if( (*ii).second.first )
     129             :             {
     130          58 :                 (*ii).second.first->dispose( nDisposeId );
     131             :             }
     132          58 :             if( (*ii).second.second )
     133             :             {
     134           0 :                 (*ii).second.second->dispose( nDisposeId );
     135             :             }
     136         102 :         }
     137         102 :     }
     138             : 
     139         102 :     void ThreadPool::destroy( sal_Int64 nDisposeId )
     140             :     {
     141         102 :         m_DisposedCallerAdmin->destroy( nDisposeId );
     142         102 :     }
     143             : 
     144             :     /******************
     145             :      * This methods lets the thread wait a certain amount of time. If within this timespan
     146             :      * a new request comes in, this thread is reused. This is done only to improve performance,
     147             :      * it is not required for threadpool functionality.
     148             :      ******************/
     149      368127 :     void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
     150             :     {
     151      368127 :         struct WaitingThread waitingThread;
     152      368127 :         waitingThread.condition = osl_createCondition();
     153      368127 :         waitingThread.thread = pThread;
     154             :         {
     155      368127 :             MutexGuard guard( m_mutexWaitingThreadList );
     156      368127 :             m_lstThreads.push_front( &waitingThread );
     157             :         }
     158             : 
     159             :         // let the thread wait 2 seconds
     160      368127 :         TimeValue time = { 2 , 0 };
     161      368127 :         osl_waitCondition( waitingThread.condition , &time );
     162             : 
     163             :         {
     164      368124 :             MutexGuard guard ( m_mutexWaitingThreadList );
     165      368127 :             if( waitingThread.thread.is() )
     166             :             {
     167             :                 // thread wasn't reused, remove it from the list
     168             :                 WaitingThreadList::iterator ii = find(
     169        1667 :                     m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
     170             :                 OSL_ASSERT( ii != m_lstThreads.end() );
     171        1667 :                 m_lstThreads.erase( ii );
     172      368127 :             }
     173             :         }
     174             : 
     175      368123 :         osl_destroyCondition( waitingThread.condition );
     176      368127 :     }
     177             : 
     178         100 :     void ThreadPool::joinWorkers()
     179             :     {
     180             :         {
     181         100 :             MutexGuard guard( m_mutexWaitingThreadList );
     182         933 :             for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
     183         622 :                  ii != m_lstThreads.end() ;
     184             :                  ++ ii )
     185             :             {
     186             :                 // wake the threads up
     187         211 :                 osl_setCondition( (*ii)->condition );
     188         100 :             }
     189             :         }
     190         100 :         m_aThreadAdmin.join();
     191         100 :     }
     192             : 
     193      368127 :     void ThreadPool::createThread( JobQueue *pQueue ,
     194             :                                    const ByteSequence &aThreadId,
     195             :                                    bool bAsynchron )
     196             :     {
     197      368127 :         bool bCreate = true;
     198             :         {
     199             :             // Can a thread be reused ?
     200      368127 :             MutexGuard guard( m_mutexWaitingThreadList );
     201      368127 :             if( ! m_lstThreads.empty() )
     202             :             {
     203             :                 // inform the thread and let it go
     204      366460 :                 struct WaitingThread *pWaitingThread = m_lstThreads.back();
     205      366460 :                 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
     206      366460 :                 pWaitingThread->thread = 0;
     207             : 
     208             :                 // remove from list
     209      366460 :                 m_lstThreads.pop_back();
     210             : 
     211             :                 // let the thread go
     212      366460 :                 osl_setCondition( pWaitingThread->condition );
     213      366460 :                 bCreate = false;
     214      368127 :             }
     215             :         }
     216             : 
     217      368127 :         if( bCreate )
     218             :         {
     219             :             rtl::Reference< ORequestThread > pThread(
     220        1667 :                 new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
     221        1667 :             pThread->launch();
     222             :         }
     223      368127 :     }
     224             : 
     225      368340 :     bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
     226             :     {
     227      368340 :         MutexGuard guard( m_mutex );
     228             : 
     229      368340 :         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     230             :         OSL_ASSERT( ii != m_mapQueue.end() );
     231             : 
     232      368340 :         if( bAsynchron )
     233             :         {
     234       56378 :             if( ! (*ii).second.second->isEmpty() )
     235             :             {
     236             :                 // another thread has put something into the queue
     237          36 :                 return false;
     238             :             }
     239             : 
     240       56342 :             (*ii).second.second = 0;
     241       56342 :             if( (*ii).second.first )
     242             :             {
     243             :                 // all oneway request have been processed, now
     244             :                 // synchronus requests may go on
     245         104 :                 (*ii).second.first->resume();
     246             :             }
     247             :         }
     248             :         else
     249             :         {
     250      311962 :             if( ! (*ii).second.first->isEmpty() )
     251             :             {
     252             :                 // another thread has put something into the queue
     253          53 :                 return false;
     254             :             }
     255      311909 :             (*ii).second.first = 0;
     256             :         }
     257             : 
     258      368251 :         if( 0 == (*ii).second.first && 0 == (*ii).second.second )
     259             :         {
     260      368141 :             m_mapQueue.erase( ii );
     261             :         }
     262             : 
     263      368251 :         return true;
     264             :     }
     265             : 
     266             : 
     267      523818 :     void ThreadPool::addJob(
     268             :         const ByteSequence &aThreadId ,
     269             :         bool bAsynchron,
     270             :         void *pThreadSpecificData,
     271             :         RequestFun * doRequest )
     272             :     {
     273      523818 :         bool bCreateThread = false;
     274      523818 :         JobQueue *pQueue = 0;
     275             :         {
     276      523818 :             MutexGuard guard( m_mutex );
     277             : 
     278      523818 :             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     279             : 
     280      523818 :             if( ii == m_mapQueue.end() )
     281             :             {
     282      368018 :                 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
     283      368018 :                 ii = m_mapQueue.find( aThreadId );
     284             :                 OSL_ASSERT( ii != m_mapQueue.end() );
     285             :             }
     286             : 
     287      523818 :             if( bAsynchron )
     288             :             {
     289      117749 :                 if( ! (*ii).second.second )
     290             :                 {
     291       56342 :                     (*ii).second.second = new JobQueue();
     292       56342 :                     bCreateThread = true;
     293             :                 }
     294      117749 :                 pQueue = (*ii).second.second;
     295             :             }
     296             :             else
     297             :             {
     298      406069 :                 if( ! (*ii).second.first )
     299             :                 {
     300      311785 :                     (*ii).second.first = new JobQueue();
     301      311785 :                     bCreateThread = true;
     302             :                 }
     303      406069 :                 pQueue = (*ii).second.first;
     304             : 
     305      406069 :                 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
     306             :                 {
     307          25 :                     pQueue->suspend();
     308             :                 }
     309             :             }
     310      523818 :             pQueue->add( pThreadSpecificData , doRequest );
     311             :         }
     312             : 
     313      523818 :         if( bCreateThread )
     314             :         {
     315      368127 :             createThread( pQueue , aThreadId , bAsynchron);
     316             :         }
     317      523818 :     }
     318             : 
     319       23761 :     void ThreadPool::prepare( const ByteSequence &aThreadId )
     320             :     {
     321       23761 :         MutexGuard guard( m_mutex );
     322             : 
     323       23761 :         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     324             : 
     325       23761 :         if( ii == m_mapQueue.end() )
     326             :         {
     327         178 :             JobQueue *p = new JobQueue();
     328         178 :             m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
     329             :         }
     330       23583 :         else if( 0 == (*ii).second.first )
     331             :         {
     332           1 :             (*ii).second.first = new JobQueue();
     333       23761 :         }
     334       23761 :     }
     335             : 
     336       14142 :     void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
     337             :     {
     338       14142 :         JobQueue *pQueue = 0;
     339             :         {
     340       14142 :             MutexGuard guard( m_mutex );
     341             : 
     342       14142 :             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
     343             : 
     344             :             OSL_ASSERT( ii != m_mapQueue.end() );
     345       14142 :             pQueue = (*ii).second.first;
     346             :         }
     347             : 
     348             :         OSL_ASSERT( pQueue );
     349       14142 :         void *pReturn = pQueue->enter( nDisposeId );
     350             : 
     351       14142 :         if( pQueue->isCallstackEmpty() )
     352             :         {
     353         124 :             if( revokeQueue( aThreadId , false) )
     354             :             {
     355             :                 // remove queue
     356         124 :                 delete pQueue;
     357             :             }
     358             :         }
     359       14142 :         return pReturn;
     360             :     }
     361             : }
     362             : 
     363             : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
     364             : // spans share one ThreadPool instance.  When g_pThreadpoolHashSet becomes empty
     365             : // (within the last uno_threadpool_destroy) all worker threads spawned by that
     366             : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
     367             : // must never be called from a worker thread); afterwards, the next call to
     368             : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
     369             : 
     370             : using namespace cppu_threadpool;
     371             : 
     372             : struct uno_ThreadPool_Equal
     373             : {
     374      562027 :     bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
     375             :         {
     376      562027 :             return a == b;
     377             :         }
     378             : };
     379             : 
     380             : struct uno_ThreadPool_Hash
     381             : {
     382      562129 :     sal_Size operator () ( const uno_ThreadPool &a  )  const
     383             :         {
     384      562129 :             return reinterpret_cast<sal_Size>( a );
     385             :         }
     386             : };
     387             : 
     388             : 
     389             : 
     390             : typedef ::boost::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
     391             : 
     392             : static ThreadpoolHashSet *g_pThreadpoolHashSet;
     393             : 
     394             : struct _uno_ThreadPool
     395             : {
     396             :     sal_Int32 dummy;
     397             : };
     398             : 
     399             : namespace {
     400             : 
     401      561925 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
     402             : {
     403      561925 :     MutexGuard guard( Mutex::getGlobalMutex() );
     404             :     assert( g_pThreadpoolHashSet != 0 );
     405      561925 :     ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
     406             :     assert( i != g_pThreadpoolHashSet->end() );
     407      561925 :     return i->second;
     408             : }
     409             : 
     410             : }
     411             : 
     412             : extern "C" uno_ThreadPool SAL_CALL
     413         102 : uno_threadpool_create() SAL_THROW_EXTERN_C()
     414             : {
     415         102 :     MutexGuard guard( Mutex::getGlobalMutex() );
     416         204 :     ThreadPoolHolder p;
     417         102 :     if( ! g_pThreadpoolHashSet )
     418             :     {
     419         100 :         g_pThreadpoolHashSet = new ThreadpoolHashSet();
     420         100 :         p = new ThreadPool;
     421             :     }
     422             :     else
     423             :     {
     424             :         assert( !g_pThreadpoolHashSet->empty() );
     425           2 :         p = g_pThreadpoolHashSet->begin()->second;
     426             :     }
     427             : 
     428             :     // Just ensure that the handle is unique in the process (via heap)
     429         102 :     uno_ThreadPool h = new struct _uno_ThreadPool;
     430         102 :     g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
     431         204 :     return h;
     432             : }
     433             : 
     434             : extern "C" void SAL_CALL
     435       23761 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
     436             : {
     437       23761 :     sal_Sequence *pThreadId = 0;
     438       23761 :     uno_getIdOfCurrentThread( &pThreadId );
     439       23761 :     getThreadPool( hPool )->prepare( pThreadId );
     440       23761 :     rtl_byte_sequence_release( pThreadId );
     441       23761 :     uno_releaseIdFromCurrentThread();
     442       23761 : }
     443             : 
     444             : extern "C" void SAL_CALL
     445       14142 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
     446             :     SAL_THROW_EXTERN_C()
     447             : {
     448       14142 :     sal_Sequence *pThreadId = 0;
     449       14142 :     uno_getIdOfCurrentThread( &pThreadId );
     450             :     *ppJob =
     451             :         getThreadPool( hPool )->enter(
     452             :             pThreadId,
     453             :             sal::static_int_cast< sal_Int64 >(
     454       14142 :                 reinterpret_cast< sal_IntPtr >(hPool)) );
     455       14142 :     rtl_byte_sequence_release( pThreadId );
     456       14142 :     uno_releaseIdFromCurrentThread();
     457       14142 : }
     458             : 
     459             : extern "C" void SAL_CALL
     460       23761 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
     461             : {
     462             :     // we might do here some tiding up in case a thread called attach but never detach
     463       23761 : }
     464             : 
     465             : extern "C" void SAL_CALL
     466      523818 : uno_threadpool_putJob(
     467             :     uno_ThreadPool hPool,
     468             :     sal_Sequence *pThreadId,
     469             :     void *pJob,
     470             :     void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
     471             :     sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
     472             : {
     473      523818 :     getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
     474      523818 : }
     475             : 
     476             : extern "C" void SAL_CALL
     477         102 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
     478             : {
     479             :     getThreadPool(hPool)->dispose(
     480             :         sal::static_int_cast< sal_Int64 >(
     481         102 :             reinterpret_cast< sal_IntPtr >(hPool)) );
     482         102 : }
     483             : 
     484             : extern "C" void SAL_CALL
     485         102 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
     486             : {
     487         102 :     ThreadPoolHolder p( getThreadPool(hPool) );
     488             :     p->destroy(
     489             :         sal::static_int_cast< sal_Int64 >(
     490         102 :             reinterpret_cast< sal_IntPtr >(hPool)) );
     491             : 
     492             :     bool empty;
     493             :     {
     494             :         OSL_ASSERT( g_pThreadpoolHashSet );
     495             : 
     496         102 :         MutexGuard guard( Mutex::getGlobalMutex() );
     497             : 
     498         102 :         ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
     499             :         OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
     500         102 :         g_pThreadpoolHashSet->erase( ii );
     501         102 :         delete hPool;
     502             : 
     503         102 :         empty = g_pThreadpoolHashSet->empty();
     504         102 :         if( empty )
     505             :         {
     506         100 :             delete g_pThreadpoolHashSet;
     507         100 :             g_pThreadpoolHashSet = 0;
     508         102 :         }
     509             :     }
     510             : 
     511         102 :     if( empty )
     512             :     {
     513         100 :         p->joinWorkers();
     514         102 :     }
     515         102 : }
     516             : 
     517             : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */

Generated by: LCOV version 1.10