Skip to content

Commit

Permalink
Task management for removing & stopping tasks
Browse files Browse the repository at this point in the history
- FIX: Missing implementation of Runner::stopRunningTasks() hschimpf#10
- Added Scheduler::removePendingTasks()
- Renamed Scheduler::removeTasks() to removeAllTasks()
- Added tests for removePendingTasks() and removeAllTasks()
  • Loading branch information
hschimpf committed Apr 8, 2023
1 parent c1701bb commit 5ca7d49
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::removeTasks()} action
* Message sent to {@see Runner} to execute {@see Runner::removeAllTasks()} action
*/
final class RemoveTasksMessage extends ParallelCommandMessage {
final class RemoveAllTasksMessage extends ParallelCommandMessage {

public function __construct() {
parent::__construct('remove_tasks');
parent::__construct('remove_all_tasks');
}

}
17 changes: 17 additions & 0 deletions src/Internals/Commands/Runner/RemovePendingTasksMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::removePendingTasks()} action
*/
final class RemovePendingTasksMessage extends ParallelCommandMessage {

public function __construct() {
parent::__construct('remove_pending_tasks');
}

}
17 changes: 17 additions & 0 deletions src/Internals/Commands/Runner/StopRunningTasksMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::stopRunningTasks()} action
*/
final class StopRunningTasksMessage extends ParallelCommandMessage {

public function __construct() {
parent::__construct('stop_running_tasks', [ true ]);
}

}
48 changes: 42 additions & 6 deletions src/Internals/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use HDSSolutions\Console\Parallel\RegisteredWorker;
use HDSSolutions\Console\Parallel\Task;
use RuntimeException;
use Throwable;

