Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wpjscc committed Aug 11, 2024
0 parents commit 26bae96
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Composer ignores
/vendor
composer.phar
composer.lock
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# reactphp-x-concurrent

## install

```
composer require reactphp-x/concurrent -vvv
```

## usage

```php

use ReactphpX\Concurrent\Concurrent;
use React\Promise\Deferred;
use React\EventLoop\Loop;

$concurrent = new Concurrent(10);

for ($i = 0; $i < 20; $i++) {
$concurrent->concurrent(function () use ($i) {
$deferred = new Deferred();
echo "Request $i\n";
Loop::addTimer($i, function () use ($deferred, $i) {
$deferred->resolve($i);
});
return $deferred->promise();
})->then(function ($result) {
echo "Result $result\n";
}, function ($error) {
$message = $error->getMessage();
echo "Error $message\n";
});
}
```

## License
MIT
19 changes: 19 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "reactphp-x/concurrent",
"license": "MIT",
"autoload": {
"psr-4": {
"ReactphpX\\Concurrent\\": "src/"
}
},
"authors": [
{
"name": "wpjscc",
"email": "[email protected]"
}
],
"require": {
"react/promise": "^3.2",
"react/event-loop": "^1.5"
}
}
25 changes: 25 additions & 0 deletions examples/concurrent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

require __DIR__ . '/../vendor/autoload.php';

use ReactphpX\Concurrent\Concurrent;
use React\Promise\Deferred;
use React\EventLoop\Loop;

$concurrent = new Concurrent(10);

for ($i = 0; $i < 20; $i++) {
$concurrent->concurrent(function () use ($i) {
$deferred = new Deferred();
echo "Request $i\n";
Loop::addTimer($i, function () use ($deferred, $i) {
$deferred->resolve($i);
});
return $deferred->promise();
})->then(function ($result) {
echo "Result $result\n";
}, function ($error) {
$message = $error->getMessage();
echo "Error $message\n";
});
}
113 changes: 113 additions & 0 deletions src/Concurrent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<?php

namespace ReactphpX\Concurrent;

use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\EventLoop\Loop;
use Psr\Http\Message\ResponseInterface;
use React\Stream\ReadableStreamInterface;


final class Concurrent
{
private $limit;
private $pending = 0;
private $queue = [];

/**
* @param int $limit Maximum amount of concurrent requests handled.
*
* For example when $limit is set to 10, 10 requests will flow to $next
* while more incoming requests have to wait until one is done.
*/
public function __construct($limit)
{
$this->limit = $limit;
}

public function concurrent($callback)
{
// happy path: simply invoke next request handler if we're below limit
if ($this->pending < $this->limit) {
++$this->pending;
return $this->await(Promise\resolve($callback()));
}
// get next queue position
$queue =& $this->queue;
$queue[] = null;
\end($queue);
$id = \key($queue);

$deferred = new Deferred(function ($_, $reject) use (&$queue, $id) {
var_dump('Cancelled queued next handle');
unset($queue[$id]);
$reject(new \RuntimeException('Cancelled queued next handler'));
});

$queue[$id] = $deferred;

$pending = &$this->pending;
$that = $this;
return $deferred->promise()->then(function () use (&$pending, $that,$callback) {
// invoke next request handler
++$pending;
return $that->await(Promise\resolve($callback()));
});
}

/**
* @internal
* @param PromiseInterface $promise
* @return PromiseInterface
*/
public function await(PromiseInterface $promise)
{
$that = $this;
return $promise->then(function ($data) use ($that) {
if (interface_exists(ResponseInterface::class) && $data instanceof ResponseInterface) {
$body = $data->getBody();
if (interface_exists(ReadableStreamInterface::class) && $body instanceof ReadableStreamInterface && $body->isReadable()) {
$body->on('close', function () use ($that) {
$that->processQueue();
});
} else {
$that->processQueue();
}
}
else if (interface_exists(ReadableStreamInterface::class) && $data instanceof ReadableStreamInterface && $data->isReadable()) {
$data->on('close', function () use ($that) {
$that->processQueue();
});
}
else {
$that->processQueue();
}
return $data;
}, function ($error) use ($that) {
$that->processQueue();
return Promise\reject($error);
});
}

/**
* @internal
*/
public function processQueue()
{
// skip if we're still above concurrency limit or there's no queued request waiting
if (--$this->pending >= $this->limit || !$this->queue) {
return;
}

$first = \reset($this->queue);
unset($this->queue[key($this->queue)]);

Loop::futureTick(function () use ($first) {
$first->resolve(null);
});
}
}


0 comments on commit 26bae96

Please sign in to comment.