Skip to content

Commit 716d09f

Browse files
authored
Merge pull request #1 from reactphp-parallel/add-recv-observable
Add RecvObservable
2 parents aa15485 + 2de798b commit 716d09f

File tree

7 files changed

+185
-10
lines changed

7 files changed

+185
-10
lines changed

.phpcs.cache

Lines changed: 0 additions & 1 deletion
This file was deleted.

composer.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@
1414
"ext-parallel": "*",
1515
"react/event-loop": "^1.1",
1616
"react/promise": "^2.7",
17+
"reactivex/rxphp": "^2.0",
1718
"wyrihaximus/constants": "^1.4.3"
1819
},
1920
"require-dev": {
2021
"wyrihaximus/async-test-utilities": "^2.0",
2122
"wyrihaximus/ticking-promise": "^1.6"
2223
},
23-
"extra": {
24-
"unused": [
25-
"ext-parallel"
26-
]
27-
},
2824
"config": {
2925
"platform": {
3026
"php": "7.4"
3127
},
3228
"sort-packages": true
3329
},
30+
"extra": {
31+
"unused": [
32+
"ext-parallel"
33+
]
34+
},
3435
"autoload": {
3536
"psr-4": {
3637
"ReactParallel\\Streams\\": "src/"

composer.lock

Lines changed: 66 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

phpstan.neon

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
parameters:
22
checkMissingIterableValueType: false
3-
ignoreErrors: []
3+
ignoreErrors:
4+
- '#In method \"ReactParallel\\Streams\\RecvObservable::recv\", caught \"Throwable\" must be rethrown. Either catch a more specific exception or add a \"throw\" clause in the "catch" block to propagate the exception. More info: http:\/\/bit.ly\/failloud#'
5+
- '#Function sleep is unsafe to use.#'
46

57
includes:
68
- vendor/wyrihaximus/async-test-utilities/rules.neon

src/RecvObservable.php

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace ReactParallel\Streams;
4+
5+
use parallel\Events;
6+
use React\EventLoop\LoopInterface;
7+
use React\EventLoop\TimerInterface;
8+
use Rx\Observable;
9+
use Rx\Subject\Subject;
10+
use Throwable;
11+
12+
final class RecvObservable
13+
{
14+
private LoopInterface $loop;
15+
16+
private Events $events;
17+
18+
public function __construct(LoopInterface $loop, Events $events)
19+
{
20+
$this->loop = $loop;
21+
$this->events = $events;
22+
}
23+
24+
public function recv(): Observable
25+
{
26+
$subject = new Subject();
27+
28+
// Call 1K times per second
29+
$timer = $this->loop->addPeriodicTimer(0.05, function () use (&$timer, $subject): void {
30+
try {
31+
while ($event = $this->events->poll()) {
32+
if ($event->type === Events\Event\Type::Read) {
33+
$subject->onNext($event->value);
34+
$this->events->addChannel($event->object);
35+
36+
break;
37+
}
38+
39+
if ($event->type !== Events\Event\Type::Close) {
40+
break;
41+
}
42+
43+
if ($timer instanceof TimerInterface) {
44+
$this->loop->cancelTimer($timer);
45+
}
46+
47+
$subject->onCompleted();
48+
49+
return;
50+
}
51+
} catch (Events\Error\Timeout $timeout) {
52+
return;
53+
} catch (Throwable $throwable) {
54+
if ($timer instanceof TimerInterface) {
55+
$this->loop->cancelTimer($timer);
56+
}
57+
58+
$subject->onError($throwable);
59+
}
60+
});
61+
62+
return $subject;
63+
}
64+
}

tests/SingleRecvObservableTest.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace ReactParallel\Tests\Streams;
4+
5+
use parallel\Channel;
6+
use parallel\Events;
7+
use React\EventLoop\Factory;
8+
use ReactParallel\Streams\RecvObservable;
9+
use WyriHaximus\AsyncTestUtilities\AsyncTestCase;
10+
use function parallel\run;
11+
use function sleep;
12+
13+
/**
14+
* @internal
15+
*/
16+
final class SingleRecvObservableTest extends AsyncTestCase
17+
{
18+
/**
19+
* @test
20+
*/
21+
public function recv(): void
22+
{
23+
$d = bin2hex(random_bytes(13));
24+
25+
$loop = Factory::create();
26+
$channel = Channel::make($d, Channel::Infinite);
27+
$events = new Events();
28+
$events->setTimeout(0);
29+
$events->addChannel($channel);
30+
31+
$recvObservable = new RecvObservable($loop, $events);
32+
33+
run(function () use ($channel): void {
34+
foreach (range(0, 13) as $i) {
35+
usleep(100);
36+
$channel->send($i);
37+
}
38+
sleep(1);
39+
$channel->close();
40+
});
41+
42+
$rd = $this->await($recvObservable->recv()->toArray()->toPromise(), $loop, 3.3);
43+
44+
self::assertSame(range(0, 13), $rd);
45+
}
46+
}

tests/SingleRecvTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public function recv(): void
3838
$channel->send($d);
3939
});
4040

41-
$loop->run();
4241
$rd = $this->await($singleRecv->recv(), $loop, 3.3);
4342

4443
self::assertSame($d, $rd);
@@ -63,7 +62,6 @@ public function timedOut(): void
6362
$channel->close();
6463
});
6564

66-
$loop->run();
6765
$rd = $this->await($singleRecv->recv(), $loop, 3.3);
6866

6967
self::assertNull($rd);

0 commit comments

Comments
 (0)