Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 : #include "sal/config.h"
21 :
22 : #include <boost/unordered_map.hpp>
23 : #include <cassert>
24 : #include <stdio.h>
25 :
26 : #include <osl/diagnose.h>
27 : #include <osl/mutex.hxx>
28 : #include <osl/thread.h>
29 : #include <rtl/instance.hxx>
30 :
31 : #include <uno/threadpool.h>
32 :
33 : #include "threadpool.hxx"
34 : #include "thread.hxx"
35 :
36 : using namespace ::std;
37 : using namespace ::osl;
38 :
39 : namespace cppu_threadpool
40 : {
41 : struct theDisposedCallerAdmin :
42 : public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
43 : {
44 0 : DisposedCallerAdminHolder operator () () {
45 0 : return DisposedCallerAdminHolder(new DisposedCallerAdmin());
46 : }
47 : };
48 :
49 0 : DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
50 : {
51 0 : return theDisposedCallerAdmin::get();
52 : }
53 :
54 0 : DisposedCallerAdmin::~DisposedCallerAdmin()
55 : {
56 : #if OSL_DEBUG_LEVEL > 1
57 : if( !m_lst.empty() )
58 : {
59 : printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
60 : }
61 : #endif
62 0 : }
63 :
64 0 : void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
65 : {
66 0 : MutexGuard guard( m_mutex );
67 0 : m_lst.push_back( nDisposeId );
68 0 : }
69 :
70 0 : void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
71 : {
72 0 : MutexGuard guard( m_mutex );
73 0 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
74 0 : ii != m_lst.end() ;
75 : ++ ii )
76 : {
77 0 : if( (*ii) == nDisposeId )
78 : {
79 0 : m_lst.erase( ii );
80 0 : break;
81 : }
82 0 : }
83 0 : }
84 :
85 0 : bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
86 : {
87 0 : MutexGuard guard( m_mutex );
88 0 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
89 0 : ii != m_lst.end() ;
90 : ++ ii )
91 : {
92 0 : if( (*ii) == nDisposeId )
93 : {
94 0 : return true;
95 : }
96 : }
97 0 : return false;
98 : }
99 :
100 :
101 :
102 :
103 0 : ThreadPool::ThreadPool()
104 : {
105 0 : m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
106 0 : }
107 :
108 0 : ThreadPool::~ThreadPool()
109 : {
110 : #if OSL_DEBUG_LEVEL > 1
111 : if( m_mapQueue.size() )
112 : {
113 : printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
114 : }
115 : #endif
116 0 : }
117 :
118 0 : void ThreadPool::dispose( sal_Int64 nDisposeId )
119 : {
120 0 : m_DisposedCallerAdmin->dispose( nDisposeId );
121 :
122 0 : MutexGuard guard( m_mutex );
123 0 : for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
124 0 : ii != m_mapQueue.end();
125 : ++ii)
126 : {
127 0 : if( (*ii).second.first )
128 : {
129 0 : (*ii).second.first->dispose( nDisposeId );
130 : }
131 0 : if( (*ii).second.second )
132 : {
133 0 : (*ii).second.second->dispose( nDisposeId );
134 : }
135 0 : }
136 0 : }
137 :
138 0 : void ThreadPool::destroy( sal_Int64 nDisposeId )
139 : {
140 0 : m_DisposedCallerAdmin->destroy( nDisposeId );
141 0 : }
142 :
143 : /******************
144 : * This methods lets the thread wait a certain amount of time. If within this timespan
145 : * a new request comes in, this thread is reused. This is done only to improve performance,
146 : * it is not required for threadpool functionality.
147 : ******************/
148 0 : void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
149 : {
150 0 : struct WaitingThread waitingThread;
151 0 : waitingThread.condition = osl_createCondition();
152 0 : waitingThread.thread = pThread;
153 : {
154 0 : MutexGuard guard( m_mutexWaitingThreadList );
155 0 : m_lstThreads.push_front( &waitingThread );
156 : }
157 :
158 : // let the thread wait 2 seconds
159 0 : TimeValue time = { 2 , 0 };
160 0 : osl_waitCondition( waitingThread.condition , &time );
161 :
162 : {
163 0 : MutexGuard guard ( m_mutexWaitingThreadList );
164 0 : if( waitingThread.thread.is() )
165 : {
166 : // thread wasn't reused, remove it from the list
167 : WaitingThreadList::iterator ii = find(
168 0 : m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
169 : OSL_ASSERT( ii != m_lstThreads.end() );
170 0 : m_lstThreads.erase( ii );
171 0 : }
172 : }
173 :
174 0 : osl_destroyCondition( waitingThread.condition );
175 0 : }
176 :
177 0 : void ThreadPool::joinWorkers()
178 : {
179 : {
180 0 : MutexGuard guard( m_mutexWaitingThreadList );
181 0 : for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
182 0 : ii != m_lstThreads.end() ;
183 : ++ ii )
184 : {
185 : // wake the threads up
186 0 : osl_setCondition( (*ii)->condition );
187 0 : }
188 : }
189 0 : m_aThreadAdmin.join();
190 0 : }
191 :
192 0 : void ThreadPool::createThread( JobQueue *pQueue ,
193 : const ByteSequence &aThreadId,
194 : bool bAsynchron )
195 : {
196 0 : bool bCreate = true;
197 : {
198 : // Can a thread be reused ?
199 0 : MutexGuard guard( m_mutexWaitingThreadList );
200 0 : if( ! m_lstThreads.empty() )
201 : {
202 : // inform the thread and let it go
203 0 : struct WaitingThread *pWaitingThread = m_lstThreads.back();
204 0 : pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
205 0 : pWaitingThread->thread = 0;
206 :
207 : // remove from list
208 0 : m_lstThreads.pop_back();
209 :
210 : // let the thread go
211 0 : osl_setCondition( pWaitingThread->condition );
212 0 : bCreate = false;
213 0 : }
214 : }
215 :
216 0 : if( bCreate )
217 : {
218 : rtl::Reference< ORequestThread > pThread(
219 0 : new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
220 0 : pThread->launch();
221 : }
222 0 : }
223 :
224 0 : bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
225 : {
226 0 : MutexGuard guard( m_mutex );
227 :
228 0 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
229 : OSL_ASSERT( ii != m_mapQueue.end() );
230 :
231 0 : if( bAsynchron )
232 : {
233 0 : if( ! (*ii).second.second->isEmpty() )
234 : {
235 : // another thread has put something into the queue
236 0 : return false;
237 : }
238 :
239 0 : (*ii).second.second = 0;
240 0 : if( (*ii).second.first )
241 : {
242 : // all oneway request have been processed, now
243 : // synchronus requests may go on
244 0 : (*ii).second.first->resume();
245 : }
246 : }
247 : else
248 : {
249 0 : if( ! (*ii).second.first->isEmpty() )
250 : {
251 : // another thread has put something into the queue
252 0 : return false;
253 : }
254 0 : (*ii).second.first = 0;
255 : }
256 :
257 0 : if( 0 == (*ii).second.first && 0 == (*ii).second.second )
258 : {
259 0 : m_mapQueue.erase( ii );
260 : }
261 :
262 0 : return true;
263 : }
264 :
265 :
266 0 : void ThreadPool::addJob(
267 : const ByteSequence &aThreadId ,
268 : bool bAsynchron,
269 : void *pThreadSpecificData,
270 : RequestFun * doRequest )
271 : {
272 0 : bool bCreateThread = false;
273 0 : JobQueue *pQueue = 0;
274 : {
275 0 : MutexGuard guard( m_mutex );
276 :
277 0 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
278 :
279 0 : if( ii == m_mapQueue.end() )
280 : {
281 0 : m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
282 0 : ii = m_mapQueue.find( aThreadId );
283 : OSL_ASSERT( ii != m_mapQueue.end() );
284 : }
285 :
286 0 : if( bAsynchron )
287 : {
288 0 : if( ! (*ii).second.second )
289 : {
290 0 : (*ii).second.second = new JobQueue();
291 0 : bCreateThread = true;
292 : }
293 0 : pQueue = (*ii).second.second;
294 : }
295 : else
296 : {
297 0 : if( ! (*ii).second.first )
298 : {
299 0 : (*ii).second.first = new JobQueue();
300 0 : bCreateThread = true;
301 : }
302 0 : pQueue = (*ii).second.first;
303 :
304 0 : if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
305 : {
306 0 : pQueue->suspend();
307 : }
308 : }
309 0 : pQueue->add( pThreadSpecificData , doRequest );
310 : }
311 :
312 0 : if( bCreateThread )
313 : {
314 0 : createThread( pQueue , aThreadId , bAsynchron);
315 : }
316 0 : }
317 :
318 0 : void ThreadPool::prepare( const ByteSequence &aThreadId )
319 : {
320 0 : MutexGuard guard( m_mutex );
321 :
322 0 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
323 :
324 0 : if( ii == m_mapQueue.end() )
325 : {
326 0 : JobQueue *p = new JobQueue();
327 0 : m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
328 : }
329 0 : else if( 0 == (*ii).second.first )
330 : {
331 0 : (*ii).second.first = new JobQueue();
332 0 : }
333 0 : }
334 :
335 0 : void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
336 : {
337 0 : JobQueue *pQueue = 0;
338 : {
339 0 : MutexGuard guard( m_mutex );
340 :
341 0 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
342 :
343 : OSL_ASSERT( ii != m_mapQueue.end() );
344 0 : pQueue = (*ii).second.first;
345 : }
346 :
347 : OSL_ASSERT( pQueue );
348 0 : void *pReturn = pQueue->enter( nDisposeId );
349 :
350 0 : if( pQueue->isCallstackEmpty() )
351 : {
352 0 : if( revokeQueue( aThreadId , false) )
353 : {
354 : // remove queue
355 0 : delete pQueue;
356 : }
357 : }
358 0 : return pReturn;
359 : }
360 : }
361 :
362 : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
363 : // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
364 : // (within the last uno_threadpool_destroy) all worker threads spawned by that
365 : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
366 : // must never be called from a worker thread); afterwards, the next call to
367 : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
368 :
369 : using namespace cppu_threadpool;
370 :
371 : struct uno_ThreadPool_Equal
372 : {
373 0 : bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
374 : {
375 0 : return a == b;
376 : }
377 : };
378 :
379 : struct uno_ThreadPool_Hash
380 : {
381 0 : sal_Size operator () ( const uno_ThreadPool &a ) const
382 : {
383 0 : return (sal_Size) a;
384 : }
385 : };
386 :
387 :
388 :
389 : typedef ::boost::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
390 :
391 : static ThreadpoolHashSet *g_pThreadpoolHashSet;
392 :
393 : struct _uno_ThreadPool
394 : {
395 : sal_Int32 dummy;
396 : };
397 :
398 : namespace {
399 :
400 0 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
401 : {
402 0 : MutexGuard guard( Mutex::getGlobalMutex() );
403 : assert( g_pThreadpoolHashSet != 0 );
404 0 : ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
405 : assert( i != g_pThreadpoolHashSet->end() );
406 0 : return i->second;
407 : }
408 :
409 : }
410 :
411 : extern "C" uno_ThreadPool SAL_CALL
412 0 : uno_threadpool_create() SAL_THROW_EXTERN_C()
413 : {
414 0 : MutexGuard guard( Mutex::getGlobalMutex() );
415 0 : ThreadPoolHolder p;
416 0 : if( ! g_pThreadpoolHashSet )
417 : {
418 0 : g_pThreadpoolHashSet = new ThreadpoolHashSet();
419 0 : p = new ThreadPool;
420 : }
421 : else
422 : {
423 : assert( !g_pThreadpoolHashSet->empty() );
424 0 : p = g_pThreadpoolHashSet->begin()->second;
425 : }
426 :
427 : // Just ensure that the handle is unique in the process (via heap)
428 0 : uno_ThreadPool h = new struct _uno_ThreadPool;
429 0 : g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
430 0 : return h;
431 : }
432 :
433 : extern "C" void SAL_CALL
434 0 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
435 : {
436 0 : sal_Sequence *pThreadId = 0;
437 0 : uno_getIdOfCurrentThread( &pThreadId );
438 0 : getThreadPool( hPool )->prepare( pThreadId );
439 0 : rtl_byte_sequence_release( pThreadId );
440 0 : uno_releaseIdFromCurrentThread();
441 0 : }
442 :
443 : extern "C" void SAL_CALL
444 0 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
445 : SAL_THROW_EXTERN_C()
446 : {
447 0 : sal_Sequence *pThreadId = 0;
448 0 : uno_getIdOfCurrentThread( &pThreadId );
449 : *ppJob =
450 : getThreadPool( hPool )->enter(
451 : pThreadId,
452 : sal::static_int_cast< sal_Int64 >(
453 0 : reinterpret_cast< sal_IntPtr >(hPool)) );
454 0 : rtl_byte_sequence_release( pThreadId );
455 0 : uno_releaseIdFromCurrentThread();
456 0 : }
457 :
458 : extern "C" void SAL_CALL
459 0 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
460 : {
461 : // we might do here some tiding up in case a thread called attach but never detach
462 0 : }
463 :
464 : extern "C" void SAL_CALL
465 0 : uno_threadpool_putJob(
466 : uno_ThreadPool hPool,
467 : sal_Sequence *pThreadId,
468 : void *pJob,
469 : void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
470 : sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
471 : {
472 0 : getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
473 0 : }
474 :
475 : extern "C" void SAL_CALL
476 0 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
477 : {
478 : getThreadPool(hPool)->dispose(
479 : sal::static_int_cast< sal_Int64 >(
480 0 : reinterpret_cast< sal_IntPtr >(hPool)) );
481 0 : }
482 :
483 : extern "C" void SAL_CALL
484 0 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
485 : {
486 0 : ThreadPoolHolder p( getThreadPool(hPool) );
487 : p->destroy(
488 : sal::static_int_cast< sal_Int64 >(
489 0 : reinterpret_cast< sal_IntPtr >(hPool)) );
490 :
491 : bool empty;
492 : {
493 : OSL_ASSERT( g_pThreadpoolHashSet );
494 :
495 0 : MutexGuard guard( Mutex::getGlobalMutex() );
496 :
497 0 : ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
498 : OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
499 0 : g_pThreadpoolHashSet->erase( ii );
500 0 : delete hPool;
501 :
502 0 : empty = g_pThreadpoolHashSet->empty();
503 0 : if( empty )
504 : {
505 0 : delete g_pThreadpoolHashSet;
506 0 : g_pThreadpoolHashSet = 0;
507 0 : }
508 : }
509 :
510 0 : if( empty )
511 : {
512 0 : p->joinWorkers();
513 0 : }
514 0 : }
515 :
516 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|