Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Servers connection settings #22

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,29 @@ $client = new Client($configuration);
$client->ping(); // true
```

As an alternative to the `host` and `port` configuration options you can specify an array of host/port values in the `server` option. When the array contains more than one entry the client will attempt to connect to each entry in turn. If it fails to connect it will go to the next entry in the array.

By default, the order of the host:port entries is randomly shuffled on startup. In situations where multiple clients have the same connection configuration this shuffle will balance the clients across multiple servers. If you want to manage client connections directly while also availing of the redundancy of multiple host:port configurations you can disable the shuffle by setting the `serversRandomize` option to false.

```php
use Basis\Nats\Client;
use Basis\Nats\Configuration;


$configuration = new Configuration([
'servers' => ['localhost:4222', 'localhost:4221', 'localhost:4220'],
'serversRandomize' => false
]);

// default delay mode is constant - first retry be in 1ms, second in 1ms, third in 1ms
$configuration->setDelay(0.001);


$client = new Client($configuration);
$client->ping(); // true

```

## Publish Subscribe

```php
Expand Down Expand Up @@ -296,19 +319,24 @@ model name : Intel(R) Core(TM) i5-4670K CPU @ 3.40GHz

The following is the list of configuration options and default values.

| Option | Default | Description |
|----------------|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `inboxPrefix` | `"_INBOX"` | Sets de prefix for automatically created inboxes |
| `jwt` | | Token for [JWT Authentication](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/jwt). Alternatively you can use [CredentialsParser](#using-nkeys-with-jwt) |
| `nkey` | | Ed25519 based public key signature used for [NKEY Authentication](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/nkey_auth). |
| `pass` | | Sets the password for a connection. |
| `pedantic` | `false` | Turns on strict subject format checks. |
| `pingInterval` | `2` | Number of seconds between client-sent pings. |
| `port` | `4222` | Port to connect to (only used if `servers` is not specified). |
| `timeout` | 1 | Number of seconds the client will wait for a connection to be established. |
| `token` | | Sets a authorization token for a connection. |
| `tlsKeyFile` | | TLS 1.2 Client key file path. |
| `tlsCertFile` | | TLS 1.2 Client certificate file path. |
| `tlsCaFile` | | TLS 1.2 CA certificate filepath. |
| `user` | | Sets the username for a connection. |
| `verbose` | `false` | Turns on `+OK` protocol acknowledgements. |
| Option | Default | Description |
|------------------------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `ignoreClusterUpdates` | `false` | If true the client will ignore `connect_urls` and `ldm` in INFO messages. |
| `inboxPrefix` | `"_INBOX"` | Sets de prefix for automatically created inboxes |
| `jwt` | | Token for [JWT Authentication](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/jwt). Alternatively you can use [CredentialsParser](#using-nkeys-with-jwt) |
| `maxReconnectAttempts` | 10 | Sets the maximum number of reconnect attempts per server. The value of -1 specifies no limit. |
| `nkey` | | Ed25519 based public key signature used for [NKEY Authentication](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/nkey_auth). |
| `pass` | | Sets the password for a connection. |
| `pedantic` | `false` | Turns on strict subject format checks. |
| `pingInterval` | `2` | Number of seconds between client-sent pings. |
| `port` | `4222` | Port to connect to (only used if `servers` is not specified). |
| `reconnectTimeWait` | `0.2` | If disconnected, the client will wait the specified number of seconds between reconnect attempts on a per server basis. |
| `servers` | | Array of host:port values for servers |
| `serversRandomize` | `true` | Toggle on/off the server array shuffle. |
| `timeout` | 1 | Number of seconds the client will wait for a connection to be established. |
| `token` | | Sets a authorization token for a connection. |
| `tlsKeyFile` | | TLS 1.2 Client key file path. |
| `tlsCertFile` | | TLS 1.2 Client certificate file path. |
| `tlsCaFile` | | TLS 1.2 CA certificate filepath. |
| `user` | | Sets the username for a connection. |
| `verbose` | `false` | Turns on `+OK` protocol acknowledgements. |
41 changes: 34 additions & 7 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Basis\Nats;

use Basis\Nats\Connection\ServerPool;
use Basis\Nats\Message\Connect;
use Basis\Nats\Message\Factory;
use Basis\Nats\Message\Info;
Expand All @@ -28,6 +29,7 @@ class Client
public readonly Api $api;

private readonly ?Authenticator $authenticator;
private readonly ServerPool $serversManager;

private $socket;
private $context;
Expand All @@ -38,14 +40,20 @@ class Client
private array $subscriptions = [];

private bool $skipInvalidMessages = false;
private bool $tlsEnabled = false;
private int $counter = 1;

/**
* @throws Exception
*/
public function __construct(
public readonly Configuration $configuration = new Configuration(),
public ?LoggerInterface $logger = null,
) {
$this->api = new Api($this);

$this->authenticator = Authenticator::create($this->configuration);
$this->serversManager = new ServerPool($this->configuration);
}

public function api($command, array $args = [], ?Closure $callback = null): ?object
Expand Down Expand Up @@ -82,12 +90,26 @@ public function connect(): self

$config = $this->configuration;

$dsn = "$config->host:$config->port";
if ($this->serversManager->hasServers()) {
if ($this->serversManager->allExceededReconnectionAttempts()) {
throw new Exception("Connection error: maximum reconnect attempts exceeded for all servers.");
}
$dsn = $this->serversManager->nextServer()->getConnectionString();
} else {
$dsn = "$config->host:$config->port";
}

$flags = STREAM_CLIENT_CONNECT;
$this->context = stream_context_create();
$this->socket = @stream_socket_client($dsn, $errorCode, $errorMessage, $config->timeout, $flags, $this->context);

if ($errorCode || !$this->socket) {
if ($config->reconnect && $this->serversManager->hasServers()) {
$this->logger?->error("Error connecting to: (" . $dsn . ")");
$this->socket = null;
$this->tlsEnabled = false;
return $this->connect(); /** @phan-suppress-current-line PhanPossiblyInfiniteRecursionSameParams */
}
throw new Exception($errorMessage ?: "Connection error", $errorCode);
}

Expand Down Expand Up @@ -336,11 +358,15 @@ public function process(null|int|float $timeout = 0)
*/
private function handleInfoMessage(Info $info): void
{
if (isset($info->tls_verify) && $info->tls_verify) {
$this->enableTls(true);
} elseif (isset($info->tls_required) && $info->tls_required) {
$this->enableTls(false);
if (!$this->tlsEnabled) {
if (isset($info->tls_verify) && $info->tls_verify) {
$this->enableTls(true);
} elseif (isset($info->tls_required) && $info->tls_required) {
$this->enableTls(false);
}
}

$this->serversManager->processInfoMessage($info);
}


Expand Down Expand Up @@ -380,12 +406,12 @@ private function enableTls(bool $requireClientCert): void
)) {
throw new Exception('Failed to connect: Error enabling TLS');
}
$this->tlsEnabled = true;
}


