From 03f718249f4ea5491fa9c0656ae8d3b5dd96b950 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 16 Sep 2024 16:02:05 -0700 Subject: [PATCH] Add support for user meta data (#2218) Add support for user metadata on certain events --- .../io/temporal/client/WorkflowOptions.java | 76 ++++++++- .../WorkflowOutboundCallsInterceptor.java | 2 + .../WorkflowOutboundCallsInterceptorBase.java | 6 + .../client/RootWorkflowClientInvoker.java | 22 ++- .../internal/client/ScheduleProtoUtil.java | 21 +++ .../client/WorkflowClientRequestFactory.java | 12 +- .../common/WorkflowExecutionUtils.java | 16 ++ .../replay/ReplayWorkflowContext.java | 4 +- .../replay/ReplayWorkflowContextImpl.java | 6 +- .../ChildWorkflowStateMachine.java | 20 ++- ...StartChildWorkflowExecutionParameters.java | 11 +- .../statemachines/TimerStateMachine.java | 23 ++- .../statemachines/WorkflowStateMachines.java | 14 +- .../internal/sync/SyncWorkflowContext.java | 29 +++- .../internal/sync/WorkflowInternal.java | 5 + .../workflow/ChildWorkflowOptions.java | 65 ++++++- .../io/temporal/workflow/TimerOptions.java | 107 ++++++++++++ .../java/io/temporal/workflow/Workflow.java | 11 ++ ...QueryReplayWorkflowRunTaskHandlerTest.java | 1 + .../LocalActivityStateMachineTest.java | 3 +- .../MutableSideEffectStateMachineTest.java | 2 + .../statemachines/TimerStateMachineTest.java | 5 + .../UpdateProtocolStateMachineTest.java | 3 + .../VersionStateMachineTest.java | 14 ++ .../ChildWorkflowMetadataTest.java | 160 ++++++++++++++++++ temporal-serviceclient/src/main/proto | 2 +- .../sync/DummySyncWorkflowContext.java | 3 +- .../TestActivityEnvironmentInternal.java | 6 + .../internal/TracingWorkerInterceptor.java | 13 +- 29 files changed, 621 insertions(+), 41 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/workflow/TimerOptions.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java index 0f468f667..08d8e3327 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java @@ -23,10 +23,7 @@ import com.google.common.base.Objects; import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; -import io.temporal.common.CronSchedule; -import io.temporal.common.MethodRetry; -import io.temporal.common.RetryOptions; -import io.temporal.common.SearchAttributes; +import io.temporal.common.*; import io.temporal.common.context.ContextPropagator; import io.temporal.internal.common.OptionsUtils; import io.temporal.worker.WorkerFactory; @@ -79,6 +76,8 @@ public static WorkflowOptions merge( .setDisableEagerExecution(o.isDisableEagerExecution()) .setStartDelay(o.getStartDelay()) .setWorkflowIdConflictPolicy(o.getWorkflowIdConflictPolicy()) + .setStaticSummary(o.getStaticSummary()) + .setStaticDetails(o.getStaticDetails()) .validateBuildWithDefaults(); } @@ -114,6 +113,10 @@ public static final class Builder { private WorkflowIdConflictPolicy workflowIdConflictpolicy; + private String staticSummary; + + private String staticDetails; + private Builder() {} private Builder(WorkflowOptions options) { @@ -135,6 +138,8 @@ private Builder(WorkflowOptions options) { this.disableEagerExecution = options.disableEagerExecution; this.startDelay = options.startDelay; this.workflowIdConflictpolicy = options.workflowIdConflictpolicy; + this.staticSummary = options.staticSummary; + this.staticDetails = options.staticDetails; } /** @@ -382,6 +387,31 @@ public Builder setStartDelay(Duration startDelay) { return this; } + /** + * Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be + * in single-line Temporal Markdown format. + * + *

Default is none/empty. + */ + @Experimental + public Builder setStaticSummary(String staticSummary) { + this.staticSummary = staticSummary; + return this; + } + + /** + * General fixed details for this workflow execution that will appear in UI/CLI. This can be in + * Temporal Markdown format and can span multiple lines. This is a fixed value on the workflow + * that cannot be updated. + * + *

Default is none/empty. + */ + @Experimental + public Builder setStaticDetails(String staticDetails) { + this.staticDetails = staticDetails; + return this; + } + public WorkflowOptions build() { return new WorkflowOptions( workflowId, @@ -398,7 +428,9 @@ public WorkflowOptions build() { contextPropagators, disableEagerExecution, startDelay, - workflowIdConflictpolicy); + workflowIdConflictpolicy, + staticSummary, + staticDetails); } /** @@ -420,7 +452,9 @@ public WorkflowOptions validateBuildWithDefaults() { contextPropagators, disableEagerExecution, startDelay, - workflowIdConflictpolicy); + workflowIdConflictpolicy, + staticSummary, + staticDetails); } } @@ -454,6 +488,10 @@ public WorkflowOptions validateBuildWithDefaults() { private final WorkflowIdConflictPolicy workflowIdConflictpolicy; + private final String staticSummary; + + private final String staticDetails; + private WorkflowOptions( String workflowId, WorkflowIdReusePolicy workflowIdReusePolicy, @@ -469,7 +507,9 @@ private WorkflowOptions( List contextPropagators, boolean disableEagerExecution, Duration startDelay, - WorkflowIdConflictPolicy workflowIdConflictpolicy) { + WorkflowIdConflictPolicy workflowIdConflictpolicy, + String staticSummary, + String staticDetails) { this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; this.workflowRunTimeout = workflowRunTimeout; @@ -485,6 +525,8 @@ private WorkflowOptions( this.disableEagerExecution = disableEagerExecution; this.startDelay = startDelay; this.workflowIdConflictpolicy = workflowIdConflictpolicy; + this.staticSummary = staticSummary; + this.staticDetails = staticDetails; } public String getWorkflowId() { @@ -556,6 +598,14 @@ public WorkflowIdConflictPolicy getWorkflowIdConflictPolicy() { return workflowIdConflictpolicy; } + public String getStaticSummary() { + return staticSummary; + } + + public String getStaticDetails() { + return staticDetails; + } + public Builder toBuilder() { return new Builder(this); } @@ -579,7 +629,9 @@ public boolean equals(Object o) { && Objects.equal(contextPropagators, that.contextPropagators) && Objects.equal(disableEagerExecution, that.disableEagerExecution) && Objects.equal(startDelay, that.startDelay) - && Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy); + && Objects.equal(workflowIdConflictpolicy, that.workflowIdConflictpolicy) + && Objects.equal(staticSummary, that.staticSummary) + && Objects.equal(staticDetails, that.staticDetails); } @Override @@ -599,7 +651,9 @@ public int hashCode() { contextPropagators, disableEagerExecution, startDelay, - workflowIdConflictpolicy); + workflowIdConflictpolicy, + staticSummary, + staticDetails); } @Override @@ -638,6 +692,10 @@ public String toString() { + startDelay + ", workflowIdConflictpolicy=" + workflowIdConflictpolicy + + ", staticSummary=" + + staticSummary + + ", staticDetails=" + + staticDetails + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 14005dc18..d3f0b0ac7 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -588,6 +588,8 @@ public DynamicUpdateHandler getHandler() { Promise newTimer(Duration duration); + Promise newTimer(Duration duration, TimerOptions options); + R sideEffect(Class resultClass, Type resultType, Func func); R mutableSideEffect( diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index eb658ee62..71dc167da 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -23,6 +23,7 @@ import io.temporal.common.SearchAttributeUpdate; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.Promise; +import io.temporal.workflow.TimerOptions; import java.lang.reflect.Type; import java.time.Duration; import java.util.Map; @@ -90,6 +91,11 @@ public Promise newTimer(Duration duration) { return next.newTimer(duration); } + @Override + public Promise newTimer(Duration duration, TimerOptions options) { + return next.newTimer(duration, options); + } + @Override public R sideEffect(Class resultClass, Type resultType, Func func) { return next.sideEffect(resultClass, resultType, func); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 4d2758305..bf88c46ff 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -21,6 +21,7 @@ package io.temporal.internal.client; import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; +import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData; import io.grpc.Deadline; import io.grpc.Status; @@ -29,6 +30,7 @@ import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; import io.temporal.api.enums.v1.WorkflowExecutionStatus; import io.temporal.api.query.v1.WorkflowQuery; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.update.v1.*; import io.temporal.api.workflowservice.v1.*; import io.temporal.client.*; @@ -86,6 +88,13 @@ public WorkflowStartOutput start(WorkflowStartInput input) { .build() : null; + @Nullable + UserMetadata userMetadata = + makeUserMetaData( + input.getOptions().getStaticSummary(), + input.getOptions().getStaticDetails(), + dataConverterWithWorkflowContext); + StartWorkflowExecutionRequest.Builder request = requestsHelper.newStartWorkflowExecutionRequest( input.getWorkflowId(), @@ -93,7 +102,8 @@ public WorkflowStartOutput start(WorkflowStartInput input) { input.getHeader(), input.getOptions(), inputArgs.orElse(null), - memo); + memo, + userMetadata); try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) { boolean requestEagerExecution = eagerDispatchHandle != null; request.setRequestEagerExecution(requestEagerExecution); @@ -173,6 +183,13 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu .build() : null; + @Nullable + UserMetadata userMetadata = + makeUserMetaData( + workflowStartInput.getOptions().getStaticSummary(), + workflowStartInput.getOptions().getStaticDetails(), + dataConverterWithWorkflowContext); + StartWorkflowExecutionRequestOrBuilder startRequest = requestsHelper.newStartWorkflowExecutionRequest( workflowStartInput.getWorkflowId(), @@ -180,7 +197,8 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu workflowStartInput.getHeader(), workflowStartInput.getOptions(), workflowInput.orElse(null), - memo); + memo, + userMetadata); Optional signalInput = dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java index 9db0b3011..b08fc9904 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java @@ -22,6 +22,7 @@ import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy; +import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -33,6 +34,7 @@ import io.temporal.api.schedule.v1.ScheduleInfo; import io.temporal.api.schedule.v1.ScheduleSpec; import io.temporal.api.schedule.v1.ScheduleState; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflow.v1.NewWorkflowExecutionInfo; import io.temporal.client.WorkflowOptions; @@ -160,6 +162,16 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction SearchAttributesUtil.encodeTyped(wfOptions.getTypedSearchAttributes())); } + @Nullable + UserMetadata userMetadata = + makeUserMetaData( + wfOptions.getStaticSummary(), + wfOptions.getStaticDetails(), + dataConverterWithWorkflowContext); + if (userMetadata != null) { + workflowRequest.setUserMetadata(userMetadata); + } + Header grpcHeader = toHeaderGrpc( startWorkflowAction.getHeader(), @@ -460,6 +472,15 @@ public io.temporal.client.schedules.ScheduleAction protoToAction(@Nonnull Schedu SearchAttributesUtil.decodeTyped(startWfAction.getSearchAttributes())); } + if (startWfAction.hasUserMetadata()) { + wfOptionsBuilder.setStaticSummary( + dataConverterWithWorkflowContext.fromPayload( + startWfAction.getUserMetadata().getSummary(), String.class, String.class)); + wfOptionsBuilder.setStaticDetails( + dataConverterWithWorkflowContext.fromPayload( + startWfAction.getUserMetadata().getDetails(), String.class, String.class)); + } + builder.setOptions(wfOptionsBuilder.build()); return builder.build(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java index 94fd5ea78..e9382a8c1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java @@ -28,6 +28,7 @@ import com.google.protobuf.ByteString; import io.temporal.api.common.v1.*; import io.temporal.api.enums.v1.HistoryEventFilterType; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest; @@ -59,7 +60,8 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest( @Nonnull io.temporal.common.interceptors.Header header, @Nonnull WorkflowOptions options, @Nullable Payloads inputArgs, - @Nullable Memo memo) { + @Nullable Memo memo, + @Nullable UserMetadata userMetadata) { StartWorkflowExecutionRequest.Builder request = StartWorkflowExecutionRequest.newBuilder() .setNamespace(clientOptions.getNamespace()) @@ -108,6 +110,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest( request.setWorkflowStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay())); } + if (userMetadata != null) { + request.setUserMetadata(userMetadata); + } + if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) { if (options.getTypedSearchAttributes() != null) { throw new IllegalArgumentException( @@ -183,6 +189,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut request.setWorkflowStartDelay(startParameters.getWorkflowStartDelay()); } + if (startParameters.hasUserMetadata()) { + request.setUserMetadata(startParameters.getUserMetadata()); + } + return request; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java index 06b351e01..94d568930 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java @@ -33,6 +33,7 @@ import io.temporal.api.enums.v1.TimeoutType; import io.temporal.api.enums.v1.WorkflowExecutionStatus; import io.temporal.api.history.v1.*; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder; import io.temporal.client.WorkflowFailedException; import io.temporal.common.converter.DataConverter; @@ -240,6 +241,21 @@ public static WorkflowExecutionStatus getCloseStatus(HistoryEvent event) { } } + public static UserMetadata makeUserMetaData(String summary, String details, DataConverter dc) { + if (summary == null && details == null) { + return null; + } + + UserMetadata.Builder builder = UserMetadata.newBuilder(); + if (summary != null) { + builder.setSummary(dc.toPayload(summary).get()); + } + if (details != null) { + builder.setDetails(dc.toPayload(details).get()); + } + return builder.build(); + } + public static String prettyPrintCommands(Iterable commands) { StringBuilder result = new StringBuilder(); for (Command command : commands) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index c983c4ee3..35b8b75f9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -25,6 +25,7 @@ import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; import io.temporal.api.common.v1.*; import io.temporal.api.failure.v1.Failure; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; import io.temporal.common.RetryOptions; import io.temporal.internal.common.SdkFlag; @@ -205,13 +206,14 @@ void requestCancelExternalWorkflowExecution( * Create a Value that becomes ready after the specified delay. * * @param delay time-interval after which the Value becomes ready. + * @param metadata user metadata to be associated with the timer. * @param callback Callback that is called with null parameter after the specified delay. * CanceledException is passed as a parameter in case of a cancellation. * @return cancellation handle. Invoke {@link io.temporal.workflow.Functions.Proc1#apply(Object)} * to cancel timer. */ Functions.Proc1 newTimer( - Duration delay, Functions.Proc1 callback); + Duration delay, UserMetadata metadata, Functions.Proc1 callback); /** * Executes the provided function once, records its result into the workflow history. The recorded diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 414183b80..78330da95 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -30,6 +30,7 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.RetryOptions; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.ProtobufTimeUtils; @@ -268,7 +269,7 @@ public Optional getCurrentBuildId() { @Override public Functions.Proc1 newTimer( - Duration delay, Functions.Proc1 callback) { + Duration delay, UserMetadata metadata, Functions.Proc1 callback) { if (delay.compareTo(Duration.ZERO) <= 0) { callback.apply(null); return (e) -> {}; @@ -279,7 +280,8 @@ public Functions.Proc1 newTimer( .setTimerId(workflowStateMachines.randomUUID().toString()) .build(); Functions.Proc cancellationHandler = - workflowStateMachines.newTimer(attributes, (event) -> handleTimerCallback(callback, event)); + workflowStateMachines.newTimer( + attributes, metadata, (event) -> handleTimerCallback(callback, event)); return (e) -> cancellationHandler.apply(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ChildWorkflowStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ChildWorkflowStateMachine.java index 72cc781e1..491bfe42f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ChildWorkflowStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ChildWorkflowStateMachine.java @@ -34,6 +34,7 @@ import io.temporal.api.history.v1.ChildWorkflowExecutionTerminatedEventAttributes; import io.temporal.api.history.v1.ChildWorkflowExecutionTimedOutEventAttributes; import io.temporal.api.history.v1.StartChildWorkflowExecutionFailedEventAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.client.WorkflowException; import io.temporal.client.WorkflowExecutionAlreadyStarted; import io.temporal.common.converter.EncodedValues; @@ -138,6 +139,8 @@ enum State { private StartChildWorkflowExecutionCommandAttributes startAttributes; + private UserMetadata metadata; + private final Functions.Proc2 startedCallback; private final Functions.Proc2, Exception> completionCallback; @@ -146,28 +149,32 @@ enum State { * Creates a new child workflow state machine * * @param attributes child workflow start command attributes + * @param metadata user metadata to be associated with the child workflow * @param startedCallback callback that is notified about child start * @param completionCallback invoked when child reports completion or failure * @return cancellation callback that should be invoked to cancel the child */ public static ChildWorkflowStateMachine newInstance( StartChildWorkflowExecutionCommandAttributes attributes, + UserMetadata metadata, Functions.Proc2 startedCallback, Functions.Proc2, Exception> completionCallback, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { return new ChildWorkflowStateMachine( - attributes, startedCallback, completionCallback, commandSink, stateMachineSink); + attributes, metadata, startedCallback, completionCallback, commandSink, stateMachineSink); } private ChildWorkflowStateMachine( StartChildWorkflowExecutionCommandAttributes startAttributes, + UserMetadata metadata, Functions.Proc2 startedCallback, Functions.Proc2, Exception> completionCallback, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink); this.startAttributes = startAttributes; + this.metadata = metadata; this.workflowType = startAttributes.getWorkflowType().getName(); this.namespace = startAttributes.getNamespace(); this.workflowId = startAttributes.getWorkflowId(); @@ -177,11 +184,16 @@ private ChildWorkflowStateMachine( } public void createStartChildCommand() { - addCommand( + Command.Builder command = Command.newBuilder() .setCommandType(CommandType.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION) - .setStartChildWorkflowExecutionCommandAttributes(startAttributes) - .build()); + .setStartChildWorkflowExecutionCommandAttributes(startAttributes); + + if (metadata != null) { + command.setUserMetadata(metadata); + metadata = null; + } + addCommand(command.build()); startAttributes = null; // avoiding retaining large input for the duration of the child } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StartChildWorkflowExecutionParameters.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StartChildWorkflowExecutionParameters.java index 51eb3a232..474be26ee 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StartChildWorkflowExecutionParameters.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/StartChildWorkflowExecutionParameters.java @@ -21,18 +21,23 @@ package io.temporal.internal.statemachines; import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.workflow.ChildWorkflowCancellationType; +import javax.annotation.Nullable; public final class StartChildWorkflowExecutionParameters { private final StartChildWorkflowExecutionCommandAttributes.Builder request; private final ChildWorkflowCancellationType cancellationType; + private final UserMetadata metadata; public StartChildWorkflowExecutionParameters( StartChildWorkflowExecutionCommandAttributes.Builder request, - ChildWorkflowCancellationType cancellationType) { + ChildWorkflowCancellationType cancellationType, + @Nullable UserMetadata metadata) { this.request = request; this.cancellationType = cancellationType; + this.metadata = metadata; } public StartChildWorkflowExecutionCommandAttributes.Builder getRequest() { @@ -42,4 +47,8 @@ public StartChildWorkflowExecutionCommandAttributes.Builder getRequest() { public ChildWorkflowCancellationType getCancellationType() { return cancellationType; } + + public @Nullable UserMetadata getMetadata() { + return metadata; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/TimerStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/TimerStateMachine.java index 0347ee80b..ede86f03a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/TimerStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/TimerStateMachine.java @@ -27,7 +27,9 @@ import io.temporal.api.enums.v1.EventType; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.TimerCanceledEventAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.workflow.Functions; +import javax.annotation.Nullable; final class TimerStateMachine extends EntityStateMachineInitialCommand< @@ -35,31 +37,38 @@ final class TimerStateMachine private final StartTimerCommandAttributes startAttributes; + private UserMetadata metadata; + private final Functions.Proc1 completionCallback; /** * Creates a new timer state machine * * @param attributes timer command attributes + * @param metadata user metadata to be associate with the timer * @param completionCallback invoked when timer fires or reports cancellation. One of * TimerFiredEvent, TimerCanceledEvent. * @return cancellation callback that should be invoked to initiate timer cancellation */ public static TimerStateMachine newInstance( StartTimerCommandAttributes attributes, + @Nullable UserMetadata metadata, Functions.Proc1 completionCallback, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { - return new TimerStateMachine(attributes, completionCallback, commandSink, stateMachineSink); + return new TimerStateMachine( + attributes, metadata, completionCallback, commandSink, stateMachineSink); } private TimerStateMachine( StartTimerCommandAttributes attributes, + @Nullable UserMetadata metadata, Functions.Proc1 completionCallback, Functions.Proc1 commandSink, Functions.Proc1 stateMachineSink) { super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink, attributes.getTimerId()); this.startAttributes = attributes; + this.metadata = metadata; this.completionCallback = completionCallback; explicitEvent(ExplicitEvent.SCHEDULE); } @@ -127,11 +136,17 @@ enum State { TimerStateMachine::notifyCancellation); private void createStartTimerCommand() { - addCommand( + Command.Builder command = Command.newBuilder() .setCommandType(CommandType.COMMAND_TYPE_START_TIMER) - .setStartTimerCommandAttributes(startAttributes) - .build()); + .setStartTimerCommandAttributes(startAttributes); + + if (metadata != null) { + command.setUserMetadata(metadata); + metadata = null; + } + + addCommand(command.build()); } public void cancel() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 2bea70238..a5460cc6d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -35,6 +35,7 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.*; import io.temporal.api.protocol.v1.Message; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; import io.temporal.failure.CanceledFailure; import io.temporal.internal.common.*; @@ -812,16 +813,20 @@ public Functions.Proc scheduleActivityTask( * Creates a new timer state machine * * @param attributes timer command attributes + * @param metadata user provided metadata * @param completionCallback invoked when timer fires or reports cancellation. One of * TimerFiredEvent, TimerCanceledEvent. * @return cancellation callback that should be invoked to initiate timer cancellation */ public Functions.Proc newTimer( - StartTimerCommandAttributes attributes, Functions.Proc1 completionCallback) { + StartTimerCommandAttributes attributes, + UserMetadata metadata, + Functions.Proc1 completionCallback) { checkEventLoopExecuting(); TimerStateMachine timer = TimerStateMachine.newInstance( attributes, + metadata, (event) -> { completionCallback.apply(event); // Needed due to immediate cancellation @@ -851,7 +856,12 @@ public Functions.Proc startChildWorkflow( ChildWorkflowCancellationType cancellationType = parameters.getCancellationType(); ChildWorkflowStateMachine child = ChildWorkflowStateMachine.newInstance( - attributes, startedCallback, completionCallback, commandSink, stateMachineSink); + attributes, + parameters.getMetadata(), + startedCallback, + completionCallback, + commandSink, + stateMachineSink); return () -> { if (cancellationType == ChildWorkflowCancellationType.ABANDON) { notifyChildCanceled(completionCallback); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 9e1bfc1b4..a827e34d2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -23,6 +23,7 @@ import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy; +import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData; import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION; import com.google.common.base.MoreObjects; @@ -44,6 +45,7 @@ import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.client.WorkflowException; @@ -649,6 +651,13 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp .build() : null; + @Nullable + UserMetadata userMetadata = + makeUserMetaData( + input.getOptions().getStaticSummary(), + input.getOptions().getStaticDetails(), + dataConverterWithChildWorkflowContext); + StartChildWorkflowExecutionParameters parameters = createChildWorkflowParameters( input.getWorkflowId(), @@ -656,7 +665,8 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp input.getOptions(), input.getHeader(), payloads, - memo); + memo, + userMetadata); Functions.Proc1 cancellationCallback = replayContext.startChildWorkflow( @@ -713,7 +723,8 @@ private StartChildWorkflowExecutionParameters createChildWorkflowParameters( ChildWorkflowOptions options, Header header, Optional input, - @Nullable Memo memo) { + @Nullable Memo memo, + @Nullable UserMetadata metadata) { final StartChildWorkflowExecutionCommandAttributes.Builder attributes = StartChildWorkflowExecutionCommandAttributes.newBuilder() .setWorkflowType(WorkflowType.newBuilder().setName(name).build()); @@ -775,7 +786,8 @@ private StartChildWorkflowExecutionParameters createChildWorkflowParameters( .determineUseCompatibleFlag( replayContext.getTaskQueue().equals(options.getTaskQueue()))); } - return new StartChildWorkflowExecutionParameters(attributes, options.getCancellationType()); + return new StartChildWorkflowExecutionParameters( + attributes, options.getCancellationType(), metadata); } private static Header extractContextsAndConvertToBytes( @@ -827,10 +839,21 @@ private static RuntimeException mapChildWorkflowException( @Override public Promise newTimer(Duration delay) { + return newTimer(delay, TimerOptions.newBuilder().build()); + } + + @Override + public Promise newTimer(Duration delay, TimerOptions options) { CompletablePromise p = Workflow.newPromise(); + + @Nullable + UserMetadata userMetadata = + makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext); + Functions.Proc1 cancellationHandler = replayContext.newTimer( delay, + userMetadata, (e) -> runner.executeInWorkflowThread( "timer-callback", diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 8990c9ab4..5fcdd2b5e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -89,6 +89,11 @@ public static Promise newTimer(Duration duration) { return getWorkflowOutboundInterceptor().newTimer(duration); } + public static Promise newTimer(Duration duration, TimerOptions options) { + assertNotReadOnly("schedule timer"); + return getWorkflowOutboundInterceptor().newTimer(duration, options); + } + /** * @param capacity the maximum size of the queue * @return new instance of {@link WorkflowQueue} diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/ChildWorkflowOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/ChildWorkflowOptions.java index 0d2393941..c89a7e3d4 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/ChildWorkflowOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/ChildWorkflowOptions.java @@ -73,6 +73,8 @@ public static final class Builder { private List contextPropagators; private ChildWorkflowCancellationType cancellationType; private VersioningIntent versioningIntent; + private String staticSummary; + private String staticDetails; private Builder() {} @@ -96,6 +98,8 @@ private Builder(ChildWorkflowOptions options) { this.contextPropagators = options.getContextPropagators(); this.cancellationType = options.getCancellationType(); this.versioningIntent = options.getVersioningIntent(); + this.staticSummary = options.getStaticSummary(); + this.staticDetails = options.getStaticDetails(); } /** @@ -303,6 +307,31 @@ public Builder setVersioningIntent(VersioningIntent versioningIntent) { return this; } + /** + * Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be + * in single-line Temporal Markdown format. + * + *

Default is none/empty. + */ + @Experimental + public Builder setStaticSummary(String staticSummary) { + this.staticSummary = staticSummary; + return this; + } + + /** + * General fixed details for this workflow execution that will appear in UI/CLI. This can be in + * Temporal Markdown format and can span multiple lines. This is a fixed value on the workflow + * that cannot be updated. + * + *

Default is none/empty. + */ + @Experimental + public Builder setStaticDetails(String staticDetails) { + this.staticDetails = staticDetails; + return this; + } + public ChildWorkflowOptions build() { return new ChildWorkflowOptions( namespace, @@ -320,7 +349,9 @@ public ChildWorkflowOptions build() { typedSearchAttributes, contextPropagators, cancellationType, - versioningIntent); + versioningIntent, + staticSummary, + staticDetails); } public ChildWorkflowOptions validateAndBuildWithDefaults() { @@ -344,7 +375,9 @@ public ChildWorkflowOptions validateAndBuildWithDefaults() { : cancellationType, versioningIntent == null ? VersioningIntent.VERSIONING_INTENT_UNSPECIFIED - : versioningIntent); + : versioningIntent, + staticSummary, + staticDetails); } } @@ -364,6 +397,8 @@ public ChildWorkflowOptions validateAndBuildWithDefaults() { private final List contextPropagators; private final ChildWorkflowCancellationType cancellationType; private final VersioningIntent versioningIntent; + private final String staticSummary; + private final String staticDetails; private ChildWorkflowOptions( String namespace, @@ -381,7 +416,9 @@ private ChildWorkflowOptions( SearchAttributes typedSearchAttributes, List contextPropagators, ChildWorkflowCancellationType cancellationType, - VersioningIntent versioningIntent) { + VersioningIntent versioningIntent, + String staticSummary, + String staticDetails) { this.namespace = namespace; this.workflowId = workflowId; this.workflowIdReusePolicy = workflowIdReusePolicy; @@ -398,6 +435,8 @@ private ChildWorkflowOptions( this.contextPropagators = contextPropagators; this.cancellationType = cancellationType; this.versioningIntent = versioningIntent; + this.staticSummary = staticSummary; + this.staticDetails = staticDetails; } public String getNamespace() { @@ -468,6 +507,14 @@ public VersioningIntent getVersioningIntent() { return versioningIntent; } + public String getStaticSummary() { + return staticSummary; + } + + public String getStaticDetails() { + return staticDetails; + } + public Builder toBuilder() { return new Builder(this); } @@ -492,7 +539,9 @@ public boolean equals(Object o) { && Objects.equal(typedSearchAttributes, that.typedSearchAttributes) && Objects.equal(contextPropagators, that.contextPropagators) && cancellationType == that.cancellationType - && versioningIntent == that.versioningIntent; + && versioningIntent == that.versioningIntent + && Objects.equal(staticSummary, that.staticSummary) + && Objects.equal(staticDetails, that.staticDetails); } @Override @@ -513,7 +562,9 @@ public int hashCode() { typedSearchAttributes, contextPropagators, cancellationType, - versioningIntent); + versioningIntent, + staticSummary, + staticDetails); } @Override @@ -555,6 +606,10 @@ public String toString() { + cancellationType + ", versioningIntent=" + versioningIntent + + ", staticSummary=" + + staticSummary + + ", staticDetails=" + + staticDetails + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/TimerOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/TimerOptions.java new file mode 100644 index 000000000..98ff2b895 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/workflow/TimerOptions.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow; + +import io.temporal.common.Experimental; +import java.util.Objects; + +/** TimerOptions is used to specify options for a timer. */ +public final class TimerOptions { + + public static TimerOptions.Builder newBuilder() { + return new TimerOptions.Builder(); + } + + public static TimerOptions.Builder newBuilder(TimerOptions options) { + return new TimerOptions.Builder(options); + } + + public static TimerOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final TimerOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = TimerOptions.newBuilder().build(); + } + + public static final class Builder { + private String summary; + + private Builder() {} + + private Builder(TimerOptions options) { + if (options == null) { + return; + } + this.summary = options.summary; + } + + /** + * Single-line fixed summary for this timer that will appear in UI/CLI. This can be in + * single-line Temporal Markdown format. + * + *

Default is none/empty. + */ + @Experimental + public Builder setSummary(String summary) { + this.summary = summary; + return this; + } + + public TimerOptions build() { + return new TimerOptions(summary); + } + } + + private final String summary; + + private TimerOptions(String summary) { + this.summary = summary; + } + + public String getSummary() { + return summary; + } + + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public String toString() { + return "TimerOptions{" + "summary='" + summary + '\'' + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TimerOptions that = (TimerOptions) o; + return Objects.equals(summary, that.summary); + } + + @Override + public int hashCode() { + return Objects.hash(summary); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 5925b82d3..fa30673b5 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -496,6 +496,17 @@ public static Promise newTimer(Duration delay) { return WorkflowInternal.newTimer(delay); } + /** + * Create new timer with options. Note that Temporal service time resolution is in seconds. So all + * durations are rounded up to the nearest second. + * + * @return feature that becomes ready when at least specified number of seconds passes. promise is + * failed with {@link CanceledFailure} if enclosing scope is canceled. + */ + public static Promise newTimer(Duration delay, TimerOptions options) { + return WorkflowInternal.newTimer(delay, options); + } + /** * @deprecated use {@link #newWorkflowQueue(int)} instead. An implementation returned by this * method has a bug. diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java index d8e6faa8d..27aaed73b 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java @@ -133,6 +133,7 @@ private ReplayWorkflow createReplayWorkflow(WorkflowExecutionHistory workflowExe .getTimerStartedEventAttributes() .getTimerId()) .build(), + null, historyEvent -> {}); return false; }) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java index 42b1fb3a1..b5fffb8bd 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java @@ -357,7 +357,8 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartChildWorkflowExecutionParameters childRequest = new StartChildWorkflowExecutionParameters( StartChildWorkflowExecutionCommandAttributes.newBuilder(), - ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED); + ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED, + null); ExecuteLocalActivityParameters parameters1 = new ExecuteLocalActivityParameters( PollActivityTaskQueueResponse.newBuilder() diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java index 45f7e64df..a239d75bf 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/MutableSideEffectStateMachineTest.java @@ -234,6 +234,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .add1( (v, c) -> @@ -241,6 +242,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .>add1( (v, c) -> stateMachines.mutableSideEffect("id1", (p) -> Optional.empty(), c)) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/TimerStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/TimerStateMachineTest.java index cb27865d6..6bdd332d2 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/TimerStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/TimerStateMachineTest.java @@ -87,6 +87,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)) .add((firedEvent) -> stateMachines.completeWorkflow(Optional.empty())); } @@ -158,6 +159,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)) .add( (firedEvent) -> @@ -171,6 +173,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)) .add((firedEvent) -> stateMachines.completeWorkflow(converter.toPayloads("result1"))); @@ -232,6 +235,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)) .add( (firedEvent) -> { @@ -247,6 +251,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)) .add( (firedEvent) -> { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java index 8d23c8eef..11d7cbfaf 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java @@ -107,6 +107,7 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)) .add( (r) -> { @@ -465,6 +466,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)); } @@ -488,6 +490,7 @@ protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder build .setStartToFireTimeout( ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1))) .build(), + null, c)); } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java index d18d3d1cc..d1c835aeb 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/VersionStateMachineTest.java @@ -384,6 +384,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); }) .add((v) -> stateMachines.completeWorkflow(converter.toPayloads(v))); @@ -496,6 +497,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); }) .add1( @@ -504,6 +506,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .add2( (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c)) @@ -635,6 +638,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); }) .add1( @@ -643,6 +647,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .add2( (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c)) @@ -743,6 +748,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .add((v) -> stateMachines.completeWorkflow(converter.toPayloads(v))); } @@ -841,6 +847,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .add1( (v, c) -> @@ -848,6 +855,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) /*.add( (v, c) -> stateMachines.getVersion("id1", maxSupported - 3, maxSupported + 10, c))*/ @@ -950,6 +958,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); stateMachines.completeWorkflow(converter.toPayloads(v)); }); @@ -1009,6 +1018,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c)) .add( (v) -> { @@ -1080,6 +1090,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); }) .add2( @@ -1181,6 +1192,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { .setStartToFireTimeout( Duration.newBuilder().setSeconds(100).build()) .build(), + null, ignore -> {}))) .add((v) -> cancelTimerProc.get().apply()) .add2( @@ -1193,6 +1205,7 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); }) .add((v) -> stateMachines.completeWorkflow(converter.toPayloads(null))); @@ -1265,6 +1278,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { StartTimerCommandAttributes.newBuilder() .setStartToFireTimeout(Duration.newBuilder().setSeconds(100).build()) .build(), + null, c); }) .add2( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java new file mode 100644 index 000000000..59a3c6065 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowMetadataTest.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.childWorkflowTests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.ChildWorkflowOptions; +import io.temporal.workflow.TimerOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows.*; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class ChildWorkflowMetadataTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestParentWorkflow.class, TestChild.class) + .build(); + + static final String summary = "my-wf-summary"; + static final String details = "my-wf-details"; + static final String childSummary = "child-summary"; + static final String childDetails = "child-details"; + static final String childTimerSummary = "child-timer-summary"; + + @Before + public void checkRealServer() { + assumeTrue("skipping for test server", SDKTestWorkflowRule.useExternalService); + } + + @Test + public void testChildWorkflowWithMetaData() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowRunTimeout(Duration.ofSeconds(20)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setStaticSummary(summary) + .setStaticDetails(details) + .build(); + TestWorkflow1 stub = + testWorkflowRule.getWorkflowClient().newWorkflowStub(TestWorkflow1.class, options); + + String childWorkflowId = stub.execute(testWorkflowRule.getTaskQueue()); + + WorkflowExecution exec = WorkflowStub.fromTyped(stub).getExecution(); + assertWorkflowMetadata(exec.getWorkflowId(), summary, details); + assertWorkflowMetadata(childWorkflowId, childSummary, childDetails); + + WorkflowExecutionHistory workflowExecutionHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(childWorkflowId); + List timerStartedEvents = + workflowExecutionHistory.getEvents().stream() + .filter(HistoryEvent::hasTimerStartedEventAttributes) + .collect(Collectors.toList()); + assertEventMetadata(timerStartedEvents.get(0), childTimerSummary, null); + } + + private void assertWorkflowMetadata(String workflowId, String summary, String details) { + DescribeWorkflowExecutionResponse describe = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId).build()) + .build()); + String describedSummary = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + describe.getExecutionConfig().getUserMetadata().getSummary(), + String.class, + String.class); + String describedDetails = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + describe.getExecutionConfig().getUserMetadata().getDetails(), + String.class, + String.class); + assertEquals(summary, describedSummary); + assertEquals(details, describedDetails); + } + + private void assertEventMetadata(HistoryEvent event, String summary, String details) { + if (summary != null) { + String describedSummary = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + event.getUserMetadata().getSummary(), String.class, String.class); + assertEquals(summary, describedSummary); + } + if (details != null) { + String describedDetails = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + event.getUserMetadata().getDetails(), String.class, String.class); + assertEquals(details, describedDetails); + } + } + + public static class TestParentWorkflow implements TestWorkflow1 { + + private final ITestChild child1 = + Workflow.newChildWorkflowStub( + ITestChild.class, + ChildWorkflowOptions.newBuilder() + .setStaticDetails(childDetails) + .setStaticSummary(childSummary) + .build()); + + @Override + public String execute(String taskQueue) { + child1.execute("World!", 1); + return Workflow.getWorkflowExecution(child1).get().getWorkflowId(); + } + } + + public static class TestChild implements ITestChild { + + @Override + public String execute(String arg, int delay) { + Workflow.newTimer( + Duration.ofMillis(delay), + TimerOptions.newBuilder().setSummary(childTimerSummary).build()) + .get(); + return arg.toUpperCase(); + } + } +} diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 39b0f69d1..2ed0a18b7 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 39b0f69d19b67731e1f35fd2d231f2c871091359 +Subproject commit 2ed0a18b7ad6b6052a15da6d9d81bb97e05bc359 diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 2ad0afd89..e19ddde12 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -26,6 +26,7 @@ import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; import io.temporal.api.common.v1.*; import io.temporal.api.failure.v1.Failure; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.common.RetryOptions; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.failure.CanceledFailure; @@ -243,7 +244,7 @@ public long currentTimeMillis() { @Override public Functions.Proc1 newTimer( - Duration delay, Functions.Proc1 callback) { + Duration delay, UserMetadata metadata, Functions.Proc1 callback) { timer.schedule( new TimerTask() { @Override diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index fd4afa070..6fbe9ea67 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -60,6 +60,7 @@ import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.Promise; +import io.temporal.workflow.TimerOptions; import io.temporal.workflow.Workflow; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Type; @@ -414,6 +415,11 @@ public Promise newTimer(Duration duration) { throw new UnsupportedOperationException("not implemented"); } + @Override + public Promise newTimer(Duration duration, TimerOptions options) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public R sideEffect(Class resultClass, Type resultType, Func func) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 3d30cf662..411a8f6fc 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -28,10 +28,7 @@ import io.temporal.common.SearchAttributeUpdate; import io.temporal.common.interceptors.*; import io.temporal.internal.sync.WorkflowMethodThreadNameStrategy; -import io.temporal.workflow.Functions; -import io.temporal.workflow.Promise; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInfo; +import io.temporal.workflow.*; import io.temporal.workflow.unsafe.WorkflowUnsafe; import java.lang.reflect.Type; import java.time.Duration; @@ -247,6 +244,14 @@ public Promise newTimer(Duration duration) { return next.newTimer(duration); } + @Override + public Promise newTimer(Duration duration, TimerOptions options) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("newTimer " + duration); + } + return next.newTimer(duration, options); + } + @Override public R sideEffect(Class resultClass, Type resultType, Functions.Func func) { if (!WorkflowUnsafe.isReplaying()) {