Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : */
9 :
10 : #include <comphelper/threadpool.hxx>
11 :
12 : #include <rtl/instance.hxx>
13 : #include <boost/shared_ptr.hpp>
14 : #include <thread>
15 : #include <algorithm>
16 :
17 : namespace comphelper {
18 :
19 3098 : class ThreadPool::ThreadWorker : public salhelper::Thread
20 : {
21 : ThreadPool *mpPool;
22 : osl::Condition maNewWork;
23 : bool mbWorking;
24 : public:
25 :
26 1549 : explicit ThreadWorker( ThreadPool *pPool ) :
27 : salhelper::Thread("thread-pool"),
28 : mpPool( pPool ),
29 1549 : mbWorking( false )
30 : {
31 1549 : }
32 :
33 1549 : virtual void execute() SAL_OVERRIDE
34 : {
35 : ThreadTask *pTask;
36 6480 : while ( ( pTask = waitForWork() ) )
37 : {
38 3379 : pTask->doWork();
39 3381 : delete pTask;
40 : }
41 1549 : }
42 :
43 4931 : ThreadTask *waitForWork()
44 : {
45 4931 : ThreadTask *pRet = NULL;
46 :
47 4931 : osl::ResettableMutexGuard aGuard( mpPool->maGuard );
48 :
49 4931 : pRet = mpPool->popWork();
50 :
51 49322 : while( !pRet )
52 : {
53 41009 : if (mbWorking)
54 2224 : mpPool->stopWork();
55 41009 : mbWorking = false;
56 41009 : maNewWork.reset();
57 :
58 41009 : if( mpPool->mbTerminate )
59 1549 : break;
60 :
61 39460 : aGuard.clear(); // unlock
62 :
63 39455 : maNewWork.wait();
64 :
65 39442 : aGuard.reset(); // lock
66 :
67 39460 : pRet = mpPool->popWork();
68 : }
69 :
70 4931 : if (pRet)
71 : {
72 3382 : if (!mbWorking)
73 2224 : mpPool->startWork();
74 3382 : mbWorking = true;
75 : }
76 :
77 4931 : return pRet;
78 : }
79 :
80 : // Why a condition per worker thread - you may ask.
81 : //
82 : // Unfortunately the Windows synchronisation API that we wrap
83 : // is horribly inadequate cf.
84 : // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
85 : // The existing osl::Condition API should only ever be used
86 : // between one producer and one consumer thread to avoid the
87 : // lost wakeup problem.
88 :
89 101792 : void signalNewWork()
90 : {
91 101792 : maNewWork.set();
92 101792 : }
93 : };
94 :
95 185 : ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
96 : mnThreadsWorking( 0 ),
97 185 : mbTerminate( false )
98 : {
99 1734 : for( sal_Int32 i = 0; i < nWorkers; i++ )
100 1549 : maWorkers.push_back( new ThreadWorker( this ) );
101 :
102 185 : maTasksComplete.set();
103 :
104 185 : osl::MutexGuard aGuard( maGuard );
105 1734 : for( size_t i = 0; i < maWorkers.size(); i++ )
106 1734 : maWorkers[ i ]->launch();
107 185 : }
108 :
109 410 : ThreadPool::~ThreadPool()
110 : {
111 185 : waitAndCleanupWorkers();
112 225 : }
113 :
114 : struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >,
115 : ThreadPoolStatic >
116 : {
117 40 : boost::shared_ptr< ThreadPool > operator () () {
118 40 : sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U );
119 40 : return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) );
120 : };
121 : };
122 :
123 931 : ThreadPool& ThreadPool::getSharedOptimalPool()
124 : {
125 931 : return *ThreadPoolStatic::get().get();
126 : }
127 :
128 185 : void ThreadPool::waitAndCleanupWorkers()
129 : {
130 185 : waitUntilEmpty();
131 :
132 185 : osl::ResettableMutexGuard aGuard( maGuard );
133 185 : mbTerminate = true;
134 :
135 1919 : while( !maWorkers.empty() )
136 : {
137 1549 : rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
138 1549 : maWorkers.pop_back();
139 : assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
140 : == maWorkers.end());
141 1549 : xWorker->signalNewWork();
142 1549 : aGuard.clear();
143 : { // unlocked
144 1549 : xWorker->join();
145 1549 : xWorker.clear();
146 : }
147 1549 : aGuard.reset();
148 1734 : }
149 185 : }
150 :
151 3382 : void ThreadPool::pushTask( ThreadTask *pTask )
152 : {
153 3382 : osl::MutexGuard aGuard( maGuard );
154 3382 : maTasks.insert( maTasks.begin(), pTask );
155 :
156 : // horrible beyond belief:
157 103625 : for( size_t i = 0; i < maWorkers.size(); i++ )
158 100243 : maWorkers[ i ]->signalNewWork();
159 3382 : maTasksComplete.reset();
160 3382 : }
161 :
162 44391 : ThreadTask *ThreadPool::popWork()
163 : {
164 44391 : if( !maTasks.empty() )
165 : {
166 3382 : ThreadTask *pTask = maTasks.back();
167 3382 : maTasks.pop_back();
168 3382 : return pTask;
169 : }
170 : else
171 41009 : return NULL;
172 : }
173 :
174 2224 : void ThreadPool::startWork()
175 : {
176 2224 : mnThreadsWorking++;
177 2224 : }
178 :
179 2224 : void ThreadPool::stopWork()
180 : {
181 : assert( mnThreadsWorking > 0 );
182 2224 : if ( --mnThreadsWorking == 0 )
183 1639 : maTasksComplete.set();
184 2224 : }
185 :
186 1261 : void ThreadPool::waitUntilEmpty()
187 : {
188 1261 : osl::ResettableMutexGuard aGuard( maGuard );
189 :
190 1261 : if( maWorkers.empty() )
191 : { // no threads at all -> execute the work in-line
192 : ThreadTask *pTask;
193 0 : while ( ( pTask = popWork() ) )
194 : {
195 0 : pTask->doWork();
196 0 : delete pTask;
197 : }
198 : }
199 : else
200 : {
201 1261 : aGuard.clear();
202 1261 : maTasksComplete.wait();
203 1261 : aGuard.reset();
204 : }
205 1261 : assert( maTasks.empty() );
206 1261 : }
207 :
208 : } // namespace comphelper
209 :
210 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|