private function doSubscribe(string $subject, ?string $group, Closure $handler): self
{
$sid = bin2hex(random_bytes(4));
$sid = "" . $this->counter++;

$this->handlers[$sid] = $handler;

Expand Down Expand Up @@ -415,6 +441,7 @@ private function processSocketException(Throwable $e): self
while (true) {
try {
$this->socket = null;
$this->tlsEnabled = false;
$this->connect();
} catch (Throwable $e) {
$this->configuration->delay($iteration++);
Expand Down
12 changes: 12 additions & 0 deletions src/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class Configuration
public readonly ?string $tlsCertFile;
public readonly ?string $tlsCaFile;

public readonly array $servers;
public readonly bool $serversRandomize;
public readonly bool $ignoreClusterUpdates;
public readonly int $maxReconnectAttempts;
public readonly float $reconnectTimeWait;

public const DELAY_CONSTANT = 'constant';
public const DELAY_LINEAR = 'linear';
public const DELAY_EXPONENTIAL = 'exponential';
Expand All @@ -56,6 +62,11 @@ class Configuration
'tlsKeyFile' => null,
'tlsCertFile' => null,
'tlsCaFile' => null,
'servers' => [],
'serversRandomize' => true,
'ignoreClusterUpdates' => false,
'maxReconnectAttempts' => 10,
'reconnectTimeWait' => 0.200
];

/**
Expand All @@ -80,6 +91,7 @@ public function getOptions(): array
'verbose' => $this->verbose,
'version' => $this->version,
'headers' => true,
'protocol' => 1,
];

if ($this->user !== null) {
Expand Down
82 changes: 82 additions & 0 deletions src/Connection/Server.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

namespace Basis\Nats\Connection;

use Exception;

class Server
{
private readonly string $host;
public readonly int $port;
public int $reconnectAttempts;
public bool $lameDuck;
public bool $gossiped;

/**
* @throws Exception
*/
public function __construct(string $hostport, bool $gossiped = false)
{
if (empty($hostport) || !str_contains($hostport, ':') || 2 !== sizeof(explode(":", $hostport))) {
throw new Exception("Invalid server configuration: " .$hostport);
}

$parts = explode(":", $hostport);

if (empty($parts[0]) || !is_numeric($parts[1])) {
throw new Exception("Invalid server configuration: " .$hostport);
}

$this->host = $parts[0];
$this->port = intval($parts[1]);
$this->gossiped = $gossiped;
$this->lameDuck = false;
$this->reconnectAttempts = -1;
}

/**
* @return string
*/
public function getConnectionString(): string
{
return $this->host . ":" . $this->port;
}

/**
*/
public function incrementReconnectAttempts(): void
{
$this->reconnectAttempts = $this->reconnectAttempts + 1;
}

/**
* @return int
*/
public function getReconnectAttempts(): int
{
return $this->reconnectAttempts;
}

/**
* @return int
*/
public function resetReconnectAttempts(): int
{
return $this->reconnectAttempts = 0;
}

/**
*/
public function setLameDuck(bool $lameDuck): void
{
$this->lameDuck = $lameDuck;
}

/**
* @return bool
*/
public function isLameDuck(): bool
{
return $this->lameDuck;
}
}
Loading