diff --git a/src/Internals/Commands/Runner/RemoveTasksMessage.php b/src/Internals/Commands/Runner/RemoveAllTasksMessage.php similarity index 72% rename from src/Internals/Commands/Runner/RemoveTasksMessage.php rename to src/Internals/Commands/Runner/RemoveAllTasksMessage.php index d3f2820..d869f58 100644 --- a/src/Internals/Commands/Runner/RemoveTasksMessage.php +++ b/src/Internals/Commands/Runner/RemoveAllTasksMessage.php @@ -6,12 +6,12 @@ use HDSSolutions\Console\Parallel\Internals\Runner; /** - * Message sent to {@see Runner} to execute {@see Runner::removeTasks()} action + * Message sent to {@see Runner} to execute {@see Runner::removeAllTasks()} action */ -final class RemoveTasksMessage extends ParallelCommandMessage { +final class RemoveAllTasksMessage extends ParallelCommandMessage { public function __construct() { - parent::__construct('remove_tasks'); + parent::__construct('remove_all_tasks'); } } diff --git a/src/Internals/Commands/Runner/RemovePendingTasksMessage.php b/src/Internals/Commands/Runner/RemovePendingTasksMessage.php new file mode 100644 index 0000000..ce14cc2 --- /dev/null +++ b/src/Internals/Commands/Runner/RemovePendingTasksMessage.php @@ -0,0 +1,17 @@ +stopRunningTasks(); $this->tasks = []; @@ -119,6 +120,46 @@ protected function removeTasks(): bool { return $this->send(true); } + protected function removePendingTasks(): bool { + // clear pending tasks + $this->pending_tasks = []; + + return $this->send(true); + } + + protected function stopRunningTasks(bool $should_return = false): bool { + // kill all running threads + foreach ($this->running_tasks as $task_id => $running_task) { + // check if future is already done working + if ( !PARALLEL_EXT_LOADED || $running_task->done()) { + // store the ProcessedTask + try { + // get the result of the process + [ $task_id, $result ] = PARALLEL_EXT_LOADED ? $running_task->value() : $running_task; + // ignore result if Task was removed, probably through Scheduler::removeTasks() + if (!array_key_exists($task_id, $this->tasks)) continue; + // store result and update state of the Task + $this->tasks[$task_id] + ->setResult($result) + ->setState(Task::STATE_Processed); + } catch (Throwable) {} + + } else { + // cancel running task + try { $running_task->cancel(); + } catch (Throwable) {} + // change task state to Cancelled + $this->tasks[$task_id]->setState(Task::STATE_Cancelled); + } + } + + $this->running_tasks = []; + + if ($should_return) return $this->send(true); + + return true; + } + protected function enableProgressBar(string $worker_id, int $steps): bool { if ( !array_key_exists($worker_id, $this->workers_hashmap)) { throw new RuntimeException('Worker is not defined'); @@ -156,11 +197,6 @@ protected function await(): bool { return true; } - private function stopRunningTasks(): void { - // TODO stop running tasks - $todo = true; - } - public function __destruct() { $this->stopProgressBar(); } diff --git a/src/Scheduler.php b/src/Scheduler.php index 9a4e35d..2331580 100644 --- a/src/Scheduler.php +++ b/src/Scheduler.php @@ -4,6 +4,7 @@ use Closure; use Generator; +use HDSSolutions\Console\Parallel\Contracts\Task; use HDSSolutions\Console\Parallel\Exceptions\ParallelException; use HDSSolutions\Console\Parallel\Internals\Commands; use parallel\Channel; @@ -148,14 +149,30 @@ public static function getTasks(): Generator | array { yield from self::instance()->runner->processMessage($message); } + /** + * Remove all pending tasks from the processing queue.
+ * Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state + */ + public static function removePendingTasks(): bool { + $message = new Commands\Runner\RemovePendingTasksMessage(); + + if (PARALLEL_EXT_LOADED) { + self::instance()->send($message); + + return self::instance()->recv(); + } + + return self::instance()->runner->processMessage($message); + } + /** * Remove all registered Tasks.
* **IMPORTANT**: This will stop processing Tasks immediately and remove **all** Tasks. * * @return bool */ - public static function removeTasks(): bool { - $message = new Commands\Runner\RemoveTasksMessage(); + public static function removeAllTasks(): bool { + $message = new Commands\Runner\RemoveAllTasksMessage(); if (PARALLEL_EXT_LOADED) { self::instance()->send($message); @@ -167,7 +184,10 @@ public static function removeTasks(): bool { } /** - * Stops all running tasks. If force is set to false, waits gracefully for all running tasks to finish execution + * Stops all running tasks.
+ * If force is set to false, waits gracefully for all running tasks to finish execution.
+ * Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state.
+ * Tasks that were currently processing, will have the {@see Task::STATE_Cancelled} state. * * @param bool $force Flag to force task cancellation */ @@ -175,7 +195,11 @@ public static function stop(bool $force = true): void { // check if extension isn't loaded and just return if ( !PARALLEL_EXT_LOADED) return; - if ($force) self::removeTasks(); + self::removePendingTasks(); + if ($force) { + self::instance()->send(new Commands\Runner\StopRunningTasksMessage()); + self::instance()->recv(); + } self::awaitTasksCompletion(); } @@ -185,7 +209,7 @@ public static function stop(bool $force = true): void { */ public function __destruct() { // remove all Tasks - self::removeTasks(); + self::removeAllTasks(); // check if extension isn't loaded and just return if ( !PARALLEL_EXT_LOADED) return; diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index ae0c8c9..77587de 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -56,7 +56,7 @@ public function testThatClosureCanBeUsedAsWorker(): void { $this->assertCount(count($tasks), $results); // remove all Tasks - Scheduler::removeTasks(); + Scheduler::removeAllTasks(); } public function testProgressBar(): void { @@ -103,7 +103,7 @@ public function testProgressBar(): void { } // remove all Tasks - Scheduler::removeTasks(); + Scheduler::removeAllTasks(); // check results foreach ($workers as $worker) { @@ -122,6 +122,69 @@ public function testProgressBar(): void { } } + public function testThatTasksCanBeRemovedFromQueue(): void { + Scheduler::using(Workers\LongRunningWorker::class); + + foreach (range(1000, 20000, 50) as $ms) { + try { Scheduler::runTask($ms); + } catch (Throwable) { + Scheduler::stop(); + } + } + + // wait 100ms and remove pending tasks + usleep(100_000); + Scheduler::removePendingTasks(); + + // wait for running tasks to end + Scheduler::awaitTasksCompletion(); + + $has_pending_tasks = false; + $has_processed_tasks = false; + $has_cancelled_tasks = false; + foreach (Scheduler::getTasks() as $task) { + $has_pending_tasks = $has_pending_tasks || $task->isPending(); + $has_processed_tasks = $has_processed_tasks || $task->wasProcessed(); + $has_cancelled_tasks = $has_cancelled_tasks || $task->wasCancelled(); + } + + $this->assertTrue($has_pending_tasks); + $this->assertTrue($has_processed_tasks); + $this->assertFalse($has_cancelled_tasks); + + Scheduler::removeAllTasks(); + } + + public function testThatTasksCanBeCancelled(): void { + Scheduler::using(Workers\LongRunningWorker::class); + + foreach (range(100, 20000, 50) as $ms) { + try { Scheduler::runTask($ms); + } catch (Throwable) { + Scheduler::stop(); + } + } + + // wait 200ms and stop all + usleep(200_000); + Scheduler::stop(); + + $has_pending_tasks = false; + $has_processed_tasks = false; + $has_cancelled_tasks = false; + foreach (Scheduler::getTasks() as $task) { + $has_pending_tasks = $has_pending_tasks || $task->isPending(); + $has_processed_tasks = $has_processed_tasks || $task->wasProcessed(); + $has_cancelled_tasks = $has_cancelled_tasks || $task->wasCancelled(); + } + + $this->assertTrue($has_pending_tasks); + $this->assertTrue($has_processed_tasks); + $this->assertTrue($has_cancelled_tasks); + + Scheduler::removeAllTasks(); + } + public function testThatChannelsDontOverlap(): void { Scheduler::using(Workers\WorkerWithSubWorkers::class); @@ -140,7 +203,7 @@ public function testThatChannelsDontOverlap(): void { } // remove all Tasks - Scheduler::removeTasks(); + Scheduler::removeAllTasks(); } } diff --git a/tests/Workers/LongRunningWorker.php b/tests/Workers/LongRunningWorker.php new file mode 100644 index 0000000..0ea9e09 --- /dev/null +++ b/tests/Workers/LongRunningWorker.php @@ -0,0 +1,19 @@ +getResult(); } - Scheduler::removeTasks(); + Scheduler::removeAllTasks(); return $results; }