diff --git a/Coroutine/Coroutine.php b/Coroutine/Coroutine.php index 9d6b3d0..cc46f0f 100644 --- a/Coroutine/Coroutine.php +++ b/Coroutine/Coroutine.php @@ -237,7 +237,6 @@ public function __construct() \spawn_setup($this->uv); \file_operation(true); - // @codeCoverageIgnoreStart $this->onEvent = function ($event, $status, $events, $stream) { if ($status !== 0) { $this->pollEvent($stream); @@ -254,7 +253,6 @@ public function __construct() $this->updateScheduler('write', $stream); } }; - // @codeCoverageIgnoreEnd $this->onTimer = function ($timer) { $taskTimer = $this->timers[(int) $timer]; @@ -275,19 +273,15 @@ protected function timestamp() return (float) ($this->isHighTimer ? \hrtime(true) / 1e+9 : \microtime(true)); } - /** - * @codeCoverageIgnore - */ protected function addEvent($stream) { if (!isset($this->events[(int) $stream])) { $meta = \stream_get_meta_data($stream); switch ($meta['stream_type'] ?? '') { case 'STDIO': - if ($meta['wrapper_type'] == 'plainfile') { - - break; - } + case 'TEMP': + $this->events[(int) $stream] = false; + break; case 'tcp_socket/ssl': $this->events[(int) $stream] = \uv_poll_init($this->uv, $stream); break; @@ -301,9 +295,6 @@ protected function addEvent($stream) } } - /** - * @codeCoverageIgnore - */ protected function removeReadEvent($stream) { if (!isset($this->events[(int) $stream])) { @@ -311,8 +302,12 @@ protected function removeReadEvent($stream) } if (isset($this->waitingForRead[(int) $stream])) { - \uv_poll_stop($this->events[(int) $stream]); - \uv_close($this->events[(int) $stream]); + $event = $this->events[(int) $stream]; + if ($event instanceof \UVPoll) { + \uv_poll_stop($event); + \uv_close($event); + } + unset($this->events[(int) $stream]); return; } @@ -320,9 +315,6 @@ protected function removeReadEvent($stream) $this->pollEvent($stream); } - /** - * @codeCoverageIgnore - */ protected function removeWriteEvent($stream) { if (!isset($this->events[(int) $stream])) { @@ -330,8 +322,12 @@ protected function removeWriteEvent($stream) } if (isset($this->waitingForWrite[(int) $stream])) { - \uv_poll_stop($this->events[(int) $stream]); - \uv_close($this->events[(int) $stream]); + $event = $this->events[(int) $stream]; + if ($event instanceof \UVPoll) { + \uv_poll_stop($event); + \uv_close($event); + } + unset($this->events[(int) $stream]); return; } @@ -339,9 +335,6 @@ protected function removeWriteEvent($stream) $this->pollEvent($stream); } - /** - * @codeCoverageIgnore - */ protected function pollEvent($stream) { if (!isset($this->events[(int) $stream])) { @@ -439,7 +432,7 @@ public function isUv(): bool public function isUvActive(): bool { - return ($this->uv instanceof \UVLoop) && $this->useUv; + return ($this->isUv() && \IS_LINUX); } public function isPcntl(): bool @@ -629,9 +622,14 @@ public function execute($isReturn = false) if ($task->isFinished()) { $this->cancelProgress($task); - $task->setState('completed'); $id = $task->taskId(); - $this->completedMap[$id] = $task; + if ($task->isNetwork()) { + $task->close(); + } else { + $task->setState('completed'); + $this->completedMap[$id] = $task; + } + unset($this->taskMap[$id]); } else { $task->setState('rescheduled'); @@ -677,7 +675,7 @@ protected function runTimers() } /** - * Check and return `true` for any pending I/O events, signals, subprocess, + * Check and return `true` for `no` pending I/O events, signals, subprocess, * streams/sockets/fd activity, timers or tasks. */ protected function hasCoroutines(): bool @@ -719,9 +717,14 @@ protected function ioWaiting() $this->process->processing(); $nextTimeout = $this->runTimers(); $streamWait = $this->waitTime($nextTimeout); - $this->ioSocketStream($streamWait); - if ($this->isUv()) { - \uv_run($this->uv, ($this->waitTime($nextTimeout) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT)); + if ($this->isUvActive()) { + \uv_run($this->uv, ($streamWait ? \UV::RUN_ONCE : \UV::RUN_NOWAIT)); + $this->ioSocketStream($this->waitTime($nextTimeout)); + } else { + $this->ioSocketStream($streamWait); + if ($this->isUv()) { + \uv_run($this->uv, ($this->waitTime($nextTimeout) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT)); + } } yield; @@ -837,7 +840,7 @@ public function addSignal($signal, $listener) } elseif ($this->isUvActive() || $this->isUvSignal) { if (!isset($this->signals[$signal])) { $signals = $this->signaler; - $this->signals[$signal] = \uv_signal_init($this->isUvActive() ? $this->uv : \uv_default_loop()); + $this->signals[$signal] = \uv_signal_init($this->uv); \uv_signal_start($this->signals[$signal], function ($signal, $signalInt) use ($signals) { $signals->execute($signalInt); }, $signal); @@ -856,7 +859,8 @@ public function removeSignal($signal, $listener) \pcntl_signal($signal, \SIG_DFL); } elseif ($this->isUvActive() || $this->isUvSignal) { if (isset($this->signals[$signal]) && $this->signaler->count($signal) === 0) { - //\uv_signal_stop($this->signals[$signal]); + if (\uv_is_active($this->signals[$signal])) + @\uv_signal_stop($this->signals[$signal]); unset($this->signals[$signal]); } } diff --git a/Coroutine/CoroutineInterface.php b/Coroutine/CoroutineInterface.php index c41d4bd..c8b7642 100644 --- a/Coroutine/CoroutineInterface.php +++ b/Coroutine/CoroutineInterface.php @@ -265,7 +265,9 @@ public function getProcess( public function getParallel(): ParallelInterface; /** - * Check if **UV** event loop `libuv` engine is available, and turned `on` for native asynchronous handling. + * Is `libuv` features available and the system is **Linux**. + * + * `Note:` Network related 'libuv` features are currently broken on **Windows**. * * @return bool */ diff --git a/Coroutine/Task.php b/Coroutine/Task.php index 396db08..452ca1f 100644 --- a/Coroutine/Task.php +++ b/Coroutine/Task.php @@ -101,7 +101,7 @@ final class Task implements TaskInterface /** * Task type indicator. * - * Currently using types of either `paralleled`, `awaited`, or `monitored`. + * Currently using types of either `paralleled`, `awaited`, `networked`, or `monitored`. * * @var string */ @@ -210,6 +210,11 @@ public function isParallel(): bool return ($this->taskType == 'paralleled'); } + public function isNetwork(): bool + { + return ($this->taskType == 'networked'); + } + public function isProcess(): bool { return ($this->state == 'process'); @@ -291,7 +296,9 @@ public function run() ? $this->coroutine->throw($this->exception) : $this->exception; - $this->error = $this->exception; + if (!$this->isNetwork()) + $this->error = $this->exception; + $this->exception = null; return $value; } else { @@ -299,9 +306,8 @@ public function run() ? $this->coroutine->send($this->sendValue) : $this->sendValue; - if (!empty($value)) { + if (!empty($value) && !$this->isNetwork()) $this->result = $value; - } $this->sendValue = null; return $value; diff --git a/Coroutine/TaskInterface.php b/Coroutine/TaskInterface.php index 53a24b8..6b8f6cd 100644 --- a/Coroutine/TaskInterface.php +++ b/Coroutine/TaskInterface.php @@ -10,14 +10,50 @@ interface TaskInterface { public function taskId(): ?int; + /** + * Task type indicator, currently either `paralleled`, `awaited`, + * `networked`, or `monitored`. + * + * @param string $type + * + * @return void + * + * @internal + */ public function taskType(string $type); + /** + * @param mixed $sendValue + * + * @return void + * + * @internal + */ public function sendValue($sendValue); + /** + * @param string $status + * + * @return void + * + * @internal + */ public function setState(string $status); + /** + * Return task current status state. + * + * @return string + * + * @internal + */ public function getState(): string; + /** + * Start the execution of the callers code, passing any `value` or `exception` back in forth. + * + * @return mixed + */ public function run(); /** @@ -59,6 +95,14 @@ public function isCustomState($state): bool; */ public function isParallel(): bool; + /** + * A flag that indicates the task is socket/stream related and nothing will be stored. + * - All memory is freed, not in completed task list, and no results retained. + * + * @return bool + */ + public function isNetwork(): bool; + /** * A flag that indicates whether or not the sub process task has started. * @@ -122,7 +166,11 @@ public function result(); /** * Mark the task as done and set an exception. * - * @param \Exception + * @param \Exception $exception + * + * @return void + * + * @internal */ public function setException($exception);