Skip to content

Commit

Permalink
Implemented NATS Microservices
Browse files Browse the repository at this point in the history
[+] Updated README.md to include example of microservice implementation.
[+] Updated Client.php to include the service() function
[+] Implemented the required standards outlined by NATS Micro to support all "nats micro" cli calls.
  • Loading branch information
EmilJimenez21 committed Sep 15, 2024
1 parent 206d522 commit 3bb277b
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 0 deletions.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,46 @@ $stream->put('mailer.bye', $payload);

```

## Microservices
The services feature provides a simple way to create microservices that leverage NATS.

In the example below, you will see an example of creating an index function for the posts microservice. The request can
be accessed under "v1.posts.index". You can add multiple endpoints to support your service.
```php
// Define a service
$service = $client->service('PostsService', 'This service is responsible for handling all things post related.', '1.0');

// Create the version group
$version = $service->addGroup('v1');

// Create the service group
$posts = $version->addGroup('posts')

// Create the endpoint handler
class IndexPosts implements \Basis\Nats\Service\ServiceHandler {

public function handle(\Basis\Nats\Message\Payload $payload): array
{
// Your application logic
return [
'hello' => 'world'
];
}
}

// Create an endpoint
$posts->addEndpoint(
'index',
new IndexPosts()
);

// Run the service
while (true) {
$client->process();
}
```


## Key Value Storage
```php
$bucket = $client->getApi()->getBucket('bucket_name');
Expand Down
10 changes: 10 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Basis\Nats\Message\Publish;
use Basis\Nats\Message\Subscribe;
use Basis\Nats\Message\Unsubscribe;
use Basis\Nats\Service\Service;
use Closure;
use Exception;
use LogicException;
Expand All @@ -24,6 +25,8 @@ class Client
private array $handlers = [];
private array $subscriptions = [];

private array $services = [];

private bool $skipInvalidMessages = false;

public function __construct(
Expand Down Expand Up @@ -260,4 +263,11 @@ public function disconnect(): self

return $this;
}

public function service(string $name, string $description, string $version): Service
{
$this->services[$name] = new Service($this, $name, $description, $version);

return $this->services[$name];
}
}
10 changes: 10 additions & 0 deletions src/Service/EndpointHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Basis\Nats\Service;

use Basis\Nats\Message\Payload;

interface EndpointHandler
{
public function handle(Payload $payload): array;
}
206 changes: 206 additions & 0 deletions src/Service/Service.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
<?php

namespace Basis\Nats\Service;

use Basis\Nats\Client;
use Basis\Nats\Message\Payload;

