Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 : #include "sal/config.h"
21 :
22 : #include <boost/unordered_map.hpp>
23 : #include <cassert>
24 : #include <stdio.h>
25 :
26 : #include <osl/diagnose.h>
27 : #include <osl/mutex.hxx>
28 : #include <osl/thread.h>
29 : #include <rtl/instance.hxx>
30 :
31 : #include <uno/threadpool.h>
32 :
33 : #include "threadpool.hxx"
34 : #include "thread.hxx"
35 :
36 : using namespace ::std;
37 : using namespace ::osl;
38 :
39 : namespace cppu_threadpool
40 : {
41 : struct theDisposedCallerAdmin :
42 : public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
43 : {
44 50 : DisposedCallerAdminHolder operator () () {
45 50 : return DisposedCallerAdminHolder(new DisposedCallerAdmin());
46 : }
47 : };
48 :
49 180074 : DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
50 : {
51 180074 : return theDisposedCallerAdmin::get();
52 : }
53 :
54 26 : DisposedCallerAdmin::~DisposedCallerAdmin()
55 : {
56 : #if OSL_DEBUG_LEVEL > 1
57 : if( !m_lst.empty() )
58 : {
59 : printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
60 : }
61 : #endif
62 26 : }
63 :
64 50 : void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
65 : {
66 50 : MutexGuard guard( m_mutex );
67 50 : m_lst.push_back( nDisposeId );
68 50 : }
69 :
70 50 : void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
71 : {
72 50 : MutexGuard guard( m_mutex );
73 150 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
74 100 : ii != m_lst.end() ;
75 : ++ ii )
76 : {
77 50 : if( (*ii) == nDisposeId )
78 : {
79 50 : m_lst.erase( ii );
80 50 : break;
81 : }
82 50 : }
83 50 : }
84 :
85 187093 : bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
86 : {
87 187093 : MutexGuard guard( m_mutex );
88 561321 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
89 374214 : ii != m_lst.end() ;
90 : ++ ii )
91 : {
92 0 : if( (*ii) == nDisposeId )
93 : {
94 0 : return true;
95 : }
96 : }
97 187107 : return false;
98 : }
99 :
100 :
101 :
102 :
103 50 : ThreadPool::ThreadPool()
104 : {
105 50 : m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
106 50 : }
107 :
108 98 : ThreadPool::~ThreadPool()
109 : {
110 : #if OSL_DEBUG_LEVEL > 1
111 : if( m_mapQueue.size() )
112 : {
113 : printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
114 : }
115 : #endif
116 98 : }
117 :
118 50 : void ThreadPool::dispose( sal_Int64 nDisposeId )
119 : {
120 50 : m_DisposedCallerAdmin->dispose( nDisposeId );
121 :
122 50 : MutexGuard guard( m_mutex );
123 240 : for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
124 160 : ii != m_mapQueue.end();
125 : ++ii)
126 : {
127 30 : if( (*ii).second.first )
128 : {
129 30 : (*ii).second.first->dispose( nDisposeId );
130 : }
131 30 : if( (*ii).second.second )
132 : {
133 0 : (*ii).second.second->dispose( nDisposeId );
134 : }
135 50 : }
136 50 : }
137 :
138 50 : void ThreadPool::destroy( sal_Int64 nDisposeId )
139 : {
140 50 : m_DisposedCallerAdmin->destroy( nDisposeId );
141 50 : }
142 :
143 : /******************
144 : * This methods lets the thread wait a certain amount of time. If within this timespan
145 : * a new request comes in, this thread is reused. This is done only to improve performance,
146 : * it is not required for threadpool functionality.
147 : ******************/
148 179940 : void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
149 : {
150 179940 : struct WaitingThread waitingThread;
151 179939 : waitingThread.condition = osl_createCondition();
152 179940 : waitingThread.thread = pThread;
153 : {
154 179940 : MutexGuard guard( m_mutexWaitingThreadList );
155 179940 : m_lstThreads.push_front( &waitingThread );
156 : }
157 :
158 : // let the thread wait 2 seconds
159 179940 : TimeValue time = { 2 , 0 };
160 179940 : osl_waitCondition( waitingThread.condition , &time );
161 :
162 : {
163 179930 : MutexGuard guard ( m_mutexWaitingThreadList );
164 179940 : if( waitingThread.thread.is() )
165 : {
166 : // thread wasn't reused, remove it from the list
167 : WaitingThreadList::iterator ii = find(
168 916 : m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
169 : OSL_ASSERT( ii != m_lstThreads.end() );
170 916 : m_lstThreads.erase( ii );
171 179940 : }
172 : }
173 :
174 179940 : osl_destroyCondition( waitingThread.condition );
175 179940 : }
176 :
177 49 : void ThreadPool::joinWorkers()
178 : {
179 : {
180 49 : MutexGuard guard( m_mutexWaitingThreadList );
181 411 : for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
182 274 : ii != m_lstThreads.end() ;
183 : ++ ii )
184 : {
185 : // wake the threads up
186 88 : osl_setCondition( (*ii)->condition );
187 49 : }
188 : }
189 49 : m_aThreadAdmin.join();
190 49 : }
191 :
192 179940 : void ThreadPool::createThread( JobQueue *pQueue ,
193 : const ByteSequence &aThreadId,
194 : bool bAsynchron )
195 : {
196 179940 : bool bCreate = true;
197 : {
198 : // Can a thread be reused ?
199 179940 : MutexGuard guard( m_mutexWaitingThreadList );
200 179940 : if( ! m_lstThreads.empty() )
201 : {
202 : // inform the thread and let it go
203 179024 : struct WaitingThread *pWaitingThread = m_lstThreads.back();
204 179024 : pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
205 179024 : pWaitingThread->thread = 0;
206 :
207 : // remove from list
208 179024 : m_lstThreads.pop_back();
209 :
210 : // let the thread go
211 179024 : osl_setCondition( pWaitingThread->condition );
212 179024 : bCreate = false;
213 179940 : }
214 : }
215 :
216 179940 : if( bCreate )
217 : {
218 : rtl::Reference< ORequestThread > pThread(
219 916 : new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
220 916 : pThread->launch();
221 : }
222 179940 : }
223 :
224 180070 : bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
225 : {
226 180070 : MutexGuard guard( m_mutex );
227 :
228 180070 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
229 : OSL_ASSERT( ii != m_mapQueue.end() );
230 :
231 180070 : if( bAsynchron )
232 : {
233 27460 : if( ! (*ii).second.second->isEmpty() )
234 : {
235 : // another thread has put something into the queue
236 38 : return false;
237 : }
238 :
239 27422 : (*ii).second.second = 0;
240 27422 : if( (*ii).second.first )
241 : {
242 : // all oneway request have been processed, now
243 : // synchronus requests may go on
244 67 : (*ii).second.first->resume();
245 : }
246 : }
247 : else
248 : {
249 152610 : if( ! (*ii).second.first->isEmpty() )
250 : {
251 : // another thread has put something into the queue
252 34 : return false;
253 : }
254 152576 : (*ii).second.first = 0;
255 : }
256 :
257 179998 : if( 0 == (*ii).second.first && 0 == (*ii).second.second )
258 : {
259 179931 : m_mapQueue.erase( ii );
260 : }
261 :
262 179998 : return true;
263 : }
264 :
265 :
266 257121 : void ThreadPool::addJob(
267 : const ByteSequence &aThreadId ,
268 : bool bAsynchron,
269 : void *pThreadSpecificData,
270 : RequestFun * doRequest )
271 : {
272 257121 : bool bCreateThread = false;
273 257121 : JobQueue *pQueue = 0;
274 : {
275 257121 : MutexGuard guard( m_mutex );
276 :
277 257121 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
278 :
279 257121 : if( ii == m_mapQueue.end() )
280 : {
281 179873 : m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
282 179873 : ii = m_mapQueue.find( aThreadId );
283 : OSL_ASSERT( ii != m_mapQueue.end() );
284 : }
285 :
286 257121 : if( bAsynchron )
287 : {
288 54456 : if( ! (*ii).second.second )
289 : {
290 27422 : (*ii).second.second = new JobQueue();
291 27422 : bCreateThread = true;
292 : }
293 54456 : pQueue = (*ii).second.second;
294 : }
295 : else
296 : {
297 202665 : if( ! (*ii).second.first )
298 : {
299 152518 : (*ii).second.first = new JobQueue();
300 152518 : bCreateThread = true;
301 : }
302 202665 : pQueue = (*ii).second.first;
303 :
304 202665 : if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
305 : {
306 6 : pQueue->suspend();
307 : }
308 : }
309 257121 : pQueue->add( pThreadSpecificData , doRequest );
310 : }
311 :
312 257121 : if( bCreateThread )
313 : {
314 179940 : createThread( pQueue , aThreadId , bAsynchron);
315 : }
316 257121 : }
317 :
318 11855 : void ThreadPool::prepare( const ByteSequence &aThreadId )
319 : {
320 11855 : MutexGuard guard( m_mutex );
321 :
322 11855 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
323 :
324 11855 : if( ii == m_mapQueue.end() )
325 : {
326 84 : JobQueue *p = new JobQueue();
327 84 : m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
328 : }
329 11771 : else if( 0 == (*ii).second.first )
330 : {
331 0 : (*ii).second.first = new JobQueue();
332 11855 : }
333 11855 : }
334 :
335 7066 : void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
336 : {
337 7066 : JobQueue *pQueue = 0;
338 : {
339 7066 : MutexGuard guard( m_mutex );
340 :
341 7066 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
342 :
343 : OSL_ASSERT( ii != m_mapQueue.end() );
344 7066 : pQueue = (*ii).second.first;
345 : }
346 :
347 : OSL_ASSERT( pQueue );
348 7066 : void *pReturn = pQueue->enter( nDisposeId );
349 :
350 7066 : if( pQueue->isCallstackEmpty() )
351 : {
352 58 : if( revokeQueue( aThreadId , false) )
353 : {
354 : // remove queue
355 58 : delete pQueue;
356 : }
357 : }
358 7066 : return pReturn;
359 : }
360 : }
361 :
362 : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
363 : // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
364 : // (within the last uno_threadpool_destroy) all worker threads spawned by that
365 : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
366 : // must never be called from a worker thread); afterwards, the next call to
367 : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
368 :
369 : using namespace cppu_threadpool;
370 :
371 : struct uno_ThreadPool_Equal
372 : {
373 276192 : bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
374 : {
375 276192 : return a == b;
376 : }
377 : };
378 :
379 : struct uno_ThreadPool_Hash
380 : {
381 276243 : sal_Size operator () ( const uno_ThreadPool &a ) const
382 : {
383 276243 : return (sal_Size) a;
384 : }
385 : };
386 :
387 :
388 :
389 : typedef ::boost::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
390 :
391 : static ThreadpoolHashSet *g_pThreadpoolHashSet;
392 :
393 : struct _uno_ThreadPool
394 : {
395 : sal_Int32 dummy;
396 : };
397 :
398 : namespace {
399 :
400 276142 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
401 : {
402 276142 : MutexGuard guard( Mutex::getGlobalMutex() );
403 : assert( g_pThreadpoolHashSet != 0 );
404 276142 : ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
405 : assert( i != g_pThreadpoolHashSet->end() );
406 276142 : return i->second;
407 : }
408 :
409 : }
410 :
411 : extern "C" uno_ThreadPool SAL_CALL
412 51 : uno_threadpool_create() SAL_THROW_EXTERN_C()
413 : {
414 51 : MutexGuard guard( Mutex::getGlobalMutex() );
415 102 : ThreadPoolHolder p;
416 51 : if( ! g_pThreadpoolHashSet )
417 : {
418 50 : g_pThreadpoolHashSet = new ThreadpoolHashSet();
419 50 : p = new ThreadPool;
420 : }
421 : else
422 : {
423 : assert( !g_pThreadpoolHashSet->empty() );
424 1 : p = g_pThreadpoolHashSet->begin()->second;
425 : }
426 :
427 : // Just ensure that the handle is unique in the process (via heap)
428 51 : uno_ThreadPool h = new struct _uno_ThreadPool;
429 51 : g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
430 102 : return h;
431 : }
432 :
433 : extern "C" void SAL_CALL
434 11855 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
435 : {
436 11855 : sal_Sequence *pThreadId = 0;
437 11855 : uno_getIdOfCurrentThread( &pThreadId );
438 11855 : getThreadPool( hPool )->prepare( pThreadId );
439 11855 : rtl_byte_sequence_release( pThreadId );
440 11855 : uno_releaseIdFromCurrentThread();
441 11855 : }
442 :
443 : extern "C" void SAL_CALL
444 7066 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
445 : SAL_THROW_EXTERN_C()
446 : {
447 7066 : sal_Sequence *pThreadId = 0;
448 7066 : uno_getIdOfCurrentThread( &pThreadId );
449 : *ppJob =
450 : getThreadPool( hPool )->enter(
451 : pThreadId,
452 : sal::static_int_cast< sal_Int64 >(
453 7066 : reinterpret_cast< sal_IntPtr >(hPool)) );
454 7066 : rtl_byte_sequence_release( pThreadId );
455 7066 : uno_releaseIdFromCurrentThread();
456 7066 : }
457 :
458 : extern "C" void SAL_CALL
459 11855 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
460 : {
461 : // we might do here some tiding up in case a thread called attach but never detach
462 11855 : }
463 :
464 : extern "C" void SAL_CALL
465 257121 : uno_threadpool_putJob(
466 : uno_ThreadPool hPool,
467 : sal_Sequence *pThreadId,
468 : void *pJob,
469 : void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
470 : sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
471 : {
472 257121 : getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
473 257121 : }
474 :
475 : extern "C" void SAL_CALL
476 50 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
477 : {
478 : getThreadPool(hPool)->dispose(
479 : sal::static_int_cast< sal_Int64 >(
480 50 : reinterpret_cast< sal_IntPtr >(hPool)) );
481 50 : }
482 :
483 : extern "C" void SAL_CALL
484 50 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
485 : {
486 50 : ThreadPoolHolder p( getThreadPool(hPool) );
487 : p->destroy(
488 : sal::static_int_cast< sal_Int64 >(
489 50 : reinterpret_cast< sal_IntPtr >(hPool)) );
490 :
491 : bool empty;
492 : {
493 : OSL_ASSERT( g_pThreadpoolHashSet );
494 :
495 50 : MutexGuard guard( Mutex::getGlobalMutex() );
496 :
497 50 : ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
498 : OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
499 50 : g_pThreadpoolHashSet->erase( ii );
500 50 : delete hPool;
501 :
502 50 : empty = g_pThreadpoolHashSet->empty();
503 50 : if( empty )
504 : {
505 49 : delete g_pThreadpoolHashSet;
506 49 : g_pThreadpoolHashSet = 0;
507 50 : }
508 : }
509 :
510 50 : if( empty )
511 : {
512 49 : p->joinWorkers();
513 50 : }
514 50 : }
515 :
516 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|