Skip to content

Commit

Permalink
Add lock and unlock to File interface
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 8, 2024
1 parent 28b38a8 commit 3596a1a
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 88 deletions.
2 changes: 2 additions & 0 deletions composer-require-check.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
"deleteFile",
"filesystem",
"openFile",
"lock",
"unlock",
"eio_cancel",
"eio_chmod",
"eio_chown",
Expand Down
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
"psr-4": {
"Amp\\File\\": "src"
},
"files": ["src/functions.php"]
"files": [
"src/functions.php",
"src/Internal/functions.php"
]
},
"autoload-dev": {
"psr-4": {
Expand Down
62 changes: 36 additions & 26 deletions src/Driver/BlockingFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Amp\Cancellation;
use Amp\DeferredFuture;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\LockMode;
use Amp\File\Whence;

/**
Expand Down Expand Up @@ -55,18 +57,26 @@ public function __destruct()
}
}

public function lock(LockMode $mode): bool
{
return Internal\lock($this->path, $this->getFileHandle(), $mode);
}

public function unlock(): void
{
Internal\unlock($this->path, $this->getFileHandle());
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

try {
\set_error_handler(function (int $type, string $message): never {
throw new StreamException("Failed reading from file '{$this->path}': {$message}");
});

$data = \fread($this->handle, $length);
$data = \fread($handle, $length);
if ($data === false) {
throw new StreamException("Failed reading from file '{$this->path}'");
}
Expand All @@ -79,16 +89,14 @@ public function read(?Cancellation $cancellation = null, int $length = self::DEF

public function write(string $bytes): void
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

try {
\set_error_handler(function (int $type, string $message): never {
throw new StreamException("Failed writing to file '{$this->path}': {$message}");
});

$length = \fwrite($this->handle, $bytes);
$length = \fwrite($handle, $bytes);
if ($length === false) {
throw new StreamException("Failed writing to file '{$this->path}'");
}
Expand Down Expand Up @@ -146,16 +154,14 @@ public function onClose(\Closure $onClose): void

public function truncate(int $size): void
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

try {
\set_error_handler(function (int $type, string $message): never {
throw new StreamException("Could not truncate file '{$this->path}': {$message}");
});

if (!\ftruncate($this->handle, $size)) {
if (!\ftruncate($handle, $size)) {
throw new StreamException("Could not truncate file '{$this->path}'");
}
} finally {
Expand All @@ -165,9 +171,7 @@ public function truncate(int $size): void

public function seek(int $position, Whence $whence = Whence::Start): int
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
$handle = $this->getFileHandle();

$mode = match ($whence) {
Whence::Start => SEEK_SET,
Expand All @@ -181,7 +185,7 @@ public function seek(int $position, Whence $whence = Whence::Start): int
throw new StreamException("Could not seek in file '{$this->path}': {$message}");
});

if (\fseek($this->handle, $position, $mode) === -1) {
if (\fseek($handle, $position, $mode) === -1) {
throw new StreamException("Could not seek in file '{$this->path}'");
}

Expand All @@ -193,20 +197,12 @@ public function seek(int $position, Whence $whence = Whence::Start): int

public function tell(): int
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}

return \ftell($this->handle);
return \ftell($this->getFileHandle());
}

public function eof(): bool
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}

return \feof($this->handle);
return \feof($this->getFileHandle());
}

public function getPath(): string
Expand Down Expand Up @@ -238,4 +234,18 @@ public function getId(): int
{
return $this->id;
}

/**
* @return resource
*
* @throws ClosedException
*/
private function getFileHandle()
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}

return $this->handle;
}
}
26 changes: 20 additions & 6 deletions src/Driver/EioFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,36 @@ final class EioFile extends Internal\QueuedWritesFile
{
private readonly Internal\EioPoll $poll;

/** @var resource eio file handle. */
private $fh;
/** @var int eio file handle resource ID. */
private int $fh;

/** @var resource|closed-resource */
private $fd;

private ?Future $closing = null;

private readonly DeferredFuture $onClose;

/**
* @param resource $fh
*/
public function __construct(Internal\EioPoll $poll, $fh, string $path, string $mode, int $size)
public function __construct(Internal\EioPoll $poll, int $fh, string $path, string $mode, int $size)
{
parent::__construct($path, $mode, $size);

$this->poll = $poll;
$this->fh = $fh;
$this->fd = \fopen('php://fd/' . $this->fh, 'r');

$this->onClose = new DeferredFuture;
}

protected function getFileHandle()
{
if (!\is_resource($this->fd)) {
throw new ClosedException("The file has been closed");
}

return $this->fd;
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isReading || !$this->queue->isEmpty()) {
Expand Down Expand Up @@ -99,6 +109,10 @@ public function close(): void
return;
}

if (\is_resource($this->fd)) {
\fclose($this->fd);
}

$this->closing = $this->onClose->getFuture();
$this->poll->listen();

Expand Down
30 changes: 30 additions & 0 deletions src/Driver/ParallelFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Amp\DeferredFuture;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\LockMode;
use Amp\File\PendingOperationError;
use Amp\File\Whence;
use Amp\Future;
Expand Down Expand Up @@ -142,6 +143,35 @@ public function eof(): bool
return $this->pendingWrites === 0 && $this->size <= $this->position;
}

