Skip to content

Commit

Permalink
Merge pull request hschimpf#11 from hschimpf/bugfix/stop-running-tasks
Browse files Browse the repository at this point in the history
Remove tasks from process queue and stopping running tasks
  • Loading branch information
hschimpf authored Apr 8, 2023
2 parents 6820c32 + 2852b82 commit 2be3400
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 23 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,57 @@ foreach (Scheduler::getTasks() as $task) {
}
```

### Remove pending tasks
You can stop processing queued tasks if your process needs to stop earlier.
```php
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

// this will remove tasks from the pending queue
Scheduler::removePendingTasks();

// after cleaning the queue, you should wait for tasks that are currently being processed to finish
Scheduler::awaitTasksCompletion();

$results = [];
$unprocessed_tasks = [];
foreach (Scheduler::getTasks() as $task) {
if ($task->wasProcessed()) {
$results[] = $task->getResult();
} else {
// tasks that were not processed, will remain in the Pending state
$unprocessed_tasks[] = $task;
}
}
```

### Stop all processing immediately
If you need to stop all right away, you can call the `Scheduler::stop()` method. This will stop processing all tasks immediately.
```php
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

// this will stop processing tasks immediately
Scheduler::stop();

// in this state, Tasks should have 3 of the following states
foreach (Scheduler::getTasks() as $task) {
switch (true) {
case $task->isPending():
// Task was never processed
break;

case $task->wasProcessed():
// Task was processed by the Worker
break;

case $task->wasCancelled():
// Task was cancelled while was being processed
break;
}
}
```

### ProgressBar

#### Requeriments
Expand Down
11 changes: 11 additions & 0 deletions src/Contracts/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ interface Task {
*/
public const STATE_Processed = 3;

/**
* ## Task processing was cancelled
*/
public const STATE_Cancelled = 4;

/**
* ## Available states of the Task
*/
Expand All @@ -32,6 +37,7 @@ interface Task {
self::STATE_Starting,
self::STATE_Processing,
self::STATE_Processed,
self::STATE_Cancelled,
];

/**
Expand Down Expand Up @@ -78,6 +84,11 @@ public function isBeingProcessed(): bool;
*/
public function wasProcessed(): bool;

/**
* @return bool True if the Task was cancelled during processing
*/
public function wasCancelled(): bool;

/**
* @return mixed Result of the Task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::removeTasks()} action
* Message sent to {@see Runner} to execute {@see Runner::removeAllTasks()} action
*/
final class RemoveTasksMessage extends ParallelCommandMessage {
final class RemoveAllTasksMessage extends ParallelCommandMessage {

public function __construct() {
parent::__construct('remove_tasks');
parent::__construct('remove_all_tasks');
}

}
17 changes: 17 additions & 0 deletions src/Internals/Commands/Runner/RemovePendingTasksMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::removePendingTasks()} action
*/
final class RemovePendingTasksMessage extends ParallelCommandMessage {

public function __construct() {
parent::__construct('remove_pending_tasks');
}

}
17 changes: 17 additions & 0 deletions src/Internals/Commands/Runner/StopRunningTasksMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::stopRunningTasks()} action
*/
final class StopRunningTasksMessage extends ParallelCommandMessage {

public function __construct() {
parent::__construct('stop_running_tasks', [ true ]);
}

}
5 changes: 1 addition & 4 deletions src/Internals/Common/ListenEventsAndExecuteActions.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ final public function listen(): void {
$this->send(new ParallelException($e));
}

}} catch (Channel\Error\Closed) {
// TODO channel must not be closed
$debug = true;
}
}} catch (Channel\Error\Closed) {}

$this->afterListening();
}
Expand Down
48 changes: 42 additions & 6 deletions src/Internals/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use HDSSolutions\Console\Parallel\RegisteredWorker;
use HDSSolutions\Console\Parallel\Task;
use RuntimeException;
use Throwable;

final class Runner {
use Runner\HasChannels;
Expand Down Expand Up @@ -110,7 +111,7 @@ protected function getTasks(): array | false {
return false;
}

protected function removeTasks(): bool {
protected function removeAllTasks(): bool {
$this->stopRunningTasks();

$this->tasks = [];
Expand All @@ -119,6 +120,46 @@ protected function removeTasks(): bool {
return $this->send(true);
}

protected function removePendingTasks(): bool {
// clear pending tasks
$this->pending_tasks = [];

return $this->send(true);
}

protected function stopRunningTasks(bool $should_return = false): bool {
// kill all running threads
foreach ($this->running_tasks as $task_id => $running_task) {
// check if future is already done working
if ( !PARALLEL_EXT_LOADED || $running_task->done()) {
// store the ProcessedTask
try {
// get the result of the process
[ $task_id, $result ] = PARALLEL_EXT_LOADED ? $running_task->value() : $running_task;
// ignore result if Task was removed, probably through Scheduler::removeTasks()
if (!array_key_exists($task_id, $this->tasks)) continue;
// store result and update state of the Task
$this->tasks[$task_id]
->setResult($result)
->setState(Task::STATE_Processed);
} catch (Throwable) {}

} else {
// cancel running task
try { $running_task->cancel();
} catch (Throwable) {}
// change task state to Cancelled
$this->tasks[$task_id]->setState(Task::STATE_Cancelled);
}
}

$this->running_tasks = [];

if ($should_return) return $this->send(true);

return true;
}

protected function enableProgressBar(string $worker_id, int $steps): bool {
if ( !array_key_exists($worker_id, $this->workers_hashmap)) {
throw new RuntimeException('Worker is not defined');
Expand Down Expand Up @@ -156,11 +197,6 @@ protected function await(): bool {
return true;
}

private function stopRunningTasks(): void {
// TODO stop running tasks
$todo = true;
}

public function __destruct() {
$this->stopProgressBar();
}
Expand Down
1 change: 0 additions & 1 deletion src/Internals/Runner/ManagesTasks.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use HDSSolutions\Console\Parallel\Contracts\ParallelWorker;
use HDSSolutions\Console\Parallel\Internals\Commands;
use HDSSolutions\Console\Parallel\Internals\Messages\ProgressBarRegistrationMessage;
use HDSSolutions\Console\Parallel\Internals\Worker;
use HDSSolutions\Console\Parallel\RegisteredWorker;
use HDSSolutions\Console\Parallel\Task;
Expand Down
34 changes: 29 additions & 5 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Closure;
use Generator;
use HDSSolutions\Console\Parallel\Contracts\Task;
use HDSSolutions\Console\Parallel\Exceptions\ParallelException;
use HDSSolutions\Console\Parallel\Internals\Commands;
use parallel\Channel;
Expand Down Expand Up @@ -148,14 +149,30 @@ public static function getTasks(): Generator | array {
yield from self::instance()->runner->processMessage($message);
}

/**
* Remove all pending tasks from the processing queue.<br>
* Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state
*/
public static function removePendingTasks(): bool {
$message = new Commands\Runner\RemovePendingTasksMessage();

if (PARALLEL_EXT_LOADED) {
self::instance()->send($message);

return self::instance()->recv();
}

return self::instance()->runner->processMessage($message);
}

/**
* Remove all registered Tasks.<br/>
* **IMPORTANT**: This will stop processing Tasks immediately and remove **all** Tasks.
*
* @return bool
*/
public static function removeTasks(): bool {
$message = new Commands\Runner\RemoveTasksMessage();
public static function removeAllTasks(): bool {
$message = new Commands\Runner\RemoveAllTasksMessage();

if (PARALLEL_EXT_LOADED) {
self::instance()->send($message);
Expand All @@ -167,15 +184,22 @@ public static function removeTasks(): bool {
}

/**
* Stops all running tasks. If force is set to false, waits gracefully for all running tasks to finish execution
* Stops all running tasks.<br>
* If force is set to false, waits gracefully for all running tasks to finish execution.<br>
* Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state.<br>
* Tasks that were currently processing, will have the {@see Task::STATE_Cancelled} state.
*
* @param bool $force Flag to force task cancellation
*/
public static function stop(bool $force = true): void {
// check if extension isn't loaded and just return
if ( !PARALLEL_EXT_LOADED) return;

if ($force) self::removeTasks();
self::removePendingTasks();
if ($force) {
self::instance()->send(new Commands\Runner\StopRunningTasksMessage());
self::instance()->recv();
}

self::awaitTasksCompletion();
}
Expand All @@ -185,7 +209,7 @@ public static function stop(bool $force = true): void {
*/
public function __destruct() {
// remove all Tasks
self::removeTasks();
self::removeAllTasks();

// check if extension isn't loaded and just return
if ( !PARALLEL_EXT_LOADED) return;
Expand Down
4 changes: 4 additions & 0 deletions src/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,8 @@ public function wasProcessed(): bool {
return $this->getState() === self::STATE_Processed;
}

public function wasCancelled(): bool {
return $this->getState() === self::STATE_Cancelled;
}

}
Loading

0 comments on commit 2be3400

Please sign in to comment.