Skip to content

Commit

Permalink
corrections, rework to use InputStream, update to have child output d…
Browse files Browse the repository at this point in the history
…isplayed when data received.
  • Loading branch information
TheTechsTech committed Feb 27, 2020
1 parent e3d5d0f commit f05a82b
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Processor/Container.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

$task = Processor::decodeTask($serializedClosure);

$output = $task();
$output = \call_user_func($task);

$serializedOutput = \base64_encode(\serialize($output));

Expand Down
8 changes: 8 additions & 0 deletions Processor/InputStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

declare(strict_types=1);

namespace Async\Processor;

class InputStream extends \Symfony\Component\Process\InputStream
{ }
57 changes: 38 additions & 19 deletions Processor/Launcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Launcher implements LauncherInterface
protected $output;
protected $errorOutput;
protected $realOutput;
protected $realTimeType;
protected $realTimeOutput;

protected $startTime;
Expand Down Expand Up @@ -55,7 +56,9 @@ public function start(): LauncherInterface
$this->startTime = \microtime(true);

$this->process->start(function ($type, $buffer) {
$this->realTimeType = $type;
$this->realTimeOutput .= $buffer;
$this->display($buffer);
});

$this->pid = $this->process->getPid();
Expand Down Expand Up @@ -90,11 +93,10 @@ public function yielding()
return yield from $this->run(true);
}

public function display()
public function display($buffer = null)
{
if ($this->showOutput) {
\printf('%s', $this->getRealOutput());
$this->realOutput = null;
\printf('%s', $this->realTime($buffer));
}
}

Expand All @@ -109,12 +111,11 @@ public function wait($waitTimer = 1000, bool $useYield = false)
return $this->triggerTimeout();
}

$this->display();

if ($useYield)
$this->yieldLiveUpdate($this->getRealOutput());
else
$this->triggerOutput($this->getRealOutput());
$this->yieldLiveUpdate($this->realTimeType, $this->realTime($this->realTimeOutput));
else {
$this->triggerOutput($this->realTimeType, $this->realTime($this->realTimeOutput));
}

\usleep($waitTimer);
}
Expand Down Expand Up @@ -175,6 +176,11 @@ public function isTerminated(): bool
return $this->process->isTerminated();
}

public function setInput($input)
{
return $this->process->setInput($input);
}

public function getOutput()
{
if (!$this->output) {
Expand All @@ -196,24 +202,36 @@ public function getRealOutput()
$processOutput = $this->realTimeOutput;

$this->realTimeOutput = null;

$this->realOutput = @\unserialize(\base64_decode((string) $processOutput));

if (!$this->realOutput) {
$this->realOutput = $processOutput;
}
} elseif ($this->realTimeOutput) {
$this->realOutput = @\unserialize(\base64_decode((string) $this->realTimeOutput));
$this->realTimeOutput = null;
}

$this->realOutput = \is_string($this->realOutput)
? \rtrim($this->realOutput, 'Tjs=')
? \str_replace('Tjs=', '', $this->realOutput)
: $this->realOutput;

return $this->realOutput;
}

protected function realTime($buffer = null)
{
if (!empty($buffer)) {
$processOutput = $buffer;
$realOutput = @\unserialize(\base64_decode($processOutput));
if (!$realOutput) {
$realOutput = $processOutput;
}

$realOutput = \is_string($realOutput)
? \str_replace('Tjs=', '', $realOutput)
: $realOutput;

return $realOutput;
}
}

public function getErrorOutput()
{
if (!$this->errorOutput) {
Expand Down Expand Up @@ -287,10 +305,10 @@ public function timeout(callable $callback): LauncherInterface
return $this;
}

public function triggerOutput($update = null)
public function triggerOutput($type, $data = null)
{
foreach ($this->progressCallbacks as $progressCallback) {
$progressCallback($update);
$progressCallback($type, $data);
}
}

Expand All @@ -303,7 +321,7 @@ public function triggerSuccess()
return $this->triggerError();
} else {
$output = $this->getOutput();
$output = !empty($this->output) ? $output : $this->realTimeOutput;
$output = !empty($this->output) ? $output : $this->getRealOutput();
}

foreach ($this->successCallbacks as $callback)
Expand All @@ -330,10 +348,10 @@ public function triggerTimeout()
$callback();
}

public function yieldLiveUpdate($update = null, $type = null)
public function yieldLiveUpdate($type, $data = null)
{
foreach ($this->progressCallbacks as $progressCallback) {
yield $progressCallback($update, $type);
yield $progressCallback($type, $data);
}
}

Expand All @@ -343,6 +361,7 @@ public function yieldSuccess()
return $this->yieldError();
} else {
$output = $this->getOutput();
//$output = !empty($this->output) ? $output : $this->getRealOutput();
}

foreach ($this->successCallbacks as $callback) {
Expand Down
19 changes: 16 additions & 3 deletions Processor/LauncherInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ public function progress(callable $progressCallback): LauncherInterface;

/**
* Call the progressCallbacks on the process output in real time
*
* @param mixed $update
*/
public function triggerOutput($update = null);
public function triggerOutput($type, $data = null);

/**
* Add handlers to be called when the process has errors
Expand Down Expand Up @@ -126,6 +124,8 @@ public function yieldError();

public function yieldTimeout();

public function yieldLiveUpdate($type, $data = null);

/**
* Returns the Pid (process identifier), if applicable.
*
Expand Down Expand Up @@ -168,6 +168,19 @@ public function isTerminated(): bool;
*/
public function isSuccessful(): bool;

/**
* Sets the input.
*
* This content will be passed to the underlying process standard input.
*
* @param string|int|float|bool|resource|\Traversable|null $input The content
*
* @return LauncherInterface
*
* @throws LogicException In case the process is running
*/
public function setInput($input);

/**
* Set process to display output of child process.
*
Expand Down
4 changes: 3 additions & 1 deletion Processor/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public static function init(string $autoload = null)
/**
* Create a sub process for callable, cmd script, or any binary application.
*
* @param mixed $task
* @param mixed $task The command to run and its arguments
* @param int|float|null $timeout The timeout in seconds or null to disable
* @param mixed|null $input The input as stream resource, scalar or \Traversable, or null for no input
*
* @return LauncherInterface
*/
Expand Down
Loading

0 comments on commit f05a82b

Please sign in to comment.