Skip to content

Commit c87b9fa

Browse files
committed
Use separate event listeners for readable and writable side
1 parent 5e95b14 commit c87b9fa

File tree

1 file changed

+35
-87
lines changed

1 file changed

+35
-87
lines changed

src/ExtEventLoop.php

Lines changed: 35 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ final class ExtEventLoop implements LoopInterface
2727
private $timerCallback;
2828
private $timerEvents;
2929
private $streamCallback;
30-
private $streamEvents = [];
31-
private $streamFlags = [];
32-
private $streamRefs = [];
30+
private $readEvents = [];
31+
private $writeEvents = [];
3332
private $readListeners = [];
3433
private $writeListeners = [];
34+
private $readRefs = [];
35+
private $writeRefs = [];
3536
private $running;
3637
private $signals;
3738
private $signalEvents = [];
@@ -70,56 +71,61 @@ function ($signal) {
7071
public function addReadStream($stream, callable $listener)
7172
{
7273
$key = (int) $stream;
74+
if (isset($this->readListeners[$key])) {
75+
return;
76+
}
77+
78+
$this->readEvents[$key] = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
79+
$this->readListeners[$key] = $listener;
7380

74-
if (!isset($this->readListeners[$key])) {
75-
$this->readListeners[$key] = $listener;
76-
$this->subscribeStreamEvent($stream, Event::READ);
81+
// ext-event does not increase refcount on stream resources for PHP 7+
82+
// manually keep track of stream resource to prevent premature garbage collection
83+
if (PHP_VERSION_ID >= 70000) {
84+
$this->readRefs[$key] = $stream;
7785
}
7886
}
7987

8088
public function addWriteStream($stream, callable $listener)
8189
{
8290
$key = (int) $stream;
83-
84-
if (!isset($this->writeListeners[$key])) {
85-
$this->writeListeners[$key] = $listener;
86-
$this->subscribeStreamEvent($stream, Event::WRITE);
91+
if (isset($this->writeListeners[$key])) {
92+
return;
8793
}
88-
}
8994

90-
public function removeReadStream($stream)
91-
{
92-
$key = (int) $stream;
95+
$this->writeEvents[$key] = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
96+
$this->writeListeners[$key] = $listener;
9397

94-
if (isset($this->readListeners[$key])) {
95-
unset($this->readListeners[$key]);
96-
$this->unsubscribeStreamEvent($stream, Event::READ);
98+
// ext-event does not increase refcount on stream resources for PHP 7+
99+
// manually keep track of stream resource to prevent premature garbage collection
100+
if (PHP_VERSION_ID >= 70000) {
101+
$this->writeRefs[$key] = $stream;
97102
}
98103
}
99104

100-
public function removeWriteStream($stream)
105+
public function removeReadStream($stream)
101106
{
102107
$key = (int) $stream;
103108

104-
if (isset($this->writeListeners[$key])) {
105-
unset($this->writeListeners[$key]);
106-
$this->unsubscribeStreamEvent($stream, Event::WRITE);
109+
if (isset($this->readEvents[$key])) {
110+
$this->readEvents[$key]->del();
111+
unset(
112+
$this->readEvents[$key],
113+
$this->readListeners[$key],
114+
$this->readRefs[$key]
115+
);
107116
}
108117
}
109118

110-
private function removeStream($stream)
119+
public function removeWriteStream($stream)
111120
{
112121
$key = (int) $stream;
113122

114-
if (isset($this->streamEvents[$key])) {
115-
$this->streamEvents[$key]->free();
116-
123+
if (isset($this->writeEvents[$key])) {
124+
$this->writeEvents[$key]->del();
117125
unset(
118-
$this->streamFlags[$key],
119-
$this->streamEvents[$key],
120-
$this->readListeners[$key],
126+
$this->writeEvents[$key],
121127
$this->writeListeners[$key],
122-
$this->streamRefs[$key]
128+
$this->writeRefs[$key]
123129
);
124130
}
125131
}
@@ -207,64 +213,6 @@ private function scheduleTimer(TimerInterface $timer)
207213
$event->add($timer->getInterval());
208214
}
209215

210-
/**
211-
* Create a new ext-event Event object, or update the existing one.
212-
*
213-
* @param resource $stream
214-
* @param integer $flag Event::READ or Event::WRITE
215-
*/
216-
private function subscribeStreamEvent($stream, $flag)
217-
{
218-
$key = (int) $stream;
219-
220-
if (isset($this->streamEvents[$key])) {
221-
$event = $this->streamEvents[$key];
222-
$flags = ($this->streamFlags[$key] |= $flag);
223-
224-
$event->del();
225-
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
226-
} else {
227-
$event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
228-
229-
$this->streamEvents[$key] = $event;
230-
$this->streamFlags[$key] = $flag;
231-
232-
// ext-event does not increase refcount on stream resources for PHP 7+
233-
// manually keep track of stream resource to prevent premature garbage collection
234-
if (PHP_VERSION_ID >= 70000) {
235-
$this->streamRefs[$key] = $stream;
236-
}
237-
}
238-
239-
$event->add();
240-
}
241-
242-
/**
243-
* Update the ext-event Event object for this stream to stop listening to
244-
* the given event type, or remove it entirely if it's no longer needed.
245-
*
246-
* @param resource $stream
247-
* @param integer $flag Event::READ or Event::WRITE
248-
*/
249-
private function unsubscribeStreamEvent($stream, $flag)
250-
{
251-
$key = (int) $stream;
252-
253-
$flags = $this->streamFlags[$key] &= ~$flag;
254-
255-
if (0 === $flags) {
256-
$this->removeStream($stream);
257-
258-
return;
259-
}
260-
261-
$event = $this->streamEvents[$key];
262-
263-
$event->del();
264-
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
265-
$event->add();
266-
}
267-
268216
/**
269217
* Create a callback used as the target of timer events.
270218
*

0 commit comments

Comments
 (0)