Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : */
9 :
10 : #include <comphelper/threadpool.hxx>
11 :
12 : #include <rtl/instance.hxx>
13 : #include <boost/shared_ptr.hpp>
14 : #include <thread>
15 : #include <algorithm>
16 :
17 : namespace comphelper {
18 :
19 520 : class ThreadPool::ThreadWorker : public salhelper::Thread
20 : {
21 : ThreadPool *mpPool;
22 : osl::Condition maNewWork;
23 : bool mbWorking;
24 : public:
25 :
26 260 : ThreadWorker( ThreadPool *pPool ) :
27 : salhelper::Thread("thread-pool"),
28 : mpPool( pPool ),
29 260 : mbWorking( false )
30 : {
31 260 : }
32 :
33 260 : virtual void execute() SAL_OVERRIDE
34 : {
35 : ThreadTask *pTask;
36 780 : while ( ( pTask = waitForWork() ) )
37 : {
38 260 : pTask->doWork();
39 260 : delete pTask;
40 : }
41 260 : }
42 :
43 520 : ThreadTask *waitForWork()
44 : {
45 520 : ThreadTask *pRet = NULL;
46 :
47 520 : osl::ResettableMutexGuard aGuard( mpPool->maGuard );
48 :
49 520 : pRet = mpPool->popWork();
50 :
51 1301 : while( !pRet )
52 : {
53 521 : if (mbWorking)
54 246 : mpPool->stopWork();
55 521 : mbWorking = false;
56 521 : maNewWork.reset();
57 :
58 521 : if( mpPool->mbTerminate )
59 260 : break;
60 :
61 261 : aGuard.clear(); // unlock
62 :
63 261 : maNewWork.wait();
64 :
65 261 : aGuard.reset(); // lock
66 :
67 261 : pRet = mpPool->popWork();
68 : }
69 :
70 520 : if (pRet)
71 : {
72 260 : if (!mbWorking)
73 246 : mpPool->startWork();
74 260 : mbWorking = true;
75 : }
76 :
77 520 : return pRet;
78 : }
79 :
80 : // Why a condition per worker thread - you may ask.
81 : //
82 : // Unfortunately the Windows synchronisation API that we wrap
83 : // is horribly inadequate cf.
84 : // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
85 : // The existing osl::Condition API should only ever be used
86 : // between one producer and one consumer thread to avoid the
87 : // lost wakeup problem.
88 :
89 880 : void signalNewWork()
90 : {
91 880 : maNewWork.set();
92 880 : }
93 : };
94 :
95 128 : ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
96 : mnThreadsWorking( 0 ),
97 128 : mbTerminate( false )
98 : {
99 388 : for( sal_Int32 i = 0; i < nWorkers; i++ )
100 260 : maWorkers.push_back( new ThreadWorker( this ) );
101 :
102 128 : maTasksComplete.reset();
103 :
104 128 : osl::MutexGuard aGuard( maGuard );
105 388 : for( size_t i = 0; i < maWorkers.size(); i++ )
106 388 : maWorkers[ i ]->launch();
107 128 : }
108 :
109 256 : ThreadPool::~ThreadPool()
110 : {
111 128 : waitAndCleanupWorkers();
112 128 : }
113 :
114 : struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >,
115 : ThreadPoolStatic >
116 : {
117 0 : boost::shared_ptr< ThreadPool > operator () () {
118 0 : sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U );
119 0 : return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) );
120 : };
121 : };
122 :
123 0 : ThreadPool& ThreadPool::getSharedOptimalPool()
124 : {
125 0 : return *ThreadPoolStatic::get().get();
126 : }
127 :
128 128 : void ThreadPool::waitAndCleanupWorkers()
129 : {
130 128 : waitUntilEmpty();
131 :
132 128 : osl::ResettableMutexGuard aGuard( maGuard );
133 128 : mbTerminate = true;
134 :
135 516 : while( !maWorkers.empty() )
136 : {
137 260 : rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
138 260 : maWorkers.pop_back();
139 : assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
140 : == maWorkers.end());
141 260 : xWorker->signalNewWork();
142 260 : aGuard.clear();
143 : { // unlocked
144 260 : xWorker->join();
145 260 : xWorker.clear();
146 : }
147 260 : aGuard.reset();
148 388 : }
149 128 : }
150 :
151 260 : void ThreadPool::pushTask( ThreadTask *pTask )
152 : {
153 260 : osl::MutexGuard aGuard( maGuard );
154 260 : maTasks.insert( maTasks.begin(), pTask );
155 :
156 : // horrible beyond belief:
157 880 : for( size_t i = 0; i < maWorkers.size(); i++ )
158 620 : maWorkers[ i ]->signalNewWork();
159 260 : maTasksComplete.reset();
160 260 : }
161 :
162 781 : ThreadTask *ThreadPool::popWork()
163 : {
164 781 : if( !maTasks.empty() )
165 : {
166 260 : ThreadTask *pTask = maTasks.back();
167 260 : maTasks.pop_back();
168 260 : return pTask;
169 : }
170 : else
171 521 : return NULL;
172 : }
173 :
174 246 : void ThreadPool::startWork()
175 : {
176 246 : mnThreadsWorking++;
177 246 : }
178 :
179 246 : void ThreadPool::stopWork()
180 : {
181 : assert( mnThreadsWorking > 0 );
182 246 : if ( --mnThreadsWorking == 0 )
183 128 : maTasksComplete.set();
184 246 : }
185 :
186 256 : void ThreadPool::waitUntilEmpty()
187 : {
188 256 : osl::ResettableMutexGuard aGuard( maGuard );
189 :
190 256 : if( maWorkers.empty() )
191 : { // no threads at all -> execute the work in-line
192 : ThreadTask *pTask;
193 0 : while ( ( pTask = popWork() ) )
194 : {
195 0 : pTask->doWork();
196 0 : delete pTask;
197 : }
198 : }
199 : else
200 : {
201 256 : aGuard.clear();
202 256 : maTasksComplete.wait();
203 256 : aGuard.reset();
204 : }
205 256 : assert( maTasks.empty() );
206 256 : }
207 :
208 : } // namespace comphelper
209 :
210 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|