Skip to content

Commit

Permalink
Add locking to File interface (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski authored Jan 9, 2025
1 parent 7f8b287 commit 5b6d389
Show file tree
Hide file tree
Showing 16 changed files with 548 additions and 88 deletions.
6 changes: 5 additions & 1 deletion composer-require-check.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
"deleteFile",
"filesystem",
"openFile",
"lock",
"tryLock",
"unlock",
"eio_cancel",
"eio_chmod",
"eio_chown",
Expand Down Expand Up @@ -83,7 +86,8 @@
"uv_fs_utime",
"uv_fs_write",
"uv_fs_close",
"uv_strerror"
"uv_strerror",
"Amp\\Process\\IS_WINDOWS"
],
"php-core-extensions": [
"Core",
Expand Down
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
"psr-4": {
"Amp\\File\\": "src"
},
"files": ["src/functions.php"]
"files": [
"src/functions.php",
"src/Internal/functions.php"
]
},
"autoload-dev": {
"psr-4": {
Expand Down
85 changes: 59 additions & 26 deletions src/Driver/BlockingFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Amp\Cancellation;
use Amp\DeferredFuture;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\LockType;
use Amp\File\Whence;

/**
Expand All @@ -24,6 +26,8 @@ final class BlockingFile implements File, \IteratorAggregate

private readonly DeferredFuture $onClose;

private ?LockType $lockType = null;

/**
* @param resource $handle An open filesystem descriptor.
* @param string $path File path.
Expand Down Expand Up @@ -55,18 +59,46 @@ public function __destruct()
}
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
/**
* Returns the currently active lock mode, or null if the file is not locked.
*/
public function getLockType(): ?LockType
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
return $this->lockType;
}

public function lock(LockType $type, ?Cancellation $cancellation = null): void
{
Internal\lock($this->path, $this->getFileHandle(), $type, $cancellation);
$this->lockType = $type;
}

public function tryLock(LockType $type): bool
{
$locked = Internal\tryLock($this->path, $this->getFileHandle(), $type);
if ($locked) {
$this->lockType = $type;
}

return $locked;
}

public function unlock(): void
{
Internal\unlock($this->path, $this->getFileHandle());
$this->lockType = null;
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
$handle = $this->getFileHandle();

try {
\set_error_handler(function (int $type, string $message): never {
throw new StreamException("Failed reading from file '{$this->path}': {$message}");
});

$data = \fread($this->handle, $length);
$data = \fread($handle, $length);
if ($data === false) {
throw new StreamException("Failed reading from file '{$this->path}'");
}
Expand All @@ -79,16 +111,14 @@ public function read(?Cancellation $cancellation = null, int $length = self::DEF

public function write(string $bytes): void
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

try {
\set_error_handler(function (int $type, string $message): never {
throw new StreamException("Failed writing to file '{$this->path}': {$message}");
});

$length = \fwrite($this->handle, $bytes);
$length = \fwrite($handle, $bytes);
if ($length === false) {
throw new StreamException("Failed writing to file '{$this->path}'");
}
Expand Down Expand Up @@ -131,6 +161,7 @@ public function close(): void
throw new StreamException("Failed closing file '{$this->path}'");
} finally {
\restore_error_handler();
$this->lockType = null;
}
}

Expand All @@ -146,16 +177,14 @@ public function onClose(\Closure $onClose): void

public function truncate(int $size): void
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

try {
\set_error_handler(function (int $type, string $message): never {
throw new StreamException("Could not truncate file '{$this->path}': {$message}");
});

if (!\ftruncate($this->handle, $size)) {
if (!\ftruncate($handle, $size)) {
throw new StreamException("Could not truncate file '{$this->path}'");
}
} finally {
Expand All @@ -165,9 +194,7 @@ public function truncate(int $size): void

public function seek(int $position, Whence $whence = Whence::Start): int
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

$mode = match ($whence) {
Whence::Start => SEEK_SET,
Expand All @@ -181,7 +208,7 @@ public function seek(int $position, Whence $whence = Whence::Start): int
throw new StreamException("Could not seek in file '{$this->path}': {$message}");
});

if (\fseek($this->handle, $position, $mode) === -1) {
if (\fseek($handle, $position, $mode) === -1) {
throw new StreamException("Could not seek in file '{$this->path}'");
}

Expand All @@ -193,20 +220,12 @@ public function seek(int $position, Whence $whence = Whence::Start): int

public function tell(): int
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}

return \ftell($this->handle);
return \ftell($this->getFileHandle());
}

public function eof(): bool
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}

return \feof($this->handle);
return \feof($this->getFileHandle());
}

public function getPath(): string
Expand Down Expand Up @@ -238,4 +257,18 @@ public function getId(): int
{
return $this->id;
}

/**
* @return resource
*
* @throws ClosedException
*/
private function getFileHandle()
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}

