Skip to content

Commit

Permalink
Merge pull request #1 from digitalcz/buffered-stream
Browse files Browse the repository at this point in the history
Replace CachingStream with BufferedStream
  • Loading branch information
spajxo authored Feb 27, 2023
2 parents d35be68 + fdd799d commit 4ed124c
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 161 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes will be documented in this file.

Updates should follow the [Keep a CHANGELOG](http://keepachangelog.com/) principles.

## [0.3.0] 2022-02-27
### Added
- Add BufferedStream that buffers stream data, to make it seekable

### Removed
- Remove CachingStream in favor of BufferedStream

## [0.2.1] 2022-02-22
### Fixed
- Fixed StreamWrapper invalid schema
Expand Down
163 changes: 163 additions & 0 deletions src/BufferedStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
<?php

declare(strict_types=1);

namespace DigitalCz\Streams;

use InvalidArgumentException;
use Psr\Http\Message\StreamInterface as PsrStreamInterface;
use Throwable;

final class BufferedStream implements StreamInterface
{
/** @var StreamInterface The original source stream */
protected StreamInterface $source;

/** @var StreamInterface The buffer stream */
protected StreamInterface $buffer;

/** @var int How many bytes were written from original to stream */
protected int $written = 0;

public function __construct(StreamInterface $source)
{
$this->source = $source;
$this->buffer = Stream::temp('rw+');
}

public function getSize(): ?int
{
if ($this->source->getSize() === null && $this->source->eof()) {
return $this->written;
}

return $this->source->getSize();
}

/** @inheritDoc */
public function seek($offset, $whence = SEEK_SET): void
{
$offset = match ($whence) {
SEEK_SET => $offset,
SEEK_CUR => $offset + $this->tell(),
SEEK_END => $offset + ($this->getSize() ?? 0),
default => throw new InvalidArgumentException('Invalid whence'),
};

$diff = $offset - $this->written;

if ($diff > 0) {
$this->read($diff);
} else {
$this->buffer->seek($offset);
}
}

/** @inheritDoc */
public function read($length): string
{
$data = '';

if ($this->buffer->tell() !== $this->written) {
$data = $this->buffer->read($length);
}

$bytesRead = strlen($data);

if ($bytesRead < $length) {
$sourceData = $this->source->read($length - $bytesRead);
$this->written += $this->buffer->write($sourceData);
$data .= $sourceData;
}

return $data;
}

/** @inheritDoc */
public function write($string): int
{
throw new StreamException('This stream is not writable');
}

public function isWritable(): bool
{
return false;
}

public function eof(): bool
{
return $this->source->eof() && $this->buffer->tell() === $this->written;
}

public function getContents(): string
{
$data = '';

while (!$this->eof()) {
$data .= $this->read(1024 ^ 2);
}

return $data;
}

public function close(): void
{
$this->source->close();
$this->buffer->close();
}

/**
* @return resource|null
*/
public function detach()
{
// read all data from source
$this->getContents();

$this->source->close();

return $this->buffer->detach();
}

/** @inheritDoc */
public function getMetadata($key = null)
{
return $this->buffer->getMetadata($key);
}

public function isSeekable(): bool
{
return $this->buffer->isSeekable();
}

public function isReadable(): bool
{
return $this->buffer->isReadable();
}

public function tell(): int
{
return $this->buffer->tell();
}

public function rewind(): void
{
$this->buffer->rewind();
}

public function copy(PsrStreamInterface $source): int
{
throw new StreamException('This stream is not writable');
}

public function __toString(): string
{
try {
$this->rewind();

return $this->getContents();
} catch (Throwable) {
return '';
}
}
}
110 changes: 0 additions & 110 deletions src/CachingStream.php

This file was deleted.

88 changes: 88 additions & 0 deletions tests/BufferedStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace DigitalCz\Streams;

use Http\Psr7Test\StreamIntegrationTest;

/**
* @covers \DigitalCz\Streams\BufferedStream
*/
class BufferedStreamTest extends StreamIntegrationTest
{
/** @var mixed[] */
protected $skippedTests = [ // phpcs:ignore
'testRewindNotSeekable' => 'BufferedStream makes stream seekable',
'testIsNotSeekable' => 'BufferedStream makes stream seekable',
'testWrite' => 'BufferedStream is not writable',
'testIsWritable' => 'BufferedStream is not writable',
'testDetach' => 'BufferedStream returns buffer handle on detach',
];

public function testUseOriginalIfAvailable(): void
{
$stream = Stream::from('test');
$buffered = new BufferedStream($stream);
self::assertSame(4, $buffered->getSize());
}

public function testReadCachedByte(): void
{
$stream = Stream::from('testing');
$buffered = new BufferedStream($stream);

$buffered->seek(5);
self::assertSame('n', $buffered->read(1));
$buffered->seek(0);
self::assertSame('t', $buffered->read(1));
}

public function testCanSeekNearEndWithSeekEnd(): void
{
$stream = Stream::from(implode('', range('a', 'z')));
$buffered = new BufferedStream($stream);
$buffered->seek(-1, SEEK_END);
self::assertSame(25, $buffered->tell());
self::assertSame('z', $buffered->read(1));
self::assertSame(26, $buffered->getSize());
}

public function testCanSeekToEndWithSeekEnd(): void
{
$stream = Stream::from(implode('', range('a', 'z')));
$buffered = new BufferedStream($stream);
$buffered->seek(0, SEEK_END);
self::assertSame(26, $stream->tell());
self::assertSame('', $buffered->read(1));
self::assertSame(26, $buffered->getSize());
}

public function testTell(): void
{
$resource = fopen('php://memory', 'wb');
self::assertNotFalse($resource);
fwrite($resource, 'abcdef');
$stream = $this->createStream($resource);

self::assertSame(0, $stream->tell());
$stream->seek(3);
self::assertSame(3, $stream->tell());
$stream->seek(6);
self::assertSame(6, $stream->tell());
}

/**
* @inheritDoc
*/
public function createStream($data): StreamInterface
{
$source = Stream::from($data);

if ($source->isSeekable()) {
$source->rewind();
}

return new BufferedStream($source);
}
}
Loading

0 comments on commit 4ed124c

Please sign in to comment.