Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Workflow::upsertMemo() #562

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"roadrunner-php/roadrunner-api-dto": "^1.10.0",
"roadrunner-php/version-checker": "^1.0.1",
"spiral/attributes": "^3.1.8",
"spiral/roadrunner": "^2024.3.2",
"spiral/roadrunner": "^2024.3.3",
"spiral/roadrunner-cli": "^2.6",
"spiral/roadrunner-kv": "^4.3",
"spiral/roadrunner-worker": "^3.6.1",
Expand Down
86 changes: 2 additions & 84 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="6.2.0@814dfde37b43a1fe6d9b0996e08b19661af53bc5">
<files psalm-version="6.4.1@09a200c15910905ddc49e5edd37b73f9c78f7580">
<file src="src/Activity.php">
<ImplicitToStringCast>
<code><![CDATA[$type]]></code>
Expand Down Expand Up @@ -984,89 +984,11 @@
<code><![CDATA[PromiseInterface]]></code>
</TooManyTemplateParams>
</file>
<file src="src/Internal/Transport/Request/Cancel.php">
<MissingImmutableAnnotation>
<code><![CDATA[Cancel]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/CancelExternalWorkflow.php">
<MissingImmutableAnnotation>
<code><![CDATA[CancelExternalWorkflow]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/CompleteWorkflow.php">
<MissingImmutableAnnotation>
<code><![CDATA[CompleteWorkflow]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/ContinueAsNew.php">
<MissingImmutableAnnotation>
<code><![CDATA[ContinueAsNew]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/ExecuteActivity.php">
<MissingImmutableAnnotation>
<code><![CDATA[ExecuteActivity]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/ExecuteChildWorkflow.php">
<MissingImmutableAnnotation>
<code><![CDATA[ExecuteChildWorkflow]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/ExecuteLocalActivity.php">
<MissingImmutableAnnotation>
<code><![CDATA[ExecuteLocalActivity]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/GetChildWorkflowExecution.php">
<MissingImmutableAnnotation>
<code><![CDATA[GetChildWorkflowExecution]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/GetVersion.php">
<MissingImmutableAnnotation>
<code><![CDATA[GetVersion]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/NewTimer.php">
<MissingImmutableAnnotation>
<code><![CDATA[NewTimer]]></code>
</MissingImmutableAnnotation>
<PossiblyNullPropertyFetch>
<code><![CDATA[CarbonInterval::make($interval)->totalMilliseconds]]></code>
</PossiblyNullPropertyFetch>
</file>
<file src="src/Internal/Transport/Request/Panic.php">
<MissingImmutableAnnotation>
<code><![CDATA[Panic]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/SideEffect.php">
<MissingImmutableAnnotation>
<code><![CDATA[SideEffect]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/SignalExternalWorkflow.php">
<MissingImmutableAnnotation>
<code><![CDATA[SignalExternalWorkflow]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/UndefinedResponse.php">
<MissingImmutableAnnotation>
<code><![CDATA[UndefinedResponse]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/UpsertSearchAttributes.php">
<MissingImmutableAnnotation>
<code><![CDATA[UpsertSearchAttributes]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Request/UpsertTypedSearchAttributes.php">
<MissingImmutableAnnotation>
<code><![CDATA[UpsertTypedSearchAttributes]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Internal/Transport/Router/CancelWorkflow.php">
<DocblockTypeContradiction>
<code><![CDATA[$process === null]]></code>
Expand Down Expand Up @@ -1127,6 +1049,7 @@
<file src="src/Internal/Transport/Router/StartWorkflow.php">
<PossiblyFalseArgument>
<code><![CDATA[\json_encode($param)]]></code>
<code><![CDATA[\json_encode($param)]]></code>
</PossiblyFalseArgument>
<UnnecessaryVarAnnotation>
<code><![CDATA[Input]]></code>
Expand Down Expand Up @@ -1427,11 +1350,6 @@
<code><![CDATA[$this->failure]]></code>
</NullableReturnStatement>
</file>
<file src="src/Worker/Transport/Command/Client/Request.php">
<MissingImmutableAnnotation>
<code><![CDATA[Request]]></code>
</MissingImmutableAnnotation>
</file>
<file src="src/Worker/Transport/Command/Client/SuccessClientResponse.php">
<InvalidNullableReturnType>
<code><![CDATA[ValuesInterface]]></code>
Expand Down
36 changes: 36 additions & 0 deletions src/Interceptor/WorkflowOutboundCalls/UpsertMemoInput.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Interceptor\WorkflowOutboundCalls;

/**
* @psalm-immutable
*/
final class UpsertMemoInput
{
/**
* @param array<non-empty-string, mixed> $memo
*
* @no-named-arguments
* @internal Don't use the constructor. Use {@see self::with()} instead.
*/
public function __construct(
public readonly array $memo,
) {}

public function with(
?array $memo = null,
): self {
return new self(
$memo ?? $this->memo,
);
}
}
6 changes: 6 additions & 0 deletions src/Interceptor/WorkflowOutboundCallsInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Temporal\Interceptor\WorkflowOutboundCalls\SideEffectInput;
use Temporal\Interceptor\WorkflowOutboundCalls\SignalExternalWorkflowInput;
use Temporal\Interceptor\WorkflowOutboundCalls\TimerInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertMemoInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertTypedSearchAttributesInput;
use Temporal\Internal\Interceptor\Interceptor;
Expand Down Expand Up @@ -113,6 +114,11 @@ public function continueAsNew(ContinueAsNewInput $input, callable $next): Promis
*/
public function getVersion(GetVersionInput $input, callable $next): PromiseInterface;

/**
* @param callable(UpsertMemoInput): PromiseInterface $next
*/
public function upsertMemo(UpsertMemoInput $input, callable $next): PromiseInterface;

/**
* @param callable(UpsertSearchAttributesInput): PromiseInterface $next
*/
Expand Down
29 changes: 29 additions & 0 deletions src/Internal/Transport/Request/UpsertMemo.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Temporal\Internal\Transport\Request;

use Temporal\Worker\Transport\Command\Client\Request;

final class UpsertMemo extends Request
{
public const NAME = 'UpsertMemo';

/**
* @param array<string, mixed> $memo
*/
public function __construct(
private readonly array $memo,
) {
parent::__construct(self::NAME, ['memo' => (object) $memo]);
}

/**
* @return array<string, mixed>
*/
public function getMemo(): array
{
return $this->memo;
}
}
29 changes: 29 additions & 0 deletions src/Internal/Transport/Router/StartWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Temporal\Internal\Transport\Router;

use React\Promise\Deferred;
use Temporal\Api\Common\V1\Memo;
use Temporal\Api\Common\V1\SearchAttributes;
use Temporal\Common\TypedSearchAttributes;
use Temporal\DataConverter\EncodedCollection;
Expand Down Expand Up @@ -58,8 +59,10 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred

// Search Attributes and Typed Search Attributes
$searchAttributes = $this->convertSearchAttributes($options['info']['SearchAttributes'] ?? null);
$memo = $this->convertMemo($options['info']['Memo'] ?? null);
$options['info']['SearchAttributes'] = $searchAttributes?->getValues();
$options['info']['TypedSearchAttributes'] = $this->prepareTypedSA($options['search_attributes'] ?? null);
$options['info']['Memo'] = $memo?->getValues();

/** @var Input $input */
$input = $this->services->marshaller->unmarshal($options, new Input());
Expand Down Expand Up @@ -156,6 +159,32 @@ private function convertSearchAttributes(?array $param): ?EncodedCollection
}
}

private function convertMemo(?array $param): ?EncodedCollection
{
if (!\is_array($param)) {
return null;
}

if ($param === []) {
return EncodedCollection::empty();
}

try {
$memo = (new Memo());
$memo->mergeFromJsonString(
\json_encode($param),
true,
);

return EncodedCollection::fromPayloadCollection(
$memo->getFields(),
$this->services->dataConverter,
);
} catch (\Throwable) {
return null;
}
}

private function prepareTypedSA(?array $param): TypedSearchAttributes
{
return $param === null
Expand Down
39 changes: 39 additions & 0 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
use Temporal\Interceptor\WorkflowOutboundCalls\PanicInput;
use Temporal\Interceptor\WorkflowOutboundCalls\SideEffectInput;
use Temporal\Interceptor\WorkflowOutboundCalls\TimerInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertMemoInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCalls\UpsertTypedSearchAttributesInput;
use Temporal\Interceptor\WorkflowOutboundCallsInterceptor;
Expand All @@ -55,6 +56,7 @@
use Temporal\Internal\Transport\Request\NewTimer;
use Temporal\Internal\Transport\Request\Panic;
use Temporal\Internal\Transport\Request\SideEffect;
use Temporal\Internal\Transport\Request\UpsertMemo;
use Temporal\Internal\Transport\Request\UpsertSearchAttributes;
use Temporal\Internal\Transport\Request\UpsertTypedSearchAttributes;
use Temporal\Internal\Workflow\Process\HandlerState;
Expand Down Expand Up @@ -448,10 +450,43 @@ public function allHandlersFinished(): bool
return !$this->handlers->hasRunningHandlers();
}

public function upsertMemo(array $values): void
{
$this->callsInterceptor->with(
function (UpsertMemoInput $input): PromiseInterface {
if ($input->memo === []) {
return resolve();
}

$result = $this->request(new UpsertMemo($input->memo), false);

/** @psalm-suppress UnsupportedPropertyReferenceUsage $memo */
$memo = &$this->input->info->memo;
$memo ??= [];
foreach ($input->memo as $name => $value) {
if ($value === null) {
unset($memo[$name]);
continue;
}

$memo[$name] = $value;
}

return $result;
},
/** @see WorkflowOutboundCallsInterceptor::upsertMemo() */
'upsertMemo',
)(new UpsertMemoInput($values));
}

public function upsertSearchAttributes(array $searchAttributes): void
{
$this->callsInterceptor->with(
function (UpsertSearchAttributesInput $input): PromiseInterface {
if ($input->searchAttributes === []) {
return resolve();
}

$result = $this->request(new UpsertSearchAttributes($input->searchAttributes), false);

/** @psalm-suppress UnsupportedPropertyReferenceUsage $sa */
Expand All @@ -476,6 +511,10 @@ public function upsertTypedSearchAttributes(SearchAttributeUpdate ...$updates):
{
$this->callsInterceptor->with(
function (UpsertTypedSearchAttributesInput $input): PromiseInterface {
if ($input->updates === []) {
return resolve();
}

$result = $this->request(new UpsertTypedSearchAttributes($input->updates), false);

// Merge changes
Expand Down
3 changes: 3 additions & 0 deletions src/Worker/Transport/Command/Common/RequestTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public function getHeader(): Header
return $this->header;
}

/**
* @psalm-external-mutation-free
*/
public function withHeader(HeaderInterface $header): self
{
$clone = clone $this;
Expand Down
Loading
Loading