@@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up
16
16
all resources on your side.
17
17
Instead, you can use this library to stream your arbitrarily large input list
18
18
as individual records to a non-blocking (async) transformation handler. It uses
19
- [ ReactPHP] ( https://reactphp.org ) to enable you to concurrently process multiple
19
+ [ ReactPHP] ( https://reactphp.org/ ) to enable you to concurrently process multiple
20
20
records at once. You can control the concurrency limit, so that by allowing
21
21
it to process 10 operations at the same time, you can thus process this large
22
22
input list around 10 times faster and at the same time you're no longer limited
@@ -72,21 +72,25 @@ Once [installed](#install), you can use the following code to process an example
72
72
user lists by sending a (RESTful) HTTP API request for each user record:
73
73
74
74
``` php
75
+ <?php
76
+
77
+ require __DIR__ . '/vendor/autoload.php';
78
+
75
79
$browser = new React\Http\Browser();
76
80
77
81
$concurrency = isset($argv[1]) ? $argv[1] : 3;
78
82
79
83
// each job should use the browser to GET a certain URL
80
84
// limit number of concurrent jobs here
81
- $transformer = new Transformer($concurrency, function ($user) use ($browser) {
85
+ $transformer = new Clue\React\Flux\ Transformer($concurrency, function ($user) use ($browser) {
82
86
// skip users that do not have an IP address listed
83
87
if (!isset($user['ip'])) {
84
88
return React\Promise\resolve($user);
85
89
}
86
90
87
91
// look up country for this IP
88
92
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
89
- function (ResponseInterface $response) use ($user) {
93
+ function (Psr\Http\Message\ ResponseInterface $response) use ($user) {
90
94
// response successfully received
91
95
// add country to user array and return updated user
92
96
$user['country'] = (string)$response->getBody();
@@ -114,7 +118,9 @@ $transformer->on('data', function ($user) {
114
118
$transformer->on('end', function () {
115
119
echo '[DONE]' . PHP_EOL;
116
120
});
117
- $transformer->on('error', 'printf');
121
+ $transformer->on('error', function (Exception $e) {
122
+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
123
+ });
118
124
119
125
```
120
126
@@ -241,7 +247,7 @@ $transformer = new Transformer(10, function ($url) use ($browser) {
241
247
return json_decode($response->getBody());
242
248
},
243
249
function (Exception $error) {
244
- var_dump('There was an error', $error ->getMessage()) ;
250
+ echo 'Error: ' . $e ->getMessage() . PHP_EOL ;
245
251
246
252
throw $error;
247
253
}
@@ -411,6 +417,10 @@ $transformer = new Transformer(10, function ($data) use ($http) {
411
417
});
412
418
413
419
$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);
420
+
421
+ $transformer->on('error', function (Exception $e) {
422
+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
423
+ });
414
424
```
415
425
416
426
Keep in mind that the transformation handler may return a rejected promise.
@@ -456,6 +466,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
456
466
457
467
$promise->then(function ($count) {
458
468
echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
469
+ }, function (Exception $e) {
470
+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
459
471
});
460
472
```
461
473
@@ -561,6 +573,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {
561
573
562
574
$promise->then(function (ResponseInterface $response) {
563
575
echo 'First successful job: ' . $response->getBody() . PHP_EOL;
576
+ }, function (Exception $e) {
577
+ echo 'Error: ' . $e->getMessage() . PHP_EOL;
564
578
});
565
579
```
566
580
0 commit comments