From e6d9588065a0c6f6a90a56c4254e778198bea795 Mon Sep 17 00:00:00 2001 From: Anton Tsitou Date: Mon, 27 May 2024 10:11:17 -0400 Subject: [PATCH] Fix decoding Payloads in case when an external Temporal SDK returns empty payload that doesn't contain even NULL value. (#442) Co-authored-by: Anton Titov --- psalm-baseline.xml | 10 --- src/DataConverter/EncodedValues.php | 26 ++++-- src/DataConverter/ValuesInterface.php | 5 +- src/Internal/Workflow/WorkflowContext.php | 4 +- tests/Fixtures/WorkerMock.php | 21 +++++ .../Fixtures/src/Activity/SimpleActivity.php | 9 ++ .../src/Workflow/VoidActivityStubWorkflow.php | 33 +++++++ tests/Functional/WorkflowTestCase.php | 89 +++++++++++++------ 8 files changed, 146 insertions(+), 51 deletions(-) create mode 100644 tests/Fixtures/src/Workflow/VoidActivityStubWorkflow.php diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 9f5d80b15..045712fb1 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -244,9 +244,6 @@ - - - @@ -1200,9 +1197,6 @@ - - - request($request), $returnType)]]> @@ -1319,9 +1313,6 @@ - - - @@ -1348,7 +1339,6 @@ - EncodedValues::decodePromise(]]> diff --git a/src/DataConverter/EncodedValues.php b/src/DataConverter/EncodedValues.php index 0e3805949..0569c29bf 100644 --- a/src/DataConverter/EncodedValues.php +++ b/src/DataConverter/EncodedValues.php @@ -69,9 +69,6 @@ public static function fromPayloads(Payloads $payloads, DataConverterInterface $ return static::fromPayloadCollection($payloads->getPayloads(), $dataConverter); } - /** - * @return Payloads - */ public function toPayloads(): Payloads { return new Payloads(['payloads' => $this->toProtoCollection()]); @@ -102,7 +99,7 @@ public static function sliceValues( * Decode promise response upon returning it to the domain layer. * * @param PromiseInterface $promise - * @param Type|string|null $type + * @param string|\ReflectionClass|\ReflectionType|Type|null $type * * @return PromiseInterface */ @@ -119,17 +116,18 @@ function ($value) use ($type) { ); } - /** - * @param Type|string|null $type - * - * @return mixed - */ public function getValue(int|string $index, $type = null): mixed { if (\is_array($this->values) && \array_key_exists($index, $this->values)) { return $this->values[$index]; } + // External SDKs might return an empty array with metadata, alias to null + // Most likely this is a void type + if ($index === 0 && $this->count() === 0 && $this->isVoidType($type)) { + return null; + } + if ($this->converter === null) { throw new \LogicException('DataConverter is not set'); } @@ -211,6 +209,16 @@ public function isEmpty(): bool return $this->count() === 0; } + private function isVoidType(mixed $type = null): bool + { + return match (true) { + $type === null => true, + $type instanceof Type => \in_array($type->getName(), [Type::TYPE_VOID, Type::TYPE_NULL], true), + $type instanceof \ReflectionNamedType => $type->getName() === Type::TYPE_VOID, + default => false, + }; + } + /** * Returns collection of {@see Payloads}. * diff --git a/src/DataConverter/ValuesInterface.php b/src/DataConverter/ValuesInterface.php index 05599a785..e08f9e1cc 100644 --- a/src/DataConverter/ValuesInterface.php +++ b/src/DataConverter/ValuesInterface.php @@ -35,8 +35,11 @@ public function setDataConverter(DataConverterInterface $converter); /** * Get value by it's index. * + * Returns {@see null} if there are no values and $type has null value + * like {@see null}, {@see Type::TYPE_VOID} or {@see Type::TYPE_NULL}. + * * @param int $index - * @param Type|TypeEnum|mixed $type + * @param string|\ReflectionClass|\ReflectionType|Type|null $type * @return mixed */ public function getValue(int $index, $type); diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 16b141101..22ae83e0f 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -228,7 +228,7 @@ public function getVersion(string $changeId, int $minSupported, int $maxSupporte public function sideEffect(callable $context): PromiseInterface { $value = null; - $closure = \Closure::fromCallable($context); + $closure = $context(...); try { if (!$this->isReplaying()) { @@ -249,7 +249,7 @@ public function sideEffect(callable $context): PromiseInterface } catch (\Throwable) { } - $last = fn() => EncodedValues::decodePromise( + $last = fn(): PromiseInterface => EncodedValues::decodePromise( $this->request(new SideEffect(EncodedValues::fromValues([$value]))), $returnType, ); diff --git a/tests/Fixtures/WorkerMock.php b/tests/Fixtures/WorkerMock.php index 6d7054949..b57784fc8 100644 --- a/tests/Fixtures/WorkerMock.php +++ b/tests/Fixtures/WorkerMock.php @@ -11,6 +11,9 @@ namespace Temporal\Tests\Fixtures; +use Temporal\Api\Failure\V1\Failure; +use Temporal\DataConverter\DataConverter; +use Temporal\Exception\Failure\FailureConverter; use Temporal\Tests\TestCase; use Temporal\Worker\Transport\HostConnectionInterface; use Temporal\Worker\WorkerFactoryInterface; @@ -122,6 +125,24 @@ public function send(string $frame): void dump($frame); } + if ($pair[0] !== $frame) { + // Parse error if exists + $json = \json_decode($frame, true); + if (\is_array($json)) { + foreach ($json as $part) { + if (isset($part['failure'])) { + $failure = new Failure(); + try { + $failure->mergeFromString(\base64_decode($part['failure'])); + } catch (\Throwable) { + continue; + } + throw FailureConverter::mapFailureToException($failure, DataConverter::createDefault()); + } + } + } + } + $this->testCase->assertEquals($pair[0], $frame); $this->indexIn++; diff --git a/tests/Fixtures/src/Activity/SimpleActivity.php b/tests/Fixtures/src/Activity/SimpleActivity.php index 4471a28f0..8d6853c63 100644 --- a/tests/Fixtures/src/Activity/SimpleActivity.php +++ b/tests/Fixtures/src/Activity/SimpleActivity.php @@ -11,6 +11,7 @@ namespace Temporal\Tests\Activity; +use React\Promise\PromiseInterface; use Temporal\Activity; use Temporal\Activity\ActivityInterface; use Temporal\Activity\ActivityMethod; @@ -149,4 +150,12 @@ public function namedArguments( 'optionalNullableString' => $optionalNullableString, ]; } + + /** + * @return PromiseInterface + */ + #[ActivityMethod] + public function empty(): void + { + } } diff --git a/tests/Fixtures/src/Workflow/VoidActivityStubWorkflow.php b/tests/Fixtures/src/Workflow/VoidActivityStubWorkflow.php new file mode 100644 index 000000000..4b5796256 --- /dev/null +++ b/tests/Fixtures/src/Workflow/VoidActivityStubWorkflow.php @@ -0,0 +1,33 @@ +withStartToCloseTimeout(5) + ); + + return yield $simple->empty(); + } +} diff --git a/tests/Functional/WorkflowTestCase.php b/tests/Functional/WorkflowTestCase.php index 3b3abf518..0a3354198 100644 --- a/tests/Functional/WorkflowTestCase.php +++ b/tests/Functional/WorkflowTestCase.php @@ -11,6 +11,7 @@ namespace Temporal\Tests\Functional; +use Temporal\Api\Common\V1\Payloads; use Temporal\Common\Uuid; use Temporal\Tests\Fixtures\Splitter; use Temporal\Tests\Fixtures\WorkerMock; @@ -29,105 +30,105 @@ public function setUp(): void $_SERVER['RR_RPC'] = 'tcp://127.0.0.1:6001'; } - public function testSplitter() + public function testSplitter(): void { $splitter = Splitter::create('Test_ExecuteSimpleWorkflow_1.log'); $this->assertNotEmpty($splitter->getQueue()); } - public function testSimpleWorkflow() + public function testSimpleWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteSimpleWorkflow_1.log')->getQueue()); } - public function testTimer() + public function testTimer(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_Timer.log')->getQueue()); } - public function testGetQuery() + public function testGetQuery(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_GetQuery.log')->getQueue()); } - public function testCancelledWithCompensationWorkflow() + public function testCancelledWithCompensationWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_CancelledWithCompensationWorkflow.log')->getQueue()); } - public function testCancelledNestedWorkflow() + public function testCancelledNestedWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_CancelledNestedWorkflow.log')->getQueue()); } - public function testCancelledMidflightWorkflow() + public function testCancelledMidflightWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_CancelledMidflightWorkflow.log')->getQueue()); } - public function testSendSignalBeforeCompletingWorkflow() + public function testSendSignalBeforeCompletingWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_SendSignalBeforeCompletingWorkflow.log')->getQueue()); } - public function testActivityStubWorkflow() + public function testActivityStubWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ActivityStubWorkflow.log')->getQueue()); } - public function testBinaryPayload() + public function testBinaryPayload(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_BinaryPayload.log')->getQueue()); } - public function testContinueAsNew() + public function testContinueAsNew(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ContinueAsNew.log')->getQueue()); } - public function testEmptyWorkflow() + public function testEmptyWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_EmptyWorkflow.log')->getQueue()); } - public function testSideEffectWorkflow() + public function testSideEffectWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_SideEffect.log')->getQueue()); } - public function testExecuteWorkflowWithParallelScopes() + public function testExecuteWorkflowWithParallelScopes(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteWorkflowWithParallelScopes.log')->getQueue()); } - public function testActivity() + public function testActivity(): void { $worker = WorkerMock::createMock(); @@ -137,63 +138,63 @@ public function testActivity() /** * @group skip-ext-protobuf */ - public function testExecuteProtoWorkflow() + public function testExecuteProtoWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteProtoWorkflow.log')->getQueue()); } - public function testExecuteSimpleDTOWorkflow() + public function testExecuteSimpleDTOWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteSimpleDTOWorkflow.log')->getQueue()); } - public function testExecuteSimpleWorkflowWithSequenceInBatch() + public function testExecuteSimpleWorkflowWithSequenceInBatch(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteSimpleWorkflowWithSequenceInBatch.log')->getQueue()); } - public function testPromiseChaining() + public function testPromiseChaining(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_PromiseChaining.log')->getQueue()); } - public function testMultipleWorkflowsInSingleWorker() + public function testMultipleWorkflowsInSingleWorker(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_MultipleWorkflowsInSingleWorker.log')->getQueue()); } - public function testSignalChildViaStubWorkflow() + public function testSignalChildViaStubWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_SignalChildViaStubWorkflow.log')->getQueue()); } - public function testExecuteChildStubWorkflow() + public function testExecuteChildStubWorkflow(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteChildStubWorkflow.log')->getQueue()); } - public function testExecuteChildStubWorkflow_02() + public function testExecuteChildStubWorkflow_02(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_ExecuteChildStubWorkflow_02.log')->getQueue()); } - public function testExecuteChildWorkflow() + public function testExecuteChildWorkflow(): void { $worker = WorkerMock::createMock(); @@ -210,28 +211,28 @@ public function testExecuteChildWorkflowNamespaced(): void $worker->run($this, Splitter::create('Test_ExecuteChildWorkflowNamespaced.log')->getQueue()); } - public function testRuntimeSignal() + public function testRuntimeSignal(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_RuntimeSignal.log')->getQueue()); } - public function testSignalStepsAndRuntimeQuery() + public function testSignalStepsAndRuntimeQuery(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_SignalSteps.log')->getQueue()); } - public function testBatchedSignal_WithPauses() + public function testBatchedSignal_WithPauses(): void { $worker = WorkerMock::createMock(); $worker->run($this, Splitter::create('Test_BatchedSignal.log')->getQueue()); } - public function testBatchedSignal_Combined() + public function testBatchedSignal_Combined(): void { $worker = WorkerMock::createMock(); @@ -332,7 +333,7 @@ public function testAwaitWithFewParallelTimeouts_Leaks(): void * That case mustn't leak. * @see \Temporal\Tests\Workflow\AwaitWithSingleTimeoutWorkflow */ - public function testAwaitWithOneTimer_Leaks() + public function testAwaitWithOneTimer_Leaks(): void { $worker = WorkerMock::createMock(); @@ -357,4 +358,34 @@ public function testAwaitWithOneTimer_Leaks() $this->assertSame(0, $after - $before); } + + /** + * Test case when an external Temporal SDK returns empty payload that doesn't contain even NULL value. + * + * In this case {@see \Temporal\DataConverter\EncodedValues::getValue()} should return {@see null} + * if there is no non-nullable expected type. + */ + public function testEmptyPayload(): void + { + $worker = WorkerMock::createMock(); + + $id1 = 9001; + $id2 = 9002; + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + $emptyPayload = (new Payloads()); + $emptyPayloadStr = \base64_encode($emptyPayload->serializeToString()); + + $log = <<run($this, Splitter::createFromString($log)->getQueue()); + } } +