return $this->handle;
}
}
27 changes: 21 additions & 6 deletions src/Driver/EioFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,36 @@ final class EioFile extends Internal\QueuedWritesFile
{
private readonly Internal\EioPoll $poll;

/** @var resource eio file handle. */
private $fh;
/** @var int eio file handle resource ID. */
private int $fh;

/** @var resource|closed-resource */
private $fd;

private ?Future $closing = null;

private readonly DeferredFuture $onClose;

/**
* @param resource $fh
*/
public function __construct(Internal\EioPoll $poll, $fh, string $path, string $mode, int $size)
public function __construct(Internal\EioPoll $poll, int $fh, string $path, string $mode, int $size)
{
parent::__construct($path, $mode, $size);

$this->poll = $poll;
$this->fh = $fh;
$this->fd = \fopen('php://fd/' . $this->fh, 'r');

$this->onClose = new DeferredFuture;
}

protected function getFileHandle()
{
if (!\is_resource($this->fd)) {
throw new ClosedException("The file has been closed");
}

return $this->fd;
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isReading || !$this->queue->isEmpty()) {
Expand Down Expand Up @@ -99,6 +109,10 @@ public function close(): void
return;
}

if (\is_resource($this->fd)) {
\fclose($this->fd);
}

$this->closing = $this->onClose->getFuture();
$this->poll->listen();

Expand All @@ -111,6 +125,7 @@ public function close(): void
$this->closing->await();
} finally {
$this->poll->done();
$this->lockType = null;
}
}

Expand Down
50 changes: 50 additions & 0 deletions src/Driver/ParallelFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Amp\DeferredFuture;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\LockType;
use Amp\File\PendingOperationError;
use Amp\File\Whence;
use Amp\Future;
Expand Down Expand Up @@ -42,6 +43,8 @@ final class ParallelFile implements File, \IteratorAggregate

private readonly DeferredFuture $onClose;

private ?LockType $lockType = null;

public function __construct(
private readonly Internal\FileWorker $worker,
int $id,
Expand Down Expand Up @@ -93,6 +96,7 @@ public function close(): void
$this->closing->await();
} finally {
$this->onClose->complete();
$this->lockType = null;
}
}

Expand Down Expand Up @@ -142,6 +146,52 @@ public function eof(): bool
return $this->pendingWrites === 0 && $this->size <= $this->position;
}

public function lock(LockType $type, ?Cancellation $cancellation = null): void
{
$this->flock('lock', $type, $cancellation);
$this->lockType = $type;
}

public function tryLock(LockType $type): bool
{
$locked = $this->flock('try-lock', $type);
if ($locked) {
$this->lockType = $type;
}

return $locked;
}

public function unlock(): void
{
$this->flock('unlock');
$this->lockType = null;
}

public function getLockType(): ?LockType
{
return $this->lockType;
}

private function flock(string $action, ?LockType $type = null, ?Cancellation $cancellation = null): bool
{
if ($this->id === null) {
throw new ClosedException("The file has been closed");
}

$this->busy = true;

try {
return $this->worker->execute(new Internal\FileTask('flock', [$type, $action], $this->id), $cancellation);
} catch (TaskFailureException $exception) {
throw new StreamException("Attempting to lock the file failed", 0, $exception);
} catch (WorkerException $exception) {
throw new StreamException("Sending the task to the worker failed", 0, $exception);
} finally {
$this->busy = false;
}
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->id === null) {
Expand Down
21 changes: 21 additions & 0 deletions src/Driver/StatusCachingFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\Cancellation;
use Amp\File\File;
use Amp\File\LockType;
use Amp\File\Whence;

/**
Expand Down Expand Up @@ -53,6 +54,26 @@ public function end(): void
}
}

public function lock(LockType $type, ?Cancellation $cancellation = null): void
{
$this->file->lock($type, $cancellation);
}

public function tryLock(LockType $type): bool
{
return $this->file->tryLock($type);
}

public function unlock(): void
{
$this->file->unlock();
}

public function getLockType(): ?LockType
{
return $this->file->getLockType();
}

public function close(): void
{
$this->file->close();
Expand Down
Loading

0 comments on commit 5b6d389

Please sign in to comment.