diff --git a/composer.json b/composer.json index 9e555d2..4f66edf 100644 --- a/composer.json +++ b/composer.json @@ -12,6 +12,7 @@ "require": { "php": "^7.4", "ext-parallel": "*", + "react-parallel/event-loop": "dev-master", "react/event-loop": "^1.1", "react/promise": "^2.7", "reactivex/rxphp": "^2.0", diff --git a/composer.lock b/composer.lock index 46ae594..52b900c 100644 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,64 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "c63dede3998d9aa49c0e1af8ce0c7b86", + "content-hash": "bc389c92bcb209302bb6600172898188", "packages": [ + { + "name": "react-parallel/event-loop", + "version": "dev-master", + "source": { + "type": "git", + "url": "https://github.com/reactphp-parallel/event-loop.git", + "reference": "afb9cd93dfe1af45fa719b32c523c4fd64052566" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp-parallel/event-loop/zipball/afb9cd93dfe1af45fa719b32c523c4fd64052566", + "reference": "afb9cd93dfe1af45fa719b32c523c4fd64052566", + "shasum": "" + }, + "require": { + "ext-parallel": "*", + "php": "^7.4", + "react/event-loop": "^1.1", + "react/promise": "^2.7", + "reactivex/rxphp": "^2.0", + "wyrihaximus/constants": "^1.4.3" + }, + "require-dev": { + "wyrihaximus/async-test-utilities": "^2.0" + }, + "type": "library", + "extra": { + "unused": [ + "ext-parallel" + ] + }, + "autoload": { + "psr-4": { + "ReactParallel\\EventLoop\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Cees-Jan Kiewiet", + "email": "ceesjank@gmail.com", + "homepage": "http://wyrihaximus.net/" + } + ], + "description": "🌀🌀 Event Loop bridge to ext-parallel Events", + "funding": [ + { + "url": "https://github.com/WyriHaximus", + "type": "github" + } + ], + "time": "2020-06-20T07:40:24+00:00" + }, { "name": "react/event-loop", "version": "v1.1.1", @@ -6491,7 +6547,9 @@ ], "aliases": [], "minimum-stability": "dev", - "stability-flags": [], + "stability-flags": { + "react-parallel/event-loop": 20 + }, "prefer-stable": true, "prefer-lowest": false, "platform": { diff --git a/phpstan.neon b/phpstan.neon index 88086c6..027cefc 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,7 +1,6 @@ parameters: checkMissingIterableValueType: false ignoreErrors: - - '#In method \"ReactParallel\\Streams\\RecvObservable::recv\", caught \"Throwable\" must be rethrown. Either catch a more specific exception or add a \"throw\" clause in the "catch" block to propagate the exception. More info: http:\/\/bit.ly\/failloud#' - '#Function sleep is unsafe to use.#' includes: diff --git a/src/Factory.php b/src/Factory.php new file mode 100644 index 0000000..b242580 --- /dev/null +++ b/src/Factory.php @@ -0,0 +1,28 @@ +loop = $loop; + } + + public function stream(Channel $channel): Observable + { + return $this->loop->observe($channel); + } + + public function single(Channel $channel): PromiseInterface + { + return $this->loop->observe($channel)->take(1)->toPromise(); + } +} diff --git a/src/RecvObservable.php b/src/RecvObservable.php deleted file mode 100644 index b9fbfa1..0000000 --- a/src/RecvObservable.php +++ /dev/null @@ -1,64 +0,0 @@ -loop = $loop; - $this->events = $events; - } - - public function recv(): Observable - { - $subject = new Subject(); - - // Call 1K times per second - $timer = $this->loop->addPeriodicTimer(0.05, function () use (&$timer, $subject): void { - try { - while ($event = $this->events->poll()) { - if ($event->type === Events\Event\Type::Read) { - $subject->onNext($event->value); - $this->events->addChannel($event->object); - - break; - } - - if ($event->type !== Events\Event\Type::Close) { - break; - } - - if ($timer instanceof TimerInterface) { - $this->loop->cancelTimer($timer); - } - - $subject->onCompleted(); - - return; - } - } catch (Events\Error\Timeout $timeout) { - return; - } catch (Throwable $throwable) { - if ($timer instanceof TimerInterface) { - $this->loop->cancelTimer($timer); - } - - $subject->onError($throwable); - } - }); - - return $subject; - } -} diff --git a/src/SingleRecv.php b/src/SingleRecv.php deleted file mode 100644 index ed5e88a..0000000 --- a/src/SingleRecv.php +++ /dev/null @@ -1,56 +0,0 @@ -loop = $loop; - $this->events = $events; - } - - public function recv(): PromiseInterface - { - return new Promise(function (callable $resolve, callable $reject): void { - // Call 1K times per second - $timer = $this->loop->addPeriodicTimer(0.001, function () use (&$timer, $resolve): void { - try { - while ($event = $this->events->poll()) { - if (! in_array($event->type, [Events\Event\Type::Read, Events\Event\Type::Close], true)) { - continue; - } - - if ($timer instanceof TimerInterface) { - $this->loop->cancelTimer($timer); - } - - $resolve($event->value); - - return; - } - } catch (Events\Error\Timeout $timeout) { - return; - } catch (Throwable $throwable) { - if ($timer instanceof TimerInterface) { - $this->loop->cancelTimer($timer); - } - - throw $throwable; - } - }); - }); - } -} diff --git a/tests/SingleRecvObservableTest.php b/tests/SingleRecvObservableTest.php index 2b61a4c..672da77 100644 --- a/tests/SingleRecvObservableTest.php +++ b/tests/SingleRecvObservableTest.php @@ -5,9 +5,12 @@ use parallel\Channel; use parallel\Events; use React\EventLoop\Factory; +use ReactParallel\EventLoop\EventLoopBridge; +use ReactParallel\Streams\Factory as StreamFactory; use ReactParallel\Streams\RecvObservable; use WyriHaximus\AsyncTestUtilities\AsyncTestCase; use function parallel\run; +use function React\Promise\all; use function sleep; /** @@ -23,24 +26,38 @@ public function recv(): void $d = bin2hex(random_bytes(13)); $loop = Factory::create(); - $channel = Channel::make($d, Channel::Infinite); - $events = new Events(); - $events->setTimeout(0); - $events->addChannel($channel); + $channels = [Channel::make($d . '_a', Channel::Infinite), Channel::make($d . '_b', Channel::Infinite)]; - $recvObservable = new RecvObservable($loop, $events); + $recvObservable = new StreamFactory(new EventLoopBridge($loop)); - run(function () use ($channel): void { + run(function () use ($channels): void { foreach (range(0, 13) as $i) { usleep(100); - $channel->send($i); + foreach (range(0, 130) as $j) { + foreach ($channels as $channel) { + $channel->send($i); + } + } } sleep(1); - $channel->close(); + foreach ($channels as $channel) { + $channel->close(); + } }); - $rd = $this->await($recvObservable->recv()->toArray()->toPromise(), $loop, 3.3); + $promises = []; + foreach ($channels as $channel) { + $promises[] = $recvObservable->stream($channel)->toArray()->toPromise(); + } + + $rd = $this->await(all($promises), $loop, 3.3); - self::assertSame(range(0, 13), $rd); + $range = []; + foreach (range(0, 13) as $i) { + foreach (range(0, 130) as $j) { + $range[] = $i; + } + } + self::assertSame([$range, $range], $rd); } } diff --git a/tests/SingleRecvTest.php b/tests/SingleRecvTest.php index 8e92354..0f66705 100644 --- a/tests/SingleRecvTest.php +++ b/tests/SingleRecvTest.php @@ -6,6 +6,8 @@ use parallel\Events; use React\EventLoop\Factory; use React\Promise\ExtendedPromiseInterface; +use ReactParallel\EventLoop\EventLoopBridge; +use ReactParallel\Streams\Factory as StreamFactory; use ReactParallel\Streams\SingleRecv; use WyriHaximus\AsyncTestUtilities\AsyncTestCase; use ReactParallel\FutureToPromiseConverter\FutureToPromiseConverter; @@ -28,17 +30,14 @@ public function recv(): void $loop = Factory::create(); $channel = Channel::make($d, Channel::Infinite); - $events = new Events(); - $events->setTimeout(0); - $events->addChannel($channel); - $singleRecv = new SingleRecv($loop, $events); + $singleRecv = new StreamFactory(new EventLoopBridge($loop)); $loop->addTimer(2, function () use ($channel, $d): void { $channel->send($d); }); - $rd = $this->await($singleRecv->recv(), $loop, 3.3); + $rd = $this->await($singleRecv->single($channel), $loop, 3.3); self::assertSame($d, $rd); } @@ -52,17 +51,14 @@ public function timedOut(): void $loop = Factory::create(); $channel = Channel::make($d, Channel::Infinite); - $events = new Events(); - $events->setTimeout(0); - $events->addChannel($channel); - $singleRecv = new SingleRecv($loop, $events); + $singleRecv = new StreamFactory(new EventLoopBridge($loop)); $loop->futureTick(static function () use ($channel): void { $channel->close(); }); - $rd = $this->await($singleRecv->recv(), $loop, 3.3); + $rd = $this->await($singleRecv->single($channel), $loop, 3.3); self::assertNull($rd); }