Skip to content

Commit

Permalink
Fix crash that occurs when re-connecting
Browse files Browse the repository at this point in the history
  • Loading branch information
ostrolucky committed Aug 3, 2019
1 parent 1d0655b commit 7818ffe
Showing 1 changed file with 47 additions and 38 deletions.
85 changes: 47 additions & 38 deletions src/Responder.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Amp\ByteStream\StreamException;
use Amp\Deferred;
use Amp\Socket\Socket;
use Error;
use Ostrolucky\Stdinho\Bufferer\AbstractBufferer;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Output\ConsoleOutput;
Expand Down Expand Up @@ -76,7 +77,11 @@ public function __invoke(Socket $socket): \Generator

// Only at this point it's safe to resolve the promise, otherwise Responder
// would think everything was buffered and specify wrong content length!
$this->defererThatIsResolvedWhenSomebodyConnects->resolve();
try {
$this->defererThatIsResolvedWhenSomebodyConnects->resolve();
} catch (Error $e) {
// Promise might have been already resolved
}

$progressBar = new ProgressBar(
$this->consoleOutput->section(),
Expand All @@ -87,43 +92,7 @@ public function __invoke(Socket $socket): \Generator

try {
yield $socket->write(implode("\r\n", array_merge($header, $this->customHttpHeaders))."\r\n\r\n");

while (true) {
$buffererProgress = $this->bufferer->getCurrentProgress();

/**
* Not trying to read the buffer when connected client caught up to it is important for following:
*
* 1. Reduce CPU usage and potential block operations thanks to avoiding executing logic in read()
* 2. Prevent ResourceInputStream to close the resource when it detects feof. This serves as workaround.
*
* @see https://github.com/ostrolucky/stdinho/pull/2
* @see https://github.com/amphp/byte-stream/issues/47
*/
if ($buffererProgress <= $progressBar->step && $this->bufferer->isBuffering()) {
yield $this->bufferer->waitForWrite();

continue;
}

if (null !== $chunk = yield $this->inputStream->read()) {
yield $socket->write($chunk);

$progressBar->max = $this->bufferer->getCurrentProgress();
$progressBar->advance(strlen($chunk));

continue;
}

if (!$this->bufferer->inputStream) {
break; // All input sources depleted -> finish download
}

// Use fallback inputStream - handy when PipeBufferer exits sooner than it consumes its inputStream
$this->inputStream = $this->bufferer->inputStream;
// Prevent other potential Responders to consume same inputStream. This can be solved in future
$this->bufferer->inputStream = null;
}
yield from $this->write($socket, $progressBar);
$progressBar->finish();
$this->logger->debug("$remoteAddress finished download");
} catch (StreamException $exception) {
Expand All @@ -132,4 +101,44 @@ public function __invoke(Socket $socket): \Generator

$socket->end();
}

private function write(Socket $socket, ProgressBar $progressBar): \Generator
{
while (true) {
$buffererProgress = $this->bufferer->getCurrentProgress();

/**
* Not trying to read the buffer when connected client caught up to it is important for following:
*
* 1. Reduce CPU usage and potential block operations thanks to avoiding executing logic in read()
* 2. Prevent ResourceInputStream to close the resource when it detects feof. This serves as workaround.
*
* @see https://github.com/ostrolucky/stdinho/pull/2
* @see https://github.com/amphp/byte-stream/issues/47
*/
if ($buffererProgress <= $progressBar->step && $this->bufferer->isBuffering()) {
yield $this->bufferer->waitForWrite();

continue;
}

if (null !== $chunk = yield $this->inputStream->read()) {
yield $socket->write($chunk);

$progressBar->max = $this->bufferer->getCurrentProgress();
$progressBar->advance(strlen($chunk));

continue;
}

if (!$this->bufferer->inputStream) {
break; // All input sources depleted -> finish download
}

// Use fallback inputStream - handy when PipeBufferer exits sooner than it consumes its inputStream
$this->inputStream = $this->bufferer->inputStream;
// Prevent other potential Responders to consume same inputStream. This can be solved in future
$this->bufferer->inputStream = null;
}
}
}

0 comments on commit 7818ffe

Please sign in to comment.