diff --git a/bin/composer.json b/bin/composer.json
deleted file mode 100644
index e69de29..0000000
diff --git a/src/Bufferer/AbstractBufferer.php b/src/Bufferer/AbstractBufferer.php
new file mode 100644
index 0000000..76c3cbd
--- /dev/null
+++ b/src/Bufferer/AbstractBufferer.php
@@ -0,0 +1,35 @@
+filePath = $filePath;
+ }
+
+ abstract public function __invoke(): Promise;
+
+ abstract public function isBuffering(): bool;
+
+ abstract public function waitForWrite(): Promise;
+
+ abstract public function getMimeType(): Promise;
+
+ abstract public function getCurrentProgress(): int;
+}
diff --git a/src/Bufferer/BuffererInterface.php b/src/Bufferer/BuffererInterface.php
deleted file mode 100644
index c972710..0000000
--- a/src/Bufferer/BuffererInterface.php
+++ /dev/null
@@ -1,22 +0,0 @@
-logger = $logger;
- $this->inputStream = new ResourceInputStream($inputStream);
- $this->outputStream = new ResourceOutputStream($fOutput = $outputPath ? fopen($outputPath, 'wb') : tmpfile());
+ $this->inputStream = $inputStream;
+ $this->outputStream = $outputStream;
+ $this->server = $server;
$this->mimeType = new Deferred();
- $this->filePath = $outputPath ?: stream_get_meta_data($fOutput)['uri'];
$this->progressBar = new ProgressBar($output, 0, 'buffer');
+ $this->bufferSize = $bufferSize;
+
+ $filePath = '';
+
+ if ($outputStream instanceof ResourceOutputStream) {
+ $filePath = stream_get_meta_data($outputStream->getResource())['uri'];
+ }
+
+ parent::__construct($filePath);
}
public function __invoke(): Promise
@@ -72,34 +80,39 @@ public function __invoke(): Promise
$generator = function (): \Generator {
$this->logger->debug("Saving stdin to $this->filePath");
- $bytesDownloaded = 0;
while (null !== $chunk = yield $this->inputStream->read()) {
yield $this->outputStream->write($chunk);
- if ($bytesDownloaded === 0) {
+ if ($this->progressBar->step === 0) {
$mimeType = (new \finfo(FILEINFO_MIME))->buffer($chunk);
$this->logger->debug(sprintf('Stdin MIME type detected: "%s"', $mimeType));
$this->mimeType->resolve($mimeType);
}
- $this->progressBar->setProgress($bytesDownloaded += strlen($chunk));
+ $this->progressBar->advance(strlen($chunk));
$this->resolveDeferrer();
+
+ if ($this->progressBar->step < $this->bufferSize) {
+ continue;
+ }
+
+ $this->logger->warning(
+ 'Max buffer size reached. Disabling buffering and falling back to piping stdin to socket directly. No new client connections will be accepted.'
+ );
+ $this->server->close();
+
+ break;
}
$this->buffering = false;
$this->progressBar->finish();
- $this->logger->debug("Stdin transfer done, $bytesDownloaded bytes downloaded");
+ $this->logger->debug("Buffering to file stopped, {$this->progressBar->step} bytes stored");
$this->resolveDeferrer();
};
return new Coroutine($generator());
}
- public function getFilePath(): string
- {
- return $this->filePath;
- }
-
public function getMimeType(): Promise
{
return $this->mimeType->promise();
diff --git a/src/Bufferer/ResolvedBufferer.php b/src/Bufferer/ResolvedBufferer.php
index 32d9472..1ee4a4d 100644
--- a/src/Bufferer/ResolvedBufferer.php
+++ b/src/Bufferer/ResolvedBufferer.php
@@ -7,12 +7,8 @@
use Amp\Promise;
use Amp\Success;
-class ResolvedBufferer implements BuffererInterface
+class ResolvedBufferer extends AbstractBufferer
{
- /**
- * @var string
- */
- private $filePath;
/**
* @var int
*/
@@ -24,7 +20,7 @@ class ResolvedBufferer implements BuffererInterface
public function __construct(string $filePath)
{
- $this->filePath = $filePath;
+ parent::__construct($filePath);
$this->filesize = filesize($filePath);
$this->mimeType = (new \finfo(FILEINFO_MIME))->file($filePath);
}
@@ -34,11 +30,6 @@ public function __invoke(): Promise
return new Success();
}
- public function getFilePath(): string
- {
- return $this->filePath;
- }
-
public function isBuffering(): bool
{
return false;
diff --git a/src/Command.php b/src/Command.php
index d0341e9..2923fe8 100644
--- a/src/Command.php
+++ b/src/Command.php
@@ -4,9 +4,14 @@
namespace Ostrolucky\Stdinho;
+use Amp\ByteStream\ResourceInputStream;
+use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
+use Amp\Socket\Server;
+use Ostrolucky\Stdinho\Bufferer\AbstractBufferer;
use Ostrolucky\Stdinho\Bufferer\PipeBufferer;
use Ostrolucky\Stdinho\Bufferer\ResolvedBufferer;
+use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Exception\InvalidOptionException;
use Symfony\Component\Console\Exception\LogicException;
use Symfony\Component\Console\Input\InputArgument;
@@ -52,6 +57,12 @@ protected function configure(): void
'Determines after how many client connections should program shut down',
INF
)
+ ->addOption(
+ 'buffer-size',
+ 'b',
+ InputOption::VALUE_REQUIRED,
+ 'Buffer size in bytes. By default, it is 90% of available disk space'
+ )
->setDescription('Turn any STDIN/STDOUT into HTTP server')
;
}
@@ -96,28 +107,55 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
+ $addressPort = $input->getArgument('addressPort');
+ $bufferSize = $input->getOption('buffer-size');
+ $connectionsLimit = (float)$input->getOption('connections-limit');
+ $filePath = $input->getOption('file');
+
+ $server = listen($addressPort);
$logger = new ConsoleLogger($firstSection = $output->section());
+ $bufferer = $this->createBufferer($output, $logger, $server, $filePath, $bufferSize);
- $bufferer = $this->hasStdin ?
- new PipeBufferer($logger, STDIN, $input->getOption('file'), $output->section()) :
- new ResolvedBufferer($input->getOption('file'))
- ;
+ $firstSection->writeln(
+ "Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.\n"
+ );
- $bufferHandler = asyncCoroutine($bufferer);
- $clientHandler = asyncCoroutine(new Responder($logger, $bufferer, $output, $this->customHttpHeaders));
+ Loop::run(function () use (&$connectionsLimit, $server, $logger, $output, $bufferer) {
+ asyncCoroutine($bufferer)();
- Loop::run(function () use ($input, $clientHandler, $firstSection, $bufferHandler) {
- $bufferHandler();
- $server = listen($input->getArgument('addressPort'));
- $firstSection->writeln(
- "Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.\n"
- );
- $connectionsLimit = $input->getOption('connections-limit');
while ($connectionsLimit-- && ($socket = yield $server->accept())) {
- $clientHandler($socket);
+ $responder = new Responder(
+ $logger,
+ $bufferer,
+ $output,
+ $this->customHttpHeaders,
+ new ResourceInputStream(fopen($bufferer->filePath, 'rb'))
+ );
+ asyncCoroutine($responder)($socket);
}
});
return 0;
}
+
+ private function createBufferer(
+ ConsoleOutput $output,
+ LoggerInterface $logger,
+ Server $server,
+ ?string $filePath,
+ ?string $bufferSize
+ ): AbstractBufferer {
+ if (!$this->hasStdin) {
+ return new ResolvedBufferer($filePath);
+ }
+
+ return new PipeBufferer(
+ $logger,
+ new ResourceInputStream(STDIN),
+ new ResourceOutputStream($filePath ? fopen($filePath, 'wb') : tmpfile()),
+ $output->section(),
+ $server,
+ (int)($bufferSize ?? disk_free_space($filePath ?: sys_get_temp_dir()) * .9)
+ );
+ }
}
diff --git a/src/ProgressBar.php b/src/ProgressBar.php
index d6bd815..dcfdaa3 100644
--- a/src/ProgressBar.php
+++ b/src/ProgressBar.php
@@ -102,12 +102,7 @@ public function __construct(ConsoleSectionOutput $output, int $max, string $form
*/
public function advance(int $step): void
{
- $this->setProgress($this->step + $step);
- }
-
- public function setProgress(int $step): void
- {
- $this->step = $step;
+ $this->step += $step;
$this->percent = $this->max ? (float)$step / $this->max : 0;
if (microtime(true) - $this->lastWriteTime < ($this->output->isDecorated() ? .1 : 1)) {
diff --git a/src/Responder.php b/src/Responder.php
index b5507af..5e5d94e 100644
--- a/src/Responder.php
+++ b/src/Responder.php
@@ -4,10 +4,10 @@
namespace Ostrolucky\Stdinho;
-use Amp\ByteStream\ResourceInputStream;
+use Amp\ByteStream\InputStream;
use Amp\ByteStream\StreamException;
use Amp\Socket\Socket;
-use Ostrolucky\Stdinho\Bufferer\BuffererInterface;
+use Ostrolucky\Stdinho\Bufferer\AbstractBufferer;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Output\ConsoleOutput;
@@ -18,7 +18,7 @@ class Responder
*/
private $logger;
/**
- * @var BuffererInterface
+ * @var AbstractBufferer
*/
private $bufferer;
/**
@@ -29,20 +29,26 @@ class Responder
* @var string[]
*/
private $customHttpHeaders = [];
+ /**
+ * @var InputStream
+ */
+ private $inputStream;
/**
* @param string[] $customHttpHeaders
*/
public function __construct(
LoggerInterface $logger,
- BuffererInterface $bufferer,
+ AbstractBufferer $bufferer,
ConsoleOutput $consoleOutput,
- array $customHttpHeaders
+ array $customHttpHeaders,
+ InputStream $inputStream
) {
$this->logger = $logger;
$this->bufferer = $bufferer;
$this->consoleOutput = $consoleOutput;
$this->customHttpHeaders = $customHttpHeaders;
+ $this->inputStream = $inputStream;
}
public function __invoke(Socket $socket): \Generator
@@ -52,7 +58,7 @@ public function __invoke(Socket $socket): \Generator
$header = [
'HTTP/1.1 200 OK',
- 'Content-Disposition: inline; filename="'.basename($this->bufferer->getFilePath()).'"',
+ 'Content-Disposition: inline; filename="'.basename($this->bufferer->filePath).'"',
'Content-Type:'.yield $this->bufferer->getMimeType(),
'Connection: close',
];
@@ -68,8 +74,6 @@ public function __invoke(Socket $socket): \Generator
$remoteAddress
);
- $handle = new ResourceInputStream(fopen($this->bufferer->getFilePath(), 'rb'));
-
try {
yield $socket->write(implode("\r\n", array_merge($header, $this->customHttpHeaders))."\r\n\r\n");
@@ -91,14 +95,23 @@ public function __invoke(Socket $socket): \Generator
continue;
}
- if (($chunk = yield $handle->read()) === null) {
- break; // No more buffering and client caught up to it -> finish download
+ if (null !== $chunk = yield $this->inputStream->read()) {
+ yield $socket->write($chunk);
+
+ $progressBar->max = $this->bufferer->getCurrentProgress();
+ $progressBar->advance(strlen($chunk));
+
+ continue;
}
- yield $socket->write($chunk);
+ if (!$this->bufferer->inputStream) {
+ break; // All input sources depleted -> finish download
+ }
- $progressBar->max = $this->bufferer->getCurrentProgress();
- $progressBar->advance(strlen($chunk));
+ // Use fallback inputStream - handy when PipeBufferer exits sooner than it consumes its inputStream
+ $this->inputStream = $this->bufferer->inputStream;
+ // Prevent other potential Responders to consume same inputStream. This can be solved in future
+ $this->bufferer->inputStream = null;
}
$progressBar->finish();
$this->logger->debug("$remoteAddress finished download");
@@ -106,7 +119,6 @@ public function __invoke(Socket $socket): \Generator
$this->logger->debug("$remoteAddress aborted download");
}
- $handle->close();
$socket->end();
}
}
diff --git a/tests/IntegrationTest.php b/tests/IntegrationTest.php
new file mode 100644
index 0000000..10b61db
--- /dev/null
+++ b/tests/IntegrationTest.php
@@ -0,0 +1,84 @@
+createMock(InputStream::class);
+ $buffererOutputStream = $this->createMock(OutputStream::class);
+ $responderInputStream = $this->createMock(InputStream::class);
+ $consoleOutput = $this->createMock(ConsoleOutput::class);
+ $sectionOutput = $this->createMock(ConsoleSectionOutput::class);
+ $server = $this->createMock(Server::class);
+ $logger = new TestLogger();
+
+ $bufferer = new PipeBufferer($logger, $buffererInputStream, $buffererOutputStream, $sectionOutput, $server, 3);
+ $responder = new Responder($logger, $bufferer, $consoleOutput, [], $responderInputStream);
+
+ $socket = $this->getMockBuilder(ClientSocket::class)
+ ->disableOriginalConstructor()
+ ->setMethods(['read', 'write', 'getRemoteAddress', 'end'])
+ ->getMock()
+ ;
+
+ $consoleOutput->method('section')->willReturn($sectionOutput);
+ $sectionOutput->method('getFormatter')->willReturn($this->createMock(OutputFormatterInterface::class));
+ $socket->method('read')->willReturn(new Success(''));
+
+ $buffererInputStream->method('read')->willReturn(
+ new Delayed(0, $foo = 'foo'),
+ new Delayed(0, $bar = 'bar'),
+ new Delayed(0, $baz = 'baz'),
+ new Success()
+ );
+
+ $buffererOutputStream
+ ->expects($this->exactly(1))
+ ->method('write')
+ ->willReturn(new Success())
+ ;
+
+ $responderInputStream
+ ->expects($this->exactly(2))
+ ->method('read')
+ ->willReturn(new Success($foo), new Success())
+ ;
+
+ $socket
+ ->expects($this->exactly(4))
+ ->method('write')
+ ->withConsecutive([Assert::stringContains('HTTP/1.1 200 OK')], [$foo], [$bar], [$baz])
+ ->willReturn(new Success())
+ ;
+
+ $server->expects($this->exactly(1))->method('close');
+
+ Loop::run(function () use ($socket, $bufferer, $responder): void {
+ asyncCoroutine($bufferer)();
+ asyncCoroutine($responder)($socket);
+ });
+
+ self::assertTrue($logger->hasWarningThatContains('Max buffer size reached'));
+ }
+}
diff --git a/tests/ResponderTest.php b/tests/ResponderTest.php
index e3ed970..3d90a25 100644
--- a/tests/ResponderTest.php
+++ b/tests/ResponderTest.php
@@ -4,6 +4,7 @@
namespace Ostrolucky\Stdinho\Tests;
+use Amp\ByteStream\InputStream;
use Amp\Coroutine;
use Amp\Socket\ClientSocket;
use Amp\Success;
@@ -22,7 +23,8 @@ public function testResponderHandlesClientAbruptDisconnect(): void
$logger = new TestLogger(),
new ResolvedBufferer(__FILE__),
$this->createMock(ConsoleOutput::class),
- []
+ [],
+ $this->createMock(InputStream::class)
);
$socket = $this->getMockBuilder(ClientSocket::class)