diff --git a/README.md b/README.md index a213e28..963d818 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ composer require hds-solutions/parallel-sdk ## Usage Firstly, you need to set the bootstrap file for parallel. Setting the composer's autoloader is enough. See reference [#1](#references) for more info. ```php -// check if extension is loaded to allow deploying even in envorinments where parallel isn't installed +// check if extension is loaded to allow deploying even in environments where parallel isn't installed if (extension_loaded('parallel')) { // set the path to composer's autoloader parallel\bootstrap(__DIR__.'/vendor/autoload.php'); @@ -80,8 +80,26 @@ final class ExampleWorker extends ParallelWorker { ```php use HDSSolutions\Console\Parallel\Scheduler; -$worker = new ExampleWorker(); -Scheduler::using($worker); +Scheduler::using(ExampleWorker::class); +``` + +You can also send parameters to the Worker's constructor. +```php +use HDSSolutions\Console\Parallel\ParallelWorker; + +final class ExampleWorker extends ParallelWorker { + + public function __construct( + private array $multipliers, + ) {} + +} +``` + +```php +use HDSSolutions\Console\Parallel\Scheduler; + +Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]); ``` ### Schedule tasks @@ -89,10 +107,10 @@ After defining a Worker, you can schedule tasks that will run in parallel. ```php use HDSSolutions\Console\Parallel\Scheduler; -foreach (range(1, 100) as $task) { +foreach (range(1, 100) as $task_data) { try { // tasks will start as soon as a thread is available - Scheduler::runTask($task); + Scheduler::runTask($task_data); } catch (Throwable) { // if no Worker was defined, a RuntimeException will be thrown @@ -101,17 +119,52 @@ foreach (range(1, 100) as $task) { } ``` +### Check Tasks state +Every task has an state. There is also helper functions to check current Task state: +```php +use HDSSolutions\Console\Parallel\Scheduler; +use HDSSolutions\Console\Parallel\Task; + +do { + $all_processed = true; + foreach (Scheduler::getTasks() as $task) { + switch (true) { + case $task->isPending(): + $all_processed = false; + break; + + case $task->isBeingProcessed(): + $all_processed = false; + break; + + case $task->wasProcessed(): + $result = $task->getResult(); + break; + } + } +} while ($all_processed == false); +``` + +### Wait for tasks completion +Instead of checking every task state, you can wait for all tasks to be processed before continue your code execution. +```php +use HDSSolutions\Console\Parallel\Scheduler; + +// This will pause execution until all tasks are processed +Scheduler::awaitTasksCompletion(); +``` + ### Get processed tasks result ```php use HDSSolutions\Console\Parallel\Scheduler; -use HDSSolutions\Console\Parallel\ProcessedTask; +use HDSSolutions\Console\Parallel\Task; -foreach (Scheduler::getProcessedTasks() as $processed_task) { +foreach (Scheduler::getTasks() as $task) { // you have access to the Worker class that was used to processed the task - $worker = $processed_task->getWorkerClass(); + $worker = $task->getWorkerClass(); // and the result of the task processed - $result = $processed_task->getResult(); + $result = $task->getResult(); } ``` @@ -126,9 +179,8 @@ use HDSSolutions\Console\Parallel\Scheduler; $tasks = range(1, 10); -$worker = new ExampleWorker(); -Scheduler::using($worker) - ->withProgress(steps: count($tasks); +Scheduler::using(ExampleWorker::class) + ->withProgress(steps: count($tasks)); ``` #### Usage from Worker @@ -166,14 +218,6 @@ final class ExampleWorker extends ParallelWorker { memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB ``` -## Graceful close all resources -This method will close all resources used internally by the `Scheduler` instance. -```php -use HDSSolutions\Console\Parallel\Scheduler; - -Scheduler::disconnect(); -``` - ### References 1. [parallel\bootstrap()](https://www.php.net/manual/en/parallel.bootstrap.php) 2. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics) @@ -186,3 +230,4 @@ If you encounter any security related issue, feel free to raise a ticket on the # Licence GPL-3.0 Please see [License File](LICENSE) for more information. + diff --git a/composer.json b/composer.json index 1abd786..cfdc349 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,7 @@ "src/helpers.php" ], "psr-4": { - "HDSSolutions\\Console\\": "src/" + "HDSSolutions\\Console\\Parallel\\": "src/" } }, "require-dev": { diff --git a/src/Parallel/Contracts/ParallelWorker.php b/src/Contracts/ParallelWorker.php similarity index 92% rename from src/Parallel/Contracts/ParallelWorker.php rename to src/Contracts/ParallelWorker.php index 69cc8ca..7f20fbc 100644 --- a/src/Parallel/Contracts/ParallelWorker.php +++ b/src/Contracts/ParallelWorker.php @@ -2,8 +2,6 @@ namespace HDSSolutions\Console\Parallel\Contracts; -use HDSSolutions\Console\Parallel\ProcessedTask; - interface ParallelWorker { /** @@ -92,10 +90,8 @@ public function getStartedAt(): ?float; public function getFinishedAt(): ?float; /** - * Returns the processed task - * - * @return ProcessedTask Processed task + * @return mixed Processed Task result */ - public function getProcessedTask(): ProcessedTask; + public function getResult(): mixed; } diff --git a/src/Contracts/Task.php b/src/Contracts/Task.php new file mode 100644 index 0000000..94d6350 --- /dev/null +++ b/src/Contracts/Task.php @@ -0,0 +1,86 @@ +message = $e->getMessage(); + $this->file = $e->getFile(); + $this->line = $e->getLine(); + } + + public function getMessage(): string { + return $this->message; + } + + public function getFile(): string { + return $this->file; + } + + public function getLine(): int { + return $this->line; + } + +} diff --git a/src/Internals/Commands/ParallelCommandMessage.php b/src/Internals/Commands/ParallelCommandMessage.php new file mode 100644 index 0000000..98f4de4 --- /dev/null +++ b/src/Internals/Commands/ParallelCommandMessage.php @@ -0,0 +1,26 @@ +$name; + } + +} diff --git a/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php b/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php new file mode 100644 index 0000000..60c57b3 --- /dev/null +++ b/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php @@ -0,0 +1,21 @@ +release(); + + // read messages + try { while (Event\Type::Close !== $message = $this->recv()) { + try { + // check if we got a valid message + if ( !($message instanceof ParallelCommandMessage)) { + throw new RuntimeException('Invalid message received!'); + } + + // process message + $this->processMessage($message); + + } catch (Throwable $e) { + // redirect exception to caller using output channel + $this->send(new ParallelException($e)); + } + + }} catch (Channel\Error\Closed) { + // TODO channel must not be closed + $debug = true; + } + + $this->afterListening(); + } + + abstract protected function afterListening(): void; + + /** + * @param ParallelCommandMessage $message + * + * @return mixed + * @throws RuntimeException If the requested action isn't implemented + */ + final public function processMessage(ParallelCommandMessage $message): mixed { + // check if action is implemented + if ( !method_exists($this, $method = lcfirst(implode('', array_map('ucfirst', explode('_', $message->action)))))) { + throw new RuntimeException(sprintf('Action "%s" not yet implemented', $message->action)); + } + + // execute action and return the result + return $this->{$method}(...$message->args); + } + +} diff --git a/src/Internals/Communication/TwoWayChannel.php b/src/Internals/Communication/TwoWayChannel.php new file mode 100644 index 0000000..dbb4eb6 --- /dev/null +++ b/src/Internals/Communication/TwoWayChannel.php @@ -0,0 +1,96 @@ + + * Shall make a buffered two-way channel with the given name and capacity + * + * @param string $name The name of the channel + * + * @return self + * @throws Channel\Error\Existence if channel already exists + */ + public static function make(string $name): self { + $instance = new self($name, true); + // create channels + $instance->input = Channel::make("$name@input"); + $instance->output = Channel::make("$name@output"); + + return $instance; + } + + /** + * Shall open the two-way channel with the given name + * + * @param string $name The name of the channel + * + * @return self + * @throws Channel\Error\Existence if channel does not exist + */ + public static function open(string $name): self { + $instance = new self($name, false); + // create channels + $instance->input = Channel::open("$name@output"); + $instance->output = Channel::open("$name@input"); + + return $instance; + } + + /** + * @throws Channel\Error\Closed if output channel is closed + * @throws Channel\Error\IllegalValue if value is illegal + */ + public function send(mixed $value): mixed { + if (PARALLEL_EXT_LOADED) { + $this->output->send($value); + } + + return $value; + } + + /** + * @throws Channel\Error\Closed if input channel is closed. + */ + public function receive(): mixed { + return $this->input->recv(); + } + + /** + * Shall close this two-way channel + * + * @throws Channel\Error\Closed if channel is closed + */ + public function close(): void { + $this->input->close(); + $this->output->close(); + } + + public function release(): bool { + return $this->send(true); + } + +} diff --git a/src/Internals/ProgressBarWorker.php b/src/Internals/ProgressBarWorker.php new file mode 100644 index 0000000..38489fb --- /dev/null +++ b/src/Internals/ProgressBarWorker.php @@ -0,0 +1,112 @@ +openChannels(); + $this->createProgressBar(); + + // threads memory usage and peak + $this->threads_memory = [ + 'current' => [ memory_get_usage() ], + 'peak' => [ memory_get_usage() ], + ]; + } + + public function afterListening(): void { + $this->closeChannels(); + } + + protected function registerWorker(string $worker, int $steps = 0): void { + // check if ProgressBar isn't already started + if ( !$this->progressBarStarted) { + // start Worker ProgressBar + $this->progressBar->start($steps); + $this->progressBarStarted = true; + + } else { + // update steps + $this->progressBar->setMaxSteps($steps); + } + } + + protected function progressBarAction(string $action, array $args): void { + // redirect action to ProgressBar instance + $this->progressBar->$action(...$args); + + if ($action === 'advance') { + // count processed item + $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; + // update ProgressBar items per second report + $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); + } + } + + protected function statsReport(string $worker_id, int $memory_usage): void { + // update memory usage for this thread + $this->threads_memory['current'][0] = memory_get_usage(); + // update peak memory usage + if ($this->threads_memory['current'][0] > $this->threads_memory['peak'][0]) { + $this->threads_memory['peak'][0] = $this->threads_memory['current'][0]; + } + + // save memory usage of thread + $this->threads_memory['current'][$worker_id] = $memory_usage; + // update peak memory usage + if ($this->threads_memory['current'][$worker_id] > ($this->threads_memory['peak'][$worker_id] ?? 0)) { + $this->threads_memory['peak'][$worker_id] = $this->threads_memory['current'][$worker_id]; + } + + // update ProgressBar memory report + $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); + } + + private function getMemoryUsage(): string { + // main memory used + $main = Helper::formatMemory($this->threads_memory['current'][0]); + // total memory used (sum of all threads) + $total = Helper::formatMemory($total_raw = array_sum($this->threads_memory['current'])); + // average of each thread + $average = Helper::formatMemory((int) ($total_raw / (($count = count($this->threads_memory['current']) - 1) > 0 ? $count : 1))); + // peak memory usage + $peak = Helper::formatMemory(array_sum($this->threads_memory['peak'])); + + return "$main, threads: {$count}x ~$average, Σ $total ↑ $peak"; + } + + private function getItemsPerSecond(): string { + // check for empty list + if (empty($this->items)) return '0'; + + // keep only last 15s for average + $this->items = array_slice($this->items, -15, preserve_keys: true); + + // return the average of items processed per second + return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); + } + +} diff --git a/src/Internals/ProgressBarWorker/HasChannels.php b/src/Internals/ProgressBarWorker/HasChannels.php new file mode 100644 index 0000000..06d0470 --- /dev/null +++ b/src/Internals/ProgressBarWorker/HasChannels.php @@ -0,0 +1,40 @@ +progressbar_channel = TwoWayChannel::make(self::class.'@'.$this->uuid); + } + + protected function recv(): mixed { + return $this->progressbar_channel->receive(); + } + + protected function send(mixed $value): mixed { + return $this->progressbar_channel->send($value); + } + + protected function release(): void { + $this->progressbar_channel->release(); + } + + private function closeChannels(): void { + // gracefully join + $this->progressbar_channel->send(false); + // close channel + $this->progressbar_channel->close(); + } + +} diff --git a/src/Internals/ProgressBarWorker/HasProgressBar.php b/src/Internals/ProgressBarWorker/HasProgressBar.php new file mode 100644 index 0000000..299a9d9 --- /dev/null +++ b/src/Internals/ProgressBarWorker/HasProgressBar.php @@ -0,0 +1,38 @@ +progressBar = new ProgressBar(new ConsoleOutput()); + + // configure ProgressBar settings + $this->progressBar->setBarWidth( 80 ); + $this->progressBar->setRedrawFrequency( 100 ); + $this->progressBar->minSecondsBetweenRedraws( 0.1 ); + $this->progressBar->maxSecondsBetweenRedraws( 0.2 ); + $this->progressBar->setFormat(" %current% of %max%: %message%\n". + " [%bar%] %percent:3s%%\n". + " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(PARALLEL_EXT_LOADED ? "\n" : ','). + " memory: %threads_memory%\n"); + // set initial values + $this->progressBar->setMessage('Starting...'); + $this->progressBar->setMessage('??', 'items_per_second'); + $this->progressBar->setMessage('??', 'threads_memory'); + } + +} diff --git a/src/Internals/Runner.php b/src/Internals/Runner.php new file mode 100644 index 0000000..0534594 --- /dev/null +++ b/src/Internals/Runner.php @@ -0,0 +1,168 @@ +openChannels(); + $this->startEater(); + } + + protected function afterListening(): void { + $this->stopEater(); + $this->stopRunningTasks(); + $this->closeChannels(); + } + + private function getMaxCpuUsage(): int { + // return configured max CPU usage + return $this->max_cpu_count ??= (isset($_SERVER['PARALLEL_MAX_COUNT']) ? (int) $_SERVER['PARALLEL_MAX_COUNT'] : cpu_count( (float) ($_SERVER['PARALLEL_MAX_PERCENT'] ?? 1.0) )); + } + + protected function getRegisteredWorker(string $worker): RegisteredWorker | false { + if ( !array_key_exists($worker, $this->workers_hashmap)) { + return $this->send(false); + } + + // set worker as the currently selected one + return $this->selectWorker($this->workers_hashmap[$worker]) + ->send($this->getSelectedWorker()); + } + + protected function registerWorker(string | Closure $worker, array $args = []): RegisteredWorker { + // check if worker is already registered + if (is_string($worker) && array_key_exists($worker, $this->workers_hashmap)) { + throw new RuntimeException(sprintf('Worker class "%s" is already registered', $worker)); + } + + // register worker + $this->workers[] = $registered_worker = new RegisteredWorker( + uuid: $this->uuid, + identifier: $idx = count($this->workers), + worker_class: is_string($worker) ? $worker : Worker::class, + closure: $worker instanceof Closure ? $worker : null, + args: $worker instanceof Closure ? [] : $args, + ); + // and put index on the hashmap + $this->workers_hashmap[$registered_worker->getIdentifier()] = $idx; + + return $this->selectWorker($idx) + ->send($registered_worker); + } + + protected function queueTask(array $data): int { + if (null === $worker = $this->getSelectedWorker()) { + // reject task scheduling, no worker is defined + throw new RuntimeException('No worker is defined'); + } + + // register task + $this->tasks[] = $task = new Task( + identifier: count($this->tasks), + worker_class: $worker->getWorkerClass(), + worker_id: $this->selected_worker, + data: $data, + ); + // and put identifier on the pending tasks list + $this->pending_tasks[] = $task->getIdentifier(); + + // if we are on a non-threaded environment, + if ( !PARALLEL_EXT_LOADED) { + // just process the Task + $this->startNextPendingTask(); + // clean finished Task + $this->cleanFinishedTasks(); + } + + return $this->send($task->getIdentifier()); + } + + protected function getTasks(): array | false { + if ( !PARALLEL_EXT_LOADED) { + return $this->tasks; + } + + foreach ($this->tasks as $task) { + $this->tasks_channel->send($task); + } + $this->tasks_channel->send(false); + + return false; + } + + protected function removeTasks(): bool { + $this->stopRunningTasks(); + + $this->tasks = []; + $this->pending_tasks = []; + + return $this->send(true); + } + + protected function enableProgressBar(string $worker_id, int $steps): bool { + if ( !array_key_exists($worker_id, $this->workers_hashmap)) { + throw new RuntimeException('Worker is not defined'); + } + + // get registered Worker + $worker = $this->workers[$this->workers_hashmap[$worker_id]]; + // enable progress with specified steps + $worker->withProgress(steps: $steps); + + $this->initProgressBar(); + + $this->progressbar_channel->send(new Commands\ProgressBar\ProgressBarRegistrationMessage( + worker: $worker->getWorkerClass(), + steps: $steps, + )); + + return $this->send(true); + } + + protected function update(): void { + $this->cleanFinishedTasks(); + while ($this->hasCpuAvailable() && $this->hasPendingTasks()) { + $this->startNextPendingTask(); + } + + $this->send($this->hasPendingTasks(), eater: true); + } + + protected function await(): bool { + if (PARALLEL_EXT_LOADED) { + return $this->send($this->hasPendingTasks() || $this->hasRunningTasks()); + } + + return true; + } + + private function stopRunningTasks(): void { + // TODO stop running tasks + $todo = true; + } + + public function __destruct() { + $this->stopProgressBar(); + } + +} diff --git a/src/Internals/Runner/HasChannels.php b/src/Internals/Runner/HasChannels.php new file mode 100644 index 0000000..d4afa5c --- /dev/null +++ b/src/Internals/Runner/HasChannels.php @@ -0,0 +1,70 @@ +channel = TwoWayChannel::make(self::class.'@'.$this->uuid); + // channel to output tasks + $this->tasks_channel = Channel::make(self::class.'@'.$this->uuid.':tasks'); + } + + protected function getEaterChannel(): TwoWayChannel { + // open channel if not already opened + while ($this->eater_channel === null) { + // open channel to communicate with the Eater instance + try { $this->eater_channel = TwoWayChannel::open(self::class.'@'.$this->uuid.':eater'); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + } + + return $this->eater_channel; + } + + protected function recv(): mixed { + return $this->channel->receive(); + } + + protected function send(mixed $value, bool $eater = false): mixed { + if (PARALLEL_EXT_LOADED) { + if ($eater) $this->getEaterChannel()->send($value); + else $this->channel->send($value); + } + + return $value; + } + + protected function release(bool $eater = false): void { + $this->send(true, $eater); + } + + private function closeChannels(): void { + // gracefully join + $this->channel->send(false); + // close channel + $this->channel->close(); + } + +} diff --git a/src/Internals/Runner/HasEater.php b/src/Internals/Runner/HasEater.php new file mode 100644 index 0000000..e372d1c --- /dev/null +++ b/src/Internals/Runner/HasEater.php @@ -0,0 +1,51 @@ +eater = (new Runtime(PARALLEL_AUTOLOADER))->run(static function(string $uuid): void { + // create communication channel + $channel = TwoWayChannel::make(Runner::class.'@'.$uuid.':eater'); + // open communication channel with the Runner + $runner_listener = TwoWayChannel::open(Runner::class.'@'.$uuid); + + // notify successful start + $channel->release(); + + // every 25ms + do { usleep(25_000); + // send an Update message to the Runner instance + $runner_listener->send(new Commands\Runner\UpdateMessage()); + // until we receive a stop signal on our channel + } while (Event\Type::Close !== $channel->receive()); + + // close communication channel + $channel->close(); + }, [ $this->uuid ]); + + // wait until Eater starts + $this->getEaterChannel()->receive(); + } + + private function stopEater(): void { + // eater will send a final request + $this->recv(); + // stop Eater instance + $this->getEaterChannel()->send(Event\Type::Close); + } + +} diff --git a/src/Internals/Runner/HasSharedProgressBar.php b/src/Internals/Runner/HasSharedProgressBar.php new file mode 100644 index 0000000..8c0d92c --- /dev/null +++ b/src/Internals/Runner/HasSharedProgressBar.php @@ -0,0 +1,67 @@ +progressBar ??= PARALLEL_EXT_LOADED + // create a ProgressBarWorker instance inside a thread + ? parallel\run(static function($uuid): void { + // create ProgressBarWorker instance + $progressBar = new Internals\ProgressBarWorker($uuid); + // listen for events + $progressBar->listen(); + }, [ $this->uuid ]) + + // create a ProgressBar instance for non-threaded environment + : new Internals\ProgressBarWorker($this->uuid); + + // check if progressbar is already started, or we are on a non-threaded environment + if ($this->progressbar_started || !PARALLEL_EXT_LOADED) return; + + // open communication channel with the ProgressBar worker + do { try { $this->progressbar_channel = TwoWayChannel::open(Internals\ProgressBarWorker::class.'@'.$this->uuid); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + // try until channel is opened + } while (($this->progressbar_channel ?? null) === null); + + // wait until ProgressBar worker starts + $this->progressbar_channel->receive(); + $this->progressbar_started = true; + } + + private function stopProgressBar(): void { + if ( !PARALLEL_EXT_LOADED || !$this->progressbar_started) return; + + // stop ProgressBar worker instance + $this->progressbar_channel->send(Event\Type::Close); + // wait until ProgressBar instance shutdowns + $this->progressbar_channel->receive(); + } + +} diff --git a/src/Internals/Runner/ManagesTasks.php b/src/Internals/Runner/ManagesTasks.php new file mode 100644 index 0000000..531a679 --- /dev/null +++ b/src/Internals/Runner/ManagesTasks.php @@ -0,0 +1,167 @@ +running_tasks) < $this->getMaxCpuUsage(); + } + + private function hasPendingTasks(): bool { + return !empty($this->pending_tasks); + } + + private function hasRunningTasks(): bool { + return !empty($this->running_tasks); + } + + private function cleanFinishedTasks(): void { + $finished_tasks = []; + foreach ($this->running_tasks as $idx => $future) { + // check if future is already done working + if ( !PARALLEL_EXT_LOADED || $future->done()) { + // store the ProcessedTask + try { + // get the result of the process + [ $task_id, $result ] = PARALLEL_EXT_LOADED ? $future->value() : $future; + // ignore result if Task was removed, probably through Scheduler::removeTasks() + if (!array_key_exists($task_id, $this->tasks)) continue; + // store result and update state of the Task + $this->tasks[$task_id] + ->setResult($result) + ->setState(Task::STATE_Processed); + } catch (Throwable) {} + + // add future idx to finished tasks list + $finished_tasks[] = $idx; + } + } + + // remove finished tasks from futures + foreach ($finished_tasks as $idx) unset($this->running_tasks[$idx]); + } + + private function startNextPendingTask(): void { + // get next available pending task + $task = $this->tasks[$task_id = array_shift($this->pending_tasks)]; + $task->setState(Task::STATE_Starting); + + // process task inside a thread (if parallel extension is available) + if (PARALLEL_EXT_LOADED) { + // create starter channel to wait threads start event + $this->starter ??= Channel::make(sprintf('starter@%s', $this->uuid)); + + // parallel available, process task inside a thread + $this->running_tasks[$task_id] = parallel\run(static function(string $uuid, int $task_id, RegisteredWorker $registered_worker, Task $task): array { + // get Worker class to instantiate + $worker_class = $registered_worker->getWorkerClass(); + + /** @var ParallelWorker $worker Instance of the Worker */ + $worker = new $worker_class(...$registered_worker->getArgs()); + // build task params + $params = $worker instanceof Worker + // process task using local Worker + ? [ $registered_worker->getClosure(), ...$task->getData() ] + // process task using user Worker + : [ ...$task->getData() ]; + + // check if worker has ProgressBar enabled + if ($registered_worker->hasProgressEnabled()) { + // connect worker to ProgressBar + $worker->connectProgressBar($uuid, $GLOBALS['worker_thread_id'] ??= sprintf('%s@%s', $uuid, substr(md5(uniqid($worker_class, true)), 0, 16))); + } + + // notify that thread started + Channel::open(sprintf('starter@%s', $uuid))->send(true); + + // process task + $worker->start(...$params); + + // return task identifier and result + return [ $task_id, $worker->getResult() ]; + }, [ + // send UUID for starter channel + $this->uuid, + // send task id + $task_id, + // send task assigned worker + $this->workers[$task->getWorkerId()], + // send task to process + $task, + ]); + + // wait for thread to start + if (($this->starter?->recv() ?? true) !== true) { + throw new RuntimeException('Failed to start Task'); + } + + // update Task state + $task->setState(Task::STATE_Processing); + + } else { + // get registered worker + $registered_worker = $this->workers[$task->getWorkerId()]; + // get Worker class to instantiate + $worker_class = $registered_worker->getWorkerClass(); + + /** @var ParallelWorker $worker Instance of the Worker */ + $worker = new $worker_class(...$registered_worker->getArgs()); + // build task params + $params = $worker instanceof Worker + // process task using local Worker + ? [ $registered_worker->getClosure(), ...$task->getData() ] + // process task using user Worker + : [ ...$task->getData() ]; + + // check if worker has ProgressBar enabled + if ($registered_worker->hasProgressEnabled()) { + // init progressbar + $this->initProgressBar(); + // register worker + $this->progressBar->processMessage(new Commands\ProgressBar\ProgressBarRegistrationMessage( + worker: $worker_class, + steps: $registered_worker->getSteps(), + )); + // connect worker to ProgressBar + $worker->connectProgressBar(fn(Commands\ProgressBar\ProgressBarActionMessage $message) => $this->progressBar->processMessage($message)); + } + + $task->setState(Task::STATE_Processing); + + // process task using worker + $worker->start(...$params); + + // store Worker result + $this->running_tasks[] = [ $task_id, $worker->getResult() ]; + } + } + +} diff --git a/src/Internals/Runner/ManagesWorkers.php b/src/Internals/Runner/ManagesWorkers.php new file mode 100644 index 0000000..039b43a --- /dev/null +++ b/src/Internals/Runner/ManagesWorkers.php @@ -0,0 +1,38 @@ +selected_worker = $idx; + + return $this; + } + + private function getSelectedWorker(): ?RegisteredWorker { + if ($this->selected_worker === false) { + return null; + } + + return $this->workers[ $this->selected_worker ]; + } + +} diff --git a/src/Internals/Scheduler/HasChannels.php b/src/Internals/Scheduler/HasChannels.php new file mode 100644 index 0000000..f627f9a --- /dev/null +++ b/src/Internals/Scheduler/HasChannels.php @@ -0,0 +1,52 @@ +runner_channel === null) { + // open channel to communicate with the Runner instance + try { $this->runner_channel = TwoWayChannel::open(Runner::class.'@'.$this->uuid); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + } + + $this->runner_channel->send($value); + } + + private function recv(bool $from_tasks_channel = false): mixed { + // open channel if not already opened + while ($this->runner_channel === null) { + // open channel to communicate with the Runner instance + try { $this->runner_channel = TwoWayChannel::open(Runner::class.'@'.$this->uuid); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + } + // open channel if not already opened + while ($this->tasks_channel === null) { + // open channel to receive the tasks list + try { $this->tasks_channel = Channel::open(Runner::class.'@'.$this->uuid.':tasks'); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + } + + return $from_tasks_channel ? $this->tasks_channel->recv() : $this->runner_channel->receive(); + } + +} diff --git a/src/Internals/Scheduler/HasRunner.php b/src/Internals/Scheduler/HasRunner.php new file mode 100644 index 0000000..58579d4 --- /dev/null +++ b/src/Internals/Scheduler/HasRunner.php @@ -0,0 +1,42 @@ +send($message); + + return $this->recv(); + } + + return $this->runner->processMessage($message); + } + + private function registerWorker(string | Closure $worker, array $args): RegisteredWorker { + $message = new Commands\Runner\RegisterWorkerMessage($worker, $args); + + if (PARALLEL_EXT_LOADED) { + $this->send($message); + + return $this->recv(); + } + + return $this->runner->processMessage($message); + } + +} diff --git a/src/Parallel/Internals/Worker.php b/src/Internals/Worker.php similarity index 100% rename from src/Parallel/Internals/Worker.php rename to src/Internals/Worker.php diff --git a/src/Internals/Worker/CommunicatesWithProgressBarWorker.php b/src/Internals/Worker/CommunicatesWithProgressBarWorker.php new file mode 100644 index 0000000..7667196 --- /dev/null +++ b/src/Internals/Worker/CommunicatesWithProgressBarWorker.php @@ -0,0 +1,81 @@ +progressbar_channel = $uuid; + + return true; + } + + // store worker identifier + $this->identifier = $identifier; + + // open communication channel with the ProgressBar worker + do { try { $this->progressbar_channel = TwoWayChannel::open(ProgressBarWorker::class.'@'.$uuid); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + // try until channel is opened + } while (($this->progressbar_channel ?? null) === null); + + return true; + } + + final public function setMessage(string $message, string $name = 'message'): void { + $this->newProgressBarAction(__FUNCTION__, $message, $name); + } + + final public function advance(int $steps = 1): void { + $this->newProgressBarAction(__FUNCTION__, $steps); + } + + final public function setProgress(int $step): void { + $this->newProgressBarAction(__FUNCTION__, $step); + } + + final public function display(): void { + $this->newProgressBarAction(__FUNCTION__); + } + + final public function clear(): void { + $this->newProgressBarAction(__FUNCTION__); + } + + private function newProgressBarAction(string $action, ...$args): void { + $message = new Commands\ProgressBar\ProgressBarActionMessage( + action: $action, + args: $args, + ); + + // check if parallel is available + if (PARALLEL_EXT_LOADED) { + // report memory usage + $this->progressbar_channel->send(new Commands\ProgressBar\StatsReportMessage( + worker_id: $this->identifier, + memory_usage: memory_get_usage(), + )); + // request ProgressBar action + $this->progressbar_channel->send($message); + + return; + } + + // redirect action to ProgressBar executor + ($this->progressbar_channel)($message); + } + +} diff --git a/src/Internals/Worker/CommunicatesWithRunner.php b/src/Internals/Worker/CommunicatesWithRunner.php new file mode 100644 index 0000000..5f9cb84 --- /dev/null +++ b/src/Internals/Worker/CommunicatesWithRunner.php @@ -0,0 +1,28 @@ +runner_channel === null) { + // open channel to communicate with the Runner instance + try { $this->runner_channel = TwoWayChannel::open(Runner::class.'@'.$this->uuid); + // wait 25ms if channel does not exist yet and retry + } catch (Channel\Error\Existence) { usleep(25_000); } + } + + return $this->runner_channel; + } + +} diff --git a/src/Parallel/Internals/Messages/ProgressBarActionMessage.php b/src/Parallel/Internals/Messages/ProgressBarActionMessage.php deleted file mode 100644 index cc4ebf5..0000000 --- a/src/Parallel/Internals/Messages/ProgressBarActionMessage.php +++ /dev/null @@ -1,16 +0,0 @@ -registered_worker; - } - - /** - * @return mixed Data of the Task - */ - public function getData(): mixed { - return $this->data; - } - -} diff --git a/src/Parallel/Internals/ProgressBarWorker.php b/src/Parallel/Internals/ProgressBarWorker.php deleted file mode 100644 index 440bdf7..0000000 --- a/src/Parallel/Internals/ProgressBarWorker.php +++ /dev/null @@ -1,149 +0,0 @@ -uuid)); - // notify successful start - $progresBarChannel->send(true); - - // create ProgressBar instance - $this->progressBar = $createProgressBarInstance(); - - // threads memory usage and peak - $this->threads_memory = [ - 'current' => [ $main_memory_usage() ], - 'peak' => [ memory_get_usage() ], - ]; - - // get next message - while (Type::Close !== $message = $progresBarChannel->recv()) { - // check for close event and exit loop - if ($message === Type::Close) break; - - switch ($message_class = get_class($message)) { - case ProgressBarRegistrationMessage::class: - $this->registerWorker($message->steps); - break; - - case StatsReportMessage::class: - // update memory usage for this thread - $this->threads_memory['current'][0] = $main_memory_usage(); - // update peak memory usage - if ($this->threads_memory['current'][0] > $this->threads_memory['peak'][0]) { - $this->threads_memory['peak'][0] = $this->threads_memory['current'][0]; - } - - // save memory usage of thread - $this->threads_memory['current'][$message->worker_id] = $message->memory_usage; - // update peak memory usage - if ($this->threads_memory['current'][$message->worker_id] > ($this->threads_memory['peak'][$message->worker_id] ?? 0)) { - $this->threads_memory['peak'][$message->worker_id] = $this->threads_memory['current'][$message->worker_id]; - } - - // update ProgressBar memory report - $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); - break; - - case ProgressBarActionMessage::class: - // redirect action to ProgressBar instance - $this->progressBar->{$message->action}(...$message->args); - if ($message->action === 'advance') { - // count processed item - $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; - // update ProgressBar items per second report - $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); - } - break; - - default: - throw new RuntimeException(sprintf('Unsupported message type: %s', $message_class)); - } - - } - - // end progress bar - $this->progressBar->finish(); - - return true; - } - - private function registerWorker(int $steps = 0): void { - // check if ProgressBar isn't already started - if ( !$this->progressBarStarted) { - // start Worker ProgressBar - $this->progressBar->start($steps); - $this->progressBarStarted = true; - - } else { - // update steps - $this->progressBar->setMaxSteps($steps); - } - } - - private function getMemoryUsage(): string { - // main memory used - $main = Helper::formatMemory($this->threads_memory['current'][0]); - // total memory used (sum of all threads) - $total = Helper::formatMemory($total_raw = array_sum($this->threads_memory['current'])); - // average of each thread - $average = Helper::formatMemory((int) ($total_raw / (($count = count($this->threads_memory['current']) - 1) > 0 ? $count : 1))); - // peak memory usage - $peak = Helper::formatMemory(array_sum($this->threads_memory['peak'])); - - return "$main, threads: {$count}x ~$average, Σ $total ↑ $peak"; - } - - private function getItemsPerSecond(): string { - // check for empty list - if (empty($this->items)) return '0'; - - // keep only last 15s for average - $this->items = array_slice($this->items, -15, preserve_keys: true); - - // return the average of items processed per second - return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); - } - -} diff --git a/src/Parallel/Internals/RegisteredWorker.php b/src/Parallel/Internals/RegisteredWorker.php deleted file mode 100644 index 72a2d3f..0000000 --- a/src/Parallel/Internals/RegisteredWorker.php +++ /dev/null @@ -1,54 +0,0 @@ -identifier; - } - - /** - * Enables a ProgressBar for the worker - * - * @param bool $with_progress Flag to enable/disable the ProgressBar - */ - public function withProgress(bool $with_progress = true, int $steps = 0): void { - if (false === $this->with_progress = $with_progress) return; - - // enable ProgressBar thread - Scheduler::registerWorkerWithProgressBar($this, $steps); - } - - public function hasProgressEnabled(): bool { - return $this->with_progress; - } - - public function getWorkerClass(): string { - return $this->worker_class; - } - - public function getClosure(): ?Closure { - return $this->closure; - } - - public function getArgs(): array { - return $this->args; - } - -} diff --git a/src/Parallel/ParallelWorker.php b/src/Parallel/ParallelWorker.php deleted file mode 100644 index 272b022..0000000 --- a/src/Parallel/ParallelWorker.php +++ /dev/null @@ -1,143 +0,0 @@ -state; - } - - final public function connectProgressBar(string | Closure $uuid, string $identifier = null): bool { - if ( !extension_loaded('parallel')) { - $this->progressBarChannel = $uuid; - - return true; - } - - // store worker identifier - $this->identifier = $identifier; - // connect to channel - $this->progressBarChannel = Channel::open(sprintf('progress-bar@%s', $uuid)); - - return true; - } - - final public function start(...$args): void { - if ($this->state !== self::STATE_New) { - throw new RuntimeException('This Worker has been already started'); - } - - $this->state = self::STATE_Running; - $this->started_at = microtime(true); - - try { $this->result = $this->process(...$args); - } catch (Throwable) {} - - $this->finished_at = microtime(true); - $this->state = self::STATE_Finished; - } - - /** - * Processes task data and returns the result - * - * @return mixed Task processing result - */ - abstract protected function process(): mixed; - - final public function setMessage(string $message, string $name = 'message'): void { - $this->newProgressBarAction(__FUNCTION__, $message, $name); - } - - final public function advance(int $steps = 1): void { - $this->newProgressBarAction(__FUNCTION__, $steps); - } - - final public function setProgress(int $step): void { - $this->newProgressBarAction(__FUNCTION__, $step); - } - - final public function display(): void { - $this->newProgressBarAction(__FUNCTION__); - } - - final public function clear(): void { - $this->newProgressBarAction(__FUNCTION__); - } - - final public function getStartedAt(): ?float { - return $this->started_at ?? null; - } - - final public function getFinishedAt(): ?float { - return $this->finished_at ?? null; - } - - final public function getProcessedTask(): ProcessedTask { - if ($this->state !== self::STATE_Finished) { - throw new RuntimeException('This Worker hasn\'t been started'); - } - - return new ProcessedTask(get_class($this), $this->result); - } - - private function newProgressBarAction(string $action, ...$args): void { - // check if parallel is available - if (extension_loaded('parallel')) { - // report memory usage - $this->progressBarChannel->send(new StatsReportMessage( - worker_id: $this->identifier, - memory_usage: memory_get_usage(), - )); - // request ProgressBar action - $this->progressBarChannel->send(new ProgressBarActionMessage( - action: $action, - args: $args, - )); - - return; - } - - // redirect action to ProgressBar executor - ($this->progressBarChannel)($action, $args); - } - -} diff --git a/src/Parallel/ProcessedTask.php b/src/Parallel/ProcessedTask.php deleted file mode 100644 index ee5865a..0000000 --- a/src/Parallel/ProcessedTask.php +++ /dev/null @@ -1,30 +0,0 @@ -worker_class; - } - - /** - * @return mixed Result of the processed Task - */ - public function getResult(): mixed { - return $this->result; - } - -} diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php deleted file mode 100644 index 38a9b21..0000000 --- a/src/Parallel/Scheduler.php +++ /dev/null @@ -1,471 +0,0 @@ - 0, 'peak' => 0 ]; - - /** - * @var array Total of items processed per second (non-threaded) - */ - private array $items = []; - - /** - * @var Future|null Thread controlling the ProgressBar - */ - private ?Future $progressBarThread = null; - - /** - * @var Channel|null Channel of communication between ProgressBar and Tasks - */ - private ?Channel $progressBarChannel = null; - - /** - * Disable public constructor, usage only available through singleton instance - */ - private function __construct() { - $this->uuid = substr(md5(uniqid(self::class, true)), 0, 16); - } - - /** - * @return self Singleton instance - */ - private static function instance(): self { - return self::$instance ??= new self(); - } - - public static function registerWorkerWithProgressBar(RegisteredWorker $registered_worker, int $steps = 0): void { - self::instance()->initProgressBar(); - - if ( !extension_loaded('parallel')) { - // check if ProgressBar isn't already started - if ( !self::instance()->progressBarStarted) { - // start ProgressBar - self::instance()->progressBar->start($steps); - self::instance()->progressBarStarted = true; - - } else { - // update steps - self::instance()->progressBar->setMaxSteps($steps); - } - } - - // register Worker ProgressBar - self::instance()->progressBarChannel?->send(new ProgressBarRegistrationMessage( - worker: $registered_worker->getWorkerClass(), - steps: $steps, - )); - } - - private function initProgressBar(): void { - // init ProgressBar only if not already working - if ($this->progressBar !== null || $this->progressBarThread !== null) return; - - // start a normal ProgressBar if parallel isn't available (non-threaded) - if ( !extension_loaded('parallel')) { - // create a non-threaded ProgressBar instance - $this->progressBar = $this->createProgressBarInstance(); - return; - } - - // create a channel of communication between ProgressBar and Tasks - $this->progressBarChannel = Channel::make(sprintf('progress-bar@%s', $this->uuid)); - - // main thread memory reporter - // FIXME this closure is copied and runs inside a thread, so memory report isn't accurate - $main_memory_usage = static fn() => memory_get_usage(); - - // decouple progress bar to a separated thread - $this->progressBarThread = run(static function(string $uuid, Closure $createProgressBarInstance, Closure $main_memory_usage): void { - // create ProgressBar worker instance - $progressBarWorker = new ProgressBarWorker($uuid); - // start ProgressBar - $progressBarWorker->start($createProgressBarInstance, $main_memory_usage); - - }, [ - // send UUID for starter channel - $this->uuid, - // send ProgressBar creator - fn() => $this->createProgressBarInstance(), - // send main memory usage reporter - $main_memory_usage, - ]); - - // wait for ProgressBar thread to start - if ($this->progressBarChannel->recv() !== true) { - throw new RuntimeException('Failed to start ProgressBar'); - } - } - - private function createProgressBarInstance(): ProgressBar { - $progressBar = new ProgressBar(new ConsoleOutput()); - - // configure ProgressBar settings - $progressBar->setBarWidth( 80 ); - $progressBar->setRedrawFrequency( 100 ); - $progressBar->minSecondsBetweenRedraws( 0.1 ); - $progressBar->maxSecondsBetweenRedraws( 0.2 ); - $progressBar->setFormat(" %current% of %max%: %message%\n". - " [%bar%] %percent:3s%%\n". - " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(extension_loaded('parallel') ? "\n" : ','). - " memory: %threads_memory%\n"); - // set initial values - $progressBar->setMessage('Starting...'); - $progressBar->setMessage('??', 'items_per_second'); - $progressBar->setMessage('??', 'threads_memory'); - - return $progressBar; - } - - /** - * Register a worker class to process tasks - * - * @param string | Closure $worker Worker class to be used for processing tasks - * @param mixed ...$args Arguments passed to Worker constructor - * - * @return RegisteredWorker - */ - public static function using(string | Closure $worker, ...$args): RegisteredWorker { - // convert Closure to ParallelWorker instance - self::instance()->registered_workers[] = $registered_worker = new RegisteredWorker( - identifier: count(self::instance()->registered_workers), - worker_class: is_string($worker) ? $worker : Worker::class, - closure: $worker instanceof Closure ? $worker : null, - args: $args, - ); - - return $registered_worker; - } - - /** - * Schedule worker task for execution in parallel, passing ...$data at execution time - * - * @param mixed ...$data Array of arguments to be passed to worker task at execution time - * - * @throws Runtime\Error\Closed if \parallel\Runtime was closed - * @throws Runtime\Error\IllegalFunction if task is a closure created from an internal function - * @throws Runtime\Error\IllegalInstruction if task contains illegal instructions - * @throws Runtime\Error\IllegalParameter if task accepts or argv contains illegal variables - * @throws Runtime\Error\IllegalReturn | Throwable if task returns illegally - * - * @see Runtime::run() for more details - * @link https://www.php.net/manual/en/parallel.run - */ - public static function runTask(mixed ...$data): void { - // check if a worker was defined - if (($worker_id = count(self::instance()->registered_workers) - 1) < 0) { - // reject task scheduling, no worker is defined - throw new RuntimeException('No worker is defined'); - } - - // get registered worker - $registered_worker = self::instance()->registered_workers[$worker_id]; - // register a pending task linked with the registered Worker - self::instance()->pendingTasks[] = new PendingTask($registered_worker, $data); - - do { - // remove finished tasks - self::instance()->cleanFinishedTasks(); - // if pending tasks count exceeds available CPU, wait 10ms - if ($pending_eq_cpu = (count(self::instance()->pendingTasks) >= self::instance()->getMaxCpuUsage())) usleep(10_000); - - // wait if all CPU are used and pending tasks exceed available CPU - } while ($pending_eq_cpu && !self::instance()->hasCpuAvailable()); - - // start the next available task - self::instance()->runNextTask(); - } - - /** - * Returns the result of every processed task - * - * @return ProcessedTask[] | Generator Results of processed tasks - */ - public static function getProcessedTasks(): Generator | array { - // start all pending tasks, send all available results, until there is no more results available - while ( !empty(self::instance()->pendingTasks) || !empty(self::instance()->futures) || !empty(self::instance()->results)) { - // send available results - while (null !== $result = array_shift(self::instance()->results)) - // send already processed result - yield $result; - - // clear finished tasks - self::instance()->cleanFinishedTasks(); - - // start available tasks - while (self::instance()->hasCpuAvailable() && !empty(self::instance()->pendingTasks)) - // start the next available task - self::instance()->runNextTask(); - - // check there are tasks running - if ( !empty(self::instance()->futures)) - // wait for any task to finish - while ( empty(array_filter(self::instance()->futures, static fn(Future $future): bool => $future->done()))) usleep(10_000); - } - } - - /** - * Stops all running tasks. If force is set to false, waits gracefully for all running tasks to finish execution - * - * @param bool $force Flag to force task cancellation - */ - public static function stop(bool $force = true): void { - // if parallel isn't enabled, just finish progress bar and return - if ( !extension_loaded('parallel')) { - // self::instance()->progress->finish(); - return; - } - - // cancel all running tasks - if ($force) array_map(static fn(Future $future): bool => $future->cancel(), self::instance()->futures); - - // wait for all tasks to finish - while ( !empty(array_filter(self::instance()->futures, static fn(Future $future): bool => !$future->done()))) usleep(10_000); - } - - private function runNextTask(): void { - // check if there is an available CPU - if ( !empty($this->pendingTasks) && $this->hasCpuAvailable()) { - // get next available pending task - $pending_task = array_shift($this->pendingTasks); - - // create starter channel to wait threads start event - $this->starter ??= extension_loaded('parallel') ? Channel::make(sprintf('starter@%s', $this->uuid)) : null; - - // process task inside a thread (if parallel extension is available) - if (extension_loaded('parallel')) { - // parallel available, process task inside a thread - $this->futures[] = run(static function(string $uuid, PendingTask $pending_task): ProcessedTask { - // get registered worker - $registered_worker = $pending_task->getRegisteredWorker(); - // get Worker class to instantiate - $worker_class = $registered_worker->getWorkerClass(); - - /** @var ParallelWorker $worker Instance of the Worker */ - $worker = new $worker_class(...$registered_worker->getArgs()); - // build task params - $params = $worker instanceof Worker - // process task using local Worker - ? [ $registered_worker->getClosure(), ...$pending_task->getData() ] - // process task using user Worker - : [ ...$pending_task->getData() ]; - - // check if worker has ProgressBar enabled - if ($registered_worker->hasProgressEnabled()) { - // connect worker to ProgressBar - $worker->connectProgressBar($uuid, $GLOBALS['worker_thread_id'] ??= sprintf('%s@%s', $uuid, substr(md5(uniqid($worker_class, true)), 0, 16))); - } - - // notify that thread started - Channel::open(sprintf('starter@%s', $uuid))->send(true); - - // process task - $worker->start(...$params); - - // return Worker result - return $worker->getProcessedTask(); - }, [ - // send UUID for starter channel - $this->uuid, - // send pending task to process - $pending_task, - ]); - - } else { - // get registered worker - $registered_worker = $pending_task->getRegisteredWorker(); - // get Worker class to instantiate - $worker_class = $registered_worker->getWorkerClass(); - - /** @var ParallelWorker $worker Instance of the Worker */ - $worker = new $worker_class(...$registered_worker->getArgs()); - // build task params - $params = $worker instanceof Worker - // process task using local Worker - ? [ $registered_worker->getClosure(), ...$pending_task->getData() ] - // process task using user Worker - : [ ...$pending_task->getData() ]; - - // check if worker has ProgressBar enabled - if ($registered_worker->hasProgressEnabled()) { - // connect worker to ProgressBar - $worker->connectProgressBar(function(string $action, array $args) { - // update stats - if ($action === 'advance') { - // count processed item - $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; - } - // update ProgressBar memory usage report - $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); - // update ProgressBar items per second report - $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); - - // execute progress bar action - $this->progressBar->$action(...$args); - }); - } - - // process task using worker - $worker->start(...$params); - - // store Worker result - $this->futures[] = $worker->getProcessedTask(); - } - - // wait for thread to start - if (($this->starter?->recv() ?? true) !== true) { - throw new RuntimeException('Failed to start Task'); - } - } - } - - private function getMemoryUsage(): string { - // update memory usage for this thread - $this->memory_stats['current'] = memory_get_usage(true); - // update peak memory usage - if ($this->memory_stats['current'] > $this->memory_stats['peak']) { - $this->memory_stats['peak'] = $this->memory_stats['current']; - } - - // current memory used - $main = Helper::formatMemory($this->memory_stats['current']); - // peak memory usage - $peak = Helper::formatMemory($this->memory_stats['peak']); - - return "$main, ↑ $peak"; - } - - private function getItemsPerSecond(): string { - // check for empty list - if (empty($this->items)) return '0'; - - // keep only last 15s for average - $this->items = array_slice($this->items, -15, preserve_keys: true); - - // return the average of items processed per second - return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); - } - - private function getMaxCpuUsage(): int { - // return configured max CPU usage - return $this->max_cpu_count ??= (isset($_SERVER['PARALLEL_MAX_COUNT']) ? (int) $_SERVER['PARALLEL_MAX_COUNT'] : cpu_count( (float) ($_SERVER['PARALLEL_MAX_PERCENT'] ?? 1.0) )); - } - - private function hasCpuAvailable(): bool { - // return if there is available CPU - return count($this->futures) < $this->getMaxCpuUsage(); - } - - private function cleanFinishedTasks(): void { - // release finished tasks from futures - $finished_tasks = []; - foreach ($this->futures as $idx => $future) { - // check if future is already done working - if ( !extension_loaded('parallel') || $future->done()) { - // store the ProcessedTask - try { $this->results[] = extension_loaded('parallel') ? $future->value() : $future; - } catch (Throwable) {} - - // add future idx to finished tasks list - $finished_tasks[] = $idx; - } - } - - // remove finished tasks from futures - foreach ($finished_tasks as $idx) unset($this->futures[$idx]); - } - - /** - * Ensures that everything gets closed - */ - public static function disconnect(): void { - // check if extension is loaded - if ( !extension_loaded('parallel')) return; - - try { - // send message to ProgressBar thread to stop execution - self::instance()->progressBarChannel?->send(Type::Close); - // wait progress thread to finish - self::instance()->progressBarThread?->value(); - // close ProgressBar communication channel - self::instance()->progressBarChannel?->close(); - - self::instance()->progressBarChannel = null; - self::instance()->progressBarThread = null; - - } catch (Channel\Error\Closed | Throwable) {} - - // kill all running threads - while ($task = array_shift(self::instance()->futures)) try { $task->cancel(); } catch (Exception) {} - - try { - // task start watcher - self::instance()->starter?->close(); - self::instance()->starter = null; - - } catch (Channel\Error\Closed) {} - } - - public function __destruct() { - // ensure that we execute disconnect - self::disconnect(); - } - -} diff --git a/src/ParallelWorker.php b/src/ParallelWorker.php new file mode 100644 index 0000000..9691b8f --- /dev/null +++ b/src/ParallelWorker.php @@ -0,0 +1,79 @@ +state; + } + + final public function start(...$args): void { + if ($this->state !== self::STATE_New) { + throw new RuntimeException('This Worker has been already started'); + } + + $this->state = self::STATE_Running; + $this->started_at = microtime(true); + + try { $this->result = $this->process(...$args); + } catch (Throwable) {} + + $this->finished_at = microtime(true); + $this->state = self::STATE_Finished; + } + + /** + * Processes task data and returns the result + * + * @return mixed Task processing result + */ + abstract protected function process(): mixed; + + final public function getStartedAt(): ?float { + return $this->started_at ?? null; + } + + final public function getFinishedAt(): ?float { + return $this->finished_at ?? null; + } + + final public function getResult(): mixed { + if ($this->state !== self::STATE_Finished) { + throw new RuntimeException('This Worker hasn\'t been yet processed the task'); + } + + return $this->result; + } + +} diff --git a/src/RegisteredWorker.php b/src/RegisteredWorker.php new file mode 100644 index 0000000..ad23603 --- /dev/null +++ b/src/RegisteredWorker.php @@ -0,0 +1,76 @@ +getClosure() === null) { + return $this->getWorkerClass(); + } + + return sprintf('%s@%.0u', $this->getWorkerClass(), $this->identifier); + } + + /** + * Enables a ProgressBar for the worker + * + * @param bool $with_progress Flag to enable/disable the ProgressBar + */ + public function withProgress(bool $with_progress = true, int $steps = 0): void { + // check if caller is Runner + $caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1]; + if (($caller['class'] ?? null) === Internals\Runner::class || !PARALLEL_EXT_LOADED) { + // enable with progress flag + $this->with_progress = $with_progress; + $this->steps = $steps; + + return; + } + + // redirect call to Runner instance + $this->getRunnerChannel()->send(new Commands\ProgressBar\EnableProgressBarMessage($this->getIdentifier(), $steps)); + // wait until Runner updates worker flag + $this->getRunnerChannel()->receive(); + } + + public function hasProgressEnabled(): bool { + return $this->with_progress; + } + + public function getSteps(): int { + return $this->steps; + } + + public function getWorkerClass(): string { + return $this->worker_class; + } + + public function getClosure(): ?Closure { + return $this->closure; + } + + public function getArgs(): array { + return $this->args; + } + +} diff --git a/src/Scheduler.php b/src/Scheduler.php new file mode 100644 index 0000000..9a4e35d --- /dev/null +++ b/src/Scheduler.php @@ -0,0 +1,202 @@ +uuid = substr(md5(uniqid(self::class, true)), 0, 16); + $this->runner = PARALLEL_EXT_LOADED + // create a Runner instance inside a thread + ? (new Runtime(PARALLEL_AUTOLOADER))->run(static function($uuid): void { + // create runner instance + $runner = new Internals\Runner($uuid); + // listen for events + $runner->listen(); + }, [ $this->uuid ]) + + // create runner instance for non-threaded environment + : new Internals\Runner($this->uuid); + + // wait until Runner starts listening for events + if (PARALLEL_EXT_LOADED) $this->recv(); + } + + /** + * @return self Singleton instance + */ + private static function instance(): self { + return self::$instance ??= new self(); + } + + /** + * Register a worker class to process tasks + * + * @param string | Closure $worker Worker class to be used for processing tasks + * @param mixed ...$args Arguments passed to Worker constructor + * + * @return RegisteredWorker + */ + public static function using(string | Closure $worker, ...$args): RegisteredWorker { + // check if worker is already registered + if (is_string($worker) && false !== $registered_worker = self::instance()->getRegisteredWorker($worker)) { + if ( !empty($args)) { + // args must not be defined if worker already exists + throw new RuntimeException(sprintf('Worker "%s" is already defined, you can\'t specify new constructor parameters!', $worker)); + } + + return $registered_worker; + } + + return self::instance()->registerWorker($worker, $args); + } + + /** + * Schedule worker task for execution in parallel, passing ...$data at execution time + * + * @param mixed ...$data Array of arguments to be passed to worker task at execution time + * + * @return int Queued Task identifier + * + * @throws Runtime\Error\Closed if \parallel\Runtime was closed + * @throws Runtime\Error\IllegalFunction if task is a closure created from an internal function + * @throws Runtime\Error\IllegalInstruction if task contains illegal instructions + * @throws Runtime\Error\IllegalParameter if task accepts or argv contains illegal variables + * @throws Runtime\Error\IllegalReturn | Throwable if task returns illegally + * + * @see Runtime::run() for more details + * @link https://www.php.net/manual/en/parallel.run + */ + public static function runTask(mixed ...$data): int { + $message = new Commands\Runner\QueueTaskMessage($data); + + if (PARALLEL_EXT_LOADED) { + self::instance()->send($message); + + // get queued task and check if there was an exception thrown + if (($task_id = self::instance()->recv()) instanceof ParallelException) { + // redirect exception + throw new RuntimeException($task_id->getMessage()); + } + + return $task_id; + } + + return self::instance()->runner->processMessage($message); + } + + /** + * Calling this method will pause execution until all tasks are finished. + */ + public static function awaitTasksCompletion(): bool { + $message = new Commands\Runner\WaitTasksCompletionMessage(); + + if (PARALLEL_EXT_LOADED) { + $has_pending_tasks = false; + do { + self::instance()->send($message); + if ($has_pending_tasks) { + usleep(25_000); + } + } while ($has_pending_tasks = self::instance()->recv()); + + return true; + } + + return self::instance()->runner->processMessage($message); + } + + /** + * Returns the list of tasks + * + * @return Task[] | Generator List of Tasks + */ + public static function getTasks(): Generator | array { + $message = new Commands\Runner\GetTasksMessage(); + + if (PARALLEL_EXT_LOADED) { + self::instance()->send($message); + + while (false !== $task = self::instance()->recv(true)) { + yield $task; + } + + return; + } + + yield from self::instance()->runner->processMessage($message); + } + + /** + * Remove all registered Tasks.
+ * **IMPORTANT**: This will stop processing Tasks immediately and remove **all** Tasks. + * + * @return bool + */ + public static function removeTasks(): bool { + $message = new Commands\Runner\RemoveTasksMessage(); + + if (PARALLEL_EXT_LOADED) { + self::instance()->send($message); + + return self::instance()->recv(); + } + + return self::instance()->runner->processMessage($message); + } + + /** + * Stops all running tasks. If force is set to false, waits gracefully for all running tasks to finish execution + * + * @param bool $force Flag to force task cancellation + */ + public static function stop(bool $force = true): void { + // check if extension isn't loaded and just return + if ( !PARALLEL_EXT_LOADED) return; + + if ($force) self::removeTasks(); + + self::awaitTasksCompletion(); + } + + /** + * Ensures that everything gets closed + */ + public function __destruct() { + // remove all Tasks + self::removeTasks(); + + // check if extension isn't loaded and just return + if ( !PARALLEL_EXT_LOADED) return; + + try { + // stop Runner instance + self::instance()->send(Event\Type::Close); + // wait until Runner instance shutdowns + self::instance()->recv(); + + } catch (Channel\Error\Closed | Throwable) {} + } + +} diff --git a/src/Task.php b/src/Task.php new file mode 100644 index 0000000..20c0458 --- /dev/null +++ b/src/Task.php @@ -0,0 +1,83 @@ +identifier; + } + + public function getWorkerClass(): string { + return $this->worker_class; + } + + public function getWorkerId(): int { + return $this->worker_id; + } + + public function getData(): mixed { + return $this->data; + } + + /** @internal */ + public function setState(int $state): self { + $this->state = $state; + + return $this; + } + + public function getState(): int { + return $this->state; + } + + /** @internal */ + public function setResult(mixed $result): self { + $this->result = $result; + + return $this; + } + + public function getResult(): mixed { + return $this->result; + } + + public function isPending(): bool { + return $this->getState() === self::STATE_Pending; + } + + public function isBeingProcessed(): bool { + return $this->getState() === self::STATE_Processing; + } + + public function wasProcessed(): bool { + return $this->getState() === self::STATE_Processed; + } + +} diff --git a/src/autoload.php b/src/autoload.php new file mode 100644 index 0000000..88a93ef --- /dev/null +++ b/src/autoload.php @@ -0,0 +1,15 @@ +expectException(RuntimeException::class); + $this->expectExceptionMessage('No worker is defined'); Scheduler::runTask(123); } - /** - * @depends testThatParallelExtensionIsAvailable - */ + public function testThatWorkersCanBeRegistered(): void { + $this->assertInstanceOf(RegisteredWorker::class, + Scheduler::using(Workers\EmptyWorker::class, [ true, 123, 'false' ])); + } + public function testThatClosureCanBeUsedAsWorker(): void { - Scheduler::using(static function($input) { - usleep(random_int(100, 500) * 1000); - return $input * 2; - }); + Scheduler::using(static fn($input) => $input * 2); foreach ($tasks = range(1, 10) as $task) { try { Scheduler::runTask($task); @@ -47,19 +45,21 @@ public function testThatClosureCanBeUsedAsWorker(): void { } } + Scheduler::awaitTasksCompletion(); + $results = []; - foreach (Scheduler::getProcessedTasks() as $processed_task) { - $this->assertEquals(Worker::class, $processed_task->getWorkerClass()); - $results[] = $processed_task->getResult(); + foreach (Scheduler::getTasks() as $task) { + $this->assertEquals(Worker::class, $task->getWorkerClass()); + $results[] = $task->getResult(); } // tasks results must be the same count $this->assertCount(count($tasks), $results); + + // remove all Tasks + Scheduler::removeTasks(); } - /** - * @depends testThatParallelExtensionIsAvailable - */ - public function testParallel(): void { + public function testProgressBar(): void { $workers = [ Workers\TestWorker::class, Workers\AnotherWorker::class, @@ -90,17 +90,20 @@ public function testParallel(): void { } } + Scheduler::awaitTasksCompletion(); + $results = []; // fetch processed tasks and store their results - foreach (Scheduler::getProcessedTasks() as $processed_task) { - $result = $processed_task->getResult(); + foreach (Scheduler::getTasks() as $task) { + $result = $task->getResult(); echo sprintf("Task result from #%s => %u\n", - $worker_class = $processed_task->getWorkerClass(), + $worker_class = $task->getWorkerClass(), $result[1]); $results[$worker_class][] = $result; } - Scheduler::disconnect(); + // remove all Tasks + Scheduler::removeTasks(); // check results foreach ($workers as $worker) { @@ -111,12 +114,33 @@ public function testParallel(): void { // tasks results must be the same count $this->assertCount(count($worker_tasks), $worker_results); - // tasks results must be in different order - $this->assertNotEquals($worker_tasks, array_column($worker_results, 0), 'Arrays are in the same order'); + // tasks results must not be in different order + $this->assertEquals($worker_tasks, array_column($worker_results, 0), 'Arrays are in different order'); $result = array_shift($worker_results); $this->assertEquals($result[1], $result[0] * array_product($multipliers)); } } + public function testThatChannelsDontOverlap(): void { + Scheduler::using(Workers\WorkerWithSubWorkers::class); + + foreach (range(1, 10) as $task) { + try { Scheduler::runTask($task); + } catch (Throwable) { + Scheduler::stop(); + } + } + + Scheduler::awaitTasksCompletion(); + + foreach (Scheduler::getTasks() as $task) { + // task result must be the same count as sub-tasks + $this->assertCount($task->getData()[0], $task->getResult()); + } + + // remove all Tasks + Scheduler::removeTasks(); + } + } diff --git a/tests/Workers/AnotherWorker.php b/tests/Workers/AnotherWorker.php index 54e8b2a..d73954b 100644 --- a/tests/Workers/AnotherWorker.php +++ b/tests/Workers/AnotherWorker.php @@ -11,7 +11,7 @@ public function __construct( ) {} protected function process(int $number = 0): array { - $microseconds = random_int(100, 1000); + $microseconds = random_int(10, 100); $this->setMessage(sprintf("AnotherWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); usleep($microseconds * 1000); diff --git a/tests/Workers/EmptyWorker.php b/tests/Workers/EmptyWorker.php new file mode 100644 index 0000000..5904b1c --- /dev/null +++ b/tests/Workers/EmptyWorker.php @@ -0,0 +1,13 @@ +setMessage(sprintf("TestWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); usleep($microseconds * 1000); diff --git a/tests/Workers/WorkerWithSubWorkers.php b/tests/Workers/WorkerWithSubWorkers.php new file mode 100644 index 0000000..a75e30f --- /dev/null +++ b/tests/Workers/WorkerWithSubWorkers.php @@ -0,0 +1,33 @@ +getResult(); + } + + Scheduler::removeTasks(); + + return $results; + } + +}