Skip to content

Commit

Permalink
Merge pull request #7 from hschimpf/feature/decoupled-runner
Browse files Browse the repository at this point in the history
[v2.0] Framework Redesign
  • Loading branch information
hschimpf authored Apr 7, 2023
2 parents 2b54963 + 0c978b0 commit 6820c32
Show file tree
Hide file tree
Showing 55 changed files with 2,140 additions and 973 deletions.
85 changes: 65 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ composer require hds-solutions/parallel-sdk
## Usage
Firstly, you need to set the bootstrap file for parallel. Setting the composer's autoloader is enough. See reference [#1](#references) for more info.
```php
// check if extension is loaded to allow deploying even in envorinments where parallel isn't installed
// check if extension is loaded to allow deploying even in environments where parallel isn't installed
if (extension_loaded('parallel')) {
// set the path to composer's autoloader
parallel\bootstrap(__DIR__.'/vendor/autoload.php');
Expand Down Expand Up @@ -80,19 +80,37 @@ final class ExampleWorker extends ParallelWorker {
```php
use HDSSolutions\Console\Parallel\Scheduler;

$worker = new ExampleWorker();
Scheduler::using($worker);
Scheduler::using(ExampleWorker::class);
```

You can also send parameters to the Worker's constructor.
```php
use HDSSolutions\Console\Parallel\ParallelWorker;

final class ExampleWorker extends ParallelWorker {

public function __construct(
private array $multipliers,
) {}

}
```

```php
use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::using(ExampleWorker::class, [ 2, 4, 8 ]);
```

### Schedule tasks
After defining a Worker, you can schedule tasks that will run in parallel.
```php
use HDSSolutions\Console\Parallel\Scheduler;

