Skip to content

Commit

Permalink
feat: add async cache implementation (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
L3tum authored Jun 17, 2024
1 parent a9e13c8 commit 9f294a4
Show file tree
Hide file tree
Showing 10 changed files with 1,224 additions and 14 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
"ext-json": "*",
"psr/simple-cache": "2 - 3",
"roadrunner-php/roadrunner-api-dto": "^1.0",
"spiral/goridge": "^4.2",
"spiral/roadrunner": "^2023.1 || ^2024.1",
"spiral/goridge": "^4.0",
"google/protobuf": "^3.7"
},
"autoload": {
Expand Down
146 changes: 146 additions & 0 deletions src/AsyncCache.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php

namespace Spiral\RoadRunner\KeyValue;

use DateInterval;
use RoadRunner\KV\DTO\V1\Response;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\KeyValue\Exception\KeyValueException;
use Spiral\RoadRunner\KeyValue\Exception\StorageException;
use Spiral\RoadRunner\KeyValue\Serializer\DefaultSerializer;
use Spiral\RoadRunner\KeyValue\Serializer\SerializerInterface;
use function sprintf;
use function str_contains;
use function str_replace;

/**
* @psalm-suppress PropertyNotSetInConstructor
*/
class AsyncCache extends Cache implements AsyncStorageInterface
{
/**
* @var positive-int[]
*/
protected array $callsInFlight = [];

/**
* @param AsyncRPCInterface $rpc
* @param non-empty-string $name
*/
public function __construct(
RPCInterface $rpc,
string $name,
SerializerInterface $serializer = new DefaultSerializer()
) {
parent::__construct($rpc, $name, $serializer);

// This should result in things like the Symfony ContainerBuilder throwing during build instead of runtime.
assert($this->rpc instanceof AsyncRPCInterface);
}

/**
* Note: The current PSR-16 implementation always returns true or
* exception on error.
*
* {@inheritDoc}
*
* @throws KeyValueException
* @throws RPCException
*/
public function deleteAsync(string $key): bool
{
return $this->deleteMultipleAsync([$key]);
}

/**
* Note: The current PSR-16 implementation always returns true or
* exception on error.
*
* {@inheritDoc}
*
* @psalm-param iterable<string> $keys
*
* @throws KeyValueException
* @throws RPCException
*/
public function deleteMultipleAsync(iterable $keys): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

// Handle someone never calling commitAsync()
if (count($this->callsInFlight) > 1000) {
$this->commitAsync();
}

$this->callsInFlight[] = $this->rpc->callAsync('kv.Delete', $this->requestKeys($keys));

return true;
}

/**
* {@inheritDoc}
*
* @psalm-param positive-int|\DateInterval|null $ttl
* @psalm-suppress MoreSpecificImplementedParamType
* @throws KeyValueException
* @throws RPCException
*/
public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl = null): bool
{
return $this->setMultipleAsync([$key => $value], $ttl);
}

/**
* {@inheritDoc}
*
* @psalm-param iterable<string, mixed> $values
* @psalm-param positive-int|\DateInterval|null $ttl
* @psalm-suppress MoreSpecificImplementedParamType
* @throws KeyValueException
* @throws RPCException
*/
public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl = null): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

// Handle someone never calling commitAsync()
if (count($this->callsInFlight) > 1000) {
$this->commitAsync();
}

$this->callsInFlight[] = $this->rpc->callAsync(
'kv.Set',
$this->requestValues($values, $this->ttlToRfc3339String($ttl))
);

return true;
}

/**
* @throws KeyValueException
* @throws RPCException
*/
public function commitAsync(): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

try {
$this->rpc->getResponses($this->callsInFlight, Response::class);
} catch (ServiceException $e) {
$message = str_replace(["\t", "\n"], ' ', $e->getMessage());

if (str_contains($message, 'no such storage')) {
throw new StorageException(sprintf(self::ERROR_INVALID_STORAGE, $this->name));
}

throw new KeyValueException($message, $e->getCode(), $e);
} finally {
$this->callsInFlight = [];
}

return true;
}
}
73 changes: 73 additions & 0 deletions src/AsyncStorageInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Spiral\RoadRunner\KeyValue;

use DateInterval;
use Spiral\RoadRunner\KeyValue\Exception\KeyValueException;

