An implementation of krakjoe/parallel PHP extension.
This library is designed to work even if the parallel
extension isn't available. In that case, the tasks will be executed un sequential order.
That allow that your code can be deployed in any environment, and if parallel
is enabled you will get the advantage of parallel processing.
You need these dependencies to execute tasks in parallel.
- PHP >= 8.0 with ZTS enabled
- parallel PECL extension (v1.2.5 or higher)
Parallel extension documentation can be found on https://php.net/parallel.
composer require hds-solutions/parallel-sdk
You should set the bootstrap file for the parallel threads. Setting the composer's autoloader is enough.
// check if extension is loaded to allow deploying even in environments where parallel isn't installed
if (extension_loaded('parallel')) {
// set the path to composer's autoloader
parallel\bootstrap(__DIR__.'/vendor/autoload.php');
}
Behind the scenes, the parallel extension creates an empty Runtime (thread) where the tasks are executed. Every Runtime is a clean, empty, isolated environment without any preloaded classes, functions, or autoloaders from the parent thread/process. This isolation ensures that each runtime starts with a minimal footprint. See references #1 and #2 for more info.
Then you define a Worker
that will process the tasks. There are two options:
- Using an anonymous function as a
Worker
. - Creating a class that extends from
ParallelWorker
and implements theprocess()
method.
Then you can schedule tasks to run in parallel using Scheduler::runTask()
method.
Since ZTS is only available on the cli, you should set the bootstrap file for parallel threads in the artisan
file.
#!/usr/bin/env php
<?php
+ // check if parallel extension is loaded
+ if (extension_loaded('parallel')) {
+ // and register the bootstrap file for the threads
+ parallel\bootstrap(__DIR__.'/bootstrap/parallel.php');
+ }
define('LARAVEL_START', microtime(true));
require __DIR__.'/vendor/autoload.php';
$app = require_once __DIR__.'/bootstrap/app.php';
Then, in the bootstrap file for the parallel threads, you just need to get an instance of the app and bootstrap the Laravel kernel. This way you will have all Laravel service providers registered.
bootstrap/parallel.php
:
<?php
require __DIR__.'/../vendor/autoload.php';
// Bootstrap the Console Kernel
(require_once __DIR__.'/app.php')
->make(Illuminate\Contracts\Console\Kernel::class)
->bootstrap();
Defining an anonymous function as a Worker
to process the tasks.
use HDSSolutions\Console\Parallel\Scheduler;
Scheduler::using(static function(int $number): int {
// here you do some work with the received data
// this portion of code will run on a separated thread
// example process
$microseconds = random_int(100, 500);
echo sprintf("AnonymousWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds);
usleep($microseconds * 1000);
// end example process
// the data returned will be available later
return $number;
});
Creating a class that extends from ParallelWorker
class. This could be useful for complex processes and to maintain your code clean.
ExampleWorker.php
:
use HDSSolutions\Console\Parallel\ParallelWorker;
final class ExampleWorker extends ParallelWorker {
protected function process(int $number = 0): int {
// example process
$microseconds = random_int(100, 500);
echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds);
usleep($microseconds * 1000);
// end example process
return $number;
}
}
use HDSSolutions\Console\Parallel\Scheduler;
Scheduler::using(ExampleWorker::class);
You can also send parameters to the Worker's constructor.
use HDSSolutions\Console\Parallel\ParallelWorker;
final class ExampleWorker extends ParallelWorker {
public function __construct(
private array $multipliers,
) {}
}
use HDSSolutions\Console\Parallel\Scheduler;
Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]);
After defining a Worker, you can schedule tasks that will run in parallel.
use HDSSolutions\Console\Parallel\Scheduler;
foreach (range(1, 100) as $task_data) {
try {
// tasks will start as soon as a thread is available
Scheduler::runTask($task_data);
} catch (Throwable) {
// if no Worker was defined, a RuntimeException will be thrown
// also, Workers have some limitations, see Reference #3 for more info
}
}
Every task has a state. There is also helper functions to check current Task state:
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;
do {
$all_processed = true;
foreach (Scheduler::getTasks() as $task) {
switch (true) {
case $task->isPending():
$all_processed = false;
break;
case $task->isBeingProcessed():
$all_processed = false;
break;
case $task->wasProcessed():
$result = $task->getOutput();
break;
}
}
} while ($all_processed == false);
Instead of checking every task state, you can wait for all tasks to be processed before continue your code execution.
use HDSSolutions\Console\Parallel\Scheduler;
// This will pause execution until all tasks are processed
Scheduler::awaitTasksCompletion();
You can also specify a time limit for waiting. The process will pause until all tasks are processed or until max time has been reached, whatever comes first.
use HDSSolutions\Console\Parallel\Scheduler;
// Pause until all tasks are processed or until 15 minutes pass
Scheduler::awaitTasksCompletion(wait_until: new DateInterval('PT15M'));
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;
foreach (Scheduler::getTasks() as $task) {
// you have access to the Worker class that was used to process the task
$worker = $task->getWorkerClass();
// and the result of the task processed
$result = $task->getOutput();
}
You can stop processing queued tasks if your process needs to stop earlier.
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;
// this will remove tasks from the pending queue
Scheduler::removePendingTasks();
// after cleaning the queue, you should wait for tasks that are currently being processed to finish
Scheduler::awaitTasksCompletion();
$results = [];
$unprocessed_tasks = [];
foreach (Scheduler::getTasks() as $task) {
if ($task->wasProcessed()) {
$results[] = $task->getOutput();
} else {
// tasks that were not processed, will remain in the Pending state
$unprocessed_tasks[] = $task;
}
}
You can remove a specific task from the processing queue if you need to.
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;
foreach (Scheduler::getTasks() as $task) {
// if for some reason you want to remove a task, or just want to free memory when a task finishes
if (someValidation($task) || $task->wasProcessed()) {
// this will remove the task from the processing queue
// IMPORTANT: if the task is already running, it will be stopped
Scheduler::removeTask($task);
}
}
If you need to stop all right away, you can call the Scheduler::stop()
method. This will stop processing all tasks immediately.
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;
// this will stop processing tasks immediately
Scheduler::stop();
// in this state, Tasks should have 3 of the following states
foreach (Scheduler::getTasks() as $task) {
switch (true) {
case $task->isPending():
// Task was never processed
break;
case $task->wasProcessed():
// Task was processed by the Worker
break;
case $task->wasCancelled():
// Task was cancelled while was being processed
break;
}
}
You can control the maximum percentage or number of CPU cores to use by calling the following methods:
use HDSSolutions\Console\Parallel\Scheduler;
Scheduler::setMaxCpuCountUsage(2); // Use at max two CPU cores
Scheduler::setMaxCpuPercentageUsage(0.5); // Use at max 50% of the total of CPU cores
symfony/console
package- Enable a ProgressBar for the worker calling the
withProgress()
method.
use HDSSolutions\Console\Parallel\Scheduler;
$tasks = range(1, 10);
Scheduler::using(ExampleWorker::class)
->withProgress(steps: count($tasks));
Available methods are:
setMessage(string $message)
advance(int $steps)
setProgress(int $step)
display()
clear()
use HDSSolutions\Console\Parallel\ParallelWorker;
final class ExampleWorker extends ParallelWorker {
protected function process(int $number = 0): int {
// example process
$microseconds = random_int(100, 500);
$this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds));
usleep($microseconds * 1000);
$this->advance();
// end example process
return $number;
}
}
28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms
[===========================================>------------------------------------] 53%
elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s
memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB
If you encounter any security-related issues, please feel free to raise a ticket on the issue tracker.
Contributions are welcome! If you find any issues or would like to add new features or improvements, please feel free to submit a pull request.
This library is open-source software licensed under the MIT License. Please see the License File for more information.