Skip to content

Commit

Permalink
Merge pull request #8 from php-etl/fix/execute-actions-in-order
Browse files Browse the repository at this point in the history
Respect loading order to execute actions & pipelines
  • Loading branch information
clemzarch authored Sep 12, 2023
2 parents 78ce3dd + 575a6d3 commit e35c494
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
42 changes: 42 additions & 0 deletions src/ActionProxy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Runtime\Workflow;

use Kiboko\Component\Runtime\Action\Console as ActionConsoleRuntime;
use Kiboko\Component\State;
use Kiboko\Contract\Action\ExecutingActionInterface;
use Kiboko\Contract\Satellite\RunnableInterface;
use Symfony\Component\Console\Output\ConsoleOutput;

class ActionProxy implements RunnableInterface
{
/** @var list<callable> */
private array $queuedCalls = [];

public function __construct(
callable $factory,
private readonly ConsoleOutput $output,
private readonly ExecutingActionInterface $action,
private readonly State\StateOutput\Workflow $state,
private readonly string $filename,
) {
$this->queuedCalls[] = static function (ActionConsoleRuntime $runtime) use ($factory): void {
$factory($runtime);
};
}

public function run(int $interval = 1000): int
{
$runtime = new ActionConsoleRuntime($this->output, $this->action, $this->state->withAction($this->filename));

foreach ($this->queuedCalls as $queuedCall) {
$queuedCall($runtime);
}

$this->queuedCalls = [];

return $runtime->run($interval);
}
}
7 changes: 3 additions & 4 deletions src/Console.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

use Kiboko\Component\Action\Action;
use Kiboko\Component\Pipeline\Pipeline;
use Kiboko\Component\Runtime\Action\ActionRuntimeInterface;
use Kiboko\Component\Runtime\Action\Console as ActionConsoleRuntime;
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
use Kiboko\Component\State;
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
use Kiboko\Contract\Satellite\RunnableInterface;
use Kiboko\Contract\Satellite\RunnableInterface as JobRunnableInterface;
use Symfony\Component\Console\Output\ConsoleOutput;

Expand All @@ -37,13 +36,13 @@ public function loadPipeline(string $filename): PipelineRuntimeInterface
return new PipelineProxy($factory, $this->output, $pipeline, $this->state, basename($filename));
}

public function loadAction(string $filename): ActionRuntimeInterface
public function loadAction(string $filename): RunnableInterface
{
$factory = require $filename;

$action = new Action();

return $factory(new ActionConsoleRuntime($this->output, $action, $this->state->withAction(basename($filename))));
return new ActionProxy($factory, $this->output, $action, $this->state, basename($filename));
}

public function job(JobRunnableInterface $job): self
Expand Down
4 changes: 1 addition & 3 deletions src/WorkflowRuntimeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,4 @@
use Kiboko\Contract\Pipeline\SchedulingInterface;
use Kiboko\Contract\Satellite\RunnableInterface;

interface WorkflowRuntimeInterface extends SchedulingInterface, RunnableInterface
{
}
interface WorkflowRuntimeInterface extends SchedulingInterface, RunnableInterface {}

0 comments on commit e35c494

Please sign in to comment.