Skip to content

Commit

Permalink
Fix premature server shutdown after buffer-size is exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
ostrolucky committed Apr 22, 2019
1 parent 80aa37a commit 6aa88c3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 26 deletions.
12 changes: 11 additions & 1 deletion src/Bufferer/PipeBufferer.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class PipeBufferer extends AbstractBufferer
* @var Server
*/
private $server;
/**
* @var Promise
*/
private $promiseThatIsResolvedWhenSomebodyConnects;
/**
* @var Deferred
*/
Expand Down Expand Up @@ -56,12 +60,14 @@ public function __construct(
OutputStream $outputStream,
ConsoleSectionOutput $output,
Server $server,
Promise $promiseThatIsResolvedWhenSomebodyConnects,
int $bufferSize
) {
$this->logger = $logger;
$this->inputStream = $inputStream;
$this->outputStream = $outputStream;
$this->server = $server;
$this->promiseThatIsResolvedWhenSomebodyConnects = $promiseThatIsResolvedWhenSomebodyConnects;
$this->mimeType = new Deferred();
$this->progressBar = new ProgressBar($output, 0, 'buffer');
$this->bufferSize = $bufferSize;
Expand Down Expand Up @@ -92,13 +98,17 @@ public function __invoke(): Promise
$this->progressBar->advance(strlen($chunk));
$this->resolveDeferrer();

// This happens after write in order to give bufferer a chance to detect mimeType even if bufferSize is 0
if ($this->progressBar->step < $this->bufferSize) {
continue;
}

$this->logger->warning(
'Max buffer size reached. Disabling buffering and falling back to piping stdin to socket directly. No new client connections will be accepted.'
'Max buffer size reached. Disabling buffering and falling back to piping stdin to socket directly. Only one client connection allowed.'
);

// Wait until at least one client connects. For first client it's safe to consume STDIN directly.
yield $this->promiseThatIsResolvedWhenSomebodyConnects;
$this->server->close();

break;
Expand Down
12 changes: 9 additions & 3 deletions src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Socket\Server;
use Ostrolucky\Stdinho\Bufferer\AbstractBufferer;
use Ostrolucky\Stdinho\Bufferer\PipeBufferer;
Expand Down Expand Up @@ -112,15 +114,16 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$connectionsLimit = (float)$input->getOption('connections-limit');
$filePath = $input->getOption('file');

$newConnDefer = new Deferred();
$server = listen($addressPort);
$logger = new ConsoleLogger($firstSection = $output->section());
$bufferer = $this->createBufferer($output, $logger, $server, $filePath, $bufferSize);
$bufferer = $this->createBufferer($output, $logger, $server, $newConnDefer->promise(), $filePath, $bufferSize);

$firstSection->writeln(
"<info>Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.</info>\n"
);

Loop::run(function () use (&$connectionsLimit, $server, $logger, $output, $bufferer) {
Loop::run(function () use ($newConnDefer, &$connectionsLimit, $server, $logger, $output, $bufferer) {
asyncCoroutine($bufferer)();

while ($connectionsLimit-- && ($socket = yield $server->accept())) {
Expand All @@ -129,7 +132,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$bufferer,
$output,
$this->customHttpHeaders,
new ResourceInputStream(fopen($bufferer->filePath, 'rb'))
new ResourceInputStream(fopen($bufferer->filePath, 'rb')),
$newConnDefer
);
asyncCoroutine($responder)($socket);
}
Expand All @@ -142,6 +146,7 @@ private function createBufferer(
ConsoleOutput $output,
LoggerInterface $logger,
Server $server,
Promise $promiseThatIsResolvedWhenSomebodyConnects,
?string $filePath,
?string $bufferSize
): AbstractBufferer {
Expand All @@ -155,6 +160,7 @@ private function createBufferer(
new ResourceOutputStream($filePath ? fopen($filePath, 'wb') : tmpfile()),
$output->section(),
$server,
$promiseThatIsResolvedWhenSomebodyConnects,
(int)($bufferSize ?? disk_free_space($filePath ?: sys_get_temp_dir()) * .9)
);
}
Expand Down
13 changes: 12 additions & 1 deletion src/Responder.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Amp\ByteStream\InputStream;
use Amp\ByteStream\StreamException;
use Amp\Deferred;
use Amp\Socket\Socket;
use Ostrolucky\Stdinho\Bufferer\AbstractBufferer;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -33,6 +34,10 @@ class Responder
* @var InputStream
*/
private $inputStream;
/**
* @var Deferred
*/
private $defererThatIsResolvedWhenSomebodyConnects;

/**
* @param string[] $customHttpHeaders
Expand All @@ -42,13 +47,15 @@ public function __construct(
AbstractBufferer $bufferer,
ConsoleOutput $consoleOutput,
array $customHttpHeaders,
InputStream $inputStream
InputStream $inputStream,
Deferred $defererThatIsResolvedWhenSomebodyConnects
) {
$this->logger = $logger;
$this->bufferer = $bufferer;
$this->consoleOutput = $consoleOutput;
$this->customHttpHeaders = $customHttpHeaders;
$this->inputStream = $inputStream;
$this->defererThatIsResolvedWhenSomebodyConnects = $defererThatIsResolvedWhenSomebodyConnects;
}

public function __invoke(Socket $socket): \Generator
Expand All @@ -67,6 +74,10 @@ public function __invoke(Socket $socket): \Generator
$header[] = "Content-Length: {$this->bufferer->getCurrentProgress()}";
}

// Only at this point it's safe to resolve the promise, otherwise Responder
// would think everything was buffered and specify wrong content length!
$this->defererThatIsResolvedWhenSomebodyConnects->resolve();

$progressBar = new ProgressBar(
$this->consoleOutput->section(),
$this->bufferer->getCurrentProgress(),
Expand Down
40 changes: 29 additions & 11 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class FunctionalTest extends TestCase
* @var DefaultClient
*/
private $httpClient;
/**
* @var string
*/
private $command;

public static function setUpBeforeClass(): void
{
Expand All @@ -31,21 +35,15 @@ public static function setUpBeforeClass(): void

protected function setUp(): void
{
Loop::run(function () {
Loop::run(function (): void {
$isCoverageEnabled = array_filter($_SERVER['argv'], function (string $arg) {
return strpos($arg, '--coverage') === 0;
});
$script = $isCoverageEnabled ? 'coverage-enabling-bin-wrapper.php' : '../bin/stdinho';

$this->process = new Process('php '.__DIR__."/$script --connections-limit=1 localhost:1338");

yield $this->process->start();
$executable = $isCoverageEnabled ? 'coverage-enabling-bin-wrapper.php' : '../bin/stdinho';
$this->command = 'php '.__DIR__."/$executable --connections-limit=1 localhost:1338 ";

$this->httpClient = new DefaultClient();
$this->httpClient->setOption(DefaultClient::OP_TRANSFER_TIMEOUT, 400);

// wait till server booted and listens to connections
yield $this->process->getStdout()->read();
});
}

Expand All @@ -58,10 +56,14 @@ protected function tearDown(): void

/**
* Tests syncing of PipeBufferer and Responder, essentially tests commit be4421d6911888e839f3a850b650ee2c6de30b25
*
* @dataProvider cliArgumentsProvider
*/
public function testStdinIsWrittenToSocketASAP(): void
public function testStdinIsWrittenToSocketASAP(string $cliArguments): void
{
Loop::run(function () {
Loop::run(function () use ($cliArguments) {
yield from $this->bootServer($cliArguments);

yield $this->process->getStdin()->write('foo');
yield new Delayed(60);

Expand All @@ -75,4 +77,20 @@ public function testStdinIsWrittenToSocketASAP(): void
self::assertEquals('bar', yield $response->getBody()->read());
});
}

/**
* @return string[][]
*/
public function cliArgumentsProvider(): array
{
return [[''], ['--buffer-size=0']];
}

private function bootServer(string $arguments): \Generator
{
$this->process = new Process($this->command.$arguments);
yield $this->process->start();
// wait till server booted and listens to connections
yield $this->process->getStdout()->read();
}
}
19 changes: 10 additions & 9 deletions tests/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use Amp\Socket\ClientSocket;
Expand All @@ -25,35 +26,35 @@ class IntegrationTest extends TestCase
{
public function testBufferOverflow(): void
{
$buffererInputStream = $this->createMock(InputStream::class);
$buffererOutputStream = $this->createMock(OutputStream::class);
$buffererInput = $this->createMock(InputStream::class);
$buffererOutput = $this->createMock(OutputStream::class);
$responderInputStream = $this->createMock(InputStream::class);
$consoleOutput = $this->createMock(ConsoleOutput::class);
$sectionOutput = $this->createMock(ConsoleSectionOutput::class);
$section = $this->createMock(ConsoleSectionOutput::class);
$server = $this->createMock(Server::class);
$logger = new TestLogger();

$bufferer = new PipeBufferer($logger, $buffererInputStream, $buffererOutputStream, $sectionOutput, $server, 3);
$responder = new Responder($logger, $bufferer, $consoleOutput, [], $responderInputStream);
$bufferer = new PipeBufferer($logger, $buffererInput, $buffererOutput, $section, $server, new Success(), 3);
$responder = new Responder($logger, $bufferer, $consoleOutput, [], $responderInputStream, new Deferred());

$socket = $this->getMockBuilder(ClientSocket::class)
->disableOriginalConstructor()
->setMethods(['read', 'write', 'getRemoteAddress', 'end'])
->getMock()
;

$consoleOutput->method('section')->willReturn($sectionOutput);
$sectionOutput->method('getFormatter')->willReturn($this->createMock(OutputFormatterInterface::class));
$consoleOutput->method('section')->willReturn($section);
$section->method('getFormatter')->willReturn($this->createMock(OutputFormatterInterface::class));
$socket->method('read')->willReturn(new Success(''));

$buffererInputStream->method('read')->willReturn(
$buffererInput->method('read')->willReturn(
new Delayed(0, $foo = 'foo'),
new Delayed(0, $bar = 'bar'),
new Delayed(0, $baz = 'baz'),
new Success()
);

$buffererOutputStream
$buffererOutput
->expects($this->exactly(1))
->method('write')
->willReturn(new Success())
Expand Down
4 changes: 3 additions & 1 deletion tests/ResponderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Amp\ByteStream\InputStream;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Socket\ClientSocket;
use Amp\Success;
use Ostrolucky\Stdinho\Bufferer\ResolvedBufferer;
Expand All @@ -24,7 +25,8 @@ public function testResponderHandlesClientAbruptDisconnect(): void
new ResolvedBufferer(__FILE__),
$this->createMock(ConsoleOutput::class),
[],
$this->createMock(InputStream::class)
$this->createMock(InputStream::class),
new Deferred()
);

$socket = $this->getMockBuilder(ClientSocket::class)
Expand Down

0 comments on commit 6aa88c3

Please sign in to comment.