Skip to content

Commit ffad099

Browse files
committed
Add suspension listeners
Dr
1 parent fc50add commit ffad099

File tree

3 files changed

+282
-12
lines changed

3 files changed

+282
-12
lines changed

src/EventLoop/Listener.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Revolt\EventLoop;
4+
5+
interface Listener
6+
{
7+
/**
8+
* Called when a Suspension is suspended.
9+
*
10+
* @param int $id The object ID of the Suspension.
11+
*/
12+
public function onSuspend(int $id): void;
13+
14+
/**
15+
* Called when a Suspension is resumed.
16+
*
17+
* @param int $id The object ID of the Suspension.
18+
*/
19+
public function onResume(int $id): void;
20+
}

src/EventLoop/Suspension.php

Lines changed: 79 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* **Example**
1111
*
1212
* ```php
13-
* $suspension = Scheduler::createSuspension();
13+
* $suspension = EventLoop::createSuspension();
1414
*
1515
* $promise->then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable));
1616
*
@@ -19,13 +19,21 @@
1919
*/
2020
final class Suspension
2121
{
22+
/** @var string Next listener ID. */
23+
private static string $nextId = 'a';
24+
25+
/** @var Listener[] */
26+
private static array $listeners = [];
27+
28+
private static bool $invokingListeners = false;
29+
2230
private ?\Fiber $fiber;
2331
private \Fiber $scheduler;
2432
private Driver $driver;
2533
private bool $pending = false;
2634

2735
/**
28-
* Suspension constructor.
36+
* Use {@see EventLoop::createSuspension()} to create Suspensions.
2937
*
3038
* @param Driver $driver
3139
* @param \Fiber $scheduler
@@ -54,6 +62,10 @@ public function throw(\Throwable $throwable): void
5462
throw new \Error('Must call throw() before calling resume()');
5563
}
5664

65+
if (self::$invokingListeners) {
66+
throw new \Error('Cannot call throw() within a suspension listener');
67+
}
68+
5769
$this->pending = false;
5870

5971
if ($this->fiber) {
@@ -70,6 +82,10 @@ public function resume(mixed $value): void
7082
throw new \Error('Must call suspend() before calling resume()');
7183
}
7284

85+
if (self::$invokingListeners) {
86+
throw new \Error('Cannot call throw() within a suspension listener');
87+
}
88+
7389
$this->pending = false;
7490

7591
if ($this->fiber) {
@@ -90,22 +106,73 @@ public function suspend(): mixed
90106
throw new \Error('Must not call suspend() from another fiber');
91107
}
92108

109+
if (self::$invokingListeners) {
110+
throw new \Error('Cannot call suspend() within a suspension listener');
111+
}
112+
93113
$this->pending = true;
94114

95-
// Awaiting from within a fiber.
96-
if ($this->fiber) {
97-
return \Fiber::suspend();
115+
if (!empty(self::$listeners)) {
116+
$this->invokeListeners('onSuspend');
98117
}
99118

100-
// Awaiting from {main}.
101-
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();
119+
try {
120+
// Awaiting from within a fiber.
121+
if ($this->fiber) {
122+
return \Fiber::suspend();
123+
}
124+
125+
// Awaiting from {main}.
126+
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();
127+
128+
/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
129+
if ($this->pending) {
130+
// Should only be true if the event loop exited without resolving the promise.
131+
throw new \Error('Event loop suspended or exited unexpectedly');
132+
}
133+
134+
return $lambda();
135+
} finally {
136+
if (!empty(self::$listeners)) {
137+
$this->invokeListeners('onResume');
138+
}
139+
}
140+
}
102141

103-
/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
104-
if ($this->pending) {
105-
// Should only be true if the event loop exited without resolving the promise.
106-
throw new \Error('Scheduler suspended or exited unexpectedly');
142+
private function invokeListeners(string $method): void
143+
{
144+
$id = \spl_object_id($this);
145+
self::$invokingListeners = true;
146+
foreach (self::$listeners as $listener) {
147+
try {
148+
$listener->{$method}($id);
149+
} catch (\Throwable $exception) {
150+
$this->driver->queue(static fn () => throw $exception);
151+
}
107152
}
153+
self::$invokingListeners = false;
154+
}
155+
156+
/**
157+
* Add a listener that is invoked when any Suspension is suspended, resumed, or destroyed.
158+
*
159+
* @param Listener $listener
160+
* @return string ID that can be used to remove the listener using {@see unlisten()}.
161+
*/
162+
public static function listen(Listener $listener): string
163+
{
164+
$id = self::$nextId++;
165+
self::$listeners[$id] = $listener;
166+
return $id;
167+
}
108168

109-
return $lambda();
169+
/**
170+
* Remove the suspension listener.
171+
*
172+
* @param string $id
173+
*/
174+
public static function unlisten(string $id): void
175+
{
176+
unset(self::$listeners[$id]);
110177
}
111178
}

test/SuspensionTest.php

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
<?php
2+
3+
namespace Revolt\EventLoop;
4+
5+
use PHPUnit\Framework\TestCase;
6+
use Revolt\EventLoop;
7+
8+
class SuspensionTest extends TestCase
9+
{
10+
public function testListen(): void
11+
{
12+
$listener = new class () implements Listener {
13+
public int $suspended = 0;
14+
public int $resumed = 0;
15+
16+
public function onSuspend(int $id): void
17+
{
18+
++$this->suspended;
19+
}
20+
21+
public function onResume(int $id): void
22+
{
23+
++$this->resumed;
24+
}
25+
};
26+
27+
$id = Suspension::listen($listener);
28+
29+
$suspension = EventLoop::createSuspension();
30+
EventLoop::defer(fn () => $suspension->resume(null));
31+
32+
$suspension->suspend();
33+
34+
self::assertSame(1, $listener->suspended);
35+
self::assertSame(1, $listener->resumed);
36+
37+
Suspension::listen($listener);
38+
39+
$suspension = EventLoop::createSuspension();
40+
EventLoop::defer(fn () => $suspension->throw(new \Exception()));
41+
42+
try {
43+
$suspension->suspend();
44+
self::fail('Exception was expected to be thrown from suspend');
45+
} catch (\Exception $e) {
46+
// Expected, ignore.
47+
}
48+
49+
self::assertSame(3, $listener->suspended);
50+
self::assertSame(3, $listener->resumed);
51+
52+
Suspension::unlisten($id);
53+
54+
$suspension = EventLoop::createSuspension();
55+
EventLoop::defer(fn () => $suspension->resume(null));
56+
57+
$suspension->suspend();
58+
59+
self::assertSame(4, $listener->suspended);
60+
self::assertSame(4, $listener->resumed);
61+
}
62+
63+
public function provideListenerMethods(): iterable
64+
{
65+
$reflectionClass = new \ReflectionClass(Listener::class);
66+
$methods = $reflectionClass->getMethods();
67+
return \array_map(static fn (\ReflectionMethod $reflectionMethod) => [$reflectionMethod->getName()], $methods);
68+
}
69+
70+
/**
71+
* @dataProvider provideListenerMethods
72+
*/
73+
public function testSuspendDuringListenerInvocation(string $functionName): void
74+
{
75+
$suspension = EventLoop::createSuspension();
76+
77+
$listener = new class ($functionName, $suspension) implements Listener {
78+
public function __construct(
79+
private string $functionName,
80+
private Suspension $suspension,
81+
) {
82+
}
83+
84+
public function onSuspend(int $id): void
85+
{
86+
if ($this->functionName === __FUNCTION__) {
87+
$this->suspension->suspend();
88+
}
89+
}
90+
91+
public function onResume(int $id): void
92+
{
93+
if ($this->functionName === __FUNCTION__) {
94+
$this->suspension->suspend();
95+
}
96+
}
97+
};
98+
99+
Suspension::listen($listener);
100+
101+
$suspension = EventLoop::createSuspension();
102+
EventLoop::defer(fn () => $suspension->resume(null));
103+
104+
self::expectException(\Error::class);
105+
self::expectExceptionMessage('within a suspension listener');
106+
107+
$suspension->suspend();
108+
}
109+
110+
/**
111+
* @dataProvider provideListenerMethods
112+
*/
113+
public function testResumeDuringListenerInvocation(string $functionName): void
114+
{
115+
$suspension = EventLoop::createSuspension();
116+
117+
$listener = new class ($functionName, $suspension) implements Listener {
118+
public function __construct(
119+
private string $functionName,
120+
private Suspension $suspension,
121+
) {
122+
}
123+
124+
public function onSuspend(int $id): void
125+
{
126+
if ($this->functionName === __FUNCTION__) {
127+
$this->suspension->resume(null);
128+
}
129+
}
130+
131+
public function onResume(int $id): void
132+
{
133+
if ($this->functionName === __FUNCTION__) {
134+
$this->suspension->resume(null);
135+
}
136+
}
137+
};
138+
139+
Suspension::listen($listener);
140+
141+
self::expectException(\Error::class);
142+
self::expectExceptionMessage('within a suspension listener');
143+
144+
$suspension->suspend();
145+
}
146+
147+
/**
148+
* @dataProvider provideListenerMethods
149+
*/
150+
public function testThrowDuringListenerInvocation(string $functionName): void
151+
{
152+
$suspension = EventLoop::createSuspension();
153+
154+
$listener = new class ($functionName, $suspension) implements Listener {
155+
public function __construct(
156+
private string $functionName,
157+
private Suspension $suspension,
158+
) {
159+
}
160+
161+
public function onSuspend(int $id): void
162+
{
163+
if ($this->functionName === __FUNCTION__) {
164+
$this->suspension->throw(new \Exception());
165+
}
166+
}
167+
168+
public function onResume(int $id): void
169+
{
170+
if ($this->functionName === __FUNCTION__) {
171+
$this->suspension->throw(new \Exception());
172+
}
173+
}
174+
};
175+
176+
Suspension::listen($listener);
177+
178+
self::expectException(\Error::class);
179+
self::expectExceptionMessage('within a suspension listener');
180+
181+
$suspension->suspend();
182+
}
183+
}

0 commit comments

Comments
 (0)