Branch data Line data Source code
1 : : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : : /*
3 : : * This file is part of the LibreOffice project.
4 : : *
5 : : * This Source Code Form is subject to the terms of the Mozilla Public
6 : : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : : *
9 : : * This file incorporates work covered by the following license notice:
10 : : *
11 : : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : : * contributor license agreements. See the NOTICE file distributed
13 : : * with this work for additional information regarding copyright
14 : : * ownership. The ASF licenses this file to you under the Apache
15 : : * License, Version 2.0 (the "License"); you may not use this file
16 : : * except in compliance with the License. You may obtain a copy of
17 : : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : : */
19 : :
20 : : #include "sal/config.h"
21 : :
22 : : #include <boost/unordered_map.hpp>
23 : : #include <cassert>
24 : : #include <stdio.h>
25 : :
26 : : #include <osl/diagnose.h>
27 : : #include <osl/mutex.hxx>
28 : : #include <osl/thread.h>
29 : : #include <rtl/instance.hxx>
30 : :
31 : : #include <uno/threadpool.h>
32 : :
33 : : #include "threadpool.hxx"
34 : : #include "thread.hxx"
35 : :
36 : : using namespace ::std;
37 : : using namespace ::osl;
38 : :
39 : : namespace cppu_threadpool
40 : : {
41 : : struct theDisposedCallerAdmin :
42 : : public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
43 : : {
44 : 100 : DisposedCallerAdminHolder operator () () {
45 [ + - ]: 100 : return DisposedCallerAdminHolder(new DisposedCallerAdmin());
46 : : }
47 : : };
48 : :
49 : 308267 : DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
50 : : {
51 : 308267 : return theDisposedCallerAdmin::get();
52 : : }
53 : :
54 : 54 : DisposedCallerAdmin::~DisposedCallerAdmin()
55 : : {
56 : : #if OSL_DEBUG_LEVEL > 1
57 : : if( !m_lst.empty() )
58 : : {
59 : : printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
60 : : }
61 : : #endif
62 : 54 : }
63 : :
64 : 100 : void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
65 : : {
66 [ + - ]: 100 : MutexGuard guard( m_mutex );
67 [ + - ][ + - ]: 100 : m_lst.push_back( nDisposeId );
68 : 100 : }
69 : :
70 : 100 : void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
71 : : {
72 [ + - ]: 100 : MutexGuard guard( m_mutex );
73 [ # # + - ]: 200 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
[ + - ]
74 : 100 : ii != m_lst.end() ;
75 : : ++ ii )
76 : : {
77 [ + - ][ + - ]: 100 : if( (*ii) == nDisposeId )
78 : : {
79 [ + - ]: 100 : m_lst.erase( ii );
80 : 100 : break;
81 : : }
82 [ + - ]: 100 : }
83 : 100 : }
84 : :
85 : 321702 : sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
86 : : {
87 [ + - ]: 321702 : MutexGuard guard( m_mutex );
88 [ # # + - ]: 643440 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
[ - + ]
89 : 321720 : ii != m_lst.end() ;
90 : : ++ ii )
91 : : {
92 [ # # ][ # # ]: 0 : if( (*ii) == nDisposeId )
93 : : {
94 : 0 : return sal_True;
95 : : }
96 : : }
97 [ + - ]: 321720 : return sal_False;
98 : : }
99 : :
100 : :
101 : : //-------------------------------------------------------------------------------
102 : :
103 [ + - ][ + - ]: 100 : ThreadPool::ThreadPool()
[ + - ][ + - ]
[ + - ][ + - ]
104 : : {
105 [ + - ][ + - ]: 100 : m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
[ + - ]
106 : 100 : }
107 : :
108 [ + - ][ + - ]: 98 : ThreadPool::~ThreadPool()
[ + - ][ + - ]
[ + - ]
109 : : {
110 : : #if OSL_DEBUG_LEVEL > 1
111 : : if( m_mapQueue.size() )
112 : : {
113 : : printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
114 : : }
115 : : #endif
116 [ - + ]: 196 : }
117 : :
118 : 100 : void ThreadPool::dispose( sal_Int64 nDisposeId )
119 : : {
120 [ + - ]: 100 : m_DisposedCallerAdmin->dispose( nDisposeId );
121 : :
122 [ + - ]: 100 : MutexGuard guard( m_mutex );
123 [ + - ][ + + ]: 322 : for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
124 [ + - ]: 161 : ii != m_mapQueue.end();
125 : : ++ii)
126 : : {
127 [ + - ][ + + ]: 61 : if( (*ii).second.first )
128 : : {
129 [ + - ][ + - ]: 60 : (*ii).second.first->dispose( nDisposeId );
130 : : }
131 [ + - ][ + + ]: 61 : if( (*ii).second.second )
132 : : {
133 [ + - ][ + - ]: 1 : (*ii).second.second->dispose( nDisposeId );
134 : : }
135 [ + - ]: 100 : }
136 : 100 : }
137 : :
138 : 100 : void ThreadPool::destroy( sal_Int64 nDisposeId )
139 : : {
140 : 100 : m_DisposedCallerAdmin->destroy( nDisposeId );
141 : 100 : }
142 : :
143 : : /******************
144 : : * This methods lets the thread wait a certain amount of time. If within this timespan
145 : : * a new request comes in, this thread is reused. This is done only to improve performance,
146 : : * it is not required for threadpool functionality.
147 : : ******************/
148 : 307914 : void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
149 : : {
150 [ + - ]: 307914 : struct WaitingThread waitingThread;
151 [ + - ]: 307915 : waitingThread.condition = osl_createCondition();
152 [ + - ]: 307915 : waitingThread.thread = pThread;
153 : : {
154 [ + - ]: 307915 : MutexGuard guard( m_mutexWaitingThreadList );
155 [ + - ][ + - ]: 307915 : m_lstThreads.push_front( &waitingThread );
156 : : }
157 : :
158 : : // let the thread wait 2 seconds
159 : 307915 : TimeValue time = { 2 , 0 };
160 [ + - ]: 307915 : osl_waitCondition( waitingThread.condition , &time );
161 : :
162 : : {
163 [ + - ]: 307909 : MutexGuard guard ( m_mutexWaitingThreadList );
164 [ + + ]: 307915 : if( waitingThread.thread.is() )
165 : : {
166 : : // thread wasn't reused, remove it from the list
167 : : WaitingThreadList::iterator ii = find(
168 [ + - ]: 1638 : m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
169 : : OSL_ASSERT( ii != m_lstThreads.end() );
170 [ + - ]: 1638 : m_lstThreads.erase( ii );
171 [ + - ]: 307915 : }
172 : : }
173 : :
174 [ + - ][ + - ]: 307915 : osl_destroyCondition( waitingThread.condition );
175 : 307915 : }
176 : :
177 : 98 : void ThreadPool::joinWorkers()
178 : : {
179 : : {
180 [ + - ]: 98 : MutexGuard guard( m_mutexWaitingThreadList );
181 [ + + ]: 560 : for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
182 : 280 : ii != m_lstThreads.end() ;
183 : : ++ ii )
184 : : {
185 : : // wake the threads up
186 [ + - ]: 182 : osl_setCondition( (*ii)->condition );
187 [ + - ]: 98 : }
188 : : }
189 : 98 : m_aThreadAdmin.join();
190 : 98 : }
191 : :
192 : 307915 : void ThreadPool::createThread( JobQueue *pQueue ,
193 : : const ByteSequence &aThreadId,
194 : : sal_Bool bAsynchron )
195 : : {
196 : 307915 : sal_Bool bCreate = sal_True;
197 : : {
198 : : // Can a thread be reused ?
199 [ + - ]: 307915 : MutexGuard guard( m_mutexWaitingThreadList );
200 [ + + ]: 307915 : if( ! m_lstThreads.empty() )
201 : : {
202 : : // inform the thread and let it go
203 [ + - ]: 306277 : struct WaitingThread *pWaitingThread = m_lstThreads.back();
204 [ + - ]: 306277 : pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
205 [ + - ]: 306277 : pWaitingThread->thread = 0;
206 : :
207 : : // remove from list
208 [ + - ]: 306277 : m_lstThreads.pop_back();
209 : :
210 : : // let the thread go
211 [ + - ]: 306277 : osl_setCondition( pWaitingThread->condition );
212 : 306277 : bCreate = sal_False;
213 [ + - ]: 307915 : }
214 : : }
215 : :
216 [ + + ]: 307915 : if( bCreate )
217 : : {
218 : : rtl::Reference< ORequestThread > pThread(
219 [ + - ][ + - ]: 1638 : new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
[ + - ][ + - ]
[ + - ]
220 [ + - ][ + - ]: 1638 : pThread->launch();
221 : : }
222 : 307915 : }
223 : :
224 : 308219 : sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
225 : : {
226 [ + - ]: 308219 : MutexGuard guard( m_mutex );
227 : :
228 [ + - ]: 308219 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
229 : : OSL_ASSERT( ii != m_mapQueue.end() );
230 : :
231 [ + + ]: 308219 : if( bAsynchron )
232 : : {
233 [ + - ][ + - ]: 51213 : if( ! (*ii).second.second->isEmpty() )
[ + + ]
234 : : {
235 : : // another thread has put something into the queue
236 : 53 : return sal_False;
237 : : }
238 : :
239 [ + - ]: 51160 : (*ii).second.second = 0;
240 [ + - ][ + + ]: 51160 : if( (*ii).second.first )
241 : : {
242 : : // all oneway request have been processed, now
243 : : // synchronus requests may go on
244 [ + - ][ + - ]: 124 : (*ii).second.first->resume();
245 : : }
246 : : }
247 : : else
248 : : {
249 [ + - ][ + - ]: 257006 : if( ! (*ii).second.first->isEmpty() )
[ + + ]
250 : : {
251 : : // another thread has put something into the queue
252 : 52 : return sal_False;
253 : : }
254 [ + - ]: 256954 : (*ii).second.first = 0;
255 : : }
256 : :
257 [ + - ][ + + ]: 308114 : if( 0 == (*ii).second.first && 0 == (*ii).second.second )
[ + - ][ + + ]
[ + + ]
258 : : {
259 [ + - ]: 307988 : m_mapQueue.erase( ii );
260 : : }
261 : :
262 [ + - ]: 308219 : return sal_True;
263 : : }
264 : :
265 : :
266 : 476763 : void ThreadPool::addJob(
267 : : const ByteSequence &aThreadId ,
268 : : sal_Bool bAsynchron,
269 : : void *pThreadSpecificData,
270 : : RequestFun * doRequest )
271 : : {
272 : 476763 : sal_Bool bCreateThread = sal_False;
273 : 476763 : JobQueue *pQueue = 0;
274 : : {
275 [ + - ]: 476763 : MutexGuard guard( m_mutex );
276 : :
277 [ + - ]: 476763 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
278 : :
279 [ + - ][ + + ]: 476763 : if( ii == m_mapQueue.end() )
280 : : {
281 [ + - ]: 307790 : m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
282 [ + - ]: 307790 : ii = m_mapQueue.find( aThreadId );
283 : : OSL_ASSERT( ii != m_mapQueue.end() );
284 : : }
285 : :
286 [ + + ]: 476763 : if( bAsynchron )
287 : : {
288 [ + - ][ + + ]: 96434 : if( ! (*ii).second.second )
289 : : {
290 [ + - ][ + - ]: 51160 : (*ii).second.second = new JobQueue();
[ + - ]
291 : 51160 : bCreateThread = sal_True;
292 : : }
293 [ + - ]: 96434 : pQueue = (*ii).second.second;
294 : : }
295 : : else
296 : : {
297 [ + - ][ + + ]: 380329 : if( ! (*ii).second.first )
298 : : {
299 [ + - ][ + - ]: 256755 : (*ii).second.first = new JobQueue();
[ + - ]
300 : 256755 : bCreateThread = sal_True;
301 : : }
302 [ + - ]: 380329 : pQueue = (*ii).second.first;
303 : :
304 [ + - ][ + + ]: 380329 : if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
[ + - ][ + - ]
[ + + ][ + + ]
305 : : {
306 [ + - ]: 13 : pQueue->suspend();
307 : : }
308 : : }
309 [ + - ][ + - ]: 476763 : pQueue->add( pThreadSpecificData , doRequest );
310 : : }
311 : :
312 [ + + ]: 476763 : if( bCreateThread )
313 : : {
314 : 307915 : createThread( pQueue , aThreadId , bAsynchron);
315 : : }
316 : 476763 : }
317 : :
318 : 22782 : void ThreadPool::prepare( const ByteSequence &aThreadId )
319 : : {
320 [ + - ]: 22782 : MutexGuard guard( m_mutex );
321 : :
322 [ + - ]: 22782 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
323 : :
324 [ + - ][ + + ]: 22782 : if( ii == m_mapQueue.end() )
325 : : {
326 [ + - ][ + - ]: 251 : JobQueue *p = new JobQueue();
327 [ + - ]: 251 : m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
328 : : }
329 [ + - ][ + + ]: 22531 : else if( 0 == (*ii).second.first )
330 : : {
331 [ + - ][ + - ]: 1 : (*ii).second.first = new JobQueue();
[ + - ]
332 [ + - ]: 22782 : }
333 : 22782 : }
334 : :
335 : 13606 : void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
336 : : {
337 : 13606 : JobQueue *pQueue = 0;
338 : : {
339 [ + - ]: 13606 : MutexGuard guard( m_mutex );
340 : :
341 [ + - ]: 13606 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
342 : :
343 : : OSL_ASSERT( ii != m_mapQueue.end() );
344 [ + - ][ + - ]: 13606 : pQueue = (*ii).second.first;
345 : : }
346 : :
347 : : OSL_ASSERT( pQueue );
348 : 13606 : void *pReturn = pQueue->enter( nDisposeId );
349 : :
350 [ + + ]: 13606 : if( pQueue->isCallstackEmpty() )
351 : : {
352 [ + - ]: 199 : if( revokeQueue( aThreadId , sal_False) )
353 : : {
354 : : // remove queue
355 [ + - ]: 199 : delete pQueue;
356 : : }
357 : : }
358 : 13606 : return pReturn;
359 : : }
360 : : }
361 : :
362 : : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
363 : : // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
364 : : // (within the last uno_threadpool_destroy) all worker threads spawned by that
365 : : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
366 : : // must never be called from a worker thread); afterwards, the next call to
367 : : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
368 : :
369 : : using namespace cppu_threadpool;
370 : :
371 : : struct uno_ThreadPool_Equal
372 : : {
373 : 513451 : sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
374 : : {
375 : 513451 : return a == b;
376 : : }
377 : : };
378 : :
379 : : struct uno_ThreadPool_Hash
380 : : {
381 : 513553 : sal_Size operator () ( const uno_ThreadPool &a ) const
382 : : {
383 : 513553 : return (sal_Size) a;
384 : : }
385 : : };
386 : :
387 : :
388 : :
389 : : typedef ::boost::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
390 : :
391 : : static ThreadpoolHashSet *g_pThreadpoolHashSet;
392 : :
393 : : struct _uno_ThreadPool
394 : : {
395 : : sal_Int32 dummy;
396 : : };
397 : :
398 : : namespace {
399 : :
400 : 513351 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
401 : : {
402 [ + - ][ + - ]: 513351 : MutexGuard guard( Mutex::getGlobalMutex() );
403 : : assert( g_pThreadpoolHashSet != 0 );
404 [ + - ]: 513351 : ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
405 : : assert( i != g_pThreadpoolHashSet->end() );
406 [ + - ][ + - ]: 513351 : return i->second;
[ + - ]
407 : : }
408 : :
409 : : }
410 : :
411 : : extern "C" uno_ThreadPool SAL_CALL
412 : 102 : uno_threadpool_create() SAL_THROW_EXTERN_C()
413 : : {
414 [ + - ][ + - ]: 102 : MutexGuard guard( Mutex::getGlobalMutex() );
415 : 102 : ThreadPoolHolder p;
416 [ + + ]: 102 : if( ! g_pThreadpoolHashSet )
417 : : {
418 [ + - ][ + - ]: 100 : g_pThreadpoolHashSet = new ThreadpoolHashSet();
419 [ + - ][ + - ]: 100 : p = new ThreadPool;
[ + - ]
420 : : }
421 : : else
422 : : {
423 : : assert( !g_pThreadpoolHashSet->empty() );
424 [ + - ][ + - ]: 2 : p = g_pThreadpoolHashSet->begin()->second;
[ + - ]
425 : : }
426 : :
427 : : // Just ensure that the handle is unique in the process (via heap)
428 [ + - ]: 102 : uno_ThreadPool h = new struct _uno_ThreadPool;
429 [ + - ][ + - ]: 102 : g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
[ + - ]
430 [ + - ][ + - ]: 102 : return h;
431 : : }
432 : :
433 : : extern "C" void SAL_CALL
434 : 22782 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
435 : : {
436 : 22782 : sal_Sequence *pThreadId = 0;
437 : 22782 : uno_getIdOfCurrentThread( &pThreadId );
438 [ + - ][ + - ]: 22782 : getThreadPool( hPool )->prepare( pThreadId );
[ + - ]
439 : 22782 : rtl_byte_sequence_release( pThreadId );
440 : 22782 : uno_releaseIdFromCurrentThread();
441 : 22782 : }
442 : :
443 : : extern "C" void SAL_CALL
444 : 13606 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
445 : : SAL_THROW_EXTERN_C()
446 : : {
447 : 13606 : sal_Sequence *pThreadId = 0;
448 : 13606 : uno_getIdOfCurrentThread( &pThreadId );
449 : : *ppJob =
450 : : getThreadPool( hPool )->enter(
451 : : pThreadId,
452 : : sal::static_int_cast< sal_Int64 >(
453 [ + - ][ + - ]: 13606 : reinterpret_cast< sal_IntPtr >(hPool)) );
[ + - ]
454 : 13606 : rtl_byte_sequence_release( pThreadId );
455 : 13606 : uno_releaseIdFromCurrentThread();
456 : 13606 : }
457 : :
458 : : extern "C" void SAL_CALL
459 : 22782 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
460 : : {
461 : : // we might do here some tiding up in case a thread called attach but never detach
462 : 22782 : }
463 : :
464 : : extern "C" void SAL_CALL
465 : 476763 : uno_threadpool_putJob(
466 : : uno_ThreadPool hPool,
467 : : sal_Sequence *pThreadId,
468 : : void *pJob,
469 : : void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
470 : : sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
471 : : {
472 [ + - ][ + - ]: 476763 : getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
[ + - ]
473 : 476763 : }
474 : :
475 : : extern "C" void SAL_CALL
476 : 100 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
477 : : {
478 : : getThreadPool(hPool)->dispose(
479 : : sal::static_int_cast< sal_Int64 >(
480 [ + - ]: 100 : reinterpret_cast< sal_IntPtr >(hPool)) );
481 : 100 : }
482 : :
483 : : extern "C" void SAL_CALL
484 : 100 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
485 : : {
486 [ + - ]: 100 : ThreadPoolHolder p( getThreadPool(hPool) );
487 : : p->destroy(
488 : : sal::static_int_cast< sal_Int64 >(
489 [ + - ]: 100 : reinterpret_cast< sal_IntPtr >(hPool)) );
490 : :
491 : : bool empty;
492 : : {
493 : : OSL_ASSERT( g_pThreadpoolHashSet );
494 : :
495 [ + - ][ + - ]: 100 : MutexGuard guard( Mutex::getGlobalMutex() );
496 : :
497 [ + - ]: 100 : ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
498 : : OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
499 [ + - ]: 100 : g_pThreadpoolHashSet->erase( ii );
500 : 100 : delete hPool;
501 : :
502 : 100 : empty = g_pThreadpoolHashSet->empty();
503 [ + + ]: 100 : if( empty )
504 : : {
505 [ + - ][ + - ]: 98 : delete g_pThreadpoolHashSet;
506 : 98 : g_pThreadpoolHashSet = 0;
507 [ + - ]: 100 : }
508 : : }
509 : :
510 [ + + ]: 100 : if( empty )
511 : : {
512 [ + - ]: 98 : p->joinWorkers();
513 [ + - ]: 100 : }
514 : 100 : }
515 : :
516 : : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|