diff --git a/composer.json b/composer.json index 2b00a36a..9d80c039 100644 --- a/composer.json +++ b/composer.json @@ -39,6 +39,7 @@ "src/constants.php", "src/core/Coroutine/functions.php", "src/core/Coroutine/Http/functions.php", + "src/core/Future/functions.php", "src/std/exec.php", "src/ext/curl.php", "src/ext/sockets.php", diff --git a/examples/futures/docs.php b/examples/futures/docs.php new file mode 100644 index 00000000..90867833 --- /dev/null +++ b/examples/futures/docs.php @@ -0,0 +1,44 @@ +await(); + + $future = Future\async(static function (): string { + return Http\get('https://httpbin.org/get')->getBody(); + }); + + echo $future->await(); + + $future = Future\async(static function (): string { + throw new RuntimeException('Futures propagates exceptions'); + }); + + try { + $future->await(); + } catch (Throwable $e) { + echo $e->getMessage(); + } +}); diff --git a/examples/futures/functions.php b/examples/futures/functions.php new file mode 100644 index 00000000..b79ce585 --- /dev/null +++ b/examples/futures/functions.php @@ -0,0 +1,18 @@ +getBody(); + })->await(); +}); diff --git a/examples/futures/future.php b/examples/futures/future.php new file mode 100644 index 00000000..bfb019b1 --- /dev/null +++ b/examples/futures/future.php @@ -0,0 +1,20 @@ +getBody(); + }); + + echo $future->await(); +}); diff --git a/examples/futures/join.php b/examples/futures/join.php new file mode 100644 index 00000000..2ea92665 --- /dev/null +++ b/examples/futures/join.php @@ -0,0 +1,28 @@ +getBody(); + }); + + $future2 = \Swoole\Future::create(static function () { + return \Swoole\Coroutine\Http\get('https://httpbin.org/delay/2')->getBody(); + }); + + echo implode(PHP_EOL, \Swoole\Future::join([$future1, $future2])); + + printf("Elapsed %f\n", microtime(true) - $start); +}); diff --git a/examples/futures/throw.php b/examples/futures/throw.php new file mode 100644 index 00000000..a1512aa2 --- /dev/null +++ b/examples/futures/throw.php @@ -0,0 +1,18 @@ +await(); +}); diff --git a/src/core/Future.php b/src/core/Future.php new file mode 100644 index 00000000..796ae12c --- /dev/null +++ b/src/core/Future.php @@ -0,0 +1,82 @@ +func = $func; + } + + public static function create(callable $func): self + { + return new self($func); + } + + public static function join(array $futures, float $timeout = -1): array + { + $len = count($futures); + $ch = new Channel($len); + $rets = []; + + Coroutine::join(array_map(static function (Future $future) use ($ch): int { + return $future->run($ch); + }, $futures)); + + while ($len--) { + $ret = $ch->pop($timeout); + + if ($ret instanceof Throwable) { + throw $ret; + } + + $rets[] = $ret; + } + + return $rets; + } + + public function run(Channel $channel): int + { + return Coroutine::create(function () use ($channel) { + try { + $channel->push(($this->func)()); + } catch (Throwable $throwable) { + $channel->push($throwable); + } + }); + } + + public function await(float $timeout = -1) + { + $ch = new Channel(1); + + $cid = $this->run($ch); + + $ret = $ch->pop($timeout); + + if ($ret instanceof Throwable) { + throw $ret; + } + + return $ret; + } +} diff --git a/src/core/Future/functions.php b/src/core/Future/functions.php new file mode 100644 index 00000000..11140b7f --- /dev/null +++ b/src/core/Future/functions.php @@ -0,0 +1,24 @@ +assertSame('test', $future->await()); + }); + } + + public function testJoin(): void + { + run(function (): void { + $future1 = async(static function (): string { + return 'foo'; + }); + + $future2 = async(static function (): string { + return 'bar'; + }); + + $strings = join($future1, $future2); + + $this->assertContains('foo', $strings); + $this->assertContains('bar', $strings); + }); + } +}