Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add async client #62

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
467f106
add async client
withinboredom Feb 11, 2024
8043672
handle async process calls
withinboredom Feb 11, 2024
c643312
update dispatch to use channels
withinboredom Feb 11, 2024
87197d8
still need to call process for non-backgrounded tasks
withinboredom Feb 11, 2024
3b3eaf7
it appears this is the bottleneck
withinboredom Feb 11, 2024
c344d64
connect when backgrounding if we have not already
withinboredom Feb 11, 2024
581e017
run phpcs-fixer
withinboredom Feb 13, 2024
46c2a70
rename client
withinboredom Feb 13, 2024
35cd867
send ping at specified intervals
withinboredom Feb 13, 2024
57c71cf
ensure result is null
withinboredom Feb 14, 2024
4676b2d
refactor auto-pings
withinboredom Feb 14, 2024
2dcd4c2
this seems to perform a bit better
withinboredom Feb 14, 2024
3ed8344
properly handle HMSG
withinboredom Feb 14, 2024
825e29a
handle closure and fix an undefined variable warning
withinboredom Feb 14, 2024
17577f1
should be an empty string
withinboredom Feb 14, 2024
7de47b6
prevent infinite pings
withinboredom Feb 16, 2024
85ddc04
add bench and better perf
withinboredom Feb 16, 2024
9064cd8
Merge branch 'basis-company:main' into amphp
withinboredom Feb 24, 2024
9e5bafc
update tests to find failures
withinboredom Feb 24, 2024
ec5b1de
enable more tests
withinboredom Feb 24, 2024
8ba6743
fix code style
withinboredom Feb 24, 2024
0a7f879
fix typo
withinboredom Feb 24, 2024
a6c7553
ensure connect property is set
withinboredom Feb 24, 2024
db58363
need to connect first
withinboredom Feb 24, 2024
f6d5af7
incompatible test
withinboredom Feb 24, 2024
aab76db
fix issues with tls
withinboredom Feb 24, 2024
2435ad5
ensure logger is always set
withinboredom Feb 24, 2024
b1587b2
get client tests passing
withinboredom Feb 24, 2024
4510d1e
fix tests
withinboredom Feb 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
"description": "nats jetstream client for php",
"keywords": ["nats", "client", "streaming", "jetstream", "queue", "messaging", "subscribe", "publish", "request", "response", "bucket", "key-value", "storage"],
"require": {
"php": ">=8.1"
"php": ">=8.1",
"amphp/parser": "^1.1",
"amphp/socket": "^2.2"
},
"require-dev": {
"phpunit/phpunit": "^9.5",
"monolog/monolog": "^2.3.5",
"ext-sodium": "*",
"friendsofphp/php-cs-fixer": "^3.9",
"phan/phan": "^5.3"
"phan/phan": "^5.3",
"phpbench/phpbench": "^1.2"
},
"suggest": {
"paragonie/sodium_compat": "Provides Ed25519 for nkey authentication if sodium is not available",
"ext-sodium": "Provides Ed25519 for nkey authentication"
"ext-sodium": "Provides Ed25519 for nkey authentication",
"amphp/parser": "Provides async parsing of messages",
"amphp/socket": "Provides support for the async client"
},
"license": "mit",
"autoload": {
Expand Down
19 changes: 19 additions & 0 deletions phpbench.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"$schema":"./vendor/phpbench/phpbench/phpbench.schema.json",
"runner.bootstrap": "vendor/autoload.php",
"report.generators": {
"clients": {
"title": "client comparisons",
"generator": "component",
"filter": "benchmark_name = 'AsyncPerformanceBench'",
"components": [
{
"component": "bar_chart_aggregate",
"bar_partition": "'Client: ' ~ variant_params['client']",
"x_partition": "variant_params['messages'] ~ ' msgs'",
"y_expr": "mode(partition['result_time_avg']) as time"
}
]
}
}
}
213 changes: 213 additions & 0 deletions src/AmpClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
<?php

declare(strict_types=1);

namespace Basis\Nats;

use Amp\ByteStream\ClosedException;
use Amp\Socket\Certificate;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Sync\Barrier;
use Amp\TimeoutCancellation;
use Basis\Nats\Async\Socket;
use Basis\Nats\Message\Connect;
use Basis\Nats\Message\Info;
use Basis\Nats\Message\Msg;
use Basis\Nats\Message\Ping;
use Basis\Nats\Message\Prototype;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Revolt\EventLoop;

use function Amp\Socket\socketConnector;
use function Amp\Sync\createChannelPair;

