Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 :
21 : // streams
22 : #include <com/sun/star/io/XPipe.hpp>
23 : #include <com/sun/star/io/XInputStream.hpp>
24 : #include <com/sun/star/io/XOutputStream.hpp>
25 : #include <com/sun/star/io/XConnectable.hpp>
26 :
27 : #include <com/sun/star/lang/XServiceInfo.hpp>
28 :
29 : #include <cppuhelper/factory.hxx>
30 : #include <cppuhelper/implbase3.hxx>
31 : #include <cppuhelper/supportsservice.hxx>
32 :
33 : #include <osl/conditn.hxx>
34 : #include <osl/mutex.hxx>
35 :
36 : #include <limits>
37 : #include <string.h>
38 :
39 : using namespace ::osl;
40 : using namespace ::cppu;
41 : using namespace ::com::sun::star::uno;
42 : using namespace ::com::sun::star::io;
43 : using namespace ::com::sun::star::lang;
44 :
45 : #include "services.hxx"
46 : #include "streamhelper.hxx"
47 :
48 : // Implementation and service names
49 : #define IMPLEMENTATION_NAME "com.sun.star.comp.io.stm.Pipe"
50 : #define SERVICE_NAME "com.sun.star.io.Pipe"
51 :
52 : namespace io_stm{
53 :
54 : class OPipeImpl :
55 : public WeakImplHelper3< XPipe , XConnectable , XServiceInfo >
56 : {
57 : public:
58 : OPipeImpl( );
59 : virtual ~OPipeImpl();
60 :
61 : public: // XInputStream
62 : virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
63 : throw( NotConnectedException,
64 : BufferSizeExceededException,
65 : RuntimeException, std::exception ) SAL_OVERRIDE;
66 : virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
67 : throw( NotConnectedException,
68 : BufferSizeExceededException,
69 : RuntimeException, std::exception ) SAL_OVERRIDE;
70 : virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip)
71 : throw( NotConnectedException,
72 : BufferSizeExceededException,
73 : RuntimeException, std::exception ) SAL_OVERRIDE;
74 : virtual sal_Int32 SAL_CALL available(void)
75 : throw( NotConnectedException,
76 : RuntimeException, std::exception ) SAL_OVERRIDE;
77 : virtual void SAL_CALL closeInput(void)
78 : throw( NotConnectedException,
79 : RuntimeException, std::exception ) SAL_OVERRIDE;
80 :
81 : public: // XOutputStream
82 :
83 : virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData)
84 : throw( NotConnectedException,
85 : BufferSizeExceededException,
86 : RuntimeException, std::exception ) SAL_OVERRIDE;
87 : virtual void SAL_CALL flush(void)
88 : throw( NotConnectedException,
89 : BufferSizeExceededException,
90 : RuntimeException, std::exception ) SAL_OVERRIDE;
91 : virtual void SAL_CALL closeOutput(void)
92 : throw( NotConnectedException,
93 : BufferSizeExceededException,
94 : RuntimeException, std::exception ) SAL_OVERRIDE;
95 :
96 : public: // XConnectable
97 : virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor)
98 : throw( RuntimeException, std::exception ) SAL_OVERRIDE;
99 : virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException, std::exception ) SAL_OVERRIDE;
100 : virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor)
101 : throw( RuntimeException, std::exception ) SAL_OVERRIDE;
102 : virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException, std::exception ) SAL_OVERRIDE ;
103 :
104 :
105 : public: // XServiceInfo
106 : OUString SAL_CALL getImplementationName() throw(std::exception ) SAL_OVERRIDE;
107 : Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw(std::exception ) SAL_OVERRIDE;
108 : sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw(std::exception ) SAL_OVERRIDE;
109 :
110 : private:
111 :
112 : Reference < XConnectable > m_succ;
113 : Reference < XConnectable > m_pred;
114 :
115 : sal_Int32 m_nBytesToSkip;
116 :
117 : bool m_bOutputStreamClosed;
118 : bool m_bInputStreamClosed;
119 :
120 : oslCondition m_conditionBytesAvail;
121 : Mutex m_mutexAccess;
122 : I_FIFO *m_pFIFO;
123 : };
124 :
125 :
126 :
127 88 : OPipeImpl::OPipeImpl()
128 : {
129 88 : m_nBytesToSkip = 0;
130 :
131 88 : m_bOutputStreamClosed = false;
132 88 : m_bInputStreamClosed = false;
133 :
134 88 : m_pFIFO = new MemFIFO;
135 88 : m_conditionBytesAvail = osl_createCondition();
136 88 : }
137 :
138 18 : OPipeImpl::~OPipeImpl()
139 : {
140 6 : osl_destroyCondition( m_conditionBytesAvail );
141 6 : delete m_pFIFO;
142 12 : }
143 :
144 :
145 3614 : sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
146 : throw( NotConnectedException, BufferSizeExceededException,RuntimeException, std::exception )
147 : {
148 : while( true )
149 : {
150 : { // start guarded section
151 3614 : MutexGuard guard( m_mutexAccess );
152 3614 : if( m_bInputStreamClosed )
153 : {
154 : throw NotConnectedException(
155 : "Pipe::readBytes NotConnectedException",
156 0 : *this );
157 : }
158 3614 : sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
159 :
160 3614 : if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
161 : {
162 0 : nBytesToRead = nOccupiedBufferLen;
163 : }
164 :
165 3614 : if( nOccupiedBufferLen < nBytesToRead )
166 : {
167 : // wait outside guarded section
168 0 : osl_resetCondition( m_conditionBytesAvail );
169 : }
170 : else {
171 : // necessary bytes are available
172 3614 : m_pFIFO->read( aData , nBytesToRead );
173 7228 : return nBytesToRead;
174 0 : }
175 : } // end guarded section
176 :
177 : // wait for new data outside guarded section!
178 0 : osl_waitCondition( m_conditionBytesAvail , 0 );
179 0 : }
180 : }
181 :
182 :
183 8 : sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
184 : throw( NotConnectedException,
185 : BufferSizeExceededException,
186 : RuntimeException, std::exception )
187 : {
188 : while( true ) {
189 : {
190 8 : MutexGuard guard( m_mutexAccess );
191 8 : if( m_bInputStreamClosed )
192 : {
193 : throw NotConnectedException(
194 : "Pipe::readSomeBytes NotConnectedException",
195 0 : *this );
196 : }
197 8 : if( m_pFIFO->getSize() )
198 : {
199 4 : sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() );
200 4 : aData.realloc( nSize );
201 4 : m_pFIFO->read( aData , nSize );
202 4 : return nSize;
203 : }
204 :
205 4 : if( m_bOutputStreamClosed )
206 : {
207 : // no bytes in buffer anymore
208 4 : return 0;
209 0 : }
210 : }
211 :
212 0 : osl_waitCondition( m_conditionBytesAvail , 0 );
213 0 : }
214 : }
215 :
216 :
217 0 : void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
218 : throw( NotConnectedException,
219 : BufferSizeExceededException,
220 : RuntimeException, std::exception )
221 : {
222 0 : MutexGuard guard( m_mutexAccess );
223 0 : if( m_bInputStreamClosed )
224 : {
225 : throw NotConnectedException(
226 : "Pipe::skipBytes NotConnectedException",
227 0 : *this );
228 : }
229 :
230 0 : if( nBytesToSkip < 0
231 0 : || (nBytesToSkip
232 0 : > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
233 : {
234 : throw BufferSizeExceededException(
235 : "Pipe::skipBytes BufferSizeExceededException",
236 0 : *this );
237 : }
238 0 : m_nBytesToSkip += nBytesToSkip;
239 :
240 0 : nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip );
241 0 : m_pFIFO->skip( nBytesToSkip );
242 0 : m_nBytesToSkip -= nBytesToSkip;
243 0 : }
244 :
245 :
246 4 : sal_Int32 OPipeImpl::available(void)
247 : throw( NotConnectedException,
248 : RuntimeException, std::exception )
249 : {
250 4 : MutexGuard guard( m_mutexAccess );
251 4 : if( m_bInputStreamClosed )
252 : {
253 : throw NotConnectedException(
254 : "Pipe::available NotConnectedException",
255 0 : *this );
256 : }
257 4 : return m_pFIFO->getSize();
258 : }
259 :
260 0 : void OPipeImpl::closeInput(void)
261 : throw( NotConnectedException,
262 : RuntimeException, std::exception)
263 : {
264 0 : MutexGuard guard( m_mutexAccess );
265 :
266 0 : m_bInputStreamClosed = true;
267 :
268 0 : delete m_pFIFO;
269 0 : m_pFIFO = 0;
270 :
271 : // readBytes may throw an exception
272 0 : osl_setCondition( m_conditionBytesAvail );
273 :
274 0 : setSuccessor( Reference< XConnectable > () );
275 0 : return;
276 : }
277 :
278 :
279 1510 : void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
280 : throw( NotConnectedException,
281 : BufferSizeExceededException,
282 : RuntimeException, std::exception)
283 : {
284 1510 : MutexGuard guard( m_mutexAccess );
285 :
286 1510 : if( m_bOutputStreamClosed )
287 : {
288 : throw NotConnectedException(
289 : "Pipe::writeBytes NotConnectedException (outputstream)",
290 0 : *this );
291 : }
292 :
293 1510 : if( m_bInputStreamClosed )
294 : {
295 : throw NotConnectedException(
296 : "Pipe::writeBytes NotConnectedException (inputstream)",
297 0 : *this );
298 : }
299 :
300 : // check skipping
301 1510 : sal_Int32 nLen = aData.getLength();
302 1510 : if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) {
303 : // all must be skipped - forget whole call
304 0 : m_nBytesToSkip -= nLen;
305 1510 : return;
306 : }
307 :
308 : // adjust buffersize if necessary
309 :
310 : try
311 : {
312 1510 : if( m_nBytesToSkip )
313 : {
314 0 : Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
315 0 : memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
316 0 : m_pFIFO->write( seqCopy );
317 : }
318 : else
319 : {
320 1510 : m_pFIFO->write( aData );
321 : }
322 1510 : m_nBytesToSkip = 0;
323 : }
324 0 : catch ( I_FIFO_OutOfBoundsException & )
325 : {
326 : throw BufferSizeExceededException(
327 : "Pipe::writeBytes BufferSizeExceededException",
328 0 : *this );
329 : }
330 0 : catch ( I_FIFO_OutOfMemoryException & )
331 : {
332 : throw BufferSizeExceededException(
333 : "Pipe::writeBytes BufferSizeExceededException",
334 0 : *this );
335 : }
336 :
337 : // readBytes may check again if enough bytes are available
338 1510 : osl_setCondition( m_conditionBytesAvail );
339 : }
340 :
341 :
342 4 : void OPipeImpl::flush(void)
343 : throw( NotConnectedException,
344 : BufferSizeExceededException,
345 : RuntimeException, std::exception)
346 : {
347 : // nothing to do for a pipe
348 4 : return;
349 : }
350 :
351 4 : void OPipeImpl::closeOutput(void)
352 : throw( NotConnectedException,
353 : BufferSizeExceededException,
354 : RuntimeException, std::exception)
355 : {
356 4 : MutexGuard guard( m_mutexAccess );
357 :
358 4 : m_bOutputStreamClosed = true;
359 4 : osl_setCondition( m_conditionBytesAvail );
360 4 : setPredecessor( Reference < XConnectable > () );
361 4 : return;
362 : }
363 :
364 :
365 82 : void OPipeImpl::setSuccessor( const Reference < XConnectable > &r )
366 : throw( RuntimeException, std::exception )
367 : {
368 : /// if the references match, nothing needs to be done
369 82 : if( m_succ != r ) {
370 : /// store the reference for later use
371 82 : m_succ = r;
372 :
373 82 : if( m_succ.is() )
374 : {
375 82 : m_succ->setPredecessor(
376 82 : Reference< XConnectable > ( (static_cast< XConnectable * >(this)) ) );
377 : }
378 : }
379 82 : }
380 :
381 0 : Reference < XConnectable > OPipeImpl::getSuccessor() throw( RuntimeException, std::exception )
382 : {
383 0 : return m_succ;
384 : }
385 :
386 :
387 : // XDataSource
388 86 : void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
389 : throw( RuntimeException, std::exception )
390 : {
391 86 : if( r != m_pred ) {
392 82 : m_pred = r;
393 82 : if( m_pred.is() ) {
394 82 : m_pred->setSuccessor(
395 82 : Reference < XConnectable > ( (static_cast< XConnectable * >(this)) ) );
396 : }
397 : }
398 86 : }
399 :
400 0 : Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException, std::exception )
401 : {
402 0 : return m_pred;
403 : }
404 :
405 :
406 :
407 :
408 : // XServiceInfo
409 0 : OUString OPipeImpl::getImplementationName() throw(std::exception )
410 : {
411 0 : return OPipeImpl_getImplementationName();
412 : }
413 :
414 : // XServiceInfo
415 0 : sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw(std::exception )
416 : {
417 0 : return cppu::supportsService(this, ServiceName);
418 : }
419 :
420 : // XServiceInfo
421 0 : Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw(std::exception )
422 : {
423 0 : return OPipeImpl_getSupportedServiceNames();
424 : }
425 :
426 : /* implementation functions
427 : *
428 : *
429 : */
430 :
431 :
432 88 : Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance(
433 : SAL_UNUSED_PARAMETER const Reference < XComponentContext > & )
434 : throw(Exception)
435 : {
436 88 : OPipeImpl *p = new OPipeImpl;
437 :
438 88 : return Reference < XInterface > ( (static_cast< OWeakObject * >(p)) );
439 : }
440 :
441 :
442 208 : OUString OPipeImpl_getImplementationName()
443 : {
444 208 : return OUString( IMPLEMENTATION_NAME );
445 : }
446 :
447 8 : Sequence<OUString> OPipeImpl_getSupportedServiceNames(void)
448 : {
449 8 : Sequence<OUString> aRet(1);
450 8 : aRet.getArray()[0] = OUString( SERVICE_NAME );
451 8 : return aRet;
452 : }
453 : }
454 :
455 :
456 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|