foreach (range(1, 100) as $task) {
foreach (range(1, 100) as $task_data) {
try {
// tasks will start as soon as a thread is available
Scheduler::runTask($task);
Scheduler::runTask($task_data);

} catch (Throwable) {
// if no Worker was defined, a RuntimeException will be thrown
Expand All @@ -101,17 +119,52 @@ foreach (range(1, 100) as $task) {
}
```

### Check Tasks state
Every task has an state. There is also helper functions to check current Task state:
```php
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\Task;

do {
$all_processed = true;
foreach (Scheduler::getTasks() as $task) {
switch (true) {
case $task->isPending():
$all_processed = false;
break;

case $task->isBeingProcessed():
$all_processed = false;
break;

case $task->wasProcessed():
$result = $task->getResult();
break;
}
}
} while ($all_processed == false);
```

### Wait for tasks completion
Instead of checking every task state, you can wait for all tasks to be processed before continue your code execution.
```php
use HDSSolutions\Console\Parallel\Scheduler;

// This will pause execution until all tasks are processed
Scheduler::awaitTasksCompletion();
```

### Get processed tasks result

```php
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Parallel\ProcessedTask;
use HDSSolutions\Console\Parallel\Task;

foreach (Scheduler::getProcessedTasks() as $processed_task) {
foreach (Scheduler::getTasks() as $task) {
// you have access to the Worker class that was used to processed the task
$worker = $processed_task->getWorkerClass();
$worker = $task->getWorkerClass();
// and the result of the task processed
$result = $processed_task->getResult();
$result = $task->getResult();
}
```

Expand All @@ -126,9 +179,8 @@ use HDSSolutions\Console\Parallel\Scheduler;

$tasks = range(1, 10);

$worker = new ExampleWorker();
Scheduler::using($worker)
->withProgress(steps: count($tasks);
Scheduler::using(ExampleWorker::class)
->withProgress(steps: count($tasks));
```

#### Usage from Worker
Expand Down Expand Up @@ -166,14 +218,6 @@ final class ExampleWorker extends ParallelWorker {
memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB
```

## Graceful close all resources
This method will close all resources used internally by the `Scheduler` instance.
```php
use HDSSolutions\Console\Parallel\Scheduler;

Scheduler::disconnect();
```

### References
1. [parallel\bootstrap()](https://www.php.net/manual/en/parallel.bootstrap.php)
2. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics)
Expand All @@ -186,3 +230,4 @@ If you encounter any security related issue, feel free to raise a ticket on the

# Licence
GPL-3.0 Please see [License File](LICENSE) for more information.

2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"src/helpers.php"
],
"psr-4": {
"HDSSolutions\\Console\\": "src/"
"HDSSolutions\\Console\\Parallel\\": "src/"
}
},
"require-dev": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace HDSSolutions\Console\Parallel\Contracts;

use HDSSolutions\Console\Parallel\ProcessedTask;

interface ParallelWorker {

/**
Expand Down Expand Up @@ -92,10 +90,8 @@ public function getStartedAt(): ?float;
public function getFinishedAt(): ?float;

/**
* Returns the processed task
*
* @return ProcessedTask Processed task
* @return mixed Processed Task result
*/
public function getProcessedTask(): ProcessedTask;
public function getResult(): mixed;

}
86 changes: 86 additions & 0 deletions src/Contracts/Task.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Contracts;

interface Task {

/**
* ## Task has not yet been processed
*/
public const STATE_Pending = 0;

/**
* ## Task processing is starting
*/
public const STATE_Starting = 1;

/**
* ## Task is being currently processed
*/
public const STATE_Processing = 2;

/**
* ## Task has been processed
*/
public const STATE_Processed = 3;

/**
* ## Available states of the Task
*/
public const STATES = [
self::STATE_Pending,
self::STATE_Starting,
self::STATE_Processing,
self::STATE_Processed,
];

/**
* @return int Identifier of the task
*/
public function getIdentifier(): int;

/**
* @return string Worker class assigned to process this Task
*/
public function getWorkerClass(): string;

/**
* @return int Identifier of the registered worker
* @internal
*/
public function getWorkerId(): int;

/**
* @return mixed Data of the Task
*/
public function getData(): mixed;

/**
* Returns the current state of the Task
*
* @return int Current Task state
* @see Task::STATES
*/
public function getState(): int;

/**
* @return bool True if the task is pending
*/
public function isPending(): bool;

/**
* @return bool True if the task is being currently processed
*/
public function isBeingProcessed(): bool;

/**
* @return bool True if the Task was processed
*/
public function wasProcessed(): bool;

/**
* @return mixed Result of the Task
*/
public function getResult(): mixed;

}
32 changes: 32 additions & 0 deletions src/Contracts/TwoWayChannel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Contracts;

interface TwoWayChannel {

/**
* Shall receive a value from input channel
*
* @return mixed
*/
public function receive(): mixed;

/**
* Shall send the given value to output channel
*
* @param mixed $value Value to send
*
* @return mixed
*/
public function send(mixed $value): mixed;

/**
* Shall send true as value to output channel
*
* @return bool
*/
public function release(): bool;

public function close(): void;

}
35 changes: 35 additions & 0 deletions src/Exceptions/ParallelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Exceptions;

use Throwable;

final class ParallelException {

private string $message;

private string $file;

private int $line;

public function __construct(
Throwable $e,
) {
$this->message = $e->getMessage();
$this->file = $e->getFile();
$this->line = $e->getLine();
}

public function getMessage(): string {
return $this->message;
}

public function getFile(): string {
return $this->file;
}

public function getLine(): int {
return $this->line;
}

}
26 changes: 26 additions & 0 deletions src/Internals/Commands/ParallelCommandMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands;

use RuntimeException;

abstract class ParallelCommandMessage {

/**
* @param string $action Action to execute
* @param array $args Arguments to pass to the action
*/
public function __construct(
private string $action,
private array $args = [],
) {}

public function __get(string $name) {
if ( !property_exists($this, $name)) {
throw new RuntimeException(sprintf('Invalid property "%s"', $name));
}

return $this->$name;
}

}
21 changes: 21 additions & 0 deletions src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\ProgressBar;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::enableProgressBar()} action
*/
final class EnableProgressBarMessage extends ParallelCommandMessage {

/**
* @param string $identifier
* @param int $steps
*/
public function __construct(string $identifier, int $steps) {
parent::__construct('enable_progress_bar', [ $identifier, $steps ]);
}

}
20 changes: 20 additions & 0 deletions src/Internals/Commands/ProgressBar/ProgressBarActionMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\ProgressBar;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;

/**
* Message sent to {@see ProgressBarWorker} to execute {@see ProgressBarWorker::progressBarAction()}
*/
final class ProgressBarActionMessage extends ParallelCommandMessage {

/**
* @param string $action
* @param array $args
*/
public function __construct(string $action, array $args) {
parent::__construct('progress_bar_action', [ $action, $args ]);
}

}
Loading

0 comments on commit 6820c32

Please sign in to comment.