diff --git a/README.md b/README.md index ff3b0a0..38b02cf 100644 --- a/README.md +++ b/README.md @@ -172,7 +172,7 @@ do { break; case $task->wasProcessed(): - $result = $task->getResult(); + $result = $task->getOutput(); break; } } @@ -188,6 +188,14 @@ use HDSSolutions\Console\Parallel\Scheduler; Scheduler::awaitTasksCompletion(); ``` +You can also specify a time limit for waiting. The process will pause until all tasks are processed or until max time has been reached, whatever comes first. +```php +use HDSSolutions\Console\Parallel\Scheduler; + +// Pause until all tasks are processed or until 15 minutes pass +Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M')); +``` + ### Get processed tasks result ```php @@ -198,7 +206,7 @@ foreach (Scheduler::getTasks() as $task) { // you have access to the Worker class that was used to processed the task $worker = $task->getWorkerClass(); // and the result of the task processed - $result = $task->getResult(); + $result = $task->getOutput(); } ``` @@ -218,7 +226,7 @@ $results = []; $unprocessed_tasks = []; foreach (Scheduler::getTasks() as $task) { if ($task->wasProcessed()) { - $results[] = $task->getResult(); + $results[] = $task->getOutput(); } else { // tasks that were not processed, will remain in the Pending state $unprocessed_tasks[] = $task; diff --git a/src/Contracts/Task.php b/src/Contracts/Task.php index 0f55ee3..b10b7d0 100644 --- a/src/Contracts/Task.php +++ b/src/Contracts/Task.php @@ -57,10 +57,15 @@ public function getWorkerClass(): string; public function getWorkerId(): int; /** - * @return mixed Data of the Task + * @deprecated Replaced with {@see self::getInput()} */ public function getData(): mixed; + /** + * @return mixed Input sent to the Task + */ + public function getInput(): mixed; + /** * Returns the current state of the Task * @@ -90,8 +95,13 @@ public function wasProcessed(): bool; public function wasCancelled(): bool; /** - * @return mixed Result of the Task + * @deprecated Replaced with {@see self::getOutput()} */ public function getResult(): mixed; + /** + * @return mixed Output of the Task + */ + public function getOutput(): mixed; + } diff --git a/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php b/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php index 5b9ff19..c5495eb 100644 --- a/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php +++ b/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php @@ -2,7 +2,8 @@ namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner; -use Closure; +use DateInterval; +use DateTime; use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage; use HDSSolutions\Console\Parallel\Internals\Runner; @@ -12,10 +13,10 @@ final class WaitTasksCompletionMessage extends ParallelCommandMessage { /** - * @param Closure $or_until + * @param DateInterval|null $wait_until */ - public function __construct(Closure $or_until) { - parent::__construct('await', [ $or_until ]); + public function __construct(?DateInterval $wait_until = null) { + parent::__construct('await', [ $wait_until === null ? null : (new DateTime())->add($wait_until)->getTimestamp() ]); } } diff --git a/src/Internals/ProgressBarWorker.php b/src/Internals/ProgressBarWorker.php index 2b39342..1f6fa15 100644 --- a/src/Internals/ProgressBarWorker.php +++ b/src/Internals/ProgressBarWorker.php @@ -52,6 +52,8 @@ protected function registerWorker(string $worker, int $steps = 0): void { // update steps $this->progressBar->setMaxSteps($steps); } + + $this->release(); } protected function progressBarAction(string $action, array $args): void { diff --git a/src/Internals/ProgressBarWorker/HasProgressBar.php b/src/Internals/ProgressBarWorker/HasProgressBar.php index 299a9d9..189b59e 100644 --- a/src/Internals/ProgressBarWorker/HasProgressBar.php +++ b/src/Internals/ProgressBarWorker/HasProgressBar.php @@ -25,10 +25,11 @@ private function createProgressBar(): void { $this->progressBar->setRedrawFrequency( 100 ); $this->progressBar->minSecondsBetweenRedraws( 0.1 ); $this->progressBar->maxSecondsBetweenRedraws( 0.2 ); - $this->progressBar->setFormat(" %current% of %max%: %message%\n". - " [%bar%] %percent:3s%%\n". - " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(PARALLEL_EXT_LOADED ? "\n" : ','). - " memory: %threads_memory%\n"); + $this->progressBar->setFormat(format: + "%current% of %max%: %message%\n". + "[%bar%] %percent:3s%%\n". + "elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(PARALLEL_EXT_LOADED ? "\n" : ','). + "memory: %threads_memory%\n"); // set initial values $this->progressBar->setMessage('Starting...'); $this->progressBar->setMessage('??', 'items_per_second'); diff --git a/src/Internals/Runner.php b/src/Internals/Runner.php index 6312092..fc6382b 100644 --- a/src/Internals/Runner.php +++ b/src/Internals/Runner.php @@ -82,7 +82,7 @@ protected function queueTask(array $data): int { identifier: count($this->tasks), worker_class: $worker->getWorkerClass(), worker_id: $this->selected_worker, - data: $data, + input: $data, ); // and put identifier on the pending tasks list $this->pending_tasks[] = $task->getIdentifier(); @@ -176,6 +176,7 @@ protected function enableProgressBar(string $worker_id, int $steps): bool { worker: $worker->getWorkerClass(), steps: $steps, )); + $this->progressbar_channel->receive(); return $this->send(true); } @@ -197,9 +198,9 @@ protected function update(): void { } } - protected function await(Closure $or_until): bool { + protected function await(?int $wait_until = null): bool { if (PARALLEL_EXT_LOADED) { - return $this->send($or_until() === false && ($this->hasPendingTasks() || $this->hasRunningTasks())); + return $this->send(time() <= ($wait_until ?? time()) && ($this->hasPendingTasks() || $this->hasRunningTasks())); } return true; diff --git a/src/Internals/Runner/ManagesTasks.php b/src/Internals/Runner/ManagesTasks.php index f681cba..d749345 100644 --- a/src/Internals/Runner/ManagesTasks.php +++ b/src/Internals/Runner/ManagesTasks.php @@ -88,9 +88,9 @@ private function startNextPendingTask(): void { // build task params $params = $worker instanceof Worker // process task using local Worker - ? [ $registered_worker->getClosure(), ...$task->getData() ] + ? [ $registered_worker->getClosure(), ...$task->getInput() ] // process task using user Worker - : [ ...$task->getData() ]; + : [ ...$task->getInput() ]; // check if worker has ProgressBar enabled if ($registered_worker->hasProgressEnabled()) { @@ -136,9 +136,9 @@ private function startNextPendingTask(): void { // build task params $params = $worker instanceof Worker // process task using local Worker - ? [ $registered_worker->getClosure(), ...$task->getData() ] + ? [ $registered_worker->getClosure(), ...$task->getInput() ] // process task using user Worker - : [ ...$task->getData() ]; + : [ ...$task->getInput() ]; // check if worker has ProgressBar enabled if ($registered_worker->hasProgressEnabled()) { diff --git a/src/RegisteredWorker.php b/src/RegisteredWorker.php index ad23603..0575c41 100644 --- a/src/RegisteredWorker.php +++ b/src/RegisteredWorker.php @@ -37,11 +37,12 @@ public function getIdentifier(): string { * @param bool $with_progress Flag to enable/disable the ProgressBar */ public function withProgress(bool $with_progress = true, int $steps = 0): void { + // enable with progress flag + $this->with_progress = $with_progress; + // check if caller is Runner $caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1]; if (($caller['class'] ?? null) === Internals\Runner::class || !PARALLEL_EXT_LOADED) { - // enable with progress flag - $this->with_progress = $with_progress; $this->steps = $steps; return; diff --git a/src/Scheduler.php b/src/Scheduler.php index dfb9e60..e05637b 100644 --- a/src/Scheduler.php +++ b/src/Scheduler.php @@ -3,6 +3,7 @@ namespace HDSSolutions\Console\Parallel; use Closure; +use DateInterval; use Generator; use HDSSolutions\Console\Parallel\Contracts\Task; use HDSSolutions\Console\Parallel\Exceptions\ParallelException; @@ -111,21 +112,19 @@ public static function runTask(mixed ...$data): int { /** * Calling this method will pause execution until all tasks are finished. * - * @param Closure|null $or_until Custom validation to stop waiting. + * @param DateInterval|null $wait_until Should wait until specified DateInterval or until all tasks finished. */ - public static function awaitTasksCompletion(Closure $or_until = null): bool { - $message = new Commands\Runner\WaitTasksCompletionMessage( - or_until: $or_until ?? static fn() => false, - ); + public static function awaitTasksCompletion(DateInterval $wait_until = null): bool { + $message = new Commands\Runner\WaitTasksCompletionMessage($wait_until); if (PARALLEL_EXT_LOADED) { - $has_pending_tasks = false; + $should_keep_waiting = false; do { self::instance()->send($message); - if ($has_pending_tasks) { + if ($should_keep_waiting) { usleep(25_000); } - } while ($has_pending_tasks = self::instance()->recv()); + } while ($should_keep_waiting = self::instance()->recv()); return true; } diff --git a/src/Task.php b/src/Task.php index 47bb780..94683de 100644 --- a/src/Task.php +++ b/src/Task.php @@ -13,21 +13,21 @@ final class Task implements Contracts\Task { private int $state = self::STATE_Pending; /** - * @var mixed Result of the task + * @var mixed Output of the task */ - private mixed $result = null; + private mixed $output = null; /** * @param int $identifier Identifier of the Task * @param string $worker_class Worker assigned to process this Task * @param int $worker_id Identifier of the registered worker - * @param mixed $data Data of the Task + * @param mixed $input Input of the Task */ public function __construct( private int $identifier, private string $worker_class, private int $worker_id, - private mixed $data = null, + private mixed $input = null, ) {} public function getIdentifier(): int { @@ -42,8 +42,13 @@ public function getWorkerId(): int { return $this->worker_id; } + /** @inheritdoc */ public function getData(): mixed { - return $this->data; + return $this->getInput(); + } + + public function getInput(): mixed { + return $this->input; } /** @internal */ @@ -59,13 +64,18 @@ public function getState(): int { /** @internal */ public function setResult(mixed $result): self { - $this->result = $result; + $this->output = $result; return $this; } + /** @inheritdoc */ public function getResult(): mixed { - return $this->result; + return $this->getOutput(); + } + + public function getOutput(): mixed { + return $this->output; } public function isPending(): bool { diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index 6de2af7..669a2f4 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -50,7 +50,7 @@ public function testThatClosureCanBeUsedAsWorker(): void { $results = []; foreach (Scheduler::getTasks() as $task) { $this->assertEquals(Worker::class, $task->getWorkerClass()); - $results[] = $task->getResult(); + $results[] = $task->getOutput(); } // tasks results must be the same count $this->assertCount(count($tasks), $results); @@ -95,7 +95,7 @@ public function testProgressBar(): void { $results = []; // fetch processed tasks and store their results foreach (Scheduler::getTasks() as $task) { - $result = $task->getResult(); + $result = $task->getOutput(); echo sprintf("Task result from #%s => %u\n", $worker_class = $task->getWorkerClass(), $result[1]); @@ -199,7 +199,7 @@ public function testThatChannelsDontOverlap(): void { foreach (Scheduler::getTasks() as $task) { // task result must be the same count as sub-tasks - $this->assertCount($task->getData()[0], $task->getResult()); + $this->assertCount($task->getInput()[0], $task->getOutput()); } // remove all Tasks diff --git a/tests/Workers/WorkerWithSubWorkers.php b/tests/Workers/WorkerWithSubWorkers.php index f0dc212..d59cfa3 100644 --- a/tests/Workers/WorkerWithSubWorkers.php +++ b/tests/Workers/WorkerWithSubWorkers.php @@ -22,7 +22,7 @@ protected function process(int $subtasks = 0): array { $results = []; foreach (Scheduler::getTasks() as $task) { - $results[] = $task->getResult(); + $results[] = $task->getOutput(); } Scheduler::removeAllTasks();