diff --git a/src/Responder.php b/src/Responder.php index 15146e4..972095f 100644 --- a/src/Responder.php +++ b/src/Responder.php @@ -8,6 +8,7 @@ use Amp\ByteStream\StreamException; use Amp\Deferred; use Amp\Socket\Socket; +use Error; use Ostrolucky\Stdinho\Bufferer\AbstractBufferer; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\ConsoleOutput; @@ -76,7 +77,11 @@ public function __invoke(Socket $socket): \Generator // Only at this point it's safe to resolve the promise, otherwise Responder // would think everything was buffered and specify wrong content length! - $this->defererThatIsResolvedWhenSomebodyConnects->resolve(); + try { + $this->defererThatIsResolvedWhenSomebodyConnects->resolve(); + } catch (Error $e) { + // Promise might have been already resolved + } $progressBar = new ProgressBar( $this->consoleOutput->section(), @@ -87,43 +92,7 @@ public function __invoke(Socket $socket): \Generator try { yield $socket->write(implode("\r\n", array_merge($header, $this->customHttpHeaders))."\r\n\r\n"); - - while (true) { - $buffererProgress = $this->bufferer->getCurrentProgress(); - - /** - * Not trying to read the buffer when connected client caught up to it is important for following: - * - * 1. Reduce CPU usage and potential block operations thanks to avoiding executing logic in read() - * 2. Prevent ResourceInputStream to close the resource when it detects feof. This serves as workaround. - * - * @see https://github.com/ostrolucky/stdinho/pull/2 - * @see https://github.com/amphp/byte-stream/issues/47 - */ - if ($buffererProgress <= $progressBar->step && $this->bufferer->isBuffering()) { - yield $this->bufferer->waitForWrite(); - - continue; - } - - if (null !== $chunk = yield $this->inputStream->read()) { - yield $socket->write($chunk); - - $progressBar->max = $this->bufferer->getCurrentProgress(); - $progressBar->advance(strlen($chunk)); - - continue; - } - - if (!$this->bufferer->inputStream) { - break; // All input sources depleted -> finish download - } - - // Use fallback inputStream - handy when PipeBufferer exits sooner than it consumes its inputStream - $this->inputStream = $this->bufferer->inputStream; - // Prevent other potential Responders to consume same inputStream. This can be solved in future - $this->bufferer->inputStream = null; - } + yield from $this->write($socket, $progressBar); $progressBar->finish(); $this->logger->debug("$remoteAddress finished download"); } catch (StreamException $exception) { @@ -132,4 +101,44 @@ public function __invoke(Socket $socket): \Generator $socket->end(); } + + private function write(Socket $socket, ProgressBar $progressBar): \Generator + { + while (true) { + $buffererProgress = $this->bufferer->getCurrentProgress(); + + /** + * Not trying to read the buffer when connected client caught up to it is important for following: + * + * 1. Reduce CPU usage and potential block operations thanks to avoiding executing logic in read() + * 2. Prevent ResourceInputStream to close the resource when it detects feof. This serves as workaround. + * + * @see https://github.com/ostrolucky/stdinho/pull/2 + * @see https://github.com/amphp/byte-stream/issues/47 + */ + if ($buffererProgress <= $progressBar->step && $this->bufferer->isBuffering()) { + yield $this->bufferer->waitForWrite(); + + continue; + } + + if (null !== $chunk = yield $this->inputStream->read()) { + yield $socket->write($chunk); + + $progressBar->max = $this->bufferer->getCurrentProgress(); + $progressBar->advance(strlen($chunk)); + + continue; + } + + if (!$this->bufferer->inputStream) { + break; // All input sources depleted -> finish download + } + + // Use fallback inputStream - handy when PipeBufferer exits sooner than it consumes its inputStream + $this->inputStream = $this->bufferer->inputStream; + // Prevent other potential Responders to consume same inputStream. This can be solved in future + $this->bufferer->inputStream = null; + } + } }