diff --git a/src/Activity/ActivityCancellationType.php b/src/Activity/ActivityCancellationType.php index 9c596756..ef459a76 100644 --- a/src/Activity/ActivityCancellationType.php +++ b/src/Activity/ActivityCancellationType.php @@ -63,7 +63,7 @@ public function serialize($value) return false; default: - $error = "Option #${value} is currently not supported"; + $error = "Option #{$value} is currently not supported"; throw new \InvalidArgumentException($error); } } diff --git a/src/Internal/Declaration/WorkflowInstance.php b/src/Internal/Declaration/WorkflowInstance.php index dfc7f800..0b0feb9a 100644 --- a/src/Internal/Declaration/WorkflowInstance.php +++ b/src/Internal/Declaration/WorkflowInstance.php @@ -121,4 +121,9 @@ public function addSignalHandler(string $name, callable $handler): void $this->signalHandlers[$name] = $this->createCallableHandler($handler); $this->signalQueue->attach($name, $this->signalHandlers[$name]); } + + public function clearSignalQueue(): void + { + $this->signalQueue->clear(); + } } diff --git a/src/Internal/Declaration/WorkflowInstance/SignalQueue.php b/src/Internal/Declaration/WorkflowInstance/SignalQueue.php index 5fb9a1f7..0b1bdba0 100644 --- a/src/Internal/Declaration/WorkflowInstance/SignalQueue.php +++ b/src/Internal/Declaration/WorkflowInstance/SignalQueue.php @@ -67,6 +67,11 @@ public function attach(string $signal, callable $consumer): void $this->flush($signal); } + public function clear(): void + { + $this->queue = []; + } + /** * @param string $signal * @psalm-suppress UnusedVariable diff --git a/src/Internal/Declaration/WorkflowInstanceInterface.php b/src/Internal/Declaration/WorkflowInstanceInterface.php index 3cefc378..ec953ed1 100644 --- a/src/Internal/Declaration/WorkflowInstanceInterface.php +++ b/src/Internal/Declaration/WorkflowInstanceInterface.php @@ -41,4 +41,6 @@ public function getSignalHandler(string $name): \Closure; * @param callable $handler */ public function addSignalHandler(string $name, callable $handler): void; + + public function clearSignalQueue(): void; } diff --git a/src/Internal/Workflow/Process/Process.php b/src/Internal/Workflow/Process/Process.php index 64ec6243..9f16fddf 100644 --- a/src/Internal/Workflow/Process/Process.php +++ b/src/Internal/Workflow/Process/Process.php @@ -14,7 +14,6 @@ use JetBrains\PhpStorm\Pure; use Temporal\DataConverter\ValuesInterface; use Temporal\Exception\DestructMemorizedInstanceException; -use Temporal\Exception\InvalidArgumentException; use Temporal\Internal\Declaration\WorkflowInstanceInterface; use Temporal\Internal\ServiceContainer; use Temporal\Internal\Workflow\WorkflowContext; @@ -44,11 +43,7 @@ function (?\Throwable $error): void { } ); - try { - $scope->start($handler); - } catch (InvalidArgumentException $e) { - // invalid signal invocation, destroy the scope with no traces - } + $scope->startSignal($handler); } ); diff --git a/src/Internal/Workflow/Process/Scope.php b/src/Internal/Workflow/Process/Scope.php index cdcffdf4..545ba177 100644 --- a/src/Internal/Workflow/Process/Scope.php +++ b/src/Internal/Workflow/Process/Scope.php @@ -19,6 +19,7 @@ use Temporal\Exception\DestructMemorizedInstanceException; use Temporal\Exception\Failure\CanceledFailure; use Temporal\Exception\Failure\TemporalFailure; +use Temporal\Exception\InvalidArgumentException; use Temporal\Internal\ServiceContainer; use Temporal\Internal\Transport\Request\Cancel; use Temporal\Internal\Workflow\ScopeContext; @@ -167,6 +168,7 @@ public function getContext(): WorkflowContext public function start(callable $handler, ValuesInterface $values = null): void { try { + // Create a coroutine generator $this->coroutine = $this->call($handler, $values ?? EncodedValues::empty()); $this->context->resolveConditions(); } catch (\Throwable $e) { @@ -177,6 +179,17 @@ public function start(callable $handler, ValuesInterface $values = null): void $this->next(); } + /** + * @param callable $handler + */ + public function startSignal(callable $handler): void + { + // Create a coroutine generator + $this->coroutine = $this->callSignalHandler($handler); + $this->context->resolveConditions(); + $this->next(); + } + /** * @param \Generator $generator * @return self @@ -322,16 +335,49 @@ function () use ($cancelID): void { */ protected function call(callable $handler, ValuesInterface $values): \Generator { - $this->makeCurrent(); - $result = $handler($values); + try { + $this->makeCurrent(); + $result = $handler($values); - if ($result instanceof \Generator) { - yield from $result; + if ($result instanceof \Generator) { + yield from $result; + + return $result->getReturn(); + } - return $result->getReturn(); + return $result; + } catch (\Throwable $e) { + $this->onException($e); } + } - return $result; + /** + * Call a Signal method. In this case deserialization errors are skipped. + * + * @param callable $handler + * @return \Generator + */ + protected function callSignalHandler(callable $handler): \Generator + { + try { + $this->makeCurrent(); + try { + $result = $handler(); + } catch (InvalidArgumentException) { + // Skip deserialization errors + return null; + } + + if ($result instanceof \Generator) { + yield from $result; + + return $result->getReturn(); + } + + return $result; + } catch (\Throwable $e) { + $this->onException($e); + } } /** diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 2cc3ab8c..212ad412 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -228,6 +228,10 @@ public function isReplaying(): bool */ public function complete(array $result = null, \Throwable $failure = null): PromiseInterface { + if ($failure !== null) { + $this->workflowInstance->clearSignalQueue(); + } + if ($result !== null) { $values = EncodedValues::fromValues($result); } else { diff --git a/tests/Fixtures/src/Workflow/LoopWithSignalCoroutinesWorkflow.php b/tests/Fixtures/src/Workflow/LoopWithSignalCoroutinesWorkflow.php index da6c50ea..d304a8a5 100644 --- a/tests/Fixtures/src/Workflow/LoopWithSignalCoroutinesWorkflow.php +++ b/tests/Fixtures/src/Workflow/LoopWithSignalCoroutinesWorkflow.php @@ -30,7 +30,7 @@ public function __construct() $this->simple = Workflow::newActivityStub( SimpleActivity::class, ActivityOptions::new() - ->withStartToCloseTimeout(5) + ->withStartToCloseTimeout(10) ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1)) ); } diff --git a/tests/Fixtures/src/Workflow/SignalExceptionsWorkflow.php b/tests/Fixtures/src/Workflow/SignalExceptionsWorkflow.php new file mode 100644 index 00000000..044b2537 --- /dev/null +++ b/tests/Fixtures/src/Workflow/SignalExceptionsWorkflow.php @@ -0,0 +1,81 @@ + $this->greetings !== [] || $this->exit); + if ($this->greetings === [] && $this->exit) { + return $received; + } + + $message = array_shift($this->greetings); + $received[] = $message; + } + } + + #[SignalMethod] + public function failWithName(string $name): void + { + $this->greetings[] = $name; + throw new \RuntimeException("Signal exception $name"); + } + + #[SignalMethod] + public function failInvalidArgument($name = 'foo'): void + { + $this->greetings[] = "invalidArgument $name"; + throw new InvalidArgumentException("Invalid argument $name"); + } + + #[SignalMethod] + public function failActivity($name = 'foo') + { + yield Workflow::newUntypedActivityStub( + ActivityOptions::new() + ->withScheduleToStartTimeout(1) + ->withRetryOptions( + RetryOptions::new()->withMaximumAttempts(1) + ) + ->withStartToCloseTimeout(1), + )->execute('nonExistingActivityName', [$name]); + } + + #[SignalMethod] + public function failRetryable() + { + 10 / 0; + } + + #[SignalMethod] + public function exit(): void + { + $this->exit = true; + } +} diff --git a/tests/Functional/Client/AwaitTestCase.php b/tests/Functional/Client/AwaitTestCase.php index 2b4614a7..44dbfc5f 100644 --- a/tests/Functional/Client/AwaitTestCase.php +++ b/tests/Functional/Client/AwaitTestCase.php @@ -132,14 +132,18 @@ public function testFailSignalSerialization() $wait->addValue('test2'); $wait->addValue('test3'); - // breaks the invocation + /** + * Breaks the invocation + * Deserialization errors must be ignored. Called Signal method in this case will be skipped. + * @link https://github.com/temporalio/sdk-php/pull/331 + */ $wait->addValue(['hello'], 123); $wait->addValue('test4'); $result = $run->getResult(); - asort($result); - $result = array_values($result); + \asort($result); + $result = \array_values($result); $this->assertSame( [ diff --git a/tests/Functional/Client/FailureTestCase.php b/tests/Functional/Client/FailureTestCase.php index 88297e2d..275dc893 100644 --- a/tests/Functional/Client/FailureTestCase.php +++ b/tests/Functional/Client/FailureTestCase.php @@ -11,10 +11,13 @@ namespace Temporal\Tests\Functional\Client; +use PHPUnit\Framework\AssertionFailedError; use Temporal\Exception\Client\WorkflowFailedException; +use Temporal\Exception\Client\WorkflowNotFoundException; use Temporal\Exception\Failure\ActivityFailure; use Temporal\Exception\Failure\ApplicationFailure; use Temporal\Exception\Failure\ChildWorkflowFailure; +use Temporal\Tests\Workflow\SignalExceptionsWorkflow; /** * @group client @@ -80,4 +83,82 @@ public function testChildWorkflowFailurePropagation() $this->assertStringContainsString('SimpleActivity->fail', $e->getPrevious()->getMessage()); } } + + public function testSignalThatThrowsRetryableException() + { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(SignalExceptionsWorkflow::class); + + $run = $client->start($wf); + + $wf->failRetryable(); + + sleep(1); + $wf->exit(); + + // There is no any exception because the workflow has not failed after the `failRetryable` signal. + $this->assertTrue(true); + } + + public function testSignalThatThrowsCustomError() + { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(SignalExceptionsWorkflow::class); + + $run = $client->start($wf); + + $wf->failWithName('test1'); + + try { + // The next + sleep(2); + $wf->exit(); + $this->fail('Signal must fail'); + } catch (AssertionFailedError $e) { + throw $e; + } catch (\Throwable $e) { + $this->assertInstanceOf(WorkflowNotFoundException::class, $e); + // \dump($e); + } + + $this->expectException(WorkflowFailedException::class); + $result = $run->getResult(); + $this->fail(sprintf("Workflow must fail. Got result %s", \print_r($result, true))); + } + + public function testSignalThatThrowsInvalidArgumentException() + { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(SignalExceptionsWorkflow::class); + + $run = $client->start($wf); + + $wf->failInvalidArgument('test1'); + + $this->expectException(WorkflowFailedException::class); + $result = $run->getResult(); + $this->fail(sprintf("Workflow must fail. Got result %s", \print_r($result, true))); + } + + public function testSignalThatThrowsInternalException() + { + $client = $this->createClient(); + $wf = $client->newWorkflowStub(SignalExceptionsWorkflow::class); + + $run = $client->startWithSignal($wf, 'failActivity', ['foo']); + + try { + sleep(8); + $wf->failActivity('foo'); + $this->fail('Signal must fail'); + } catch (AssertionFailedError $e) { + throw $e; + } catch (\Throwable $e) { + $this->assertInstanceOf(WorkflowNotFoundException::class, $e); + } + + $this->expectException(WorkflowFailedException::class); + $result = $run->getResult(); + $this->fail(sprintf("Workflow must fail. Got result %s", \print_r($result, true))); + } } diff --git a/tests/Functional/Client/UntypedWorkflowStubTestCase.php b/tests/Functional/Client/UntypedWorkflowStubTestCase.php index b2a90245..8822f1fa 100644 --- a/tests/Functional/Client/UntypedWorkflowStubTestCase.php +++ b/tests/Functional/Client/UntypedWorkflowStubTestCase.php @@ -38,6 +38,16 @@ public function testUntypedStartAndWaitResult() $this->assertSame('HELLO WORLD', $simple->getResult()); } + public function testUntypedStartWithWrongData() + { + $client = $this->createClient(); + $simple = $client->newUntypedWorkflowStub('SimpleWorkflow'); + $client->start($simple, ['hello world']); + + $this->expectException(WorkflowFailedException::class); + $simple->getResult(); + } + public function testUntypedStartViaClient() { $client = $this->createClient(); diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index dbd68e6e..fd936663 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -6,7 +6,7 @@ services: ports: - "9042:9042" temporal: - image: temporalio/auto-setup:1.16.2 + image: temporalio/auto-setup:1.21.1.0 ports: - "7233:7233" volumes: @@ -17,7 +17,7 @@ services: depends_on: - cassandra temporal-admin-tools: - image: temporalio/admin-tools:1.16.2 + image: temporalio/admin-tools:1.21.1.0 stdin_open: true tty: true environment: @@ -25,7 +25,7 @@ services: depends_on: - temporal temporal-ui: - image: temporalio/ui:2.5.0 + image: temporalio/ui:2.16.2 environment: - TEMPORAL_ADDRESS=temporal:7233 - TEMPORAL_CORS_ORIGINS=http://localhost:3000