Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 : #include "sal/config.h"
21 :
22 : #include <cassert>
23 : #include <unordered_map>
24 :
25 : #include <osl/diagnose.h>
26 : #include <osl/mutex.hxx>
27 : #include <osl/thread.h>
28 : #include <rtl/instance.hxx>
29 : #include <sal/log.hxx>
30 :
31 : #include <uno/threadpool.h>
32 :
33 : #include "threadpool.hxx"
34 : #include "thread.hxx"
35 :
36 : using namespace ::std;
37 : using namespace ::osl;
38 : using namespace ::rtl;
39 :
40 : namespace cppu_threadpool
41 : {
42 : struct theDisposedCallerAdmin :
43 : public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
44 : {
45 65 : DisposedCallerAdminHolder operator () () {
46 65 : return DisposedCallerAdminHolder(new DisposedCallerAdmin());
47 : }
48 : };
49 :
50 232210 : DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
51 : {
52 232210 : return theDisposedCallerAdmin::get();
53 : }
54 :
55 30 : DisposedCallerAdmin::~DisposedCallerAdmin()
56 : {
57 : SAL_WARN_IF( !m_lst.empty(), "cppu.threadpool", "DisposedCallerList : " << m_lst.size() << " left\n");
58 30 : }
59 :
60 66 : void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
61 : {
62 66 : MutexGuard guard( m_mutex );
63 66 : m_lst.push_back( nDisposeId );
64 66 : }
65 :
66 66 : void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
67 : {
68 66 : MutexGuard guard( m_mutex );
69 198 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
70 132 : ii != m_lst.end() ;
71 : ++ ii )
72 : {
73 66 : if( (*ii) == nDisposeId )
74 : {
75 66 : m_lst.erase( ii );
76 66 : break;
77 : }
78 66 : }
79 66 : }
80 :
81 239311 : bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
82 : {
83 239311 : MutexGuard guard( m_mutex );
84 718023 : for( DisposedCallerList::iterator ii = m_lst.begin() ;
85 478682 : ii != m_lst.end() ;
86 : ++ ii )
87 : {
88 0 : if( (*ii) == nDisposeId )
89 : {
90 0 : return true;
91 : }
92 : }
93 239341 : return false;
94 : }
95 :
96 :
97 :
98 :
99 65 : ThreadPool::ThreadPool()
100 : {
101 65 : m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
102 65 : }
103 :
104 130 : ThreadPool::~ThreadPool()
105 : {
106 : SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left\n");
107 130 : }
108 :
109 66 : void ThreadPool::dispose( sal_Int64 nDisposeId )
110 : {
111 66 : m_DisposedCallerAdmin->dispose( nDisposeId );
112 :
113 66 : MutexGuard guard( m_mutex );
114 327 : for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
115 218 : ii != m_mapQueue.end();
116 : ++ii)
117 : {
118 43 : if( (*ii).second.first )
119 : {
120 42 : (*ii).second.first->dispose( nDisposeId );
121 : }
122 43 : if( (*ii).second.second )
123 : {
124 1 : (*ii).second.second->dispose( nDisposeId );
125 : }
126 66 : }
127 66 : }
128 :
129 66 : void ThreadPool::destroy( sal_Int64 nDisposeId )
130 : {
131 66 : m_DisposedCallerAdmin->destroy( nDisposeId );
132 66 : }
133 :
134 : /******************
135 : * This methods lets the thread wait a certain amount of time. If within this timespan
136 : * a new request comes in, this thread is reused. This is done only to improve performance,
137 : * it is not required for threadpool functionality.
138 : ******************/
139 231989 : void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
140 : {
141 231989 : struct WaitingThread waitingThread;
142 231989 : waitingThread.condition = osl_createCondition();
143 231989 : waitingThread.thread = pThread;
144 : {
145 231989 : MutexGuard guard( m_mutexWaitingThreadList );
146 231989 : m_lstThreads.push_front( &waitingThread );
147 : }
148 :
149 : // let the thread wait 2 seconds
150 231989 : TimeValue time = { 2 , 0 };
151 231989 : osl_waitCondition( waitingThread.condition , &time );
152 :
153 : {
154 231932 : MutexGuard guard ( m_mutexWaitingThreadList );
155 231989 : if( waitingThread.thread.is() )
156 : {
157 : // thread wasn't reused, remove it from the list
158 : WaitingThreadList::iterator ii = find(
159 907 : m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
160 : OSL_ASSERT( ii != m_lstThreads.end() );
161 907 : m_lstThreads.erase( ii );
162 231989 : }
163 : }
164 :
165 231989 : osl_destroyCondition( waitingThread.condition );
166 231989 : }
167 :
168 65 : void ThreadPool::joinWorkers()
169 : {
170 : {
171 65 : MutexGuard guard( m_mutexWaitingThreadList );
172 576 : for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
173 384 : ii != m_lstThreads.end() ;
174 : ++ ii )
175 : {
176 : // wake the threads up
177 127 : osl_setCondition( (*ii)->condition );
178 65 : }
179 : }
180 65 : m_aThreadAdmin.join();
181 65 : }
182 :
183 231989 : bool ThreadPool::createThread( JobQueue *pQueue ,
184 : const ByteSequence &aThreadId,
185 : bool bAsynchron )
186 : {
187 : {
188 : // Can a thread be reused ?
189 231989 : MutexGuard guard( m_mutexWaitingThreadList );
190 231989 : if( ! m_lstThreads.empty() )
191 : {
192 : // inform the thread and let it go
193 231082 : struct WaitingThread *pWaitingThread = m_lstThreads.back();
194 231082 : pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
195 231082 : pWaitingThread->thread = 0;
196 :
197 : // remove from list
198 231082 : m_lstThreads.pop_back();
199 :
200 : // let the thread go
201 231082 : osl_setCondition( pWaitingThread->condition );
202 231082 : return true;
203 907 : }
204 : }
205 :
206 : rtl::Reference< ORequestThread > pThread(
207 907 : new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
208 907 : return pThread->launch();
209 : }
210 :
211 232239 : bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
212 : {
213 232239 : MutexGuard guard( m_mutex );
214 :
215 232239 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
216 : OSL_ASSERT( ii != m_mapQueue.end() );
217 :
218 232239 : if( bAsynchron )
219 : {
220 32081 : if( ! (*ii).second.second->isEmpty() )
221 : {
222 : // another thread has put something into the queue
223 125 : return false;
224 : }
225 :
226 31956 : (*ii).second.second = 0;
227 31956 : if( (*ii).second.first )
228 : {
229 : // all oneway request have been processed, now
230 : // synchronus requests may go on
231 57 : (*ii).second.first->resume();
232 : }
233 : }
234 : else
235 : {
236 200158 : if( ! (*ii).second.first->isEmpty() )
237 : {
238 : // another thread has put something into the queue
239 7 : return false;
240 : }
241 200151 : (*ii).second.first = 0;
242 : }
243 :
244 232107 : if( 0 == (*ii).second.first && 0 == (*ii).second.second )
245 : {
246 232050 : m_mapQueue.erase( ii );
247 : }
248 :
249 232107 : return true;
250 : }
251 :
252 :
253 266747 : bool ThreadPool::addJob(
254 : const ByteSequence &aThreadId ,
255 : bool bAsynchron,
256 : void *pThreadSpecificData,
257 : RequestFun * doRequest )
258 : {
259 266747 : bool bCreateThread = false;
260 266747 : JobQueue *pQueue = 0;
261 : {
262 266747 : MutexGuard guard( m_mutex );
263 :
264 266747 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
265 :
266 266747 : if( ii == m_mapQueue.end() )
267 : {
268 231933 : m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( nullptr , nullptr );
269 231933 : ii = m_mapQueue.find( aThreadId );
270 : OSL_ASSERT( ii != m_mapQueue.end() );
271 : }
272 :
273 266747 : if( bAsynchron )
274 : {
275 52266 : if( ! (*ii).second.second )
276 : {
277 31956 : (*ii).second.second = new JobQueue();
278 31956 : bCreateThread = true;
279 : }
280 52266 : pQueue = (*ii).second.second;
281 : }
282 : else
283 : {
284 214481 : if( ! (*ii).second.first )
285 : {
286 200033 : (*ii).second.first = new JobQueue();
287 200033 : bCreateThread = true;
288 : }
289 214481 : pQueue = (*ii).second.first;
290 :
291 214481 : if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
292 : {
293 13 : pQueue->suspend();
294 : }
295 : }
296 266747 : pQueue->add( pThreadSpecificData , doRequest );
297 : }
298 :
299 266747 : return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
300 : }
301 :
302 11879 : void ThreadPool::prepare( const ByteSequence &aThreadId )
303 : {
304 11879 : MutexGuard guard( m_mutex );
305 :
306 11879 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
307 :
308 11879 : if( ii == m_mapQueue.end() )
309 : {
310 155 : JobQueue *p = new JobQueue();
311 155 : m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , nullptr );
312 : }
313 11724 : else if( 0 == (*ii).second.first )
314 : {
315 1 : (*ii).second.first = new JobQueue();
316 11879 : }
317 11879 : }
318 :
319 7140 : void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
320 : {
321 7140 : JobQueue *pQueue = 0;
322 : {
323 7140 : MutexGuard guard( m_mutex );
324 :
325 7140 : ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
326 :
327 : OSL_ASSERT( ii != m_mapQueue.end() );
328 7140 : pQueue = (*ii).second.first;
329 : }
330 :
331 : OSL_ASSERT( pQueue );
332 7140 : void *pReturn = pQueue->enter( nDisposeId );
333 :
334 7140 : if( pQueue->isCallstackEmpty() )
335 : {
336 118 : if( revokeQueue( aThreadId , false) )
337 : {
338 : // remove queue
339 118 : delete pQueue;
340 : }
341 : }
342 7140 : return pReturn;
343 : }
344 : }
345 :
346 : // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
347 : // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
348 : // (within the last uno_threadpool_destroy) all worker threads spawned by that
349 : // ThreadPool instance are joined (which implies that uno_threadpool_destroy
350 : // must never be called from a worker thread); afterwards, the next call to
351 : // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
352 :
353 : using namespace cppu_threadpool;
354 :
355 : struct uno_ThreadPool_Equal
356 : {
357 285964 : bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
358 : {
359 285964 : return a == b;
360 : }
361 : };
362 :
363 : struct uno_ThreadPool_Hash
364 : {
365 286030 : sal_Size operator () ( const uno_ThreadPool &a ) const
366 : {
367 286030 : return reinterpret_cast<sal_Size>( a );
368 : }
369 : };
370 :
371 :
372 :
373 : typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
374 :
375 : static ThreadpoolHashSet *g_pThreadpoolHashSet;
376 :
377 : struct _uno_ThreadPool
378 : {
379 : sal_Int32 dummy;
380 : };
381 :
382 : namespace {
383 :
384 285898 : ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
385 : {
386 285898 : MutexGuard guard( Mutex::getGlobalMutex() );
387 : assert( g_pThreadpoolHashSet != 0 );
388 285898 : ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
389 : assert( i != g_pThreadpoolHashSet->end() );
390 285898 : return i->second;
391 : }
392 :
393 : }
394 :
395 : extern "C" uno_ThreadPool SAL_CALL
396 66 : uno_threadpool_create() SAL_THROW_EXTERN_C()
397 : {
398 66 : MutexGuard guard( Mutex::getGlobalMutex() );
399 132 : ThreadPoolHolder p;
400 66 : if( ! g_pThreadpoolHashSet )
401 : {
402 65 : g_pThreadpoolHashSet = new ThreadpoolHashSet();
403 65 : p = new ThreadPool;
404 : }
405 : else
406 : {
407 : assert( !g_pThreadpoolHashSet->empty() );
408 1 : p = g_pThreadpoolHashSet->begin()->second;
409 : }
410 :
411 : // Just ensure that the handle is unique in the process (via heap)
412 66 : uno_ThreadPool h = new struct _uno_ThreadPool;
413 66 : g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
414 132 : return h;
415 : }
416 :
417 : extern "C" void SAL_CALL
418 11879 : uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
419 : {
420 11879 : sal_Sequence *pThreadId = 0;
421 11879 : uno_getIdOfCurrentThread( &pThreadId );
422 11879 : getThreadPool( hPool )->prepare( pThreadId );
423 11879 : rtl_byte_sequence_release( pThreadId );
424 11879 : uno_releaseIdFromCurrentThread();
425 11879 : }
426 :
427 : extern "C" void SAL_CALL
428 7140 : uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
429 : SAL_THROW_EXTERN_C()
430 : {
431 7140 : sal_Sequence *pThreadId = 0;
432 7140 : uno_getIdOfCurrentThread( &pThreadId );
433 : *ppJob =
434 : getThreadPool( hPool )->enter(
435 : pThreadId,
436 : sal::static_int_cast< sal_Int64 >(
437 7140 : reinterpret_cast< sal_IntPtr >(hPool)) );
438 7140 : rtl_byte_sequence_release( pThreadId );
439 7140 : uno_releaseIdFromCurrentThread();
440 7140 : }
441 :
442 : extern "C" void SAL_CALL
443 11879 : uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
444 : {
445 : // we might do here some tiding up in case a thread called attach but never detach
446 11879 : }
447 :
448 : extern "C" void SAL_CALL
449 266747 : uno_threadpool_putJob(
450 : uno_ThreadPool hPool,
451 : sal_Sequence *pThreadId,
452 : void *pJob,
453 : void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
454 : sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
455 : {
456 266747 : if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest ))
457 : {
458 : SAL_WARN(
459 : "cppu",
460 : "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
461 : }
462 266747 : }
463 :
464 : extern "C" void SAL_CALL
465 66 : uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
466 : {
467 : getThreadPool(hPool)->dispose(
468 : sal::static_int_cast< sal_Int64 >(
469 66 : reinterpret_cast< sal_IntPtr >(hPool)) );
470 66 : }
471 :
472 : extern "C" void SAL_CALL
473 66 : uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
474 : {
475 66 : ThreadPoolHolder p( getThreadPool(hPool) );
476 : p->destroy(
477 : sal::static_int_cast< sal_Int64 >(
478 66 : reinterpret_cast< sal_IntPtr >(hPool)) );
479 :
480 : bool empty;
481 : {
482 : OSL_ASSERT( g_pThreadpoolHashSet );
483 :
484 66 : MutexGuard guard( Mutex::getGlobalMutex() );
485 :
486 66 : ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
487 : OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
488 66 : g_pThreadpoolHashSet->erase( ii );
489 66 : delete hPool;
490 :
491 66 : empty = g_pThreadpoolHashSet->empty();
492 66 : if( empty )
493 : {
494 65 : delete g_pThreadpoolHashSet;
495 65 : g_pThreadpoolHashSet = 0;
496 66 : }
497 : }
498 :
499 66 : if( empty )
500 : {
501 65 : p->joinWorkers();
502 66 : }
503 66 : }
504 :
505 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|