Skip to content

Commit

Permalink
Merge pull request #15 from hschimpf/2.x-dev
Browse files Browse the repository at this point in the history
- Renamed Task data and result to input and output
- Added an argument to Scheduler::awaitTasksCompletion() to specify a timeout interval
- Updated README with an example on await() with specified timeout interval
- FIX: Wait until ProgressBar worker starts
  • Loading branch information
hschimpf authored Apr 12, 2023
2 parents 5469e29 + 2288124 commit d3a7720
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 41 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ do {
break;

case $task->wasProcessed():
$result = $task->getResult();
$result = $task->getOutput();
break;
}
}
Expand All @@ -188,6 +188,14 @@ use HDSSolutions\Console\Parallel\Scheduler;
Scheduler::awaitTasksCompletion();
```

You can also specify a time limit for waiting. The process will pause until all tasks are processed or until max time has been reached, whatever comes first.
```php
use HDSSolutions\Console\Parallel\Scheduler;

// Pause until all tasks are processed or until 15 minutes pass
Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M'));
```

### Get processed tasks result

```php
Expand All @@ -198,7 +206,7 @@ foreach (Scheduler::getTasks() as $task) {
// you have access to the Worker class that was used to processed the task
$worker = $task->getWorkerClass();
// and the result of the task processed
$result = $task->getResult();
$result = $task->getOutput();
}
```

Expand All @@ -218,7 +226,7 @@ $results = [];
$unprocessed_tasks = [];
foreach (Scheduler::getTasks() as $task) {
if ($task->wasProcessed()) {
$results[] = $task->getResult();
$results[] = $task->getOutput();
} else {
// tasks that were not processed, will remain in the Pending state
$unprocessed_tasks[] = $task;
Expand Down
14 changes: 12 additions & 2 deletions src/Contracts/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ public function getWorkerClass(): string;
public function getWorkerId(): int;

/**
* @return mixed Data of the Task
* @deprecated Replaced with {@see self::getInput()}
*/
public function getData(): mixed;

/**
* @return mixed Input sent to the Task
*/
public function getInput(): mixed;

/**
* Returns the current state of the Task
*
Expand Down Expand Up @@ -90,8 +95,13 @@ public function wasProcessed(): bool;
public function wasCancelled(): bool;

/**
* @return mixed Result of the Task
* @deprecated Replaced with {@see self::getOutput()}
*/
public function getResult(): mixed;

/**
* @return mixed Output of the Task
*/
public function getOutput(): mixed;

}
9 changes: 5 additions & 4 deletions src/Internals/Commands/Runner/WaitTasksCompletionMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner;

use Closure;
use DateInterval;
use DateTime;
use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

Expand All @@ -12,10 +13,10 @@
final class WaitTasksCompletionMessage extends ParallelCommandMessage {

/**
* @param Closure $or_until
* @param DateInterval|null $wait_until
*/
public function __construct(Closure $or_until) {
parent::__construct('await', [ $or_until ]);
public function __construct(?DateInterval $wait_until = null) {
parent::__construct('await', [ $wait_until === null ? null : (new DateTime())->add($wait_until)->getTimestamp() ]);
}

}
2 changes: 2 additions & 0 deletions src/Internals/ProgressBarWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ protected function registerWorker(string $worker, int $steps = 0): void {
// update steps
$this->progressBar->setMaxSteps($steps);
}

$this->release();
}

protected function progressBarAction(string $action, array $args): void {
Expand Down
9 changes: 5 additions & 4 deletions src/Internals/ProgressBarWorker/HasProgressBar.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ private function createProgressBar(): void {
$this->progressBar->setRedrawFrequency( 100 );
$this->progressBar->minSecondsBetweenRedraws( 0.1 );
$this->progressBar->maxSecondsBetweenRedraws( 0.2 );
$this->progressBar->setFormat(" %current% of %max%: %message%\n".
" [%bar%] %percent:3s%%\n".
" elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(PARALLEL_EXT_LOADED ? "\n" : ',').
" memory: %threads_memory%\n");
$this->progressBar->setFormat(format:
"%current% of %max%: %message%\n".
"[%bar%] %percent:3s%%\n".
"elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(PARALLEL_EXT_LOADED ? "\n" : ',').
"memory: %threads_memory%\n");
// set initial values
$this->progressBar->setMessage('Starting...');
$this->progressBar->setMessage('??', 'items_per_second');
Expand Down
7 changes: 4 additions & 3 deletions src/Internals/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected function queueTask(array $data): int {
identifier: count($this->tasks),
worker_class: $worker->getWorkerClass(),
worker_id: $this->selected_worker,
data: $data,
input: $data,
);
// and put identifier on the pending tasks list
$this->pending_tasks[] = $task->getIdentifier();
Expand Down Expand Up @@ -176,6 +176,7 @@ protected function enableProgressBar(string $worker_id, int $steps): bool {
worker: $worker->getWorkerClass(),
steps: $steps,
));
$this->progressbar_channel->receive();

return $this->send(true);
}
Expand All @@ -197,9 +198,9 @@ protected function update(): void {
}
}

protected function await(Closure $or_until): bool {
protected function await(?int $wait_until = null): bool {
if (PARALLEL_EXT_LOADED) {
return $this->send($or_until() === false && ($this->hasPendingTasks() || $this->hasRunningTasks()));
return $this->send(time() <= ($wait_until ?? time()) && ($this->hasPendingTasks() || $this->hasRunningTasks()));
}

return true;
Expand Down
8 changes: 4 additions & 4 deletions src/Internals/Runner/ManagesTasks.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ private function startNextPendingTask(): void {
// build task params
$params = $worker instanceof Worker
// process task using local Worker
? [ $registered_worker->getClosure(), ...$task->getData() ]
? [ $registered_worker->getClosure(), ...$task->getInput() ]
// process task using user Worker
: [ ...$task->getData() ];
: [ ...$task->getInput() ];

// check if worker has ProgressBar enabled
if ($registered_worker->hasProgressEnabled()) {
Expand Down Expand Up @@ -136,9 +136,9 @@ private function startNextPendingTask(): void {
// build task params
$params = $worker instanceof Worker
// process task using local Worker
? [ $registered_worker->getClosure(), ...$task->getData() ]
? [ $registered_worker->getClosure(), ...$task->getInput() ]
// process task using user Worker
: [ ...$task->getData() ];
: [ ...$task->getInput() ];

// check if worker has ProgressBar enabled
if ($registered_worker->hasProgressEnabled()) {
Expand Down
5 changes: 3 additions & 2 deletions src/RegisteredWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public function getIdentifier(): string {
* @param bool $with_progress Flag to enable/disable the ProgressBar
*/
public function withProgress(bool $with_progress = true, int $steps = 0): void {
// enable with progress flag
$this->with_progress = $with_progress;

// check if caller is Runner
$caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1];
if (($caller['class'] ?? null) === Internals\Runner::class || !PARALLEL_EXT_LOADED) {
// enable with progress flag
$this->with_progress = $with_progress;
$this->steps = $steps;

return;
Expand Down
15 changes: 7 additions & 8 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace HDSSolutions\Console\Parallel;

use Closure;
use DateInterval;
use Generator;
use HDSSolutions\Console\Parallel\Contracts\Task;
use HDSSolutions\Console\Parallel\Exceptions\ParallelException;
Expand Down Expand Up @@ -111,21 +112,19 @@ public static function runTask(mixed ...$data): int {
/**
* Calling this method will pause execution until all tasks are finished.
*
* @param Closure|null $or_until Custom validation to stop waiting.
* @param DateInterval|null $wait_until Should wait until specified DateInterval or until all tasks finished.
*/
public static function awaitTasksCompletion(Closure $or_until = null): bool {
$message = new Commands\Runner\WaitTasksCompletionMessage(
or_until: $or_until ?? static fn() => false,
);
public static function awaitTasksCompletion(DateInterval $wait_until = null): bool {
$message = new Commands\Runner\WaitTasksCompletionMessage($wait_until);

if (PARALLEL_EXT_LOADED) {
$has_pending_tasks = false;
$should_keep_waiting = false;
do {
self::instance()->send($message);
if ($has_pending_tasks) {
if ($should_keep_waiting) {
usleep(25_000);
}
} while ($has_pending_tasks = self::instance()->recv());
} while ($should_keep_waiting = self::instance()->recv());

return true;
}
Expand Down
24 changes: 17 additions & 7 deletions src/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ final class Task implements Contracts\Task {
private int $state = self::STATE_Pending;

/**
* @var mixed Result of the task
* @var mixed Output of the task
*/
private mixed $result = null;
private mixed $output = null;

/**
* @param int $identifier Identifier of the Task
* @param string $worker_class Worker assigned to process this Task
* @param int $worker_id Identifier of the registered worker
* @param mixed $data Data of the Task
* @param mixed $input Input of the Task
*/
public function __construct(
private int $identifier,
private string $worker_class,
private int $worker_id,
private mixed $data = null,
private mixed $input = null,
) {}

public function getIdentifier(): int {
Expand All @@ -42,8 +42,13 @@ public function getWorkerId(): int {
return $this->worker_id;
}

/** @inheritdoc */
public function getData(): mixed {
return $this->data;
return $this->getInput();
}

public function getInput(): mixed {
return $this->input;
}

/** @internal */
Expand All @@ -59,13 +64,18 @@ public function getState(): int {

/** @internal */
public function setResult(mixed $result): self {
$this->result = $result;
$this->output = $result;

return $this;
}

/** @inheritdoc */
public function getResult(): mixed {
return $this->result;
return $this->getOutput();
}

public function getOutput(): mixed {
return $this->output;
}

public function isPending(): bool {
Expand Down
6 changes: 3 additions & 3 deletions tests/ParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function testThatClosureCanBeUsedAsWorker(): void {
$results = [];
foreach (Scheduler::getTasks() as $task) {
$this->assertEquals(Worker::class, $task->getWorkerClass());
$results[] = $task->getResult();
$results[] = $task->getOutput();
}
// tasks results must be the same count
$this->assertCount(count($tasks), $results);
Expand Down Expand Up @@ -95,7 +95,7 @@ public function testProgressBar(): void {
$results = [];
// fetch processed tasks and store their results
foreach (Scheduler::getTasks() as $task) {
$result = $task->getResult();
$result = $task->getOutput();
echo sprintf("Task result from #%s => %u\n",
$worker_class = $task->getWorkerClass(),
$result[1]);
Expand Down Expand Up @@ -199,7 +199,7 @@ public function testThatChannelsDontOverlap(): void {

foreach (Scheduler::getTasks() as $task) {
// task result must be the same count as sub-tasks
$this->assertCount($task->getData()[0], $task->getResult());
$this->assertCount($task->getInput()[0], $task->getOutput());
}

// remove all Tasks
Expand Down
2 changes: 1 addition & 1 deletion tests/Workers/WorkerWithSubWorkers.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected function process(int $subtasks = 0): array {

$results = [];
foreach (Scheduler::getTasks() as $task) {
$results[] = $task->getResult();
$results[] = $task->getOutput();
}

Scheduler::removeAllTasks();
Expand Down

0 comments on commit d3a7720

Please sign in to comment.