Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /*
3 : * This file is part of the LibreOffice project.
4 : *
5 : * This Source Code Form is subject to the terms of the Mozilla Public
6 : * License, v. 2.0. If a copy of the MPL was not distributed with this
7 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 : *
9 : * This file incorporates work covered by the following license notice:
10 : *
11 : * Licensed to the Apache Software Foundation (ASF) under one or more
12 : * contributor license agreements. See the NOTICE file distributed
13 : * with this work for additional information regarding copyright
14 : * ownership. The ASF licenses this file to you under the Apache
15 : * License, Version 2.0 (the "License"); you may not use this file
16 : * except in compliance with the License. You may obtain a copy of
17 : * the License at http://www.apache.org/licenses/LICENSE-2.0 .
18 : */
19 :
20 :
21 : // streams
22 : #include <com/sun/star/io/XPipe.hpp>
23 : #include <com/sun/star/io/XInputStream.hpp>
24 : #include <com/sun/star/io/XOutputStream.hpp>
25 : #include <com/sun/star/io/XConnectable.hpp>
26 :
27 : #include <com/sun/star/lang/XServiceInfo.hpp>
28 :
29 : #include <cppuhelper/factory.hxx>
30 : #include <cppuhelper/implbase3.hxx>
31 : #include <cppuhelper/supportsservice.hxx>
32 :
33 : #include <osl/conditn.hxx>
34 : #include <osl/mutex.hxx>
35 :
36 : #include <limits>
37 : #include <string.h>
38 :
39 : using namespace ::osl;
40 : using namespace ::cppu;
41 : using namespace ::com::sun::star::uno;
42 : using namespace ::com::sun::star::io;
43 : using namespace ::com::sun::star::lang;
44 :
45 : #include "services.hxx"
46 : #include "streamhelper.hxx"
47 :
48 : // Implementation and service names
49 : #define IMPLEMENTATION_NAME "com.sun.star.comp.io.stm.Pipe"
50 :
51 : namespace io_stm{
52 :
53 : class OPipeImpl :
54 : public WeakImplHelper3< XPipe , XConnectable , XServiceInfo >
55 : {
56 : public:
57 : OPipeImpl( );
58 : virtual ~OPipeImpl();
59 :
60 : public: // XInputStream
61 : virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
62 : throw( NotConnectedException,
63 : BufferSizeExceededException,
64 : RuntimeException, std::exception ) SAL_OVERRIDE;
65 : virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
66 : throw( NotConnectedException,
67 : BufferSizeExceededException,
68 : RuntimeException, std::exception ) SAL_OVERRIDE;
69 : virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip)
70 : throw( NotConnectedException,
71 : BufferSizeExceededException,
72 : RuntimeException, std::exception ) SAL_OVERRIDE;
73 : virtual sal_Int32 SAL_CALL available()
74 : throw( NotConnectedException,
75 : RuntimeException, std::exception ) SAL_OVERRIDE;
76 : virtual void SAL_CALL closeInput()
77 : throw( NotConnectedException,
78 : RuntimeException, std::exception ) SAL_OVERRIDE;
79 :
80 : public: // XOutputStream
81 :
82 : virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData)
83 : throw( NotConnectedException,
84 : BufferSizeExceededException,
85 : RuntimeException, std::exception ) SAL_OVERRIDE;
86 : virtual void SAL_CALL flush()
87 : throw( NotConnectedException,
88 : BufferSizeExceededException,
89 : RuntimeException, std::exception ) SAL_OVERRIDE;
90 : virtual void SAL_CALL closeOutput()
91 : throw( NotConnectedException,
92 : BufferSizeExceededException,
93 : RuntimeException, std::exception ) SAL_OVERRIDE;
94 :
95 : public: // XConnectable
96 : virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor)
97 : throw( RuntimeException, std::exception ) SAL_OVERRIDE;
98 : virtual Reference< XConnectable > SAL_CALL getPredecessor() throw( RuntimeException, std::exception ) SAL_OVERRIDE;
99 : virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor)
100 : throw( RuntimeException, std::exception ) SAL_OVERRIDE;
101 : virtual Reference < XConnectable > SAL_CALL getSuccessor() throw( RuntimeException, std::exception ) SAL_OVERRIDE ;
102 :
103 :
104 : public: // XServiceInfo
105 : OUString SAL_CALL getImplementationName() throw(std::exception ) SAL_OVERRIDE;
106 : Sequence< OUString > SAL_CALL getSupportedServiceNames() throw(std::exception ) SAL_OVERRIDE;
107 : sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw(std::exception ) SAL_OVERRIDE;
108 :
109 : private:
110 :
111 : Reference < XConnectable > m_succ;
112 : Reference < XConnectable > m_pred;
113 :
114 : sal_Int32 m_nBytesToSkip;
115 :
116 : bool m_bOutputStreamClosed;
117 : bool m_bInputStreamClosed;
118 :
119 : oslCondition m_conditionBytesAvail;
120 : Mutex m_mutexAccess;
121 : I_FIFO *m_pFIFO;
122 : };
123 :
124 :
125 :
126 45 : OPipeImpl::OPipeImpl()
127 : {
128 45 : m_nBytesToSkip = 0;
129 :
130 45 : m_bOutputStreamClosed = false;
131 45 : m_bInputStreamClosed = false;
132 :
133 45 : m_pFIFO = new MemFIFO;
134 45 : m_conditionBytesAvail = osl_createCondition();
135 45 : }
136 :
137 9 : OPipeImpl::~OPipeImpl()
138 : {
139 3 : osl_destroyCondition( m_conditionBytesAvail );
140 3 : delete m_pFIFO;
141 6 : }
142 :
143 :
144 2011 : sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
145 : throw( NotConnectedException, BufferSizeExceededException,RuntimeException, std::exception )
146 : {
147 : while( true )
148 : {
149 : { // start guarded section
150 2011 : MutexGuard guard( m_mutexAccess );
151 2011 : if( m_bInputStreamClosed )
152 : {
153 : throw NotConnectedException(
154 : "Pipe::readBytes NotConnectedException",
155 0 : *this );
156 : }
157 2011 : sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
158 :
159 2011 : if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
160 : {
161 4 : nBytesToRead = nOccupiedBufferLen;
162 : }
163 :
164 2011 : if( nOccupiedBufferLen < nBytesToRead )
165 : {
166 : // wait outside guarded section
167 201 : osl_resetCondition( m_conditionBytesAvail );
168 : }
169 : else {
170 : // necessary bytes are available
171 1810 : m_pFIFO->read( aData , nBytesToRead );
172 3620 : return nBytesToRead;
173 201 : }
174 : } // end guarded section
175 :
176 : // wait for new data outside guarded section!
177 201 : osl_waitCondition( m_conditionBytesAvail , 0 );
178 201 : }
179 : }
180 :
181 :
182 5 : sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
183 : throw( NotConnectedException,
184 : BufferSizeExceededException,
185 : RuntimeException, std::exception )
186 : {
187 : while( true ) {
188 : {
189 5 : MutexGuard guard( m_mutexAccess );
190 5 : if( m_bInputStreamClosed )
191 : {
192 : throw NotConnectedException(
193 : "Pipe::readSomeBytes NotConnectedException",
194 0 : *this );
195 : }
196 5 : if( m_pFIFO->getSize() )
197 : {
198 2 : sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() );
199 2 : aData.realloc( nSize );
200 2 : m_pFIFO->read( aData , nSize );
201 2 : return nSize;
202 : }
203 :
204 3 : if( m_bOutputStreamClosed )
205 : {
206 : // no bytes in buffer anymore
207 3 : return 0;
208 0 : }
209 : }
210 :
211 0 : osl_waitCondition( m_conditionBytesAvail , 0 );
212 0 : }
213 : }
214 :
215 :
216 0 : void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
217 : throw( NotConnectedException,
218 : BufferSizeExceededException,
219 : RuntimeException, std::exception )
220 : {
221 0 : MutexGuard guard( m_mutexAccess );
222 0 : if( m_bInputStreamClosed )
223 : {
224 : throw NotConnectedException(
225 : "Pipe::skipBytes NotConnectedException",
226 0 : *this );
227 : }
228 :
229 0 : if( nBytesToSkip < 0
230 0 : || (nBytesToSkip
231 0 : > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
232 : {
233 : throw BufferSizeExceededException(
234 : "Pipe::skipBytes BufferSizeExceededException",
235 0 : *this );
236 : }
237 0 : m_nBytesToSkip += nBytesToSkip;
238 :
239 0 : nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip );
240 0 : m_pFIFO->skip( nBytesToSkip );
241 0 : m_nBytesToSkip -= nBytesToSkip;
242 0 : }
243 :
244 :
245 2 : sal_Int32 OPipeImpl::available()
246 : throw( NotConnectedException,
247 : RuntimeException, std::exception )
248 : {
249 2 : MutexGuard guard( m_mutexAccess );
250 2 : if( m_bInputStreamClosed )
251 : {
252 : throw NotConnectedException(
253 : "Pipe::available NotConnectedException",
254 0 : *this );
255 : }
256 2 : return m_pFIFO->getSize();
257 : }
258 :
259 0 : void OPipeImpl::closeInput()
260 : throw( NotConnectedException,
261 : RuntimeException, std::exception)
262 : {
263 0 : MutexGuard guard( m_mutexAccess );
264 :
265 0 : m_bInputStreamClosed = true;
266 :
267 0 : delete m_pFIFO;
268 0 : m_pFIFO = 0;
269 :
270 : // readBytes may throw an exception
271 0 : osl_setCondition( m_conditionBytesAvail );
272 :
273 0 : setSuccessor( Reference< XConnectable > () );
274 0 : return;
275 : }
276 :
277 :
278 964 : void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
279 : throw( NotConnectedException,
280 : BufferSizeExceededException,
281 : RuntimeException, std::exception)
282 : {
283 964 : MutexGuard guard( m_mutexAccess );
284 :
285 964 : if( m_bOutputStreamClosed )
286 : {
287 : throw NotConnectedException(
288 : "Pipe::writeBytes NotConnectedException (outputstream)",
289 0 : *this );
290 : }
291 :
292 964 : if( m_bInputStreamClosed )
293 : {
294 : throw NotConnectedException(
295 : "Pipe::writeBytes NotConnectedException (inputstream)",
296 0 : *this );
297 : }
298 :
299 : // check skipping
300 964 : sal_Int32 nLen = aData.getLength();
301 964 : if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) {
302 : // all must be skipped - forget whole call
303 0 : m_nBytesToSkip -= nLen;
304 964 : return;
305 : }
306 :
307 : // adjust buffersize if necessary
308 :
309 : try
310 : {
311 964 : if( m_nBytesToSkip )
312 : {
313 0 : Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
314 0 : memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
315 0 : m_pFIFO->write( seqCopy );
316 : }
317 : else
318 : {
319 964 : m_pFIFO->write( aData );
320 : }
321 964 : m_nBytesToSkip = 0;
322 : }
323 0 : catch ( I_FIFO_OutOfBoundsException & )
324 : {
325 : throw BufferSizeExceededException(
326 : "Pipe::writeBytes BufferSizeExceededException",
327 0 : *this );
328 : }
329 0 : catch ( I_FIFO_OutOfMemoryException & )
330 : {
331 : throw BufferSizeExceededException(
332 : "Pipe::writeBytes BufferSizeExceededException",
333 0 : *this );
334 : }
335 :
336 : // readBytes may check again if enough bytes are available
337 964 : osl_setCondition( m_conditionBytesAvail );
338 : }
339 :
340 :
341 2 : void OPipeImpl::flush()
342 : throw( NotConnectedException,
343 : BufferSizeExceededException,
344 : RuntimeException, std::exception)
345 : {
346 : // nothing to do for a pipe
347 2 : return;
348 : }
349 :
350 4 : void OPipeImpl::closeOutput()
351 : throw( NotConnectedException,
352 : BufferSizeExceededException,
353 : RuntimeException, std::exception)
354 : {
355 4 : MutexGuard guard( m_mutexAccess );
356 :
357 4 : m_bOutputStreamClosed = true;
358 4 : osl_setCondition( m_conditionBytesAvail );
359 4 : setPredecessor( Reference < XConnectable > () );
360 4 : return;
361 : }
362 :
363 :
364 40 : void OPipeImpl::setSuccessor( const Reference < XConnectable > &r )
365 : throw( RuntimeException, std::exception )
366 : {
367 : /// if the references match, nothing needs to be done
368 40 : if( m_succ != r ) {
369 : /// store the reference for later use
370 40 : m_succ = r;
371 :
372 40 : if( m_succ.is() )
373 : {
374 40 : m_succ->setPredecessor(
375 40 : Reference< XConnectable > ( (static_cast< XConnectable * >(this)) ) );
376 : }
377 : }
378 40 : }
379 :
380 0 : Reference < XConnectable > OPipeImpl::getSuccessor() throw( RuntimeException, std::exception )
381 : {
382 0 : return m_succ;
383 : }
384 :
385 :
386 : // XDataSource
387 44 : void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
388 : throw( RuntimeException, std::exception )
389 : {
390 44 : if( r != m_pred ) {
391 40 : m_pred = r;
392 40 : if( m_pred.is() ) {
393 40 : m_pred->setSuccessor(
394 40 : Reference < XConnectable > ( (static_cast< XConnectable * >(this)) ) );
395 : }
396 : }
397 44 : }
398 :
399 0 : Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException, std::exception )
400 : {
401 0 : return m_pred;
402 : }
403 :
404 :
405 :
406 :
407 : // XServiceInfo
408 1 : OUString OPipeImpl::getImplementationName() throw(std::exception )
409 : {
410 1 : return OPipeImpl_getImplementationName();
411 : }
412 :
413 : // XServiceInfo
414 0 : sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw(std::exception )
415 : {
416 0 : return cppu::supportsService(this, ServiceName);
417 : }
418 :
419 : // XServiceInfo
420 1 : Sequence< OUString > OPipeImpl::getSupportedServiceNames() throw(std::exception )
421 : {
422 1 : return OPipeImpl_getSupportedServiceNames();
423 : }
424 :
425 : /* implementation functions
426 : *
427 : *
428 : */
429 :
430 :
431 45 : Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance(
432 : SAL_UNUSED_PARAMETER const Reference < XComponentContext > & )
433 : throw(Exception)
434 : {
435 45 : OPipeImpl *p = new OPipeImpl;
436 :
437 45 : return Reference < XInterface > ( (static_cast< OWeakObject * >(p)) );
438 : }
439 :
440 :
441 174 : OUString OPipeImpl_getImplementationName()
442 : {
443 174 : return OUString( IMPLEMENTATION_NAME );
444 : }
445 :
446 11 : Sequence<OUString> OPipeImpl_getSupportedServiceNames()
447 : {
448 11 : Sequence<OUString> aRet(1);
449 11 : aRet.getArray()[0] = "com.sun.star.io.Pipe";
450 11 : return aRet;
451 : }
452 : }
453 :
454 :
455 : /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|