class AmpClient extends Client
{
public readonly Api $api;

private Socket $socket;

public function __construct(
Configuration $configuration = new Configuration(),
LoggerInterface|null $logger = new NullLogger(),
) {
parent::__construct($configuration, $logger);
$this->logger ??= new NullLogger();
}

public function setTimeout(float $value): Client
{
throw new \LogicException('timeout is set via configuration');
}

public function ping(): bool
{
$this->send(new Ping([]));

// todo: handle this result better
return true;
}

protected function send(Prototype $message): self
{
if($this->configuration->reconnect || ($this->socket ?? null) === null) {
$this->connect();
}

$line = $message->render() . "\r\n";

try {
$this->socket->write($line);
} catch (ClosedException) {
if($this->configuration->reconnect || ($this->socket ?? null) === null) {
$this->connect();
return $this->send($message);
}
throw new \LogicException('Socket read timeout');
}

return $this;
}

public function connect(): Client
{
if (isset($this->socket) && !$this->socket->isClosed()) {
return $this;
}

$config = $this->configuration;
$dsn = "$config->host:$config->port";

try {
$context = (new ConnectContext())->withConnectTimeout($config->timeout);
$tlsContext = null;
if ($config->tlsKeyFile || $config->tlsCertFile) {
$tlsContext ??= new ClientTlsContext();
$tlsContext = $tlsContext->withoutPeerVerification()
->withCertificate(new Certificate($config->tlsCertFile, $config->tlsKeyFile));

$this->logger->debug(
'tls connection params',
['certFile' => is_readable($config->tlsCertFile), 'key' => is_readable($config->tlsKeyFile)]
);
}
if ($config->tlsCaFile) {
$tlsContext ??= new ClientTlsContext();
$tlsContext = $tlsContext
->withCaFile($config->tlsCaFile);
}
if ($tlsContext) {
$context = $context->withTlsContext($tlsContext->withMinimumVersion(ClientTlsContext::TLSv1_2));
}
$this->socket = new Socket(
socketConnector()->connect($dsn, $context),
logger: $this->logger ?? new NullLogger(),
idleTimeout: $config->pingInterval
);
} catch (\Throwable $exception) {
// todo: handle exception
throw $exception;
}

$this->info = $info = $this->process($config->timeout);
assert($info instanceof Info);

$this->connect = $connect = new Connect($config->getOptions());
if ($this->name) {
$connect->name = $this->name;
}
if (isset($info->nonce) && $this->authenticator) {
$connect->sig = $this->authenticator->sign($info->nonce);
$connect->nkey = $this->authenticator->getPublicKey();
}

$this->send($connect);

return $this;
}

public function process(null|int|float $timeout = 0, bool $reply = true, bool $async = false): Info|null
{
if ($this->socket->isAsync()) {
return null;
}
$this->lastDataReadFailureAt = null;
$message = $this->socket->read($timeout, $reply);
if ($message === null) {
return null;
}

return $this->onMessage($message, $reply);
}

protected function onMessage(Prototype $message, bool $reply = true, bool $async = false): Info|null
{
switch ($message::class) {
case Info::class:
if (($message->tls_verify ?? false) || ($message->tls_required ?? false)) {
$this->socket->enableTls();
}
return $message;
case Msg::class:
assert($message instanceof Msg);
$handler = $this->handlers[$message->sid] ?? null;
if (!$handler) {
if ($this->skipInvalidMessages) {
return null;
}
throw new \LogicException('No handler for ' . $message->sid);
}

if ($async) {
EventLoop::queue(function () use ($handler, $message, $reply) {
$result = $handler($message->payload, $message->replyTo);
if ($reply && $message->replyTo) {
$this->publish($message->replyTo, $result);
}
});
} else {
$result = $handler($message->payload, $message->replyTo);
if ($reply && $message->replyTo) {
$this->publish($message->replyTo, $result);
}
}
break;
}

return null;
}

public function background(bool $enableAutoReply, int $concurrency = 10): \Closure
{
if($this->configuration->reconnect || ($this->socket ?? null) === null) {
$this->connect();
}
$this->socket->switchToAsync(
$concurrency,
fn (Prototype|null $message) => $message && $this->onMessage($message, $enableAutoReply, false)
);
return $this->socket->switchToSync(...);
}

public function dispatch(string $name, mixed $payload, ?float $timeout = null)
{
$timeoutCancellation = null;
if ($timeout !== null) {
$timeoutCancellation = new TimeoutCancellation($timeout);
}

$writeBarrier = new Barrier(1);
$value = null;

$this->request($name, $payload, function ($result) use ($writeBarrier, &$value) {
$value = $result;
$writeBarrier->arrive();
});

$writeBarrier->await($timeoutCancellation);

return $value;
}
}
44 changes: 44 additions & 0 deletions src/Async/Parser.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Basis\Nats\Async;

use Amp\Pipeline\Queue;

class Parser extends \Amp\Parser\Parser
{
private const CRLF = "\r\n";

public function __construct(Queue $queue)
{
parent::__construct(self::parser($queue));
}

private static function parser(Queue $queue): \Generator
{
while(true) {
try {
$line = yield self::CRLF;

if(str_starts_with($line, 'MSG')) {
$payload = yield self::CRLF;
$queue->push([$line, $payload]);
continue;
}

if(str_starts_with($line, 'HMSG')) {
// headers contain CRLF, but are deliminated by CRLFCRLF
$headers = yield self::CRLF.self::CRLF;
$payload = yield self::CRLF;
$queue->push([$line, "$headers\r\n\r\n$payload"]);
}

$queue->push($line);
} catch(\Throwable $exception) {
// todo: handle exception?
throw $exception;
}
}
}
}
Loading