public function lock(LockMode $mode): bool
{
return $this->flock($mode);
}

public function unlock(): void
{
$this->flock(null);
}

private function flock(?LockMode $mode): bool
{
if ($this->id === null) {
throw new ClosedException("The file has been closed");
}

$this->busy = true;

try {
return $this->worker->execute(new Internal\FileTask('flock', [$mode], $this->id));
} catch (TaskFailureException $exception) {
throw new StreamException("Attempting to lock the file failed", 0, $exception);
} catch (WorkerException $exception) {
throw new StreamException("Sending the task to the worker failed", 0, $exception);
} finally {
$this->busy = false;
}
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->id === null) {
Expand Down
11 changes: 11 additions & 0 deletions src/Driver/StatusCachingFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\Cancellation;
use Amp\File\File;
use Amp\File\LockMode;
use Amp\File\Whence;

/**
Expand Down Expand Up @@ -53,6 +54,16 @@ public function end(): void
}
}

public function lock(LockMode $mode): bool
{
return $this->file->lock($mode);
}

public function unlock(): void
{
$this->file->unlock();
}

public function close(): void
{
$this->file->close();
Expand Down
9 changes: 9 additions & 0 deletions src/Driver/UvFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public function __construct(
$this->onClose = new DeferredFuture;
}

protected function getFileHandle()
{
if ($this->closing) {
throw new ClosedException("The file has been closed");
}

return $this->fh;
}

public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isReading || !$this->queue->isEmpty()) {
Expand Down
13 changes: 13 additions & 0 deletions src/File.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\File;

use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableStream;
use Amp\ByteStream\WritableStream;
use Amp\Cancellation;
Expand Down Expand Up @@ -54,4 +55,16 @@ public function getMode(): string;
* @param int $size New file size.
*/
public function truncate(int $size): void;

/**
* Makes a non-blocking attempt to lock the file. Returns true if the lock was obtained.
*
* @throws ClosedException If the file has been closed.
*/
public function lock(LockMode $mode): bool;

/**
* @throws ClosedException If the file has been closed.
*/
public function unlock(): void;
}
32 changes: 18 additions & 14 deletions src/FileMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,30 @@ public function __construct(private readonly string $fileName, ?Filesystem $file
$this->directory = \dirname($this->fileName);
}

/**
* @throws SyncException
*/
public function acquire(?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory of "%s" does not exist or is not a directory', $this->fileName));
}

// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
// Try to create and lock the file. If flock fails, someone else already has the lock,
// so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
try {
$file = $this->filesystem->openFile($this->fileName, 'x');

// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock($this->release(...));

$file = $this->filesystem->openFile($this->fileName, 'c');
if ($file->lock(LockMode::Exclusive)) {
return new Lock(fn () => $this->release($file));
}
$file->close();

return $lock;
} catch (FilesystemException) {
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
} catch (FilesystemException $exception) {
throw new SyncException($exception->getMessage(), previous: $exception);
}

$multiplier = 2 ** \min(31, $attempt);
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * $multiplier), cancellation: $cancellation);
}
}

Expand All @@ -55,11 +58,12 @@ public function acquire(?Cancellation $cancellation = null): Lock
*
* @throws SyncException
*/
private function release(): void
private function release(File $file): void
{
try {
$this->filesystem->deleteFile($this->fileName);
} catch (\Throwable $exception) {
$this->filesystem->deleteFile($this->fileName); // Delete file while holding the lock.
$file->close();
} catch (FilesystemException $exception) {
throw new SyncException(
'Failed to unlock the mutex file: ' . $this->fileName,
previous: $exception,
Expand Down
9 changes: 9 additions & 0 deletions src/Internal/FileTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public function run(Channel $channel, Cancellation $cancellation): mixed
$file->close();
return null;

case "flock":
[$mode] = $this->args;
if ($mode) {
return $file->lock($mode);
}

$file->unlock();
return null;

default:
throw new \Error('Invalid operation');
}
Expand Down
Loading

0 comments on commit 3596a1a

Please sign in to comment.