From bf50ad88dc5783b681774ac93f98daee54bf61f5 Mon Sep 17 00:00:00 2001 From: Rob Landers Date: Mon, 26 Jul 2021 00:30:11 +0200 Subject: [PATCH] Add actor reentrancy support & reminder partitioning (#112) * Update actor configuration * Send token in requests * Add unit tests to test config * resolves #97 too * Add that to the test too --- src/lib/Actors/ActorConfig.php | 53 ++++++++++++---------- src/lib/Actors/ReentrantConfig.php | 14 ++++++ src/lib/Client/DaprHttpClient.php | 5 ++ src/lib/Middleware/Defaults/ActorToken.php | 38 ++++++++++++++++ tests/ActorConfigTest.php | 33 +++++++++++--- 5 files changed, 114 insertions(+), 29 deletions(-) create mode 100644 src/lib/Actors/ReentrantConfig.php create mode 100644 src/lib/Middleware/Defaults/ActorToken.php diff --git a/src/lib/Actors/ActorConfig.php b/src/lib/Actors/ActorConfig.php index 4b4e961..29552dd 100644 --- a/src/lib/Actors/ActorConfig.php +++ b/src/lib/Actors/ActorConfig.php @@ -35,7 +35,9 @@ public function __construct( protected DateInterval|null $idle_timeout = null, protected DateInterval|null $scan_interval = null, protected DateInterval|null $drain_timeout = null, - protected bool|null $drain_enabled = null + protected bool|null $drain_enabled = null, + protected ReentrantConfig|null $reentrantConfig = null, + protected int|null $remindersStoragePartitions = null ) { } @@ -49,7 +51,9 @@ public function __construct( */ public function get_actor_type_from_dapr_type(string $dapr_type): string|null { - if($this->actor_name_to_type[$dapr_type] ?? false) return $this->actor_name_to_type[$dapr_type]; + if ($this->actor_name_to_type[$dapr_type] ?? false) { + return $this->actor_name_to_type[$dapr_type]; + } $actors = array_combine($this->get_supported_actors(), $this->actor_name_to_type); @@ -106,31 +110,34 @@ public function drain_enabled(): bool|null } #[ArrayShape([ - 'entities' => "", - 'drainRebalancedActors' => "", + 'entities' => "", + 'drainRebalancedActors' => "", 'drainOngoingCallTimeout' => "mixed", - 'actorScanInterval' => "mixed", - 'actorIdleTimeout' => "mixed", + 'actorScanInterval' => "mixed", + 'actorIdleTimeout' => "mixed", ])] public function serialize( mixed $value, ISerializer $serializer ): array { - $return = [ - 'entities' => $value->get_supported_actors(), - ]; - if ($a = $value->get_idle_timeout()) { - $return['actorIdleTimeout'] = Formats::normalize_interval($a); - } - if ($a = $value->get_scan_interval()) { - $return['actorScanInterval'] = Formats::normalize_interval($a); - } - if ($a = $value->get_drain_timeout()) { - $return['drainOngoingCallTimeout'] = Formats::normalize_interval($a); - } - if ($a = $value->drain_enabled()) { - $return['drainRebalancedActors'] = $a; - } - - return $return; + return array_merge( + [ + 'entities' => $this->get_supported_actors(), + ], + $this->idle_timeout ? ['actorIdleTimeout' => Formats::normalize_interval($this->idle_timeout)] : [], + $this->scan_interval ? ['actorScanInterval' => Formats::normalize_interval($this->scan_interval)] : [], + $this->drain_timeout ? [ + 'drainOngoingCallTimeout' => Formats::normalize_interval( + $this->drain_timeout + ) + ] : [], + $this->drain_enabled ? ['drainRebalancedActors' => $this->drain_enabled] : [], + $this->reentrantConfig ? [ + 'reentrancy' => array_merge( + ['enabled' => true], + $this->reentrantConfig->max_stack_depth ? ['maxStackDepth' => $this->reentrantConfig->max_stack_depth] : [] + ) + ] : [], + $this->remindersStoragePartitions === null ? [] : ['remindersStoragePartitions' => $this->remindersStoragePartitions], + ); } } diff --git a/src/lib/Actors/ReentrantConfig.php b/src/lib/Actors/ReentrantConfig.php new file mode 100644 index 0000000..69a8e67 --- /dev/null +++ b/src/lib/Actors/ReentrantConfig.php @@ -0,0 +1,14 @@ +getDaprToken(); } + if (!empty(ActorToken::$token)) { + $options['headers']['Dapr-Reentrancy-Id'] = &ActorToken::$token; + } + $this->httpClient = new Client($options); } diff --git a/src/lib/Middleware/Defaults/ActorToken.php b/src/lib/Middleware/Defaults/ActorToken.php new file mode 100644 index 0000000..bd64c31 --- /dev/null +++ b/src/lib/Middleware/Defaults/ActorToken.php @@ -0,0 +1,38 @@ +hasHeader('Dapr-Reentrancy-Id')) { + self::$token = $request->getHeader('Dapr-Reentrancy-Id'); + } else { + self::$token = []; + } + + return $request; + } + + public function response(ResponseInterface $response): ResponseInterface + { + if (!empty(self::$token)) { + return $response->withHeader('Dapr-Reentrancy-Id', self::$token); + } + + return $response; + } +} diff --git a/tests/ActorConfigTest.php b/tests/ActorConfigTest.php index 26a32a6..8e6c511 100644 --- a/tests/ActorConfigTest.php +++ b/tests/ActorConfigTest.php @@ -4,7 +4,7 @@ use Dapr\Serialization\ISerializer; use Fixtures\ITestActor; -require_once __DIR__.'/DaprTests.php'; +require_once __DIR__ . '/DaprTests.php'; /** * Class ActorConfigTest @@ -14,7 +14,7 @@ class ActorConfigTest extends DaprTests public function testSerialization() { $serializer = $this->container->get(ISerializer::class); - $config = new ActorConfig( + $config = new ActorConfig( [ITestActor::class], new DateInterval('PT1S'), new DateInterval('PT2S'), @@ -23,11 +23,32 @@ public function testSerialization() ); $this->assertSame( [ - 'entities' => ['TestActor'], - 'actorIdleTimeout' => '0h0m1s0us', - 'actorScanInterval' => '0h0m2s0us', + 'entities' => ['TestActor'], + 'actorIdleTimeout' => '0h0m1s0us', + 'actorScanInterval' => '0h0m2s0us', 'drainOngoingCallTimeout' => '0h0m3s0us', - 'drainRebalancedActors' => true, + 'drainRebalancedActors' => true, + ], + $serializer->as_array($config) + ); + } + + public function testReentrancy() + { + $serializer = $this->container->get(ISerializer::class); + $config = new ActorConfig( + [ITestActor::class], + reentrantConfig: new \Dapr\Actors\ReentrantConfig(12), + remindersStoragePartitions: 1 + ); + $this->assertSame( + [ + 'entities' => ['TestActor'], + 'reentrancy' => [ + 'enabled' => true, + 'maxStackDepth' => 12 + ], + 'remindersStoragePartitions' => 1 ], $serializer->as_array($config) );