interface AsyncStorageInterface extends StorageInterface
{
/**
* Needs to be called to make sure all async calls have completed successfully.
*
* @throws KeyValueException
*/
public function commitAsync(): bool;

/**
* Persists a set of key => value pairs in the cache, with an optional TTL.
*
* @param iterable $values A list of key => value pairs for a multiple-set operation.
* @param null|int|\DateInterval $ttl Optional. The TTL value of this item. If no value is sent and
* the driver supports TTL then the library may set a default value
* for it or let the driver take care of that.
*
* @return bool True on success and false on failure.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if $values is neither an array nor a Traversable,
* or if any of the $values are not a legal value.
*/
public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl = null): bool;

/**
* Persists data in the cache, uniquely referenced by a key with an optional expiration TTL time.
*
* @param string $key The key of the item to store.
* @param mixed $value The value of the item to store, must be serializable.
* @param null|int|\DateInterval $ttl Optional. The TTL value of this item. If no value is sent and
* the driver supports TTL then the library may set a default value
* for it or let the driver take care of that.
*
* @return bool True on success and false on failure.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if the $key string is not a legal value.
*/
public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl = null): bool;

/**
* Delete an item from the cache by its unique key.
*
* @param string $key The unique cache key of the item to delete.
*
* @return bool True if the item was successfully removed. False if there was an error.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if the $key string is not a legal value.
*/
public function deleteAsync(string $key): bool;

/**
* Deletes multiple cache items in a single operation.
*
* @param iterable<string> $keys A list of string-based keys to be deleted.
*
* @return bool True if the items were successfully removed. False if there was an error.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if $keys is neither an array nor a Traversable,
* or if any of the $keys are not a legal value.
*/
public function deleteMultipleAsync(iterable $keys): bool;
}
18 changes: 10 additions & 8 deletions src/Cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Cache implements StorageInterface
{
use SerializerAwareTrait;

private const ERROR_INVALID_STORAGE =
protected const ERROR_INVALID_STORAGE =
'Storage "%s" has not been defined. Please make sure your '.
'RoadRunner "kv" configuration contains a storage key named "%1$s"';

Expand All @@ -47,9 +47,9 @@ class Cache implements StorageInterface
* @param non-empty-string $name
*/
public function __construct(
RPCInterface $rpc,
private readonly string $name,
SerializerInterface $serializer = new DefaultSerializer()
RPCInterface $rpc,
protected readonly string $name,
SerializerInterface $serializer = new DefaultSerializer()
) {
$this->rpc = $rpc->withCodec(new ProtobufCodec());
$this->zone = new \DateTimeZone('UTC');
Expand Down Expand Up @@ -108,7 +108,7 @@ public function getMultipleTtl(iterable $keys = []): iterable
/**
* @return array<string, Item>
*/
private function createIndex(Response $response): array
protected function createIndex(Response $response): array
{
$result = [];

Expand Down Expand Up @@ -146,7 +146,7 @@ private function call(string $method, Request $request): Response
* @param iterable<string> $keys
* @throws InvalidArgumentException
*/
private function requestKeys(iterable $keys): Request
protected function requestKeys(iterable $keys): Request
{
$items = [];

Expand Down Expand Up @@ -257,8 +257,9 @@ public function setMultiple(iterable $values, null|int|\DateInterval $ttl = null
/**
* @param iterable<string, mixed> $values
* @throws SerializationException
* @throws InvalidArgumentException
*/
private function requestValues(iterable $values, string $ttl): Request
protected function requestValues(iterable $values, string $ttl): Request
{
$items = [];
$serializer = $this->getSerializer();
Expand All @@ -279,8 +280,9 @@ private function requestValues(iterable $values, string $ttl): Request

/**
* @throws InvalidArgumentException
* @throws \Exception
*/
private function ttlToRfc3339String(null|int|\DateInterval $ttl): string
protected function ttlToRfc3339String(null|int|\DateInterval $ttl): string
{
if ($ttl === null) {
return '';
Expand Down
9 changes: 7 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

namespace Spiral\RoadRunner\KeyValue;

use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\KeyValue\Serializer\DefaultSerializer;
use Spiral\RoadRunner\KeyValue\Serializer\SerializerAwareTrait;
use Spiral\RoadRunner\KeyValue\Serializer\SerializerInterface;
use Spiral\RoadRunner\KeyValue\Serializer\DefaultSerializer;

/**
* @psalm-suppress PropertyNotSetInConstructor
Expand All @@ -25,6 +26,10 @@ public function __construct(

public function select(string $name): StorageInterface
{
return new Cache($this->rpc, $name, $this->getSerializer());
if ($this->rpc instanceof AsyncRPCInterface) {
return new AsyncCache($this->rpc, $name, $this->getSerializer());
} else {
return new Cache($this->rpc, $name, $this->getSerializer());
}
}
}
Loading

0 comments on commit 9f294a4

Please sign in to comment.