Skip to content

Commit 19e37a2

Browse files
authored
Merge pull request #2 from reactphp-parallel/refactor0recv-observable-to-use-one-event-loop-for-all-channels
Refactor RecvObservable and SingleRecv into Factory which is syntactic sugar around the event loop bridge
2 parents 716d09f + 8aafc08 commit 19e37a2

File tree

8 files changed

+122
-143
lines changed

8 files changed

+122
-143
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"require": {
1313
"php": "^7.4",
1414
"ext-parallel": "*",
15+
"react-parallel/event-loop": "dev-master",
1516
"react/event-loop": "^1.1",
1617
"react/promise": "^2.7",
1718
"reactivex/rxphp": "^2.0",

composer.lock

Lines changed: 60 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

phpstan.neon

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
parameters:
22
checkMissingIterableValueType: false
33
ignoreErrors:
4-
- '#In method \"ReactParallel\\Streams\\RecvObservable::recv\", caught \"Throwable\" must be rethrown. Either catch a more specific exception or add a \"throw\" clause in the "catch" block to propagate the exception. More info: http:\/\/bit.ly\/failloud#'
54
- '#Function sleep is unsafe to use.#'
65

76
includes:

src/Factory.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace ReactParallel\Streams;
4+
5+
use parallel\Channel;
6+
use React\Promise\PromiseInterface;
7+
use ReactParallel\EventLoop\EventLoopBridge;
8+
use Rx\Observable;
9+
10+
final class Factory
11+
{
12+
private EventLoopBridge $loop;
13+
14+
public function __construct(EventLoopBridge $loop)
15+
{
16+
$this->loop = $loop;
17+
}
18+
19+
public function stream(Channel $channel): Observable
20+
{
21+
return $this->loop->observe($channel);
22+
}
23+
24+
public function single(Channel $channel): PromiseInterface
25+
{
26+
return $this->loop->observe($channel)->take(1)->toPromise();
27+
}
28+
}

src/RecvObservable.php

Lines changed: 0 additions & 64 deletions
This file was deleted.

src/SingleRecv.php

Lines changed: 0 additions & 56 deletions
This file was deleted.

tests/SingleRecvObservableTest.php

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
use parallel\Channel;
66
use parallel\Events;
77
use React\EventLoop\Factory;
8+
use ReactParallel\EventLoop\EventLoopBridge;
9+
use ReactParallel\Streams\Factory as StreamFactory;
810
use ReactParallel\Streams\RecvObservable;
911
use WyriHaximus\AsyncTestUtilities\AsyncTestCase;
1012
use function parallel\run;
13+
use function React\Promise\all;
1114
use function sleep;
1215

1316
/**
@@ -23,24 +26,38 @@ public function recv(): void
2326
$d = bin2hex(random_bytes(13));
2427

2528
$loop = Factory::create();
26-
$channel = Channel::make($d, Channel::Infinite);
27-
$events = new Events();
28-
$events->setTimeout(0);
29-
$events->addChannel($channel);
29+
$channels = [Channel::make($d . '_a', Channel::Infinite), Channel::make($d . '_b', Channel::Infinite)];
3030

31-
$recvObservable = new RecvObservable($loop, $events);
31+
$recvObservable = new StreamFactory(new EventLoopBridge($loop));
3232

33-
run(function () use ($channel): void {
33+
run(function () use ($channels): void {
3434
foreach (range(0, 13) as $i) {
3535
usleep(100);
36-
$channel->send($i);
36+
foreach (range(0, 130) as $j) {
37+
foreach ($channels as $channel) {
38+
$channel->send($i);
39+
}
40+
}
3741
}
3842
sleep(1);
39-
$channel->close();
43+
foreach ($channels as $channel) {
44+
$channel->close();
45+
}
4046
});
4147

42-
$rd = $this->await($recvObservable->recv()->toArray()->toPromise(), $loop, 3.3);
48+
$promises = [];
49+
foreach ($channels as $channel) {
50+
$promises[] = $recvObservable->stream($channel)->toArray()->toPromise();
51+
}
52+
53+
$rd = $this->await(all($promises), $loop, 3.3);
4354

44-
self::assertSame(range(0, 13), $rd);
55+
$range = [];
56+
foreach (range(0, 13) as $i) {
57+
foreach (range(0, 130) as $j) {
58+
$range[] = $i;
59+
}
60+
}
61+
self::assertSame([$range, $range], $rd);
4562
}
4663
}

tests/SingleRecvTest.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use parallel\Events;
77
use React\EventLoop\Factory;
88
use React\Promise\ExtendedPromiseInterface;
9+
use ReactParallel\EventLoop\EventLoopBridge;
10+
use ReactParallel\Streams\Factory as StreamFactory;
911
use ReactParallel\Streams\SingleRecv;
1012
use WyriHaximus\AsyncTestUtilities\AsyncTestCase;
1113
use ReactParallel\FutureToPromiseConverter\FutureToPromiseConverter;
@@ -28,17 +30,14 @@ public function recv(): void
2830

2931
$loop = Factory::create();
3032
$channel = Channel::make($d, Channel::Infinite);
31-
$events = new Events();
32-
$events->setTimeout(0);
33-
$events->addChannel($channel);
3433

35-
$singleRecv = new SingleRecv($loop, $events);
34+
$singleRecv = new StreamFactory(new EventLoopBridge($loop));
3635

3736
$loop->addTimer(2, function () use ($channel, $d): void {
3837
$channel->send($d);
3938
});
4039

41-
$rd = $this->await($singleRecv->recv(), $loop, 3.3);
40+
$rd = $this->await($singleRecv->single($channel), $loop, 3.3);
4241

4342
self::assertSame($d, $rd);
4443
}
@@ -52,17 +51,14 @@ public function timedOut(): void
5251

5352
$loop = Factory::create();
5453
$channel = Channel::make($d, Channel::Infinite);
55-
$events = new Events();
56-
$events->setTimeout(0);
57-
$events->addChannel($channel);
5854

59-
$singleRecv = new SingleRecv($loop, $events);
55+
$singleRecv = new StreamFactory(new EventLoopBridge($loop));
6056

6157
$loop->futureTick(static function () use ($channel): void {
6258
$channel->close();
6359
});
6460

65-
$rd = $this->await($singleRecv->recv(), $loop, 3.3);
61+
$rd = $this->await($singleRecv->single($channel), $loop, 3.3);
6662

6763
self::assertNull($rd);
6864
}

0 commit comments

Comments
 (0)