diff --git a/.phpcs.cache b/.phpcs.cache deleted file mode 100644 index 8dd0c66..0000000 --- a/.phpcs.cache +++ /dev/null @@ -1 +0,0 @@ -{"config":{"phpVersion":70406,"tabWidth":4,"encoding":"utf-8","recordErrors":true,"annotations":true,"configData":{"installed_paths":"..\/..\/slevomat\/coding-standard,..\/..\/wyrihaximus\/coding-standard\/src,..\/..\/doctrine\/coding-standard\/lib"},"codeHash":"965e46655295cb0b6e4da9a0b6ae4b50","rulesetHash":"6437dc4eda23e7c26f5f667da4f4ba90"},"\/home\/wyrihaximus\/Projects\/ReactPHPParallel\/streams\/src\/SingleRecv.php":{"hash":"7ccd997df158bcc969d1e64f11b745d633204","errors":{"15":{"27":[{"message":"Private member variable \"loop\" must contain a leading underscore","source":"Squiz.NamingConventions.ValidVariableName.PrivateNoUnderscore","listener":"PHP_CodeSniffer\\Standards\\Squiz\\Sniffs\\NamingConventions\\ValidVariableNameSniff","severity":0,"fixable":false}]},"17":{"20":[{"message":"Private member variable \"events\" must contain a leading underscore","source":"Squiz.NamingConventions.ValidVariableName.PrivateNoUnderscore","listener":"PHP_CodeSniffer\\Standards\\Squiz\\Sniffs\\NamingConventions\\ValidVariableNameSniff","severity":0,"fixable":false}]},"19":{"12":[{"message":"Missing doc comment for function __construct()","source":"Squiz.Commenting.FunctionComment.Missing","listener":"PHP_CodeSniffer\\Standards\\Squiz\\Sniffs\\Commenting\\FunctionCommentSniff","severity":0,"fixable":false}]},"25":{"12":[{"message":"Missing doc comment for function recv()","source":"Squiz.Commenting.FunctionComment.Missing","listener":"PHP_CodeSniffer\\Standards\\Squiz\\Sniffs\\Commenting\\FunctionCommentSniff","severity":0,"fixable":false}]}},"warnings":[],"metrics":{"Number of newlines at EOF":{"values":{"1":1}},"PHP closing tag at end of PHP-only file":{"values":{"no":1}},"Declarations and side effects mixed":{"values":{"no":1}},"PHP short open tag used":{"values":{"no":1}},"EOL char":{"values":{"\\n":1}},"Line length":{"values":{"80 or less":43,"81-120":3}},"Line indent":{"values":{"spaces":34}},"PHP keyword case":{"values":{"lower":37}},"Multiple statements on same line":{"values":{"no":22}},"Class opening brace placement":{"values":{"new line":1}},"One class per file":{"values":{"yes":1}},"Class defined in namespace":{"values":{"yes":1}},"PascalCase class name":{"values":{"yes":1}},"Function opening brace placement":{"values":{"new line":2}},"Function has doc comment":{"values":{"no":2}},"Function spacing after":{"values":{"1":1}},"Function spacing before":{"values":{"1":1}},"Spacing before object operator":{"values":[14]},"Spacing after object operator":{"values":[14]},"Adjacent assignments aligned":{"values":{"yes":1}},"CamelCase method name":{"values":{"yes":1}},"Function spacing after last":{"values":[1]},"Closure opening brace placement":{"values":{"same line":2}},"PHP type case":{"values":{"lower":2}},"Inline comment style":{"values":{"\/\/ ...":1}},"Blank lines at start of control structure":{"values":[7]},"Blank lines at end of control structure":{"values":[7]},"Spaces after control structure open parenthesis":{"values":[6]},"Spaces before control structure close parenthesis":{"values":[6]},"Control structure defined inline":{"values":{"no":4}},"Short array syntax used":{"values":{"yes":1}},"Array end comma":{"values":{"no":1}},"PHP constant case":{"values":{"lower":1}}},"errorCount":4,"warningCount":0,"fixableCount":0,"numTokens":422}} \ No newline at end of file diff --git a/composer.json b/composer.json index f027e17..9e555d2 100644 --- a/composer.json +++ b/composer.json @@ -14,23 +14,24 @@ "ext-parallel": "*", "react/event-loop": "^1.1", "react/promise": "^2.7", + "reactivex/rxphp": "^2.0", "wyrihaximus/constants": "^1.4.3" }, "require-dev": { "wyrihaximus/async-test-utilities": "^2.0", "wyrihaximus/ticking-promise": "^1.6" }, - "extra": { - "unused": [ - "ext-parallel" - ] - }, "config": { "platform": { "php": "7.4" }, "sort-packages": true }, + "extra": { + "unused": [ + "ext-parallel" + ] + }, "autoload": { "psr-4": { "ReactParallel\\Streams\\": "src/" diff --git a/composer.lock b/composer.lock index 23520af..46ae594 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "912cd3656752319e02286d98f7ffc97c", + "content-hash": "c63dede3998d9aa49c0e1af8ce0c7b86", "packages": [ { "name": "react/event-loop", @@ -94,6 +94,71 @@ ], "time": "2020-05-12T15:16:56+00:00" }, + { + "name": "reactivex/rxphp", + "version": "2.0.7", + "source": { + "type": "git", + "url": "https://github.com/ReactiveX/RxPHP.git", + "reference": "fd74a0cd2b5edd4a48e4ff12aaa44cc6cdc4a9b4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/ReactiveX/RxPHP/zipball/fd74a0cd2b5edd4a48e4ff12aaa44cc6cdc4a9b4", + "reference": "fd74a0cd2b5edd4a48e4ff12aaa44cc6cdc4a9b4", + "shasum": "" + }, + "require": { + "php": "~7.0", + "react/promise": "~2.2" + }, + "require-dev": { + "phpunit/phpcov": "^3.1", + "phpunit/phpunit": "^5.7", + "react/event-loop": "^1.0 || ^0.5 || ^0.4.2", + "satooshi/php-coveralls": "~1.0" + }, + "suggest": { + "react/event-loop": "Used for scheduling async operations" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0-dev" + } + }, + "autoload": { + "psr-4": { + "Rx\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Alexander", + "email": "iam.asm89@gmail.com" + }, + { + "name": "David Dan", + "email": "davidwdan@gmail.com" + }, + { + "name": "Matt Bonneau", + "email": "matt@bonneau.net" + } + ], + "description": "Reactive extensions for php.", + "homepage": "https://github.com/ReactiveX/RxPHP", + "keywords": [ + "extensions", + "reactive", + "rx" + ], + "time": "2018-04-18T01:34:36+00:00" + }, { "name": "wyrihaximus/constants", "version": "1.5.0", diff --git a/phpstan.neon b/phpstan.neon index ada5313..88086c6 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,6 +1,8 @@ parameters: checkMissingIterableValueType: false - ignoreErrors: [] + ignoreErrors: + - '#In method \"ReactParallel\\Streams\\RecvObservable::recv\", caught \"Throwable\" must be rethrown. Either catch a more specific exception or add a \"throw\" clause in the "catch" block to propagate the exception. More info: http:\/\/bit.ly\/failloud#' + - '#Function sleep is unsafe to use.#' includes: - vendor/wyrihaximus/async-test-utilities/rules.neon \ No newline at end of file diff --git a/src/RecvObservable.php b/src/RecvObservable.php new file mode 100644 index 0000000..b9fbfa1 --- /dev/null +++ b/src/RecvObservable.php @@ -0,0 +1,64 @@ +loop = $loop; + $this->events = $events; + } + + public function recv(): Observable + { + $subject = new Subject(); + + // Call 1K times per second + $timer = $this->loop->addPeriodicTimer(0.05, function () use (&$timer, $subject): void { + try { + while ($event = $this->events->poll()) { + if ($event->type === Events\Event\Type::Read) { + $subject->onNext($event->value); + $this->events->addChannel($event->object); + + break; + } + + if ($event->type !== Events\Event\Type::Close) { + break; + } + + if ($timer instanceof TimerInterface) { + $this->loop->cancelTimer($timer); + } + + $subject->onCompleted(); + + return; + } + } catch (Events\Error\Timeout $timeout) { + return; + } catch (Throwable $throwable) { + if ($timer instanceof TimerInterface) { + $this->loop->cancelTimer($timer); + } + + $subject->onError($throwable); + } + }); + + return $subject; + } +} diff --git a/tests/SingleRecvObservableTest.php b/tests/SingleRecvObservableTest.php new file mode 100644 index 0000000..2b61a4c --- /dev/null +++ b/tests/SingleRecvObservableTest.php @@ -0,0 +1,46 @@ +setTimeout(0); + $events->addChannel($channel); + + $recvObservable = new RecvObservable($loop, $events); + + run(function () use ($channel): void { + foreach (range(0, 13) as $i) { + usleep(100); + $channel->send($i); + } + sleep(1); + $channel->close(); + }); + + $rd = $this->await($recvObservable->recv()->toArray()->toPromise(), $loop, 3.3); + + self::assertSame(range(0, 13), $rd); + } +} diff --git a/tests/SingleRecvTest.php b/tests/SingleRecvTest.php index e9c19db..8e92354 100644 --- a/tests/SingleRecvTest.php +++ b/tests/SingleRecvTest.php @@ -38,7 +38,6 @@ public function recv(): void $channel->send($d); }); - $loop->run(); $rd = $this->await($singleRecv->recv(), $loop, 3.3); self::assertSame($d, $rd); @@ -63,7 +62,6 @@ public function timedOut(): void $channel->close(); }); - $loop->run(); $rd = $this->await($singleRecv->recv(), $loop, 3.3); self::assertNull($rd);