Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 : #include "sal/config.h"
21 :
22 : #include <boost/unordered_map.hpp>
23 : #include <cassert>
24 : #include <stdio.h>
25 :
26 : #include <osl/diagnose.h>
27 : #include <osl/mutex.hxx>
28 : #include <osl/thread.h>
29 : #include <rtl/instance.hxx>
30 :
31 : #include <uno/threadpool.h>
32 :
33 : #include "threadpool.hxx"
34 : #include "thread.hxx"
35 :
36 : using namespace ::std;
37 : using namespace ::osl;
38 : using namespace ::rtl;
39 :
40 : namespace cppu_threadpool
41 : {
42 : struct theDisposedCallerAdmin :
43 : public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
44 : {
45 100 : DisposedCallerAdminHolder operator () () {
46 100 : return DisposedCallerAdminHolder(new DisposedCallerAdmin());
47 : }
48 : };
49 :
50 368406 : DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
51 : {
52 368406 : return theDisposedCallerAdmin::get();
53 : }
54 :
55 52 : DisposedCallerAdmin::~DisposedCallerAdmin()
56 : {
57 : #if OSL_DEBUG_LEVEL > 1
58 : if( !m_lst.empty() )
59 : {
60 : printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
61 : }
62 : #endif
63 52 : }
64 :
65 102 : void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
66 : {
67 102 : MutexGuard guard( m_mutex );
68 102 : m_lst.push_back( nDisposeId );
69 102 : }
70 :
71 102 : void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
72 : {
73 102 : MutexGuard guard( m_mutex );
74 306 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
75 204 : ii != m_lst.end() ;
76 : ++ ii )
77 : {
78 102 : if( (*ii) == nDisposeId )
79 : {
80 102 : m_lst.erase( ii );
81 102 : break;
82 : }
83 102 : }
84 102 : }
85 :
86 382368 : bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
87 : {
88 382368 : MutexGuard guard( m_mutex );
89 1147107 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
90 764738 : ii != m_lst.end() ;
91 : ++ ii )
92 : {
93 0 : if( (*ii) == nDisposeId )
94 : {
95 0 : return true;
96 : }
97 : }
98 382369 : return false;
99 : }
100 :
101 :
102 :
103 :
104 100 : ThreadPool::ThreadPool()
105 : {
106 100 : m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
107 100 : }
108 :
109 200 : ThreadPool::~ThreadPool()
110 : {
111 : #if OSL_DEBUG_LEVEL > 1
112 : if( m_mapQueue.size() )
113 : {
114 : printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
115 : }
116 : #endif
117 200 : }
118 :
119 102 : void ThreadPool::dispose( sal_Int64 nDisposeId )
120 : {
121 102 : m_DisposedCallerAdmin->dispose( nDisposeId );
122 :
123 102 : MutexGuard guard( m_mutex );
124 480 : for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
125 320 : ii != m_mapQueue.end();
126 : ++ii)
127 : {
128 58 : if( (*ii).second.first )
129 : {
130 58 : (*ii).second.first->dispose( nDisposeId );
131 : }
132 58 : if( (*ii).second.second )
133 : {
134 0 : (*ii).second.second->dispose( nDisposeId );
135 : }
136 102 : }
137 102 : }
138 :
139 102 : void ThreadPool::destroy( sal_Int64 nDisposeId )
140 : {
141 102 : m_DisposedCallerAdmin->destroy( nDisposeId );
142 102 : }
143 :
144 : /******************
145 : * This methods lets the thread wait a certain amount of time. If within this timespan
146 : * a new request comes in, this thread is reused. This is done only to improve performance,
147 : * it is not required for threadpool functionality.
148 : ******************/
149 368127 : void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
150 : {
151 368127 : struct WaitingThread waitingThread;
152 368127 : waitingThread.condition = osl_createCondition();
153 368127 : waitingThread.thread = pThread;
154 : {
155 368127 : MutexGuard guard( m_mutexWaitingThreadList );
156 368127 : m_lstThreads.push_front( &waitingThread );
157 : }
158 :
159 : // let the thread wait 2 seconds
160 368127 : TimeValue time = { 2 , 0 };
161 368127 : osl_waitCondition( waitingThread.condition , &time );
162 :
163 : {
164 368124 : MutexGuard guard ( m_mutexWaitingThreadList );
165 368127 : if( waitingThread.thread.is() )
166 : {
167 : // thread wasn't reused, remove it from the list
168 : WaitingThreadList::iterator ii = find(
169 1667 : m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
170 : OSL_ASSERT( ii != m_lstThreads.end() );
171 1667 : m_lstThreads.erase( ii );
172 368127 : }
173 : }
174 :
175 368123 : osl_destroyCondition( waitingThread.condition );
176 368127 : }
177 :
178 100 : void ThreadPool::joinWorkers()
179 : {
180 : {
181 100 : MutexGuard guard( m_mutexWaitingThreadList );
182 933 : for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
183 622 : ii != m_lstThreads.end() ;
184 : ++ ii )
185 : {
186 : // wake the threads up
187 211 : osl_setCondition( (*ii)->condition );
188 100 : }
189 : }
190 100 : m_aThreadAdmin.join();
191 100 : }
192 :
193 368127 : void ThreadPool::createThread( JobQueue *pQueue ,
194 : const ByteSequence &aThreadId,
195 : bool bAsynchron )
196 : {
197 368127 : bool bCreate = true;
198 : {
199 : // Can a thread be reused ?
200 368127 : MutexGuard guard( m_mutexWaitingThreadList );
201 368127 : if( ! m_lstThreads.empty() )
202 : {
203 : // inform the thread and let it go
204 366460 : struct WaitingThread *pWaitingThread = m_lstThreads.back();
205 366460 : pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
206 366460 : pWaitingThread->thread = 0;
207 :
208 : // remove from list
209 366460 : m_lstThreads.pop_back();
210 :
211 : // let the thread go
212 366460 : osl_setCondition( pWaitingThread->condition );
213 366460 : bCreate = false;
214 368127 : }
215 : }
216 :
217 368127 : if( bCreate )
218 : {
219 : rtl::Reference< ORequestThread > pThread(
220 1667 : new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
221 1667 : pThread->launch();
222 : }
223 368127 : }
224 :
225 368340 : bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
226 : {
227 368340 : MutexGuard guard( m_mutex );
228 :
229 368340 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
230 : OSL_ASSERT( ii != m_mapQueue.end() );
231 :
232 368340 : if( bAsynchron )
233 : {
234 56378 : if( ! (*ii).second.second->isEmpty() )
235 : {
236 : // another thread has put something into the queue
237 36 : return false;
238 : }
239 :
240 56342 : (*ii).second.second = 0;
241 56342 : if( (*ii).second.first )
242 : {
243 : // all oneway request have been processed, now
244 : // synchronus requests may go on
245 104 : (*ii).second.first->resume();
246 : }
247 : }
248 : else
249 : {
250 311962 : if( ! (*ii).second.first->isEmpty() )
251 : {
252 : // another thread has put something into the queue
253 53 : return false;
254 : }
255 311909 : (*ii).second.first = 0;
256 : }
257 :
258 368251 : if( 0 == (*ii).second.first && 0 == (*ii).second.second )
259 : {
260 368141 : m_mapQueue.erase( ii );
261 : }
262 :
263 368251 : return true;
264 : }
265 :
266 :
267 523818 : void ThreadPool::addJob(
268 : const ByteSequence &aThreadId ,
269 : bool bAsynchron,
270 : void *pThreadSpecificData,
271 : RequestFun * doRequest )
272 : {
273 523818 : bool bCreateThread = false;
274 523818 : JobQueue *pQueue = 0;
275 : {
276 523818 : MutexGuard guard( m_mutex );
277 :
278 523818 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
279 :
280 523818 : if( ii == m_mapQueue.end() )
281 : {
282 368018 : m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
283 368018 : ii = m_mapQueue.find( aThreadId );
284 : OSL_ASSERT( ii != m_mapQueue.end() );
285 : }
286 :
287 523818 : if( bAsynchron )
288 : {
289 117749 : if( ! (*ii).second.second )
290 : {
291 56342 : (*ii).second.second = new JobQueue();
292 56342 : bCreateThread = true;
293 : }
294 117749 : pQueue = (*ii).second.second;
295 : }
296 : else
297 : {
298 406069 : if( ! (*ii).second.first )
299 : {
300 311785 : (*ii).second.first = new JobQueue();
301 311785 : bCreateThread = true;
302 : }
303 406069 : pQueue = (*ii).second.first;
304 :
305 406069 : if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
306 : {
307 25 : pQueue->suspend();
308 : }
309 : }
310 523818 : pQueue->add( pThreadSpecificData , doRequest );
311 : }
312 :
313 523818 : if( bCreateThread )
314 : {
315 368127 : createThread( pQueue , aThreadId , bAsynchron);
316 : }
317 523818 : }
318 :
319 23761 : void ThreadPool::prepare( const ByteSequence &aThreadId )
320 : {
321 23761 : MutexGuard guard( m_mutex );
322 :
323 23761 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
324 :
325 23761 : if( ii == m_mapQueue.end() )
326 : {
327 178 : JobQueue *p = new JobQueue();
328 178 : m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
329 : }
330 23583 : else if( 0 == (*ii).second.first )
331 : {
332 1 : (*ii).second.first = new JobQueue();
333 23761 : }
334 23761 : }
335 :
336 14142 : void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
337 : {
338 14142 : JobQueue *pQueue = 0;
339 : {
340 14142 : MutexGuard guard( m_mutex );
341 :
342 14142 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
343 :
344 : OSL_ASSERT( ii != m_mapQueue.end() );
345 14142 : pQueue = (*ii).second.first;
346 : }
347 :
348 : OSL_ASSERT( pQueue );
349 14142 : void *pReturn = pQueue->enter( nDisposeId );
350 :
351 14142 : if( pQueue->isCallstackEmpty() )
352 : {
353 124 : if( revokeQueue( aThreadId , false) )
354 : {
355 : // remove queue
356 124 : delete pQueue;
357 : }
358 : }
359 14142 : return pReturn;
360 : }
361 : }
362 :
363 : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
364 : // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
365 : // (within the last uno_threadpool_destroy) all worker threads spawned by that
366 : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
367 : // must never be called from a worker thread); afterwards, the next call to
368 : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
369 :
370 : using namespace cppu_threadpool;
371 :
372 : struct uno_ThreadPool_Equal
373 : {
374 562027 : bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
375 : {
376 562027 : return a == b;
377 : }
378 : };
379 :
380 : struct uno_ThreadPool_Hash
381 : {
382 562129 : sal_Size operator () ( const uno_ThreadPool &a ) const
383 : {
384 562129 : return reinterpret_cast<sal_Size>( a );
385 : }
386 : };
387 :
388 :
389 :
390 : typedef ::boost::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
391 :
392 : static ThreadpoolHashSet *g_pThreadpoolHashSet;
393 :
394 : struct _uno_ThreadPool
395 : {
396 : sal_Int32 dummy;
397 : };
398 :
399 : namespace {
400 :
401 561925 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
402 : {
403 561925 : MutexGuard guard( Mutex::getGlobalMutex() );
404 : assert( g_pThreadpoolHashSet != 0 );
405 561925 : ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
406 : assert( i != g_pThreadpoolHashSet->end() );
407 561925 : return i->second;
408 : }
409 :
410 : }
411 :
412 : extern "C" uno_ThreadPool SAL_CALL
413 102 : uno_threadpool_create() SAL_THROW_EXTERN_C()
414 : {
415 102 : MutexGuard guard( Mutex::getGlobalMutex() );
416 204 : ThreadPoolHolder p;
417 102 : if( ! g_pThreadpoolHashSet )
418 : {
419 100 : g_pThreadpoolHashSet = new ThreadpoolHashSet();
420 100 : p = new ThreadPool;
421 : }
422 : else
423 : {
424 : assert( !g_pThreadpoolHashSet->empty() );
425 2 : p = g_pThreadpoolHashSet->begin()->second;
426 : }
427 :
428 : // Just ensure that the handle is unique in the process (via heap)
429 102 : uno_ThreadPool h = new struct _uno_ThreadPool;
430 102 : g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
431 204 : return h;
432 : }
433 :
434 : extern "C" void SAL_CALL
435 23761 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
436 : {
437 23761 : sal_Sequence *pThreadId = 0;
438 23761 : uno_getIdOfCurrentThread( &pThreadId );
439 23761 : getThreadPool( hPool )->prepare( pThreadId );
440 23761 : rtl_byte_sequence_release( pThreadId );
441 23761 : uno_releaseIdFromCurrentThread();
442 23761 : }
443 :
444 : extern "C" void SAL_CALL
445 14142 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
446 : SAL_THROW_EXTERN_C()
447 : {
448 14142 : sal_Sequence *pThreadId = 0;
449 14142 : uno_getIdOfCurrentThread( &pThreadId );
450 : *ppJob =
451 : getThreadPool( hPool )->enter(
452 : pThreadId,
453 : sal::static_int_cast< sal_Int64 >(
454 14142 : reinterpret_cast< sal_IntPtr >(hPool)) );
455 14142 : rtl_byte_sequence_release( pThreadId );
456 14142 : uno_releaseIdFromCurrentThread();
457 14142 : }
458 :
459 : extern "C" void SAL_CALL
460 23761 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
461 : {
462 : // we might do here some tiding up in case a thread called attach but never detach
463 23761 : }
464 :
465 : extern "C" void SAL_CALL
466 523818 : uno_threadpool_putJob(
467 : uno_ThreadPool hPool,
468 : sal_Sequence *pThreadId,
469 : void *pJob,
470 : void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
471 : sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
472 : {
473 523818 : getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
474 523818 : }
475 :
476 : extern "C" void SAL_CALL
477 102 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
478 : {
479 : getThreadPool(hPool)->dispose(
480 : sal::static_int_cast< sal_Int64 >(
481 102 : reinterpret_cast< sal_IntPtr >(hPool)) );
482 102 : }
483 :
484 : extern "C" void SAL_CALL
485 102 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
486 : {
487 102 : ThreadPoolHolder p( getThreadPool(hPool) );
488 : p->destroy(
489 : sal::static_int_cast< sal_Int64 >(
490 102 : reinterpret_cast< sal_IntPtr >(hPool)) );
491 :
492 : bool empty;
493 : {
494 : OSL_ASSERT( g_pThreadpoolHashSet );
495 :
496 102 : MutexGuard guard( Mutex::getGlobalMutex() );
497 :
498 102 : ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
499 : OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
500 102 : g_pThreadpoolHashSet->erase( ii );
501 102 : delete hPool;
502 :
503 102 : empty = g_pThreadpoolHashSet->empty();
504 102 : if( empty )
505 : {
506 100 : delete g_pThreadpoolHashSet;
507 100 : g_pThreadpoolHashSet = 0;
508 102 : }
509 : }
510 :
511 102 : if( empty )
512 : {
513 100 : p->joinWorkers();
514 102 : }
515 102 : }
516 :
517 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|