final class Runner {
use Runner\HasChannels;
Expand Down Expand Up @@ -110,7 +111,7 @@ protected function getTasks(): array | false {
return false;
}

protected function removeTasks(): bool {
protected function removeAllTasks(): bool {
$this->stopRunningTasks();

$this->tasks = [];
Expand All @@ -119,6 +120,46 @@ protected function removeTasks(): bool {
return $this->send(true);
}

protected function removePendingTasks(): bool {
// clear pending tasks
$this->pending_tasks = [];

return $this->send(true);
}

protected function stopRunningTasks(bool $should_return = false): bool {
// kill all running threads
foreach ($this->running_tasks as $task_id => $running_task) {
// check if future is already done working
if ( !PARALLEL_EXT_LOADED || $running_task->done()) {
// store the ProcessedTask
try {
// get the result of the process
[ $task_id, $result ] = PARALLEL_EXT_LOADED ? $running_task->value() : $running_task;
// ignore result if Task was removed, probably through Scheduler::removeTasks()
if (!array_key_exists($task_id, $this->tasks)) continue;
// store result and update state of the Task
$this->tasks[$task_id]
->setResult($result)
->setState(Task::STATE_Processed);
} catch (Throwable) {}

} else {
// cancel running task
try { $running_task->cancel();
} catch (Throwable) {}
// change task state to Cancelled
$this->tasks[$task_id]->setState(Task::STATE_Cancelled);
}
}

$this->running_tasks = [];

if ($should_return) return $this->send(true);

return true;
}

protected function enableProgressBar(string $worker_id, int $steps): bool {
if ( !array_key_exists($worker_id, $this->workers_hashmap)) {
throw new RuntimeException('Worker is not defined');
Expand Down Expand Up @@ -156,11 +197,6 @@ protected function await(): bool {
return true;
}

private function stopRunningTasks(): void {
// TODO stop running tasks
$todo = true;
}

public function __destruct() {
$this->stopProgressBar();
}
Expand Down
34 changes: 29 additions & 5 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Closure;
use Generator;
use HDSSolutions\Console\Parallel\Contracts\Task;
use HDSSolutions\Console\Parallel\Exceptions\ParallelException;
use HDSSolutions\Console\Parallel\Internals\Commands;
use parallel\Channel;
Expand Down Expand Up @@ -148,14 +149,30 @@ public static function getTasks(): Generator | array {
yield from self::instance()->runner->processMessage($message);
}

/**
* Remove all pending tasks from the processing queue.<br>
* Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state
*/
public static function removePendingTasks(): bool {
$message = new Commands\Runner\RemovePendingTasksMessage();

if (PARALLEL_EXT_LOADED) {
self::instance()->send($message);

return self::instance()->recv();
}

return self::instance()->runner->processMessage($message);
}

/**
* Remove all registered Tasks.<br/>
* **IMPORTANT**: This will stop processing Tasks immediately and remove **all** Tasks.
*
* @return bool
*/
public static function removeTasks(): bool {
$message = new Commands\Runner\RemoveTasksMessage();
public static function removeAllTasks(): bool {
$message = new Commands\Runner\RemoveAllTasksMessage();

if (PARALLEL_EXT_LOADED) {
self::instance()->send($message);
Expand All @@ -167,15 +184,22 @@ public static function removeTasks(): bool {
}

/**
* Stops all running tasks. If force is set to false, waits gracefully for all running tasks to finish execution
* Stops all running tasks.<br>
* If force is set to false, waits gracefully for all running tasks to finish execution.<br>
* Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state.<br>
* Tasks that were currently processing, will have the {@see Task::STATE_Cancelled} state.
*
* @param bool $force Flag to force task cancellation
*/
public static function stop(bool $force = true): void {
// check if extension isn't loaded and just return
if ( !PARALLEL_EXT_LOADED) return;

if ($force) self::removeTasks();
self::removePendingTasks();
if ($force) {
self::instance()->send(new Commands\Runner\StopRunningTasksMessage());
self::instance()->recv();
}

self::awaitTasksCompletion();
}
Expand All @@ -185,7 +209,7 @@ public static function stop(bool $force = true): void {
*/
public function __destruct() {
// remove all Tasks
self::removeTasks();
self::removeAllTasks();

// check if extension isn't loaded and just return
if ( !PARALLEL_EXT_LOADED) return;
Expand Down
69 changes: 66 additions & 3 deletions tests/ParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public function testThatClosureCanBeUsedAsWorker(): void {
$this->assertCount(count($tasks), $results);

// remove all Tasks
Scheduler::removeTasks();
Scheduler::removeAllTasks();
}

public function testProgressBar(): void {
Expand Down Expand Up @@ -103,7 +103,7 @@ public function testProgressBar(): void {
}

// remove all Tasks
Scheduler::removeTasks();
Scheduler::removeAllTasks();

// check results
foreach ($workers as $worker) {
Expand All @@ -122,6 +122,69 @@ public function testProgressBar(): void {
}
}

public function testThatTasksCanBeRemovedFromQueue(): void {
Scheduler::using(Workers\LongRunningWorker::class);

foreach (range(1000, 20000, 50) as $ms) {
try { Scheduler::runTask($ms);
} catch (Throwable) {
Scheduler::stop();
}
}

// wait 100ms and remove pending tasks
usleep(100_000);
Scheduler::removePendingTasks();

// wait for running tasks to end
Scheduler::awaitTasksCompletion();

$has_pending_tasks = false;
$has_processed_tasks = false;
$has_cancelled_tasks = false;
foreach (Scheduler::getTasks() as $task) {
$has_pending_tasks = $has_pending_tasks || $task->isPending();
$has_processed_tasks = $has_processed_tasks || $task->wasProcessed();
$has_cancelled_tasks = $has_cancelled_tasks || $task->wasCancelled();
}

$this->assertTrue($has_pending_tasks);
$this->assertTrue($has_processed_tasks);
$this->assertFalse($has_cancelled_tasks);

Scheduler::removeAllTasks();
}

public function testThatTasksCanBeCancelled(): void {
Scheduler::using(Workers\LongRunningWorker::class);

foreach (range(100, 20000, 50) as $ms) {
try { Scheduler::runTask($ms);
} catch (Throwable) {
Scheduler::stop();
}
}

// wait 200ms and stop all
usleep(200_000);
Scheduler::stop();

$has_pending_tasks = false;
$has_processed_tasks = false;
$has_cancelled_tasks = false;
foreach (Scheduler::getTasks() as $task) {
$has_pending_tasks = $has_pending_tasks || $task->isPending();
$has_processed_tasks = $has_processed_tasks || $task->wasProcessed();
$has_cancelled_tasks = $has_cancelled_tasks || $task->wasCancelled();
}

$this->assertTrue($has_pending_tasks);
$this->assertTrue($has_processed_tasks);
$this->assertTrue($has_cancelled_tasks);

Scheduler::removeAllTasks();
}

public function testThatChannelsDontOverlap(): void {
Scheduler::using(Workers\WorkerWithSubWorkers::class);

Expand All @@ -140,7 +203,7 @@ public function testThatChannelsDontOverlap(): void {
}

// remove all Tasks
Scheduler::removeTasks();
Scheduler::removeAllTasks();
}

}
19 changes: 19 additions & 0 deletions tests/Workers/LongRunningWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Tests\Workers;

use HDSSolutions\Console\Parallel\ParallelWorker;

final class LongRunningWorker extends ParallelWorker {

protected function process(int $goal = 0): int {
$waited = 0;
do {
usleep(100_000);
$waited += 100;
} while ($waited <= $goal);

return $goal;
}

}
2 changes: 1 addition & 1 deletion tests/Workers/WorkerWithSubWorkers.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ protected function process(int $subtasks = 0): array {
$results[] = $task->getResult();
}

Scheduler::removeTasks();
Scheduler::removeAllTasks();

return $results;
}
Expand Down

0 comments on commit 5ca7d49

Please sign in to comment.