Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 : #include "sal/config.h"
21 :
22 : #include <boost/unordered_map.hpp>
23 : #include <cassert>
24 : #include <stdio.h>
25 :
26 : #include <osl/diagnose.h>
27 : #include <osl/mutex.hxx>
28 : #include <osl/thread.h>
29 : #include <rtl/instance.hxx>
30 :
31 : #include <uno/threadpool.h>
32 :
33 : #include "threadpool.hxx"
34 : #include "thread.hxx"
35 :
36 : using namespace ::std;
37 : using namespace ::osl;
38 :
39 : namespace cppu_threadpool
40 : {
41 : struct theDisposedCallerAdmin :
42 : public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
43 : {
44 8 : DisposedCallerAdminHolder operator () () {
45 8 : return DisposedCallerAdminHolder(new DisposedCallerAdmin());
46 : }
47 : };
48 :
49 1670 : DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
50 : {
51 1670 : return theDisposedCallerAdmin::get();
52 : }
53 :
54 8 : DisposedCallerAdmin::~DisposedCallerAdmin()
55 : {
56 : #if OSL_DEBUG_LEVEL > 1
57 : if( !m_lst.empty() )
58 : {
59 : printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
60 : }
61 : #endif
62 8 : }
63 :
64 8 : void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
65 : {
66 8 : MutexGuard guard( m_mutex );
67 8 : m_lst.push_back( nDisposeId );
68 8 : }
69 :
70 8 : void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
71 : {
72 8 : MutexGuard guard( m_mutex );
73 24 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
74 16 : ii != m_lst.end() ;
75 : ++ ii )
76 : {
77 8 : if( (*ii) == nDisposeId )
78 : {
79 8 : m_lst.erase( ii );
80 8 : break;
81 : }
82 8 : }
83 8 : }
84 :
85 1662 : sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
86 : {
87 1662 : MutexGuard guard( m_mutex );
88 4986 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
89 3324 : ii != m_lst.end() ;
90 : ++ ii )
91 : {
92 0 : if( (*ii) == nDisposeId )
93 : {
94 0 : return sal_True;
95 : }
96 : }
97 1662 : return sal_False;
98 : }
99 :
100 :
101 : //-------------------------------------------------------------------------------
102 :
103 8 : ThreadPool::ThreadPool()
104 : {
105 8 : m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
106 8 : }
107 :
108 16 : ThreadPool::~ThreadPool()
109 : {
110 : #if OSL_DEBUG_LEVEL > 1
111 : if( m_mapQueue.size() )
112 : {
113 : printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
114 : }
115 : #endif
116 16 : }
117 :
118 8 : void ThreadPool::dispose( sal_Int64 nDisposeId )
119 : {
120 8 : m_DisposedCallerAdmin->dispose( nDisposeId );
121 :
122 8 : MutexGuard guard( m_mutex );
123 27 : for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
124 18 : ii != m_mapQueue.end();
125 : ++ii)
126 : {
127 1 : if( (*ii).second.first )
128 : {
129 1 : (*ii).second.first->dispose( nDisposeId );
130 : }
131 1 : if( (*ii).second.second )
132 : {
133 0 : (*ii).second.second->dispose( nDisposeId );
134 : }
135 8 : }
136 8 : }
137 :
138 8 : void ThreadPool::destroy( sal_Int64 nDisposeId )
139 : {
140 8 : m_DisposedCallerAdmin->destroy( nDisposeId );
141 8 : }
142 :
143 : /******************
144 : * This methods lets the thread wait a certain amount of time. If within this timespan
145 : * a new request comes in, this thread is reused. This is done only to improve performance,
146 : * it is not required for threadpool functionality.
147 : ******************/
148 1662 : void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
149 : {
150 1662 : struct WaitingThread waitingThread;
151 1662 : waitingThread.condition = osl_createCondition();
152 1662 : waitingThread.thread = pThread;
153 : {
154 1662 : MutexGuard guard( m_mutexWaitingThreadList );
155 1662 : m_lstThreads.push_front( &waitingThread );
156 : }
157 :
158 : // let the thread wait 2 seconds
159 1662 : TimeValue time = { 2 , 0 };
160 1662 : osl_waitCondition( waitingThread.condition , &time );
161 :
162 : {
163 1662 : MutexGuard guard ( m_mutexWaitingThreadList );
164 1662 : if( waitingThread.thread.is() )
165 : {
166 : // thread wasn't reused, remove it from the list
167 : WaitingThreadList::iterator ii = find(
168 17 : m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
169 : OSL_ASSERT( ii != m_lstThreads.end() );
170 17 : m_lstThreads.erase( ii );
171 1662 : }
172 : }
173 :
174 1662 : osl_destroyCondition( waitingThread.condition );
175 1662 : }
176 :
177 8 : void ThreadPool::joinWorkers()
178 : {
179 : {
180 8 : MutexGuard guard( m_mutexWaitingThreadList );
181 69 : for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
182 46 : ii != m_lstThreads.end() ;
183 : ++ ii )
184 : {
185 : // wake the threads up
186 15 : osl_setCondition( (*ii)->condition );
187 8 : }
188 : }
189 8 : m_aThreadAdmin.join();
190 8 : }
191 :
192 1662 : void ThreadPool::createThread( JobQueue *pQueue ,
193 : const ByteSequence &aThreadId,
194 : sal_Bool bAsynchron )
195 : {
196 1662 : sal_Bool bCreate = sal_True;
197 : {
198 : // Can a thread be reused ?
199 1662 : MutexGuard guard( m_mutexWaitingThreadList );
200 1662 : if( ! m_lstThreads.empty() )
201 : {
202 : // inform the thread and let it go
203 1645 : struct WaitingThread *pWaitingThread = m_lstThreads.back();
204 1645 : pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
205 1645 : pWaitingThread->thread = 0;
206 :
207 : // remove from list
208 1645 : m_lstThreads.pop_back();
209 :
210 : // let the thread go
211 1645 : osl_setCondition( pWaitingThread->condition );
212 1645 : bCreate = sal_False;
213 1662 : }
214 : }
215 :
216 1662 : if( bCreate )
217 : {
218 : rtl::Reference< ORequestThread > pThread(
219 17 : new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
220 17 : pThread->launch();
221 : }
222 1662 : }
223 :
224 1662 : sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
225 : {
226 1662 : MutexGuard guard( m_mutex );
227 :
228 1662 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
229 : OSL_ASSERT( ii != m_mapQueue.end() );
230 :
231 1662 : if( bAsynchron )
232 : {
233 118 : if( ! (*ii).second.second->isEmpty() )
234 : {
235 : // another thread has put something into the queue
236 0 : return sal_False;
237 : }
238 :
239 118 : (*ii).second.second = 0;
240 118 : if( (*ii).second.first )
241 : {
242 : // all oneway request have been processed, now
243 : // synchronus requests may go on
244 0 : (*ii).second.first->resume();
245 : }
246 : }
247 : else
248 : {
249 1544 : if( ! (*ii).second.first->isEmpty() )
250 : {
251 : // another thread has put something into the queue
252 0 : return sal_False;
253 : }
254 1544 : (*ii).second.first = 0;
255 : }
256 :
257 1662 : if( 0 == (*ii).second.first && 0 == (*ii).second.second )
258 : {
259 1662 : m_mapQueue.erase( ii );
260 : }
261 :
262 1662 : return sal_True;
263 : }
264 :
265 :
266 1977 : void ThreadPool::addJob(
267 : const ByteSequence &aThreadId ,
268 : sal_Bool bAsynchron,
269 : void *pThreadSpecificData,
270 : RequestFun * doRequest )
271 : {
272 1977 : sal_Bool bCreateThread = sal_False;
273 1977 : JobQueue *pQueue = 0;
274 : {
275 1977 : MutexGuard guard( m_mutex );
276 :
277 1977 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
278 :
279 1977 : if( ii == m_mapQueue.end() )
280 : {
281 1662 : m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
282 1662 : ii = m_mapQueue.find( aThreadId );
283 : OSL_ASSERT( ii != m_mapQueue.end() );
284 : }
285 :
286 1977 : if( bAsynchron )
287 : {
288 118 : if( ! (*ii).second.second )
289 : {
290 118 : (*ii).second.second = new JobQueue();
291 118 : bCreateThread = sal_True;
292 : }
293 118 : pQueue = (*ii).second.second;
294 : }
295 : else
296 : {
297 1859 : if( ! (*ii).second.first )
298 : {
299 1544 : (*ii).second.first = new JobQueue();
300 1544 : bCreateThread = sal_True;
301 : }
302 1859 : pQueue = (*ii).second.first;
303 :
304 1859 : if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
305 : {
306 0 : pQueue->suspend();
307 : }
308 : }
309 1977 : pQueue->add( pThreadSpecificData , doRequest );
310 : }
311 :
312 1977 : if( bCreateThread )
313 : {
314 1662 : createThread( pQueue , aThreadId , bAsynchron);
315 : }
316 1977 : }
317 :
318 0 : void ThreadPool::prepare( const ByteSequence &aThreadId )
319 : {
320 0 : MutexGuard guard( m_mutex );
321 :
322 0 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
323 :
324 0 : if( ii == m_mapQueue.end() )
325 : {
326 0 : JobQueue *p = new JobQueue();
327 0 : m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
328 : }
329 0 : else if( 0 == (*ii).second.first )
330 : {
331 0 : (*ii).second.first = new JobQueue();
332 0 : }
333 0 : }
334 :
335 0 : void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
336 : {
337 0 : JobQueue *pQueue = 0;
338 : {
339 0 : MutexGuard guard( m_mutex );
340 :
341 0 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
342 :
343 : OSL_ASSERT( ii != m_mapQueue.end() );
344 0 : pQueue = (*ii).second.first;
345 : }
346 :
347 : OSL_ASSERT( pQueue );
348 0 : void *pReturn = pQueue->enter( nDisposeId );
349 :
350 0 : if( pQueue->isCallstackEmpty() )
351 : {
352 0 : if( revokeQueue( aThreadId , sal_False) )
353 : {
354 : // remove queue
355 0 : delete pQueue;
356 : }
357 : }
358 0 : return pReturn;
359 : }
360 : }
361 :
362 : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
363 : // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
364 : // (within the last uno_threadpool_destroy) all worker threads spawned by that
365 : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
366 : // must never be called from a worker thread); afterwards, the next call to
367 : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
368 :
369 : using namespace cppu_threadpool;
370 :
371 : struct uno_ThreadPool_Equal
372 : {
373 2001 : sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
374 : {
375 2001 : return a == b;
376 : }
377 : };
378 :
379 : struct uno_ThreadPool_Hash
380 : {
381 2009 : sal_Size operator () ( const uno_ThreadPool &a ) const
382 : {
383 2009 : return (sal_Size) a;
384 : }
385 : };
386 :
387 :
388 :
389 : typedef ::boost::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
390 :
391 : static ThreadpoolHashSet *g_pThreadpoolHashSet;
392 :
393 : struct _uno_ThreadPool
394 : {
395 : sal_Int32 dummy;
396 : };
397 :
398 : namespace {
399 :
400 1993 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
401 : {
402 1993 : MutexGuard guard( Mutex::getGlobalMutex() );
403 : assert( g_pThreadpoolHashSet != 0 );
404 1993 : ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
405 : assert( i != g_pThreadpoolHashSet->end() );
406 1993 : return i->second;
407 : }
408 :
409 : }
410 :
411 : extern "C" uno_ThreadPool SAL_CALL
412 8 : uno_threadpool_create() SAL_THROW_EXTERN_C()
413 : {
414 8 : MutexGuard guard( Mutex::getGlobalMutex() );
415 8 : ThreadPoolHolder p;
416 8 : if( ! g_pThreadpoolHashSet )
417 : {
418 8 : g_pThreadpoolHashSet = new ThreadpoolHashSet();
419 8 : p = new ThreadPool;
420 : }
421 : else
422 : {
423 : assert( !g_pThreadpoolHashSet->empty() );
424 0 : p = g_pThreadpoolHashSet->begin()->second;
425 : }
426 :
427 : // Just ensure that the handle is unique in the process (via heap)
428 8 : uno_ThreadPool h = new struct _uno_ThreadPool;
429 8 : g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
430 8 : return h;
431 : }
432 :
433 : extern "C" void SAL_CALL
434 0 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
435 : {
436 0 : sal_Sequence *pThreadId = 0;
437 0 : uno_getIdOfCurrentThread( &pThreadId );
438 0 : getThreadPool( hPool )->prepare( pThreadId );
439 0 : rtl_byte_sequence_release( pThreadId );
440 0 : uno_releaseIdFromCurrentThread();
441 0 : }
442 :
443 : extern "C" void SAL_CALL
444 0 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
445 : SAL_THROW_EXTERN_C()
446 : {
447 0 : sal_Sequence *pThreadId = 0;
448 0 : uno_getIdOfCurrentThread( &pThreadId );
449 : *ppJob =
450 : getThreadPool( hPool )->enter(
451 : pThreadId,
452 : sal::static_int_cast< sal_Int64 >(
453 0 : reinterpret_cast< sal_IntPtr >(hPool)) );
454 0 : rtl_byte_sequence_release( pThreadId );
455 0 : uno_releaseIdFromCurrentThread();
456 0 : }
457 :
458 : extern "C" void SAL_CALL
459 0 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
460 : {
461 : // we might do here some tiding up in case a thread called attach but never detach
462 0 : }
463 :
464 : extern "C" void SAL_CALL
465 1977 : uno_threadpool_putJob(
466 : uno_ThreadPool hPool,
467 : sal_Sequence *pThreadId,
468 : void *pJob,
469 : void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
470 : sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
471 : {
472 1977 : getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
473 1977 : }
474 :
475 : extern "C" void SAL_CALL
476 8 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
477 : {
478 : getThreadPool(hPool)->dispose(
479 : sal::static_int_cast< sal_Int64 >(
480 8 : reinterpret_cast< sal_IntPtr >(hPool)) );
481 8 : }
482 :
483 : extern "C" void SAL_CALL
484 8 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
485 : {
486 8 : ThreadPoolHolder p( getThreadPool(hPool) );
487 : p->destroy(
488 : sal::static_int_cast< sal_Int64 >(
489 8 : reinterpret_cast< sal_IntPtr >(hPool)) );
490 :
491 : bool empty;
492 : {
493 : OSL_ASSERT( g_pThreadpoolHashSet );
494 :
495 8 : MutexGuard guard( Mutex::getGlobalMutex() );
496 :
497 8 : ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
498 : OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
499 8 : g_pThreadpoolHashSet->erase( ii );
500 8 : delete hPool;
501 :
502 8 : empty = g_pThreadpoolHashSet->empty();
503 8 : if( empty )
504 : {
505 8 : delete g_pThreadpoolHashSet;
506 8 : g_pThreadpoolHashSet = 0;
507 8 : }
508 : }
509 :
510 8 : if( empty )
511 : {
512 8 : p->joinWorkers();
513 8 : }
514 8 : }
515 :
516 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|