From add6c4e8174b81f31ebc373142844bd9c89c4b9d Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 5 Sep 2024 07:45:14 -0700 Subject: [PATCH] Add support for upsert memo (#2202) Add support for upsert memo --- .../WorkflowOutboundCallsInterceptor.java | 2 + .../WorkflowOutboundCallsInterceptorBase.java | 5 + .../common/WorkflowExecutionUtils.java | 3 + .../internal/replay/BasicWorkflowContext.java | 4 - .../replay/ReplayWorkflowContext.java | 3 + .../replay/ReplayWorkflowContextImpl.java | 8 +- .../internal/replay/WorkflowMutableState.java | 17 +++ ...orkflowPropertiesModifiedStateMachine.java | 92 ++++++++++++++ .../statemachines/WorkflowStateMachines.java | 10 ++ .../internal/sync/SyncWorkflowContext.java | 10 ++ .../internal/sync/WorkflowInternal.java | 5 + .../java/io/temporal/workflow/Workflow.java | 11 ++ .../upsertMemoTests/UpsertMemoTest.java | 118 ++++++++++++++++++ .../internal/testservice/StateUtils.java | 33 +++++ .../TestWorkflowMutableStateImpl.java | 37 +++++- .../sync/DummySyncWorkflowContext.java | 11 +- .../TestActivityEnvironmentInternal.java | 5 + .../internal/TracingWorkerInterceptor.java | 8 ++ 18 files changed, 368 insertions(+), 14 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowPropertiesModifiedStateMachine.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/upsertMemoTests/UpsertMemoTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index cac927692..14005dc18 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -617,6 +617,8 @@ R mutableSideEffect( void upsertTypedSearchAttributes(SearchAttributeUpdate... searchAttributeUpdates); + void upsertMemo(Map memo); + /** * Intercepts creation of a workflow child thread. * diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index de52b822f..eb658ee62 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -156,6 +156,11 @@ public void upsertTypedSearchAttributes(SearchAttributeUpdate... searchAttrib next.upsertTypedSearchAttributes(searchAttributeUpdates); } + @Override + public void upsertMemo(Map memo) { + next.upsertMemo(memo); + } + @Override public Object newChildThread(Runnable runnable, boolean detached, String name) { return next.newChildThread(runnable, detached, name); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java index 236bc1c1a..06b351e01 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java @@ -299,6 +299,7 @@ public static boolean isCommandEvent(HistoryEvent event) { case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED: case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED: case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED: + case EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED: return true; default: return false; @@ -334,6 +335,8 @@ public static EventType getEventTypeForCommand(CommandType commandType) { return EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED; case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES: return EventType.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES; + case COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES: + return EventType.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED; } throw new IllegalArgumentException("Unknown commandType"); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java index 3e0932b0b..d30135aaf 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/BasicWorkflowContext.java @@ -128,10 +128,6 @@ public Map getHeader() { return startedAttributes.getHeader().getFieldsMap(); } - public Payload getMemo(String key) { - return startedAttributes.getMemo().getFieldsMap().get(key); - } - int getAttempt() { return startedAttributes.getAttempt(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index fbe6a7103..c983c4ee3 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -401,6 +401,9 @@ boolean getVersion( /** Updates or inserts search attributes used to index workflows. */ void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes); + /** Updates or inserts memos. */ + void upsertMemo(@Nonnull Memo memo); + /** * @return true if this flag may currently be used. */ diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 61e8e964d..414183b80 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -185,7 +185,7 @@ public long getRunStartedTimestampMillis() { @Override public Payload getMemo(String key) { - return basicWorkflowContext.getMemo(key); + return mutableState.getMemo(key); } @Override @@ -335,6 +335,12 @@ public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) { mutableState.upsertSearchAttributes(searchAttributes); } + @Override + public void upsertMemo(@Nonnull Memo memo) { + workflowStateMachines.upsertMemo(memo); + mutableState.upsertMemo(memo); + } + @Override public int getAttempt() { return basicWorkflowContext.getAttempt(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowMutableState.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowMutableState.java index aab045f05..dc2f858e5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowMutableState.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowMutableState.java @@ -21,8 +21,11 @@ package io.temporal.internal.replay; import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; +import io.temporal.api.common.v1.Memo; +import io.temporal.api.common.v1.Payload; import io.temporal.api.common.v1.SearchAttributes; import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; +import javax.annotation.Nonnull; import javax.annotation.Nullable; class WorkflowMutableState { @@ -31,11 +34,17 @@ class WorkflowMutableState { private boolean workflowMethodCompleted; private Throwable workflowTaskFailureThrowable; private SearchAttributes.Builder searchAttributes; + private Memo.Builder memo; WorkflowMutableState(WorkflowExecutionStartedEventAttributes startedAttributes) { if (startedAttributes.hasSearchAttributes()) { this.searchAttributes = startedAttributes.getSearchAttributes().toBuilder(); } + if (startedAttributes.hasMemo()) { + this.memo = startedAttributes.getMemo().toBuilder(); + } else { + this.memo = Memo.newBuilder(); + } } boolean isCancelRequested() { @@ -76,6 +85,10 @@ SearchAttributes getSearchAttributes() { : searchAttributes.build(); } + public Payload getMemo(String key) { + return memo.build().getFieldsMap().get(key); + } + void upsertSearchAttributes(@Nullable SearchAttributes searchAttributes) { if (searchAttributes == null || searchAttributes.getIndexedFieldsCount() == 0) { return; @@ -85,4 +98,8 @@ void upsertSearchAttributes(@Nullable SearchAttributes searchAttributes) { } this.searchAttributes.putAllIndexedFields(searchAttributes.getIndexedFieldsMap()); } + + public void upsertMemo(@Nonnull Memo memo) { + this.memo.putAllFields(memo.getFieldsMap()); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowPropertiesModifiedStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowPropertiesModifiedStateMachine.java new file mode 100644 index 000000000..6329ef379 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowPropertiesModifiedStateMachine.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.statemachines; + +import io.temporal.api.command.v1.Command; +import io.temporal.api.command.v1.ModifyWorkflowPropertiesCommandAttributes; +import io.temporal.api.enums.v1.CommandType; +import io.temporal.api.enums.v1.EventType; +import io.temporal.workflow.Functions; + +final class WorkflowPropertiesModifiedStateMachine + extends EntityStateMachineInitialCommand< + WorkflowPropertiesModifiedStateMachine.State, + WorkflowPropertiesModifiedStateMachine.ExplicitEvent, + WorkflowPropertiesModifiedStateMachine> { + + private ModifyWorkflowPropertiesCommandAttributes modifiedPropertiesAttributes; + + public static void newInstance( + ModifyWorkflowPropertiesCommandAttributes modifiedPropertiesAttributes, + Functions.Proc1 commandSink, + Functions.Proc1 stateMachineSink) { + new WorkflowPropertiesModifiedStateMachine( + modifiedPropertiesAttributes, commandSink, stateMachineSink); + } + + private WorkflowPropertiesModifiedStateMachine( + ModifyWorkflowPropertiesCommandAttributes modifiedPropertiesAttributes, + Functions.Proc1 commandSink, + Functions.Proc1 stateMachineSink) { + super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink); + this.modifiedPropertiesAttributes = modifiedPropertiesAttributes; + explicitEvent(ExplicitEvent.SCHEDULE); + } + + enum ExplicitEvent { + SCHEDULE + } + + enum State { + CREATED, + MODIFY_COMMAND_CREATED, + MODIFY_COMMAND_RECORDED, + } + + public static final StateMachineDefinition< + State, ExplicitEvent, WorkflowPropertiesModifiedStateMachine> + STATE_MACHINE_DEFINITION = + StateMachineDefinition + .newInstance( + "WorkflowPropertiesModified", State.CREATED, State.MODIFY_COMMAND_RECORDED) + .add( + State.CREATED, + ExplicitEvent.SCHEDULE, + State.MODIFY_COMMAND_CREATED, + WorkflowPropertiesModifiedStateMachine::createModifyCommand) + .add( + State.MODIFY_COMMAND_CREATED, + CommandType.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES, + State.MODIFY_COMMAND_CREATED) + .add( + State.MODIFY_COMMAND_CREATED, + EventType.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED, + State.MODIFY_COMMAND_RECORDED); + + private void createModifyCommand() { + addCommand( + Command.newBuilder() + .setCommandType(CommandType.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES) + .setModifyWorkflowPropertiesCommandAttributes(modifiedPropertiesAttributes) + .build()); + modifiedPropertiesAttributes = null; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 067082f88..2bea70238 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -918,6 +918,14 @@ public void upsertSearchAttributes(SearchAttributes attributes) { UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink); } + public void upsertMemo(Memo memo) { + checkEventLoopExecuting(); + WorkflowPropertiesModifiedStateMachine.newInstance( + ModifyWorkflowPropertiesCommandAttributes.newBuilder().setUpsertedMemo(memo).build(), + commandSink, + stateMachineSink); + } + public void completeWorkflow(Optional workflowOutput) { checkEventLoopExecuting(); CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink); @@ -1188,6 +1196,7 @@ private void validateCommand(Command command, HistoryEvent event) { case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION: case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION: case COMMAND_TYPE_PROTOCOL_MESSAGE: + case COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES: break; case UNRECOGNIZED: case COMMAND_TYPE_UNSPECIFIED: @@ -1343,6 +1352,7 @@ private OptionalLong getInitialCommandEventId(HistoryEvent event) { case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED: case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED: case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED: + case EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED: return OptionalLong.of(event.getEventId()); default: diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 094b1c8fe..9e1bfc1b4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1209,6 +1209,16 @@ public void upsertTypedSearchAttributes(SearchAttributeUpdate... searchAttrib replayContext.upsertSearchAttributes(attr); } + @Override + public void upsertMemo(Map memo) { + Preconditions.checkArgument(memo != null, "null memo"); + Preconditions.checkArgument(!memo.isEmpty(), "empty memo"); + replayContext.upsertMemo( + Memo.newBuilder() + .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)) + .build()); + } + @Nonnull public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) { return runner.newWorkflowThread(runnable, false, name); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 1250acbc3..8990c9ab4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -722,6 +722,11 @@ public static void upsertTypedSearchAttributes( getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates); } + public static void upsertMemo(Map memo) { + assertNotReadOnly("upsert memo"); + getWorkflowOutboundInterceptor().upsertMemo(memo); + } + public static DataConverter getDataConverter() { return getRootWorkflowContext().getDataConverter(); } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index e4ead58ea..5925b82d3 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -1186,6 +1186,17 @@ public static void upsertTypedSearchAttributes( WorkflowInternal.upsertTypedSearchAttributes(searchAttributeUpdates); } + /** + * Updates a Workflow Memos by applying {@code memoUpdates} to the existing Memos set attached to + * the workflow. Memos are additional non-indexed information attributed to workflow and can + * return by describing or listing a workflow. The type of value can be any object that are + * serializable by {@link io.temporal.common.converter.DataConverter}. To remove a memo set the + * value null. + */ + public static void upsertMemo(Map memo) { + WorkflowInternal.upsertMemo(memo); + } + /** * Sets the default activity options that will be used for activity stubs that have no {@link * ActivityOptions} specified.
diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/upsertMemoTests/UpsertMemoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/upsertMemoTests/UpsertMemoTest.java new file mode 100644 index 000000000..944efbbbf --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/upsertMemoTests/UpsertMemoTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.upsertMemoTests; + +import io.temporal.api.common.v1.Payload; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.client.WorkflowStub; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.interceptors.*; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class UpsertMemoTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new WorkerInterceptor()) + .build()) + .setWorkflowTypes(TestWorkflow1Impl.class) + .build(); + + @Test + public void upsertMemo() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("memoValue2", result); + // Verify that describeWorkflowExecution returns the correct final memo + DescribeWorkflowExecutionResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + DescribeWorkflowExecutionRequest.newBuilder() + .setExecution(WorkflowStub.fromTyped(workflowStub).getExecution()) + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()); + Map memo = + Collections.singletonMap( + "memoKey2", DefaultDataConverter.newDefaultInstance().toPayload("memoValue2").get()); + Assert.assertEquals(memo, resp.getWorkflowExecutionInfo().getMemo().getFieldsMap()); + } + + public static class TestWorkflow1Impl implements TestWorkflow1 { + + @Override + public String execute(String testName) { + String memoVal = Workflow.getMemo("memoKey", String.class, String.class); + Assert.assertNull(memoVal); + + Workflow.upsertMemo(Collections.singletonMap("memoKey", "memoValue")); + memoVal = Workflow.getMemo("memoKey", String.class, String.class); + Assert.assertEquals("memoValue", memoVal); + + Workflow.sleep(Duration.ofMillis(100)); + + Workflow.upsertMemo(Collections.singletonMap("memoKey2", "memoValue2")); + memoVal = Workflow.getMemo("memoKey", String.class, String.class); + Assert.assertEquals("memoValue", memoVal); + + Workflow.sleep(Duration.ofMillis(100)); + + Workflow.upsertMemo(Collections.singletonMap("memoKey", null)); + memoVal = Workflow.getMemo("memoKey", String.class, String.class); + Assert.assertNull(memoVal); + return Workflow.getMemo("memoKey2", String.class, String.class); + } + } + + private static class WorkerInterceptor extends WorkerInterceptorBase { + @Override + public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { + return new WorkflowInboundCallsInterceptorBase(next) { + @Override + public void init(WorkflowOutboundCallsInterceptor outboundCalls) { + next.init(new OutboundCallsInterceptor(outboundCalls)); + } + }; + } + } + + private static class OutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase { + public OutboundCallsInterceptor(WorkflowOutboundCallsInterceptor next) { + super(next); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateUtils.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateUtils.java index bc18ad489..7678d2035 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateUtils.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateUtils.java @@ -20,7 +20,19 @@ package io.temporal.internal.testservice; +import static io.temporal.common.converter.EncodingKeys.METADATA_ENCODING_KEY; + +import com.google.protobuf.ByteString; +import io.temporal.api.common.v1.Payload; +import io.temporal.common.converter.DefaultDataConverter; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + class StateUtils { + private static Payload nullPayload = DefaultDataConverter.STANDARD_INSTANCE.toPayload(null).get(); + private static Payload emptyListPayload = + DefaultDataConverter.STANDARD_INSTANCE.toPayload(new String[] {}).get(); /** * @return true if the workflow was completed not by workflow task completion result */ @@ -33,4 +45,25 @@ public static boolean isWorkflowExecutionForcefullyCompleted(StateMachines.State return false; } } + + private static boolean isEqual(Payload a, Payload b) { + String aEnc = a.getMetadataOrDefault(METADATA_ENCODING_KEY, ByteString.EMPTY).toStringUtf8(); + String bEnc = b.getMetadataOrDefault(METADATA_ENCODING_KEY, ByteString.EMPTY).toStringUtf8(); + return aEnc.equals(bEnc) && a.getData().equals(b.getData()); + } + + public static @Nonnull Map mergeMemo( + @Nonnull Map src, @Nonnull Map dst) { + HashMap result = new HashMap(src); + dst.forEach( + (k, v) -> { + // Remove the key if the value is null or encoding is binary/null + if (v == null || isEqual(v, nullPayload) || isEqual(v, emptyListPayload)) { + result.remove(k); + return; + } + result.put(k, v); + }); + return result; + } } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index d67832028..e762751a5 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -23,6 +23,7 @@ import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.*; import static io.temporal.internal.testservice.CronUtils.getBackoffInterval; import static io.temporal.internal.testservice.StateMachines.*; +import static io.temporal.internal.testservice.StateUtils.mergeMemo; import static io.temporal.internal.testservice.TestServiceRetryState.validateAndOverrideRetryPolicy; import com.google.common.base.Preconditions; @@ -36,9 +37,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.temporal.api.command.v1.*; -import io.temporal.api.common.v1.Payloads; -import io.temporal.api.common.v1.RetryPolicy; -import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.common.v1.*; import io.temporal.api.enums.v1.*; import io.temporal.api.errordetails.v1.QueryFailedFailure; import io.temporal.api.failure.v1.ApplicationFailureInfo; @@ -127,6 +126,7 @@ private interface UpdateProcedure { private final Map> queries = new ConcurrentHashMap<>(); public StickyExecutionAttributes stickyExecutionAttributes; + private Map currentMemo; /** * @param retryState present if workflow is a retry @@ -174,6 +174,7 @@ private interface UpdateProcedure { continuedExecutionRunId); this.workflow = StateMachines.newWorkflowStateMachine(data); this.workflowTaskStateMachine = StateMachines.newWorkflowTaskStateMachine(store, startRequest); + this.currentMemo = new HashMap(startRequest.getMemo().getFieldsMap()); } /** Based on overrideStartWorkflowExecutionRequest from historyEngine.go */ @@ -654,6 +655,10 @@ private void processCommand( processUpsertWorkflowSearchAttributes( ctx, d.getUpsertWorkflowSearchAttributesCommandAttributes(), workflowTaskCompletedId); break; + case COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES: + processModifyWorkflowProperties( + ctx, d.getModifyWorkflowPropertiesCommandAttributes(), workflowTaskCompletedId); + break; case COMMAND_TYPE_PROTOCOL_MESSAGE: processProtocolMessageAttributes( ctx, @@ -1600,6 +1605,26 @@ private WorkflowTaskFailedCause processUpsertWorkflowSearchAttributes( return null; } + /** processModifyWorkflowProperties handles ModifyWorkflowPropertiesCommandAttributes */ + private void processModifyWorkflowProperties( + RequestContext ctx, + ModifyWorkflowPropertiesCommandAttributes attr, + long workflowTaskCompletedId) { + // Update workflow properties + currentMemo = mergeMemo(currentMemo, attr.getUpsertedMemo().getFieldsMap()); + + WorkflowPropertiesModifiedEventAttributes.Builder propModifiedEventAttr = + WorkflowPropertiesModifiedEventAttributes.newBuilder() + .setUpsertedMemo(attr.getUpsertedMemo()) + .setWorkflowTaskCompletedEventId(workflowTaskCompletedId); + HistoryEvent event = + HistoryEvent.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED) + .setWorkflowPropertiesModifiedEventAttributes(propModifiedEventAttr) + .build(); + ctx.addEvent(event); + } + /** * processProtocolMessageAttributes handles protocol messages, it is expected to look up the * {@code Message} in the given {@code List} process that message and remove that {@code @@ -2863,6 +2888,10 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() { } } + private Memo getCurrentMemo() { + return Memo.newBuilder().putAllFields(currentMemo).build(); + } + private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() { WorkflowExecutionConfig.Builder executionConfig = WorkflowExecutionConfig.newBuilder() @@ -2886,7 +2915,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() executionInfo .setExecution(this.executionId.getExecution()) .setType(this.getStartRequest().getWorkflowType()) - .setMemo(this.startRequest.getMemo()) + .setMemo(this.getCurrentMemo()) // No setAutoResetPoints - the test environment doesn't support that feature .setSearchAttributes(visibilityStore.getSearchAttributesForExecution(executionId)) .setStatus(this.getWorkflowExecutionStatus()) diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 25daa91e0..2ad0afd89 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -24,11 +24,7 @@ import com.uber.m3.tally.Scope; import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; -import io.temporal.api.common.v1.Payload; -import io.temporal.api.common.v1.Payloads; -import io.temporal.api.common.v1.SearchAttributes; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.common.v1.WorkflowType; +import io.temporal.api.common.v1.*; import io.temporal.api.failure.v1.Failure; import io.temporal.common.RetryOptions; import io.temporal.common.converter.DefaultDataConverter; @@ -314,6 +310,11 @@ public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) { throw new UnsupportedOperationException("not implemented"); } + @Override + public void upsertMemo(Memo memo) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public boolean tryUseSdkFlag(SdkFlag flag) { return false; diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index d93c5dd67..fd4afa070 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -470,6 +470,11 @@ public void upsertTypedSearchAttributes(SearchAttributeUpdate... searchAttrib throw new UnsupportedOperationException("not implemented"); } + @Override + public void upsertMemo(Map memo) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public Object newChildThread(Runnable runnable, boolean detached, String name) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index ec4650632..3d30cf662 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -385,6 +385,14 @@ public void upsertTypedSearchAttributes(SearchAttributeUpdate... searchAttrib next.upsertTypedSearchAttributes(searchAttributeUpdates); } + @Override + public void upsertMemo(Map memo) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("upsertMemo"); + } + next.upsertMemo(memo); + } + @Override public Object newChildThread(Runnable runnable, boolean detached, String name) { if (!WorkflowUnsafe.isReplaying()) {