diff --git a/CHANGELOG.md b/CHANGELOG.md index a6d6621..b6f221f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes will be documented in this file. Updates should follow the [Keep a CHANGELOG](http://keepachangelog.com/) principles. +## [0.3.0] 2022-02-27 +### Added +- Add BufferedStream that buffers stream data, to make it seekable + +### Removed +- Remove CachingStream in favor of BufferedStream + ## [0.2.1] 2022-02-22 ### Fixed - Fixed StreamWrapper invalid schema diff --git a/src/BufferedStream.php b/src/BufferedStream.php new file mode 100644 index 0000000..c9ded3d --- /dev/null +++ b/src/BufferedStream.php @@ -0,0 +1,163 @@ +source = $source; + $this->buffer = Stream::temp('rw+'); + } + + public function getSize(): ?int + { + if ($this->source->getSize() === null && $this->source->eof()) { + return $this->written; + } + + return $this->source->getSize(); + } + + /** @inheritDoc */ + public function seek($offset, $whence = SEEK_SET): void + { + $offset = match ($whence) { + SEEK_SET => $offset, + SEEK_CUR => $offset + $this->tell(), + SEEK_END => $offset + ($this->getSize() ?? 0), + default => throw new InvalidArgumentException('Invalid whence'), + }; + + $diff = $offset - $this->written; + + if ($diff > 0) { + $this->read($diff); + } else { + $this->buffer->seek($offset); + } + } + + /** @inheritDoc */ + public function read($length): string + { + $data = ''; + + if ($this->buffer->tell() !== $this->written) { + $data = $this->buffer->read($length); + } + + $bytesRead = strlen($data); + + if ($bytesRead < $length) { + $sourceData = $this->source->read($length - $bytesRead); + $this->written += $this->buffer->write($sourceData); + $data .= $sourceData; + } + + return $data; + } + + /** @inheritDoc */ + public function write($string): int + { + throw new StreamException('This stream is not writable'); + } + + public function isWritable(): bool + { + return false; + } + + public function eof(): bool + { + return $this->source->eof() && $this->buffer->tell() === $this->written; + } + + public function getContents(): string + { + $data = ''; + + while (!$this->eof()) { + $data .= $this->read(1024 ^ 2); + } + + return $data; + } + + public function close(): void + { + $this->source->close(); + $this->buffer->close(); + } + + /** + * @return resource|null + */ + public function detach() + { + // read all data from source + $this->getContents(); + + $this->source->close(); + + return $this->buffer->detach(); + } + + /** @inheritDoc */ + public function getMetadata($key = null) + { + return $this->buffer->getMetadata($key); + } + + public function isSeekable(): bool + { + return $this->buffer->isSeekable(); + } + + public function isReadable(): bool + { + return $this->buffer->isReadable(); + } + + public function tell(): int + { + return $this->buffer->tell(); + } + + public function rewind(): void + { + $this->buffer->rewind(); + } + + public function copy(PsrStreamInterface $source): int + { + throw new StreamException('This stream is not writable'); + } + + public function __toString(): string + { + try { + $this->rewind(); + + return $this->getContents(); + } catch (Throwable) { + return ''; + } + } +} diff --git a/src/CachingStream.php b/src/CachingStream.php deleted file mode 100644 index 6d82a75..0000000 --- a/src/CachingStream.php +++ /dev/null @@ -1,110 +0,0 @@ -original = $stream; - $this->stream = Stream::temp('rb+'); - } - - public function getSize(): ?int - { - $originalSize = $this->original->getSize(); - - if ($originalSize === null) { - return null; - } - - return max($this->stream->getSize(), $originalSize); - } - - /** @inheritDoc */ - public function seek($offset, $whence = SEEK_SET): void - { - $offset = match ($whence) { - SEEK_SET => $offset, - SEEK_CUR => $offset + $this->tell(), - SEEK_END => $offset + ($this->original->getSize() ?? $this->stream->copy($this->original)), - default => throw new InvalidArgumentException('Invalid whence'), - }; - - $diff = $offset - ($this->stream->getSize() ?? 0); - - if ($diff > 0) { - while ($diff > 0 && !$this->original->eof()) { - $this->read($diff); - $diff = $offset - ($this->stream->getSize() ?? 0); - } - } else { - $this->stream->seek($offset); - } - } - - /** @inheritDoc */ - public function read($length): string - { - $data = $this->stream->read($length); - $remaining = $length - strlen($data); - - if ($remaining > 0) { - $originalData = $this->original->read($remaining + $this->skipBytes); - - if ($this->skipBytes > 0) { - $originalData = substr($originalData, $this->skipBytes); - $this->skipBytes = max(0, $this->skipBytes - strlen($originalData)); - } - - $data .= $originalData; - $this->stream->write($originalData); - } - - return $data; - } - - /** @inheritDoc */ - public function write($string): int - { - $overflow = strlen($string) + $this->tell() - $this->original->tell(); - - if ($overflow > 0) { - $this->skipBytes += $overflow; - } - - return $this->stream->write($string); - } - - public function eof(): bool - { - return $this->stream->eof() && $this->original->eof(); - } - - public function getContents(): string - { - $contents = ''; - - while (!$this->eof()) { - $contents .= $this->read(1024 ^ 2); - } - - return $contents; - } - - public function close(): void - { - $this->original->close(); - $this->stream->close(); - } -} diff --git a/tests/BufferedStreamTest.php b/tests/BufferedStreamTest.php new file mode 100644 index 0000000..a18e800 --- /dev/null +++ b/tests/BufferedStreamTest.php @@ -0,0 +1,88 @@ + 'BufferedStream makes stream seekable', + 'testIsNotSeekable' => 'BufferedStream makes stream seekable', + 'testWrite' => 'BufferedStream is not writable', + 'testIsWritable' => 'BufferedStream is not writable', + 'testDetach' => 'BufferedStream returns buffer handle on detach', + ]; + + public function testUseOriginalIfAvailable(): void + { + $stream = Stream::from('test'); + $buffered = new BufferedStream($stream); + self::assertSame(4, $buffered->getSize()); + } + + public function testReadCachedByte(): void + { + $stream = Stream::from('testing'); + $buffered = new BufferedStream($stream); + + $buffered->seek(5); + self::assertSame('n', $buffered->read(1)); + $buffered->seek(0); + self::assertSame('t', $buffered->read(1)); + } + + public function testCanSeekNearEndWithSeekEnd(): void + { + $stream = Stream::from(implode('', range('a', 'z'))); + $buffered = new BufferedStream($stream); + $buffered->seek(-1, SEEK_END); + self::assertSame(25, $buffered->tell()); + self::assertSame('z', $buffered->read(1)); + self::assertSame(26, $buffered->getSize()); + } + + public function testCanSeekToEndWithSeekEnd(): void + { + $stream = Stream::from(implode('', range('a', 'z'))); + $buffered = new BufferedStream($stream); + $buffered->seek(0, SEEK_END); + self::assertSame(26, $stream->tell()); + self::assertSame('', $buffered->read(1)); + self::assertSame(26, $buffered->getSize()); + } + + public function testTell(): void + { + $resource = fopen('php://memory', 'wb'); + self::assertNotFalse($resource); + fwrite($resource, 'abcdef'); + $stream = $this->createStream($resource); + + self::assertSame(0, $stream->tell()); + $stream->seek(3); + self::assertSame(3, $stream->tell()); + $stream->seek(6); + self::assertSame(6, $stream->tell()); + } + + /** + * @inheritDoc + */ + public function createStream($data): StreamInterface + { + $source = Stream::from($data); + + if ($source->isSeekable()) { + $source->rewind(); + } + + return new BufferedStream($source); + } +} diff --git a/tests/CachingStreamTest.php b/tests/CachingStreamTest.php deleted file mode 100644 index 2c2fd82..0000000 --- a/tests/CachingStreamTest.php +++ /dev/null @@ -1,51 +0,0 @@ -getSize()); - } - - public function testReadCachedByte(): void - { - $stream = Stream::from('testing'); - $cached = new CachingStream($stream); - - $cached->seek(5); - self::assertSame('n', $cached->read(1)); - $cached->seek(0); - self::assertSame('t', $cached->read(1)); - } - - public function testCanSeekNearEndWithSeekEnd(): void - { - $stream = Stream::from(implode('', range('a', 'z'))); - $cached = new CachingStream($stream); - $cached->seek(-1, SEEK_END); - self::assertSame(25, $stream->tell()); - self::assertSame('z', $cached->read(1)); - self::assertSame(26, $cached->getSize()); - } - - public function testCanSeekToEndWithSeekEnd(): void - { - $stream = Stream::from(implode('', range('a', 'z'))); - $cached = new CachingStream($stream); - $cached->seek(0, SEEK_END); - self::assertSame(26, $stream->tell()); - self::assertSame('', $cached->read(1)); - self::assertSame(26, $cached->getSize()); - } -}