Skip to content

Turn Suspension into an interface #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/EventLoop/Internal/AbstractDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public function unreference(string $callbackId): string

public function createSuspension(\Fiber $scheduler): Suspension
{
return new Suspension($this, $scheduler, $this->interruptCallback);
return new DriverSuspension($this, $scheduler, $this->interruptCallback);
}

/**
Expand Down
106 changes: 106 additions & 0 deletions src/EventLoop/Internal/DriverSuspension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

namespace Revolt\EventLoop\Internal;

use Revolt\EventLoop\Driver;
use Revolt\EventLoop\Suspension;

/**
* @internal
*/
final class DriverSuspension implements Suspension
{
private ?\Fiber $fiber;
private \Fiber $scheduler;
private Driver $driver;
private bool $pending = false;
private ?\FiberError $error = null;
/** @var callable */
private $interrupt;

/**
* @param Driver $driver
* @param \Fiber $scheduler
* @param callable $interrupt
*
* @internal
*/
public function __construct(Driver $driver, \Fiber $scheduler, callable $interrupt)
{
$this->driver = $driver;
$this->scheduler = $scheduler;
$this->interrupt = $interrupt;
$this->fiber = \Fiber::getCurrent();

// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($this->fiber !== $this->scheduler);
}

public function throw(\Throwable $throwable): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling throw()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'throw'], $throwable);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => throw $throwable);
}
}

public function resume(mixed $value = null): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling resume()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'resume'], $value);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => $value);
}
}

public function suspend(): mixed
{
if ($this->pending) {
throw new \Error('Must call resume() or throw() before calling suspend() again');
}

if ($this->fiber !== \Fiber::getCurrent()) {
throw new \Error('Must not call suspend() from another fiber');
}

$this->pending = true;

// Awaiting from within a fiber.
if ($this->fiber) {
try {
return \Fiber::suspend();
} catch (\FiberError $exception) {
$this->pending = false;
$this->error = $exception;

throw $exception;
}
}

// Awaiting from {main}.
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// Should only be true if the event loop exited without resolving the promise.
throw new \Error('Scheduler suspended or exited unexpectedly');
}

return $lambda();
}
}
100 changes: 8 additions & 92 deletions src/EventLoop/Suspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,103 +10,19 @@
* ```php
* $suspension = EventLoop::createSuspension();
*
* $promise->then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable));
* $promise->then(
* fn (mixed $value) => $suspension->resume($value),
* fn (Throwable $error) => $suspension->throw($error)
* );
*
* $suspension->suspend();
* ```
*/
final class Suspension
interface Suspension
{
private ?\Fiber $fiber;
private \Fiber $scheduler;
private Driver $driver;
private bool $pending = false;
private ?\FiberError $error = null;
/** @var callable */
private $interrupt;
public function resume(mixed $value = null): void;

/**
* @param Driver $driver
* @param \Fiber $scheduler
* @param callable $interrupt
*
* @internal
*/
public function __construct(Driver $driver, \Fiber $scheduler, callable $interrupt)
{
$this->driver = $driver;
$this->scheduler = $scheduler;
$this->interrupt = $interrupt;
$this->fiber = \Fiber::getCurrent();
public function suspend(): mixed;

// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($this->fiber !== $this->scheduler);
}

public function throw(\Throwable $throwable): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling throw()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'throw'], $throwable);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => throw $throwable);
}
}

public function resume(mixed $value = null): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling resume()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'resume'], $value);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => $value);
}
}

public function suspend(): mixed
{
if ($this->pending) {
throw new \Error('Must call resume() or throw() before calling suspend() again');
}

if ($this->fiber !== \Fiber::getCurrent()) {
throw new \Error('Must not call suspend() from another fiber');
}

$this->pending = true;

// Awaiting from within a fiber.
if ($this->fiber) {
try {
return \Fiber::suspend();
} catch (\FiberError $exception) {
$this->pending = false;
$this->error = $exception;
throw $exception;
}
}

// Awaiting from {main}.
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// Should only be true if the event loop exited without resolving the promise.
throw new \Error('Scheduler suspended or exited unexpectedly');
}

return $lambda();
}
public function throw(\Throwable $throwable): void;
}