class Service
{
public Client $client;

private string $id;

private string $name;

private string $description = '';

private string $version;

private string $started;

private array $endpoints = [];

private array $groups = [];

private array $subscriptions = [];

public function __construct(
Client $client,
string $name,
string $description = 'Default Description',
string $version = '0.0.1'
) {
$this->client = $client;
$this->id = $this->generateId();
$this->name = $name;
$this->description = $description;
$this->version = $version;
$this->started = date("Y-m-d\TH:i:s.v\Z");

// Register the service verbs to listen for
$this->registerVerbs();
}

private function generateId(): string {
$characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';

$charactersLength = strlen($characters);

$randomString = "";

for ($i = 0; $i < 22; $i++) {
$randomString .= $characters[rand(0, $charactersLength - 1)];
}

return $randomString;
}

private function ping(): array
{
return [
'type' => 'io.nats.micro.v1.ping_response',
'name' => $this->name,
'id' => $this->id,
'version' => $this->version,
];
}

private function info(): array
{
return [
'type' => 'io.nats.micro.v1.info_response',
'name' => $this->name,
'id' => $this->id,
'version' => $this->version,
'description' => $this->description,
'endpoints' => array_reduce($this->endpoints, function ($carry, $endpoint) {
$carry[] = [
'name' => $endpoint->getName(),
'subject' => $endpoint->getSubject(),
'queue_group' => $endpoint->getQueueGroup(),
];

return $carry;
}, []),
];
}

private function stats(): array
{
return [
'type' => 'io.nats.micro.v1.stats_response',
'name' => $this->name,
'id' => $this->id,
'version' => $this->version,
'endpoints' => array_reduce($this->endpoints, function ($carry, ServiceEndpoint $endpoint) {
$carry[] = [
'name' => $endpoint->getName(),
'subject' => $endpoint->getSubject(),
'queue_group' => $endpoint->getQueueGroup(),
'num_requests' => $endpoint->getNumRequests(),
'num_errors' => $endpoint->getNumErrors(),
'last_error' => $endpoint->getLastError(),
'processing_time' => $endpoint->getProcessingTime(),
'average_processing_time' => $endpoint->getAverageProcessingTime(),
];

return $carry;
}, []),
'started' => $this->started,
];
}

public function addGroup(string $name): ServiceGroup
{
$this->groups[$name] = new ServiceGroup($this, $name);

return $this->groups[$name];
}

public function addEndpoint(
string $name,
EndpointHandler $endpointHandler,
array $options = []
): void {
$subject = $name;
$queue_group = 'q';

if (array_key_exists('subject', $options)) {
$subject = $options['subject'];
}

if (array_key_exists('queue_group', $options)) {
$queue_group = $options['queue_group'];
}

$this->endpoints[$name] = new ServiceEndpoint(
$this,
$name,
$subject,
$endpointHandler,
$queue_group
);
}

public function reset(): void
{
array_map(function (ServiceEndpoint $endpoint) {
$endpoint->resetStats();
}, $this->endpoints);
}

private function registerVerbs(): void
{
$verbs = [
'PING' => function (Payload $payload) {
return $this->ping();
},
'INFO' => function (Payload $payload) {
return $this->info();
},
'STATS' => function (Payload $payload) {
return $this->stats();
},
];

foreach ($verbs as $verb => $handler) {
// Add the all handler
$this->addInternalHandler($verb, '', '', "$verb-all", $handler);

// Add the kind handler
$this->addInternalHandler($verb, $this->name, '', "$verb-kind", $handler);

// Add the service id handler
$this->addInternalHandler($verb, $this->name, $this->id, $verb, $handler);
}
}

private function addInternalHandler(
string $verb,
string $kind,
string $id,
string $name,
callable $handler
): void {
$subject = $this->controlSubject($verb, $kind, $id);

$this->subscriptions[$name] = $this->client->subscribe(
$subject,
$handler
);
}

private function controlSubject(string $verb, string $name, string $id): string
{
if ($name == '' && $id == '') {
return "\$SRV.$verb";
}

if ($id == '') {
return "\$SRV.$verb.$name";
}

return "\$SRV.$verb.$name.$id";
}
}
98 changes: 98 additions & 0 deletions src/Service/ServiceEndpoint.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?php

namespace Basis\Nats\Service;

use Basis\Nats\Client;
use Basis\Nats\Message\Payload;
use Basis\Nats\Queue;

class ServiceEndpoint
{
private int $num_requests = 1;

private int $num_errors = 0;

private string $last_error = '';

private float $processing_time = 0;

private Client|Queue $subscription;

public function __construct(
private readonly Service $service,
private readonly string $name,
private readonly string $subject,
private readonly EndpointHandler $endpointHandler,
private readonly string $queue_group = 'q'
)
{
$this->subscription = $this->service->client->subscribeQueue(
$this->subject,
$this->queue_group,
function (Payload $message) {
// Start calculating the time
$start = microtime(true);

// Update the endpoint metrics
$this->num_requests += 1;

// Run the handler
$response = $this->endpointHandler->handle($message);

// Add to the total processing time
$this->processing_time += microtime(true) - $start;

// Return the array
return $response;
}
);
}

public function getName(): string
{
return $this->name;
}

public function getSubject(): string
{
return $this->subject;
}

public function getQueueGroup(): string
{
return $this->queue_group;
}

public function getAverageProcessingTime(): float
{
return round($this->getProcessingTime() / $this->getNumRequests());
}

public function getLastError(): string
{
return $this->last_error;
}

public function getNumErrors(): int
{
return $this->num_errors;
}

public function getNumRequests(): int
{
return $this->num_requests;
}

public function getProcessingTime(): float
{
return $this->processing_time * 1e9;
}

public function resetStats(): void
{
$this->num_requests = 1;
$this->num_errors = 0;
$this->processing_time = 0;
$this->last_error = '';
}
}
Loading

0 comments on commit 3bb277b

Please sign in to comment.