diff --git a/README.md b/README.md index 3afa5e8..a213e28 100644 --- a/README.md +++ b/README.md @@ -20,9 +20,18 @@ composer require hds-solutions/parallel-sdk ``` ## Usage +Firstly, you need to set the bootstrap file for parallel. Setting the composer's autoloader is enough. See reference [#1](#references) for more info. +```php +// check if extension is loaded to allow deploying even in envorinments where parallel isn't installed +if (extension_loaded('parallel')) { + // set the path to composer's autoloader + parallel\bootstrap(__DIR__.'/vendor/autoload.php'); +} +``` + You need to define a `Worker` that will process the tasks. There are two options: 1. Using an anonymous function as a `Worker`. -2. Creating a class that extends from `ParallelWorker` and implements the `processTask()` method. +2. Creating a class that extends from `ParallelWorker` and implements the `process()` method. Then you can schedule tasks to run in parallel using `Scheduler::runTask()` method. @@ -31,7 +40,7 @@ Defining an anonymous function as a `Worker` to process the tasks. ```php use HDSSolutions\Console\Parallel\Scheduler; -Scheduler::with(static function(int $number): int { +Scheduler::using(static function(int $number): int { // here you do some work with the received data // this portion of code will run on a separated thread @@ -55,7 +64,7 @@ use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { - protected function processTask(int $number = 0): int { + protected function process(int $number = 0): int { // example process $microseconds = random_int(100, 500); echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); @@ -72,7 +81,7 @@ final class ExampleWorker extends ParallelWorker { use HDSSolutions\Console\Parallel\Scheduler; $worker = new ExampleWorker(); -Scheduler::with($worker); +Scheduler::using($worker); ``` ### Schedule tasks @@ -87,24 +96,76 @@ foreach (range(1, 100) as $task) { } catch (Throwable) { // if no Worker was defined, a RuntimeException will be thrown - // also, Workers have some limitations, see Reference #1 for more info + // also, Workers have some limitations, see Reference #2 for more info } } ``` ### Get processed tasks result + ```php use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\ProcessedTask; foreach (Scheduler::getProcessedTasks() as $processed_task) { - // you have access to Worker that processed the task - $worker = $processed_task->getWorker(); + // you have access to the Worker class that was used to processed the task + $worker = $processed_task->getWorkerClass(); // and the result of the task processed $result = $processed_task->getResult(); } ``` +### ProgressBar + +#### Requeriments +- `symfony/console` package +- Enable a ProgressBar for the worker calling the `withProgress()` method. + +```php +use HDSSolutions\Console\Parallel\Scheduler; + +$tasks = range(1, 10); + +$worker = new ExampleWorker(); +Scheduler::using($worker) + ->withProgress(steps: count($tasks); +``` + +#### Usage from Worker +Available methods are: +- `setMessage(string $message)` +- `advance(int $steps)` +- `setProgress(int $step)` +- `display()` +- `clear()` + +```php +use HDSSolutions\Console\Parallel\ParallelWorker; + +final class ExampleWorker extends ParallelWorker { + + protected function process(int $number = 0): int { + // example process + $microseconds = random_int(100, 500); + $this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); + usleep($microseconds * 1000); + $this->advance(); + // end example process + + return $number; + } + +} +``` + +#### Example output +```bash + 28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms + [===========================================>------------------------------------] 53% + elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s + memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB +``` + ## Graceful close all resources This method will close all resources used internally by the `Scheduler` instance. ```php @@ -114,4 +175,14 @@ Scheduler::disconnect(); ``` ### References -1. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics) +1. [parallel\bootstrap()](https://www.php.net/manual/en/parallel.bootstrap.php) +2. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics) + +# Security Vulnerabilities +If you encounter any security related issue, feel free to raise a ticket on the issue traker. + +# Contributors +- [Hermann D. Schimpf](https://hds-solutions.net) + +# Licence +GPL-3.0 Please see [License File](LICENSE) for more information. diff --git a/composer.json b/composer.json index 37b935f..1abd786 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,8 @@ } ], "suggest": { - "ext-parallel": "Allows to run multi-threaded processes" + "ext-parallel": "Allows to run multi-threaded processes", + "symfony/console": "Allows usage of a shared ProgressBar between the Workers" }, "require": { "php": ">=8.0" @@ -25,7 +26,8 @@ }, "require-dev": { "phpunit/phpunit": "^9.6", - "roave/security-advisories": "dev-latest" + "roave/security-advisories": "dev-latest", + "symfony/console": "^6.0" }, "autoload-dev": { "psr-4": { diff --git a/phpunit.xml b/phpunit.xml index 0b4896f..31aace4 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,7 +1,7 @@ diff --git a/src/Parallel/Contracts/ParallelWorker.php b/src/Parallel/Contracts/ParallelWorker.php new file mode 100644 index 0000000..69cc8ca --- /dev/null +++ b/src/Parallel/Contracts/ParallelWorker.php @@ -0,0 +1,101 @@ +registered_worker; + } + + /** + * @return mixed Data of the Task + */ + public function getData(): mixed { + return $this->data; + } + +} diff --git a/src/Parallel/Internals/ProgressBarWorker.php b/src/Parallel/Internals/ProgressBarWorker.php new file mode 100644 index 0000000..440bdf7 --- /dev/null +++ b/src/Parallel/Internals/ProgressBarWorker.php @@ -0,0 +1,149 @@ +uuid)); + // notify successful start + $progresBarChannel->send(true); + + // create ProgressBar instance + $this->progressBar = $createProgressBarInstance(); + + // threads memory usage and peak + $this->threads_memory = [ + 'current' => [ $main_memory_usage() ], + 'peak' => [ memory_get_usage() ], + ]; + + // get next message + while (Type::Close !== $message = $progresBarChannel->recv()) { + // check for close event and exit loop + if ($message === Type::Close) break; + + switch ($message_class = get_class($message)) { + case ProgressBarRegistrationMessage::class: + $this->registerWorker($message->steps); + break; + + case StatsReportMessage::class: + // update memory usage for this thread + $this->threads_memory['current'][0] = $main_memory_usage(); + // update peak memory usage + if ($this->threads_memory['current'][0] > $this->threads_memory['peak'][0]) { + $this->threads_memory['peak'][0] = $this->threads_memory['current'][0]; + } + + // save memory usage of thread + $this->threads_memory['current'][$message->worker_id] = $message->memory_usage; + // update peak memory usage + if ($this->threads_memory['current'][$message->worker_id] > ($this->threads_memory['peak'][$message->worker_id] ?? 0)) { + $this->threads_memory['peak'][$message->worker_id] = $this->threads_memory['current'][$message->worker_id]; + } + + // update ProgressBar memory report + $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); + break; + + case ProgressBarActionMessage::class: + // redirect action to ProgressBar instance + $this->progressBar->{$message->action}(...$message->args); + if ($message->action === 'advance') { + // count processed item + $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; + // update ProgressBar items per second report + $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); + } + break; + + default: + throw new RuntimeException(sprintf('Unsupported message type: %s', $message_class)); + } + + } + + // end progress bar + $this->progressBar->finish(); + + return true; + } + + private function registerWorker(int $steps = 0): void { + // check if ProgressBar isn't already started + if ( !$this->progressBarStarted) { + // start Worker ProgressBar + $this->progressBar->start($steps); + $this->progressBarStarted = true; + + } else { + // update steps + $this->progressBar->setMaxSteps($steps); + } + } + + private function getMemoryUsage(): string { + // main memory used + $main = Helper::formatMemory($this->threads_memory['current'][0]); + // total memory used (sum of all threads) + $total = Helper::formatMemory($total_raw = array_sum($this->threads_memory['current'])); + // average of each thread + $average = Helper::formatMemory((int) ($total_raw / (($count = count($this->threads_memory['current']) - 1) > 0 ? $count : 1))); + // peak memory usage + $peak = Helper::formatMemory(array_sum($this->threads_memory['peak'])); + + return "$main, threads: {$count}x ~$average, Σ $total ↑ $peak"; + } + + private function getItemsPerSecond(): string { + // check for empty list + if (empty($this->items)) return '0'; + + // keep only last 15s for average + $this->items = array_slice($this->items, -15, preserve_keys: true); + + // return the average of items processed per second + return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); + } + +} diff --git a/src/Parallel/Internals/RegisteredWorker.php b/src/Parallel/Internals/RegisteredWorker.php new file mode 100644 index 0000000..72a2d3f --- /dev/null +++ b/src/Parallel/Internals/RegisteredWorker.php @@ -0,0 +1,54 @@ +identifier; + } + + /** + * Enables a ProgressBar for the worker + * + * @param bool $with_progress Flag to enable/disable the ProgressBar + */ + public function withProgress(bool $with_progress = true, int $steps = 0): void { + if (false === $this->with_progress = $with_progress) return; + + // enable ProgressBar thread + Scheduler::registerWorkerWithProgressBar($this, $steps); + } + + public function hasProgressEnabled(): bool { + return $this->with_progress; + } + + public function getWorkerClass(): string { + return $this->worker_class; + } + + public function getClosure(): ?Closure { + return $this->closure; + } + + public function getArgs(): array { + return $this->args; + } + +} diff --git a/src/Parallel/Internals/Worker.php b/src/Parallel/Internals/Worker.php new file mode 100644 index 0000000..9972215 --- /dev/null +++ b/src/Parallel/Internals/Worker.php @@ -0,0 +1,15 @@ +processTask( ...func_get_args() ); + /** + * @var Channel|Closure|null Channel of communication between Task and ProgressBar + */ + private Channel | Closure | null $progressBarChannel = null; + + /** + * @var float Time when process started + */ + private float $started_at; + + /** + * @var float Time when process finished + */ + private float $finished_at; + + /** + * @var mixed Worker execution result + */ + private mixed $result; + + final public function getState(): int { + return $this->state; + } + + final public function connectProgressBar(string | Closure $uuid, string $identifier = null): bool { + if ( !extension_loaded('parallel')) { + $this->progressBarChannel = $uuid; + + return true; + } + + // store worker identifier + $this->identifier = $identifier; + // connect to channel + $this->progressBarChannel = Channel::open(sprintf('progress-bar@%s', $uuid)); + + return true; + } + + final public function start(...$args): void { + if ($this->state !== self::STATE_New) { + throw new RuntimeException('This Worker has been already started'); + } + + $this->state = self::STATE_Running; + $this->started_at = microtime(true); + + try { $this->result = $this->process(...$args); + } catch (Throwable) {} + + $this->finished_at = microtime(true); + $this->state = self::STATE_Finished; } - final public function onTaskFinished(Closure $callback): self { - // register callback - $this->taskFinishedCallback = $callback; + /** + * Processes task data and returns the result + * + * @return mixed Task processing result + */ + abstract protected function process(): mixed; + + final public function setMessage(string $message, string $name = 'message'): void { + $this->newProgressBarAction(__FUNCTION__, $message, $name); + } - return $this; + final public function advance(int $steps = 1): void { + $this->newProgressBarAction(__FUNCTION__, $steps); } - final public function dispatchTaskFinished(...$result): void { - // pass task result to callback - ($this->taskFinishedCallback)(...$result); + final public function setProgress(int $step): void { + $this->newProgressBarAction(__FUNCTION__, $step); + } + + final public function display(): void { + $this->newProgressBarAction(__FUNCTION__); + } + + final public function clear(): void { + $this->newProgressBarAction(__FUNCTION__); + } + + final public function getStartedAt(): ?float { + return $this->started_at ?? null; + } + + final public function getFinishedAt(): ?float { + return $this->finished_at ?? null; + } + + final public function getProcessedTask(): ProcessedTask { + if ($this->state !== self::STATE_Finished) { + throw new RuntimeException('This Worker hasn\'t been started'); + } + + return new ProcessedTask(get_class($this), $this->result); + } + + private function newProgressBarAction(string $action, ...$args): void { + // check if parallel is available + if (extension_loaded('parallel')) { + // report memory usage + $this->progressBarChannel->send(new StatsReportMessage( + worker_id: $this->identifier, + memory_usage: memory_get_usage(), + )); + // request ProgressBar action + $this->progressBarChannel->send(new ProgressBarActionMessage( + action: $action, + args: $args, + )); + + return; + } + + // redirect action to ProgressBar executor + ($this->progressBarChannel)($action, $args); } } diff --git a/src/Parallel/PendingTask.php b/src/Parallel/PendingTask.php deleted file mode 100644 index 427e308..0000000 --- a/src/Parallel/PendingTask.php +++ /dev/null @@ -1,30 +0,0 @@ -worker_id; - } - - /** - * @return mixed Data of the Task - */ - public function getData(): mixed { - return $this->data; - } - -} diff --git a/src/Parallel/ProcessedTask.php b/src/Parallel/ProcessedTask.php index 3198665..ee5865a 100644 --- a/src/Parallel/ProcessedTask.php +++ b/src/Parallel/ProcessedTask.php @@ -5,23 +5,23 @@ final class ProcessedTask { /** - * @param ParallelWorker $worker Worker that processed the task - * @param mixed $result Result of the task + * @param string $worker_class Worker class used to process the Task + * @param mixed $result Result of the Task */ public function __construct( - private ParallelWorker $worker, + private string $worker_class, private mixed $result, ) {} /** - * @return ParallelWorker + * @return string Worker class that processed the Task */ - public function getWorker(): ParallelWorker { - return $this->worker; + public function getWorkerClass(): string { + return $this->worker_class; } /** - * @return mixed + * @return mixed Result of the processed Task */ public function getResult(): mixed { return $this->result; diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php index 7bedc7c..38a9b21 100644 --- a/src/Parallel/Scheduler.php +++ b/src/Parallel/Scheduler.php @@ -5,10 +5,19 @@ use Closure; use Exception; use Generator; +use HDSSolutions\Console\Parallel\Internals\Messages\ProgressBarRegistrationMessage; +use HDSSolutions\Console\Parallel\Internals\PendingTask; +use HDSSolutions\Console\Parallel\Internals\ProgressBarWorker; +use HDSSolutions\Console\Parallel\Internals\RegisteredWorker; +use HDSSolutions\Console\Parallel\Internals\Worker; use parallel\Channel; +use parallel\Events\Event\Type; use parallel\Future; use parallel\Runtime; use RuntimeException; +use Symfony\Component\Console\Helper\Helper; +use Symfony\Component\Console\Helper\ProgressBar; +use Symfony\Component\Console\Output\ConsoleOutput; use Throwable; use function parallel\run; @@ -17,8 +26,11 @@ final class Scheduler { /** @var Scheduler Singleton instance */ private static self $instance; - /** @var ParallelWorker[] Registered workers */ - private array $workers; + /** @var string Unique ID of the instance */ + private string $uuid; + + /** @var RegisteredWorker[] Registered workers */ + private array $registered_workers = []; /** @var ?Channel Channel to wait threads start */ private ?Channel $starter = null; @@ -34,10 +46,43 @@ final class Scheduler { /** @var ?int Max CPU usage count */ private ?int $max_cpu_count = null; + + /** + * @var ProgressBar|null ProgressBar instance for non-threaded Tasks execution + */ + private ?ProgressBar $progressBar = null; + + /** + * @var bool Flag to identify if ProgressBar is already started (non-threaded) + */ + private bool $progressBarStarted = false; + + /** + * @var array Memory usage stats (non-threaded) + */ + private array $memory_stats = [ 'current' => 0, 'peak' => 0 ]; + + /** + * @var array Total of items processed per second (non-threaded) + */ + private array $items = []; + + /** + * @var Future|null Thread controlling the ProgressBar + */ + private ?Future $progressBarThread = null; + + /** + * @var Channel|null Channel of communication between ProgressBar and Tasks + */ + private ?Channel $progressBarChannel = null; + /** * Disable public constructor, usage only available through singleton instance */ - private function __construct() {} + private function __construct() { + $this->uuid = substr(md5(uniqid(self::class, true)), 0, 16); + } /** * @return self Singleton instance @@ -46,18 +91,107 @@ private static function instance(): self { return self::$instance ??= new self(); } + public static function registerWorkerWithProgressBar(RegisteredWorker $registered_worker, int $steps = 0): void { + self::instance()->initProgressBar(); + + if ( !extension_loaded('parallel')) { + // check if ProgressBar isn't already started + if ( !self::instance()->progressBarStarted) { + // start ProgressBar + self::instance()->progressBar->start($steps); + self::instance()->progressBarStarted = true; + + } else { + // update steps + self::instance()->progressBar->setMaxSteps($steps); + } + } + + // register Worker ProgressBar + self::instance()->progressBarChannel?->send(new ProgressBarRegistrationMessage( + worker: $registered_worker->getWorkerClass(), + steps: $steps, + )); + } + + private function initProgressBar(): void { + // init ProgressBar only if not already working + if ($this->progressBar !== null || $this->progressBarThread !== null) return; + + // start a normal ProgressBar if parallel isn't available (non-threaded) + if ( !extension_loaded('parallel')) { + // create a non-threaded ProgressBar instance + $this->progressBar = $this->createProgressBarInstance(); + return; + } + + // create a channel of communication between ProgressBar and Tasks + $this->progressBarChannel = Channel::make(sprintf('progress-bar@%s', $this->uuid)); + + // main thread memory reporter + // FIXME this closure is copied and runs inside a thread, so memory report isn't accurate + $main_memory_usage = static fn() => memory_get_usage(); + + // decouple progress bar to a separated thread + $this->progressBarThread = run(static function(string $uuid, Closure $createProgressBarInstance, Closure $main_memory_usage): void { + // create ProgressBar worker instance + $progressBarWorker = new ProgressBarWorker($uuid); + // start ProgressBar + $progressBarWorker->start($createProgressBarInstance, $main_memory_usage); + + }, [ + // send UUID for starter channel + $this->uuid, + // send ProgressBar creator + fn() => $this->createProgressBarInstance(), + // send main memory usage reporter + $main_memory_usage, + ]); + + // wait for ProgressBar thread to start + if ($this->progressBarChannel->recv() !== true) { + throw new RuntimeException('Failed to start ProgressBar'); + } + } + + private function createProgressBarInstance(): ProgressBar { + $progressBar = new ProgressBar(new ConsoleOutput()); + + // configure ProgressBar settings + $progressBar->setBarWidth( 80 ); + $progressBar->setRedrawFrequency( 100 ); + $progressBar->minSecondsBetweenRedraws( 0.1 ); + $progressBar->maxSecondsBetweenRedraws( 0.2 ); + $progressBar->setFormat(" %current% of %max%: %message%\n". + " [%bar%] %percent:3s%%\n". + " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(extension_loaded('parallel') ? "\n" : ','). + " memory: %threads_memory%\n"); + // set initial values + $progressBar->setMessage('Starting...'); + $progressBar->setMessage('??', 'items_per_second'); + $progressBar->setMessage('??', 'threads_memory'); + + return $progressBar; + } + /** - * Register the worker to process tasks + * Register a worker class to process tasks * - * @param Closure|ParallelWorker $worker Worker to process tasks + * @param string | Closure $worker Worker class to be used for processing tasks + * @param mixed ...$args Arguments passed to Worker constructor * - * @return ParallelWorker + * @return RegisteredWorker */ - public static function with(Closure | ParallelWorker $worker): ParallelWorker { + public static function using(string | Closure $worker, ...$args): RegisteredWorker { // convert Closure to ParallelWorker instance - self::instance()->workers[] = $worker instanceof ParallelWorker ? $worker : new Worker($worker); - - return $worker; + self::instance()->registered_workers[] = $registered_worker = new RegisteredWorker( + identifier: count(self::instance()->registered_workers), + worker_class: is_string($worker) ? $worker : Worker::class, + closure: $worker instanceof Closure ? $worker : null, + args: $args, + ); + + return $registered_worker; } /** @@ -76,13 +210,15 @@ public static function with(Closure | ParallelWorker $worker): ParallelWorker { */ public static function runTask(mixed ...$data): void { // check if a worker was defined - if (($worker_id = count(self::instance()->workers) - 1) < 0) { + if (($worker_id = count(self::instance()->registered_workers) - 1) < 0) { // reject task scheduling, no worker is defined throw new RuntimeException('No worker is defined'); } - // save data to pending tasks - self::instance()->pendingTasks[] = new PendingTask($worker_id, $data); + // get registered worker + $registered_worker = self::instance()->registered_workers[$worker_id]; + // register a pending task linked with the registered Worker + self::instance()->pendingTasks[] = new PendingTask($registered_worker, $data); do { // remove finished tasks @@ -142,21 +278,6 @@ public static function stop(bool $force = true): void { // wait for all tasks to finish while ( !empty(array_filter(self::instance()->futures, static fn(Future $future): bool => !$future->done()))) usleep(10_000); - // close channels - self::instance()->__starter?->close(); - self::instance()->__starter = null; - } - - /** - * Ensures that everything gets closed - */ - public static function disconnect(): void { - // check if extension is loaded - if ( !extension_loaded('parallel')) return; - // kill all running threads - while ($task = array_shift(self::instance()->futures)) try { $task->cancel(); } catch (Exception) {} - // task start watcher - try { self::instance()->starter?->close(); } catch (Channel\Error\Closed) {} } private function runNextTask(): void { @@ -164,49 +285,122 @@ private function runNextTask(): void { if ( !empty($this->pendingTasks) && $this->hasCpuAvailable()) { // get next available pending task $pending_task = array_shift($this->pendingTasks); - // get worker ID from pending task - $worker_id = $pending_task->getWorkerId(); // create starter channel to wait threads start event - $this->starter ??= extension_loaded('parallel') ? Channel::make('starter') : null; + $this->starter ??= extension_loaded('parallel') ? Channel::make(sprintf('starter@%s', $this->uuid)) : null; // process task inside a thread (if parallel extension is available) - $this->futures[] = ( !extension_loaded('parallel') - // normal execution (non-threaded) - ? [ $worker_id, $this->workers[$worker_id](...$pending_task->getData()) ] - + if (extension_loaded('parallel')) { // parallel available, process task inside a thread - : run(static function(...$data): array { - /** @var int $worker_id */ - $worker_id = array_shift($data); - /** @var ParallelWorker $worker */ - $worker = array_shift($data); + $this->futures[] = run(static function(string $uuid, PendingTask $pending_task): ProcessedTask { + // get registered worker + $registered_worker = $pending_task->getRegisteredWorker(); + // get Worker class to instantiate + $worker_class = $registered_worker->getWorkerClass(); + + /** @var ParallelWorker $worker Instance of the Worker */ + $worker = new $worker_class(...$registered_worker->getArgs()); + // build task params + $params = $worker instanceof Worker + // process task using local Worker + ? [ $registered_worker->getClosure(), ...$pending_task->getData() ] + // process task using user Worker + : [ ...$pending_task->getData() ]; + + // check if worker has ProgressBar enabled + if ($registered_worker->hasProgressEnabled()) { + // connect worker to ProgressBar + $worker->connectProgressBar($uuid, $GLOBALS['worker_thread_id'] ??= sprintf('%s@%s', $uuid, substr(md5(uniqid($worker_class, true)), 0, 16))); + } // notify that thread started - Channel::open('starter')->send(true); + Channel::open(sprintf('starter@%s', $uuid))->send(true); - // process task using specified worker - $result = $worker(...$data); + // process task + $worker->start(...$params); - // execute finished event - try { $worker->dispatchTaskFinished($result); - } catch (Exception) {} - - // return worker ID and result - return [ $worker_id, $result ]; + // return Worker result + return $worker->getProcessedTask(); }, [ - // send worker ID for returning value - $worker_id, - // worker to process task - $this->workers[$worker_id], - // task data to pass to the worker - ...$pending_task->getData(), - ]) - ); + // send UUID for starter channel + $this->uuid, + // send pending task to process + $pending_task, + ]); + + } else { + // get registered worker + $registered_worker = $pending_task->getRegisteredWorker(); + // get Worker class to instantiate + $worker_class = $registered_worker->getWorkerClass(); + + /** @var ParallelWorker $worker Instance of the Worker */ + $worker = new $worker_class(...$registered_worker->getArgs()); + // build task params + $params = $worker instanceof Worker + // process task using local Worker + ? [ $registered_worker->getClosure(), ...$pending_task->getData() ] + // process task using user Worker + : [ ...$pending_task->getData() ]; + + // check if worker has ProgressBar enabled + if ($registered_worker->hasProgressEnabled()) { + // connect worker to ProgressBar + $worker->connectProgressBar(function(string $action, array $args) { + // update stats + if ($action === 'advance') { + // count processed item + $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; + } + // update ProgressBar memory usage report + $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); + // update ProgressBar items per second report + $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); + + // execute progress bar action + $this->progressBar->$action(...$args); + }); + } + + // process task using worker + $worker->start(...$params); + + // store Worker result + $this->futures[] = $worker->getProcessedTask(); + } // wait for thread to start - $this->starter?->recv(); + if (($this->starter?->recv() ?? true) !== true) { + throw new RuntimeException('Failed to start Task'); + } + } + } + + private function getMemoryUsage(): string { + // update memory usage for this thread + $this->memory_stats['current'] = memory_get_usage(true); + // update peak memory usage + if ($this->memory_stats['current'] > $this->memory_stats['peak']) { + $this->memory_stats['peak'] = $this->memory_stats['current']; } + + // current memory used + $main = Helper::formatMemory($this->memory_stats['current']); + // peak memory usage + $peak = Helper::formatMemory($this->memory_stats['peak']); + + return "$main, ↑ $peak"; + } + + private function getItemsPerSecond(): string { + // check for empty list + if (empty($this->items)) return '0'; + + // keep only last 15s for average + $this->items = array_slice($this->items, -15, preserve_keys: true); + + // return the average of items processed per second + return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); } private function getMaxCpuUsage(): int { @@ -225,16 +419,10 @@ private function cleanFinishedTasks(): void { foreach ($this->futures as $idx => $future) { // check if future is already done working if ( !extension_loaded('parallel') || $future->done()) { - try { - // get result to release thread - $result = extension_loaded('parallel') ? $future->value() : $future; - // get worker identifier - $worker_id = array_shift($result); - // get process result - $result = array_shift($result); - // store Task result - $this->results[] = new ProcessedTask($this->workers[$worker_id], $result); + // store the ProcessedTask + try { $this->results[] = extension_loaded('parallel') ? $future->value() : $future; } catch (Throwable) {} + // add future idx to finished tasks list $finished_tasks[] = $idx; } @@ -244,6 +432,37 @@ private function cleanFinishedTasks(): void { foreach ($finished_tasks as $idx) unset($this->futures[$idx]); } + /** + * Ensures that everything gets closed + */ + public static function disconnect(): void { + // check if extension is loaded + if ( !extension_loaded('parallel')) return; + + try { + // send message to ProgressBar thread to stop execution + self::instance()->progressBarChannel?->send(Type::Close); + // wait progress thread to finish + self::instance()->progressBarThread?->value(); + // close ProgressBar communication channel + self::instance()->progressBarChannel?->close(); + + self::instance()->progressBarChannel = null; + self::instance()->progressBarThread = null; + + } catch (Channel\Error\Closed | Throwable) {} + + // kill all running threads + while ($task = array_shift(self::instance()->futures)) try { $task->cancel(); } catch (Exception) {} + + try { + // task start watcher + self::instance()->starter?->close(); + self::instance()->starter = null; + + } catch (Channel\Error\Closed) {} + } + public function __destruct() { // ensure that we execute disconnect self::disconnect(); diff --git a/src/Parallel/Worker.php b/src/Parallel/Worker.php deleted file mode 100644 index fbcfe18..0000000 --- a/src/Parallel/Worker.php +++ /dev/null @@ -1,18 +0,0 @@ -processor)($broadcast, ...$data); - } - -} diff --git a/tests/AnotherWorker.php b/tests/AnotherWorker.php deleted file mode 100644 index 3ada416..0000000 --- a/tests/AnotherWorker.php +++ /dev/null @@ -1,17 +0,0 @@ -> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); - usleep($microseconds * 1000); - - return $number; - } - -} diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index 5c920eb..3091ef3 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -2,8 +2,11 @@ namespace HDSSolutions\Console\Tests; +use HDSSolutions\Console\Parallel\Internals\Worker; use HDSSolutions\Console\Parallel\Scheduler; +use HDSSolutions\Console\Tests\Workers; use PHPUnit\Framework\TestCase; +use RuntimeException; use Throwable; use function parallel\bootstrap; @@ -15,32 +18,71 @@ public function testThatParallelExtensionIsAvailable(): void { // check if extension is available if ($loaded) { - // set parallel bootstrap file - bootstrap(__DIR__.'/config/bootstrap.php'); + // set bootstrap file + bootstrap(__DIR__.'/../vendor/autoload.php'); } } + /** + * @depends testThatParallelExtensionIsAvailable + */ + public function testThatWorkerMustBeDefinedValidates(): void { + $this->expectException(RuntimeException::class); + Scheduler::runTask(123); + } + + /** + * @depends testThatParallelExtensionIsAvailable + */ + public function testThatClosureCanBeUsedAsWorker(): void { + Scheduler::using(static function($input) { + usleep(random_int(100, 500) * 1000); + return $input * 2; + }); + + foreach ($tasks = range(1, 10) as $task) { + try { Scheduler::runTask($task); + } catch (Throwable) { + Scheduler::stop(); + } + } + + $results = []; + foreach (Scheduler::getProcessedTasks() as $processed_task) { + $this->assertEquals(Worker::class, $processed_task->getWorkerClass()); + $results[] = $processed_task->getResult(); + } + // tasks results must be the same count + $this->assertCount(count($tasks), $results); + } + /** * @depends testThatParallelExtensionIsAvailable */ public function testParallel(): void { $workers = [ - new TestWorker(), - new AnotherWorker(), + Workers\TestWorker::class, + Workers\AnotherWorker::class, ]; - $tasks = []; + $multipliers = [ 2, 4, 8 ]; + + // build example "tasks" + $tasks = []; + $total = 0; foreach ($workers as $idx => $worker) { + $tasks[$worker] = range(($idx + 1) * 100, ($idx + 1) * 100 + 25); + $total += count($tasks[$worker]); + } + + foreach ($workers as $worker) { // register worker - Scheduler::with($worker) - // register task finished callback - ->onTaskFinished(static function($task_no) { - echo sprintf("%s finished\n", $task_no); - }); - // build example "tasks" - $tasks[get_class($worker)] = range(($idx + 1) * 100, ($idx + 1) * 100 + 25); + Scheduler::using($worker, $multipliers) + // init progress bar for worker + ->withProgress(steps: $total); + // run example tasks - foreach ($tasks[get_class($worker)] as $task) { + foreach ($tasks[$worker] as $task) { try { Scheduler::runTask($task); } catch (Throwable) { Scheduler::stop(); @@ -50,10 +92,11 @@ public function testParallel(): void { $results = []; // fetch processed tasks and store their results - foreach (Scheduler::getProcessedTasks() as $task_result) { + foreach (Scheduler::getProcessedTasks() as $processed_task) { + $result = $processed_task->getResult(); echo sprintf("Task result from #%s => %u\n", - $worker_class = get_class($task_result->getWorker()), - $result = $task_result->getResult()); + $worker_class = $processed_task->getWorkerClass(), + $result[1]); $results[$worker_class][] = $result; } @@ -62,14 +105,17 @@ public function testParallel(): void { // check results foreach ($workers as $worker) { // get original tasks - $worker_tasks = $tasks[get_class($worker)]; + $worker_tasks = $tasks[$worker]; // get tasks results - $worker_results = $results[get_class($worker)]; + $worker_results = $results[$worker]; // tasks results must be the same count $this->assertCount(count($worker_tasks), $worker_results); // tasks results must be in different order - $this->assertNotEquals($worker_tasks, $worker_results, 'Arrays are in the same order'); + $this->assertNotEquals($worker_tasks, array_column($worker_results, 0), 'Arrays are in the same order'); + + $result = array_shift($worker_results); + $this->assertEquals($result[1], $result[0] * array_product($multipliers)); } } diff --git a/tests/TestWorker.php b/tests/TestWorker.php deleted file mode 100644 index 74842cf..0000000 --- a/tests/TestWorker.php +++ /dev/null @@ -1,17 +0,0 @@ -> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); - usleep($microseconds * 1000); - - return $number; - } - -} diff --git a/tests/Workers/AnotherWorker.php b/tests/Workers/AnotherWorker.php new file mode 100644 index 0000000..54e8b2a --- /dev/null +++ b/tests/Workers/AnotherWorker.php @@ -0,0 +1,25 @@ +setMessage(sprintf("AnotherWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); + + usleep($microseconds * 1000); + + $this->setMessage(sprintf('Hey! I finished waiting %sms from task #%u!', $microseconds, $number)); + $this->advance(); + + return [ $number, $number * array_product($this->multipliers) ]; + } + +} diff --git a/tests/Workers/TestWorker.php b/tests/Workers/TestWorker.php new file mode 100644 index 0000000..655149a --- /dev/null +++ b/tests/Workers/TestWorker.php @@ -0,0 +1,25 @@ +setMessage(sprintf("TestWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); + + usleep($microseconds * 1000); + + $this->setMessage(sprintf('Hey! I finished waiting %sms from task #%u!', $microseconds, $number)); + $this->advance(); + + return [ $number, $number * array_product($this->multipliers) ]; + } + +} diff --git a/tests/config/bootstrap.php b/tests/config/bootstrap.php deleted file mode 100644 index 5b98d42..0000000 --- a/tests/config/bootstrap.php +++ /dev/null @@ -1,3 +0,0 @@ -