Skip to content

Commit 5f7bd5b

Browse files
authored
Merge pull request #85 from clue-labs/duplex
Add DuplexResourceStream and deprecate Stream
2 parents 6b90cb2 + 065c432 commit 5f7bd5b

File tree

6 files changed

+393
-267
lines changed

6 files changed

+393
-267
lines changed

README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ descriptor based implementation with an in-memory write buffer.
3636
* [DuplexStreamInterface](#duplexstreaminterface)
3737
* [ReadableResourceStream](#readableresourcestream)
3838
* [WritableResourceStream](#writableresourcestream)
39+
* [DuplexResourceStream](#duplexresourcestream)
3940
* [Usage](#usage)
4041
* [Install](#install)
4142
* [Tests](#tests)
@@ -724,6 +725,9 @@ Otherwise, it will throw an `InvalidArgumentException`:
724725
$stream = new ReadableResourceStream(false, $loop);
725726
```
726727

728+
See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
729+
stream resources otherwise.
730+
727731
Internally, this class tries to enable non-blocking mode on the stream resource
728732
which may not be supported for all stream resources.
729733
Most notably, this is not supported by pipes on Windows (STDIN etc.).
@@ -787,6 +791,9 @@ Otherwise, it will throw an `InvalidArgumentException`:
787791
$stream = new WritableResourceStream(false, $loop);
788792
```
789793

794+
See also the [`DuplexResourceStream`](#readableresourcestream) for read-and-write
795+
stream resources otherwise.
796+
790797
Internally, this class tries to enable non-blocking mode on the stream resource
791798
which may not be supported for all stream resources.
792799
Most notably, this is not supported by pipes on Windows (STDOUT, STDERR etc.).
@@ -823,6 +830,94 @@ $stream->softLimit = 8192;
823830

824831
See also [`write()`](#write) for more details.
825832

833+
### DuplexResourceStream
834+
835+
The `DuplexResourceStream` is a concrete implementation of the
836+
[`DuplexStreamInterface`](#duplexstreaminterface) for PHP's stream resources.
837+
838+
This can be used to represent a read-and-write resource like a file stream opened
839+
in read and write mode mode or a stream such as a TCP/IP connection:
840+
841+
```php
842+
$conn = stream_socket_client('tcp://google.com:80');
843+
$stream = new DuplexResourceStream($conn, $loop);
844+
$stream->write('hello!');
845+
$stream->end();
846+
```
847+
848+
See also [`DuplexStreamInterface`](#duplexstreaminterface) for more details.
849+
850+
The first parameter given to the constructor MUST be a valid stream resource
851+
that is opened for reading *and* writing.
852+
Otherwise, it will throw an `InvalidArgumentException`:
853+
854+
```php
855+
// throws InvalidArgumentException
856+
$stream = new DuplexResourceStream(false, $loop);
857+
```
858+
859+
See also the [`ReadableResourceStream`](#readableresourcestream) for read-only
860+
and the [`WritableResourceStream`](#writableresourcestream) for write-only
861+
stream resources otherwise.
862+
863+
Internally, this class tries to enable non-blocking mode on the stream resource
864+
which may not be supported for all stream resources.
865+
Most notably, this is not supported by pipes on Windows (STDOUT, STDERR etc.).
866+
If this fails, it will throw a `RuntimeException`:
867+
868+
```php
869+
// throws RuntimeException on Windows
870+
$stream = new DuplexResourceStream(STDOUT, $loop);
871+
```
872+
873+
Once the constructor is called with a valid stream resource, this class will
874+
take care of the underlying stream resource.
875+
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
876+
stream resource manually.
877+
Should you need to access the underlying stream resource, you can use the public
878+
`$stream` property like this:
879+
880+
```php
881+
var_dump(stream_get_meta_data($stream->stream));
882+
```
883+
884+
The `$bufferSize` property controls the maximum buffer size in bytes to read
885+
at once from the stream.
886+
This value SHOULD NOT be changed unless you know what you're doing.
887+
This can be a positive number which means that up to X bytes will be read
888+
at once from the underlying stream resource. Note that the actual number
889+
of bytes read may be lower if the stream resource has less than X bytes
890+
currently available.
891+
This can be `null` which means "read everything available" from the
892+
underlying stream resource.
893+
This should read until the stream resource is not readable anymore
894+
(i.e. underlying buffer drained), note that this does not neccessarily
895+
mean it reached EOF.
896+
897+
```php
898+
$stream->bufferSize = 8192;
899+
```
900+
901+
Any `write()` calls to this class will not be performaned instantly, but will
902+
be performaned asynchronously, once the EventLoop reports the stream resource is
903+
ready to accept data.
904+
For this, it uses an in-memory buffer string to collect all outstanding writes.
905+
This buffer has a soft-limit applied which defines how much data it is willing
906+
to accept before the caller SHOULD stop sending further data.
907+
It currently defaults to 64 KiB and can be controlled through the public
908+
`$softLimit` property like this:
909+
910+
```php
911+
$buffer = $stream->getBuffer();
912+
$buffer->softLimit = 8192;
913+
```
914+
915+
See also [`write()`](#write) for more details.
916+
917+
> BC note: This class was previously called `Stream`.
918+
The `Stream` class still exists for BC reasons and will be removed in future
919+
versions of this package.
920+
826921
## Usage
827922
```php
828923
$loop = React\EventLoop\Factory::create();

examples/benchmark-throughput.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
$loop = new React\EventLoop\StreamSelectLoop();
1515

1616
// setup information stream
17-
$info = new React\Stream\Stream(STDERR, $loop);
18-
$info->pause();
17+
$info = new React\Stream\WritableResourceStream(STDERR, $loop);
1918
if (extension_loaded('xdebug')) {
2019
$info->write('NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL);
2120
}

src/DuplexResourceStream.php

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
<?php
2+
3+
namespace React\Stream;
4+
5+
use Evenement\EventEmitter;
6+
use React\EventLoop\LoopInterface;
7+
use InvalidArgumentException;
8+
9+
class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
10+
{
11+
/**
12+
* Controls the maximum buffer size in bytes to read at once from the stream.
13+
*
14+
* This can be a positive number which means that up to X bytes will be read
15+
* at once from the underlying stream resource. Note that the actual number
16+
* of bytes read may be lower if the stream resource has less than X bytes
17+
* currently available.
18+
*
19+
* This can be `null` which means read everything available from the
20+
* underlying stream resource.
21+
* This should read until the stream resource is not readable anymore
22+
* (i.e. underlying buffer drained), note that this does not neccessarily
23+
* mean it reached EOF.
24+
*
25+
* @var int|null
26+
*/
27+
public $bufferSize = 65536;
28+
29+
public $stream;
30+
protected $readable = true;
31+
protected $writable = true;
32+
protected $closing = false;
33+
protected $loop;
34+
protected $buffer;
35+
36+
public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
37+
{
38+
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
39+
throw new InvalidArgumentException('First parameter must be a valid stream resource');
40+
}
41+
42+
// ensure resource is opened for reading and wrting (fopen mode must contain "+")
43+
$meta = stream_get_meta_data($stream);
44+
if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], '+') === false) {
45+
throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
46+
}
47+
48+
// this class relies on non-blocking I/O in order to not interrupt the event loop
49+
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
50+
if (stream_set_blocking($stream, 0) !== true) {
51+
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
52+
}
53+
54+
// Use unbuffered read operations on the underlying stream resource.
55+
// Reading chunks from the stream may otherwise leave unread bytes in
56+
// PHP's stream buffers which some event loop implementations do not
57+
// trigger events on (edge triggered).
58+
// This does not affect the default event loop implementation (level
59+
// triggered), so we can ignore platforms not supporting this (HHVM).
60+
// Pipe streams (such as STDIN) do not seem to require this and legacy
61+
// PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this.
62+
if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
63+
stream_set_read_buffer($stream, 0);
64+
}
65+
66+
if ($buffer === null) {
67+
$buffer = new WritableResourceStream($stream, $loop);
68+
}
69+
70+
$this->stream = $stream;
71+
$this->loop = $loop;
72+
$this->buffer = $buffer;
73+
74+
$that = $this;
75+
76+
$this->buffer->on('error', function ($error) use ($that) {
77+
$that->emit('error', array($error));
78+
});
79+
80+
$this->buffer->on('close', array($this, 'close'));
81+
82+
$this->buffer->on('drain', function () use ($that) {
83+
$that->emit('drain');
84+
});
85+
86+
$this->resume();
87+
}
88+
89+
public function isReadable()
90+
{
91+
return $this->readable;
92+
}
93+
94+
public function isWritable()
95+
{
96+
return $this->writable;
97+
}
98+
99+
public function pause()
100+
{
101+
$this->loop->removeReadStream($this->stream);
102+
}
103+
104+
public function resume()
105+
{
106+
if ($this->readable) {
107+
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
108+
}
109+
}
110+
111+
public function write($data)
112+
{
113+
if (!$this->writable) {
114+
return false;
115+
}
116+
117+
return $this->buffer->write($data);
118+
}
119+
120+
public function close()
121+
{
122+
if (!$this->writable && !$this->closing) {
123+
return;
124+
}
125+
126+
$this->closing = false;
127+
128+
$this->readable = false;
129+
$this->writable = false;
130+
131+
$this->emit('close');
132+
$this->loop->removeStream($this->stream);
133+
$this->buffer->close();
134+
$this->removeAllListeners();
135+
136+
$this->handleClose();
137+
}
138+
139+
public function end($data = null)
140+
{
141+
if (!$this->writable) {
142+
return;
143+
}
144+
145+
$this->closing = true;
146+
147+
$this->readable = false;
148+
$this->writable = false;
149+
150+
$this->buffer->end($data);
151+
}
152+
153+
public function pipe(WritableStreamInterface $dest, array $options = array())
154+
{
155+
return Util::pipe($this, $dest, $options);
156+
}
157+
158+
public function handleData($stream)
159+
{
160+
$error = null;
161+
set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
162+
$error = new \ErrorException(
163+
$errstr,
164+
0,
165+
$errno,
166+
$errfile,
167+
$errline
168+
);
169+
});
170+
171+
$data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize);
172+
173+
restore_error_handler();
174+
175+
if ($error !== null) {
176+
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
177+
$this->close();
178+
return;
179+
}
180+
181+
if ($data !== '') {
182+
$this->emit('data', array($data));
183+
} else{
184+
// no data read => we reached the end and close the stream
185+
$this->emit('end');
186+
$this->close();
187+
}
188+
}
189+
190+
public function handleClose()
191+
{
192+
if (is_resource($this->stream)) {
193+
fclose($this->stream);
194+
}
195+
}
196+
197+
/**
198+
* @return WritableStreamInterface
199+
*/
200+
public function getBuffer()
201+
{
202+
return $this->buffer;
203+
}
204+
205+
/**
206+
* Returns whether this is a pipe resource in a legacy environment
207+
*
208+
* @param resource $resource
209+
* @return bool
210+
*
211+
* @codeCoverageIgnore
212+
*/
213+
private function isLegacyPipe($resource)
214+
{
215+
if (PHP_VERSION_ID < 50400) {
216+
$meta = stream_get_meta_data($resource);
217+
218+
if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
219+
return true;
220+
}
221+
}
222+
return false;
223+
}
224+
}

0 commit comments

Comments
 (0)