Skip to content

Commit

Permalink
Add actor reentrancy support & reminder partitioning (dapr#112)
Browse files Browse the repository at this point in the history
* Update actor configuration

* Send token in requests

* Add unit tests to test config

* resolves dapr#97 too

* Add that to the test too
  • Loading branch information
withinboredom authored Jul 25, 2021
1 parent ea43e72 commit bf50ad8
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 29 deletions.
53 changes: 30 additions & 23 deletions src/lib/Actors/ActorConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public function __construct(
protected DateInterval|null $idle_timeout = null,
protected DateInterval|null $scan_interval = null,
protected DateInterval|null $drain_timeout = null,
protected bool|null $drain_enabled = null
protected bool|null $drain_enabled = null,
protected ReentrantConfig|null $reentrantConfig = null,
protected int|null $remindersStoragePartitions = null
) {
}

Expand All @@ -49,7 +51,9 @@ public function __construct(
*/
public function get_actor_type_from_dapr_type(string $dapr_type): string|null
{
if($this->actor_name_to_type[$dapr_type] ?? false) return $this->actor_name_to_type[$dapr_type];
if ($this->actor_name_to_type[$dapr_type] ?? false) {
return $this->actor_name_to_type[$dapr_type];
}

$actors = array_combine($this->get_supported_actors(), $this->actor_name_to_type);

Expand Down Expand Up @@ -106,31 +110,34 @@ public function drain_enabled(): bool|null
}

#[ArrayShape([
'entities' => "",
'drainRebalancedActors' => "",
'entities' => "",
'drainRebalancedActors' => "",
'drainOngoingCallTimeout' => "mixed",
'actorScanInterval' => "mixed",
'actorIdleTimeout' => "mixed",
'actorScanInterval' => "mixed",
'actorIdleTimeout' => "mixed",
])] public function serialize(
mixed $value,
ISerializer $serializer
): array {
$return = [
'entities' => $value->get_supported_actors(),
];
if ($a = $value->get_idle_timeout()) {
$return['actorIdleTimeout'] = Formats::normalize_interval($a);
}
if ($a = $value->get_scan_interval()) {
$return['actorScanInterval'] = Formats::normalize_interval($a);
}
if ($a = $value->get_drain_timeout()) {
$return['drainOngoingCallTimeout'] = Formats::normalize_interval($a);
}
if ($a = $value->drain_enabled()) {
$return['drainRebalancedActors'] = $a;
}

return $return;
return array_merge(
[
'entities' => $this->get_supported_actors(),
],
$this->idle_timeout ? ['actorIdleTimeout' => Formats::normalize_interval($this->idle_timeout)] : [],
$this->scan_interval ? ['actorScanInterval' => Formats::normalize_interval($this->scan_interval)] : [],
$this->drain_timeout ? [
'drainOngoingCallTimeout' => Formats::normalize_interval(
$this->drain_timeout
)
] : [],
$this->drain_enabled ? ['drainRebalancedActors' => $this->drain_enabled] : [],
$this->reentrantConfig ? [
'reentrancy' => array_merge(
['enabled' => true],
$this->reentrantConfig->max_stack_depth ? ['maxStackDepth' => $this->reentrantConfig->max_stack_depth] : []
)
] : [],
$this->remindersStoragePartitions === null ? [] : ['remindersStoragePartitions' => $this->remindersStoragePartitions],
);
}
}
14 changes: 14 additions & 0 deletions src/lib/Actors/ReentrantConfig.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Dapr\Actors;

/**
* Class ReentrantConfig
* @package Dapr\Actors
*/
class ReentrantConfig
{
public function __construct(public int|null $max_stack_depth = null)
{
}
}
5 changes: 5 additions & 0 deletions src/lib/Client/DaprHttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Dapr\Client;

use Dapr\Deserialization\IDeserializer;
use Dapr\Middleware\Defaults\ActorToken;
use Dapr\Serialization\ISerializer;
use GuzzleHttp\Client;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -47,6 +48,10 @@ public function __construct(
$options['headers']['dapr-api-token'] = $this->getDaprToken();
}

if (!empty(ActorToken::$token)) {
$options['headers']['Dapr-Reentrancy-Id'] = &ActorToken::$token;
}

$this->httpClient = new Client($options);
}

Expand Down
38 changes: 38 additions & 0 deletions src/lib/Middleware/Defaults/ActorToken.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Dapr\Middleware\Defaults;

use Dapr\Middleware\IRequestMiddleware;
use Dapr\Middleware\IResponseMiddleware;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;

/**
* Class ActorToken
* @package Dapr\Middleware\Defaults
*/
class ActorToken implements IRequestMiddleware, IResponseMiddleware
{
// this is intentionally left undefined in order to throw if the client is created before detecting a reentrant token
public static array $token;

public function request(RequestInterface $request): RequestInterface
{
if ($request->hasHeader('Dapr-Reentrancy-Id')) {
self::$token = $request->getHeader('Dapr-Reentrancy-Id');
} else {
self::$token = [];
}

return $request;
}

public function response(ResponseInterface $response): ResponseInterface
{
if (!empty(self::$token)) {
return $response->withHeader('Dapr-Reentrancy-Id', self::$token);
}

return $response;
}
}
33 changes: 27 additions & 6 deletions tests/ActorConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use Dapr\Serialization\ISerializer;
use Fixtures\ITestActor;

require_once __DIR__.'/DaprTests.php';
require_once __DIR__ . '/DaprTests.php';

/**
* Class ActorConfigTest
Expand All @@ -14,7 +14,7 @@ class ActorConfigTest extends DaprTests
public function testSerialization()
{
$serializer = $this->container->get(ISerializer::class);
$config = new ActorConfig(
$config = new ActorConfig(
[ITestActor::class],
new DateInterval('PT1S'),
new DateInterval('PT2S'),
Expand All @@ -23,11 +23,32 @@ public function testSerialization()
);
$this->assertSame(
[
'entities' => ['TestActor'],
'actorIdleTimeout' => '0h0m1s0us',
'actorScanInterval' => '0h0m2s0us',
'entities' => ['TestActor'],
'actorIdleTimeout' => '0h0m1s0us',
'actorScanInterval' => '0h0m2s0us',
'drainOngoingCallTimeout' => '0h0m3s0us',
'drainRebalancedActors' => true,
'drainRebalancedActors' => true,
],
$serializer->as_array($config)
);
}

public function testReentrancy()
{
$serializer = $this->container->get(ISerializer::class);
$config = new ActorConfig(
[ITestActor::class],
reentrantConfig: new \Dapr\Actors\ReentrantConfig(12),
remindersStoragePartitions: 1
);
$this->assertSame(
[
'entities' => ['TestActor'],
'reentrancy' => [
'enabled' => true,
'maxStackDepth' => 12
],
'remindersStoragePartitions' => 1
],
$serializer->as_array($config)
);
Expand Down

0 comments on commit bf50ad8

Please sign in to comment.