diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java index 30a3eeb2c..fbe6a7103 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java @@ -338,7 +338,7 @@ boolean getVersion( /** * @return eventId of the last / currently active workflow task of this workflow */ - long getCurrentWorkflowTaskStartedEventId(); + long getLastWorkflowTaskStartedEventId(); /** * @return size of Workflow history in bytes up until the current moment of execution. This value @@ -405,4 +405,13 @@ boolean getVersion( * @return true if this flag may currently be used. */ boolean tryUseSdkFlag(SdkFlag flag); + + /** + * @return The Build ID of the worker which executed the current Workflow Task. May be empty the + * task was completed by a worker without a Build ID. If this worker is the one executing this + * task for the first time and has a Build ID set, then its ID will be used. This value may + * change over the lifetime of the workflow run, but is deterministic and safe to use for + * branching. + */ + Optional getCurrentBuildId(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java index 8d5a99c23..61e8e964d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java @@ -253,6 +253,19 @@ public boolean tryUseSdkFlag(SdkFlag flag) { return workflowStateMachines.tryUseSdkFlag(flag); } + @Override + public Optional getCurrentBuildId() { + String curTaskBID = workflowStateMachines.getCurrentTaskBuildId(); + // The current task started id == 0 check is to avoid setting the build id to this worker's ID + // in the event we're + // servicing a query, in which case we do want to use the ID from history. + if (!workflowStateMachines.isReplaying() + && workflowStateMachines.getCurrentWFTStartedEventId() != 0) { + curTaskBID = workerOptions.getBuildId(); + } + return Optional.ofNullable(curTaskBID); + } + @Override public Functions.Proc1 newTimer( Duration delay, Functions.Proc1 callback) { @@ -356,8 +369,8 @@ public Map getHeader() { } @Override - public long getCurrentWorkflowTaskStartedEventId() { - return workflowStateMachines.getCurrentStartedEventId(); + public long getLastWorkflowTaskStartedEventId() { + return workflowStateMachines.getLastWFTStartedEventId(); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index facd909a3..01c0448a9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -150,7 +150,7 @@ public WorkflowTaskResult handleWorkflowTask( TimeUnit.NANOSECONDS); if (workflowTask.getPreviousStartedEventId() - < workflowStateMachines.getCurrentStartedEventId()) { + < workflowStateMachines.getLastWFTStartedEventId()) { // if previousStartedEventId < currentStartedEventId - the last workflow task handled by // these state machines is ahead of the last handled workflow task known by the server. // Something is off, the server lost progress. @@ -219,7 +219,7 @@ public QueryResult handleDirectQueryWorkflowTask( @Override public void setCurrentStartedEvenId(Long eventId) { - workflowStateMachines.setCurrentStartedEventId(eventId); + workflowStateMachines.setLastWFTStartedEventId(eventId); } private void handleWorkflowTaskImpl( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 1319554fa..aa42f0285 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -49,6 +49,7 @@ import io.temporal.workflow.Functions; import java.nio.charset.StandardCharsets; import java.util.*; +import javax.annotation.Nullable; public final class WorkflowStateMachines { @@ -79,7 +80,10 @@ enum HandleEventStatus { private long workflowTaskStartedEventId; /** EventId of the last WorkflowTaskStarted event handled by these state machines. */ - private long currentStartedEventId; + private long lastWFTStartedEventId; + + /** The Build ID used in the current WFT if already completed and set (may be null) */ + private String currentTaskBuildId; private long historySize; @@ -201,27 +205,36 @@ public void setWorklfowStartedEventId(long workflowTaskStartedEventId) { this.workflowTaskStartedEventId = workflowTaskStartedEventId; } - public void setCurrentStartedEventId(long eventId) { + public void setLastWFTStartedEventId(long eventId) { // We have to drop any state machines (which should only be one workflow task machine) // created when handling the speculative workflow task for (long i = this.lastHandledEventId; i > eventId; i--) { stateMachines.remove(i); } - this.currentStartedEventId = eventId; + this.lastWFTStartedEventId = eventId; // When we reset the event ID on a speculative WFT we need to move this counter back // to the last WFT completed to allow new tasks to be processed. Assume the WFT complete // always follows the WFT started. this.lastHandledEventId = eventId + 1; } - public long getCurrentStartedEventId() { - return currentStartedEventId; + public long getLastWFTStartedEventId() { + return lastWFTStartedEventId; + } + + public long getCurrentWFTStartedEventId() { + return workflowTaskStartedEventId; } public long getHistorySize() { return historySize; } + @Nullable + public String getCurrentTaskBuildId() { + return currentTaskBuildId; + } + public boolean isContinueAsNewSuggested() { return isContinueAsNewSuggested; } @@ -329,6 +342,10 @@ private void handleSingleEventLookahead(HistoryEvent event) { case EVENT_TYPE_WORKFLOW_TASK_COMPLETED: WorkflowTaskCompletedEventAttributes completedEvent = event.getWorkflowTaskCompletedEventAttributes(); + String maybeBuildId = completedEvent.getWorkerVersion().getBuildId(); + if (!maybeBuildId.isEmpty()) { + currentTaskBuildId = maybeBuildId; + } for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) { SdkFlag sdkFlag = SdkFlag.getValue(flag); if (sdkFlag.equals(SdkFlag.UNKNOWN)) { @@ -703,7 +720,7 @@ private long setCurrentTimeMillis(long currentTimeMillis) { } public long getLastStartedEventId() { - return currentStartedEventId; + return lastWFTStartedEventId; } /** @@ -1164,7 +1181,7 @@ public void workflowTaskStarted( value.nonReplayWorkflowTaskStarted(); } } - WorkflowStateMachines.this.currentStartedEventId = startedEventId; + WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId; WorkflowStateMachines.this.historySize = historySize; WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested; @@ -1282,6 +1299,6 @@ private String createEventHandlingMessage(HistoryEvent event) { private String createShortCurrentStateMessagePostfix() { return String.format( "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}", - this.workflowTaskStartedEventId, this.currentStartedEventId); + this.workflowTaskStartedEventId, this.lastWFTStartedEventId); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java index fc546c951..f44132348 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java @@ -137,7 +137,7 @@ public String getCronSchedule() { @Override public long getHistoryLength() { - return context.getCurrentWorkflowTaskStartedEventId(); + return context.getLastWorkflowTaskStartedEventId(); } @Override @@ -150,6 +150,11 @@ public boolean isContinueAsNewSuggested() { return context.isContinueAsNewSuggested(); } + @Override + public Optional getCurrentBuildId() { + return context.getCurrentBuildId(); + } + @Override public String toString() { return "WorkflowInfo{" diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java index cff44064b..bbd6597b8 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java @@ -164,4 +164,13 @@ public interface WorkflowInfo { * value changes during the lifetime of a Workflow Execution. */ boolean isContinueAsNewSuggested(); + + /** + * @return The Build ID of the worker which executed the current Workflow Task. May be empty the + * task was completed by a worker without a Build ID. If this worker is the one executing this + * task for the first time and has a Build ID set, then its ID will be used. This value may + * change over the lifetime of the workflow run, but is deterministic and safe to use for + * branching. + */ + Optional getCurrentBuildId(); } diff --git a/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java b/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java index b51fce023..22a27922f 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/BuildIdVersioningTest.java @@ -24,18 +24,19 @@ import io.temporal.activity.Activity; import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.BuildIdOperation; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.internal.Signal; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowMethod; -import io.temporal.workflow.WorkflowQueue; +import io.temporal.workflow.*; import io.temporal.workflow.shared.TestActivities; import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; import java.util.UUID; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -45,7 +46,6 @@ public class BuildIdVersioningTest { SDKTestWorkflowRule.newBuilder() .setWorkerOptions( WorkerOptions.newBuilder().setBuildId("1.0").setUseBuildIdForVersioning(true).build()) - .setWorkflowTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class) .setActivityImplementations(new BuildIdVersioningTest.ActivityImpl()) .setDoNotStart(true) .build(); @@ -57,6 +57,10 @@ public void testBuildIdVersioningDataSetProperly() { String taskQueue = testWorkflowRule.getTaskQueue(); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + testWorkflowRule + .getWorker() + .registerWorkflowImplementationTypes( + BuildIdVersioningTest.TestVersioningWorkflowImpl.class); // Add 1.0 to the queue workflowClient.updateWorkerBuildIdCompatability( @@ -120,6 +124,75 @@ public void testBuildIdVersioningDataSetProperly() { w2F.shutdown(); } + private static final Signal ACTIVITY_RAN = new Signal(); + + @Test + public void testCurrentBuildIDSetProperly() throws InterruptedException { + assumeTrue( + "Test Server doesn't support versioning yet", SDKTestWorkflowRule.useExternalService); + + String taskQueue = testWorkflowRule.getTaskQueue(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + testWorkflowRule + .getWorker() + .registerWorkflowImplementationTypes( + BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class); + + // Add 1.0 to the queue + workflowClient.updateWorkerBuildIdCompatability( + taskQueue, BuildIdOperation.newIdInNewDefaultSet("1.0")); + + // Now start the worker (to avoid poll timeout while queue is unversioned) + testWorkflowRule.getTestEnvironment().start(); + + // Start a workflow + String workflowId = "build-id-versioning-1.0-" + UUID.randomUUID(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(taskQueue).toBuilder() + .setWorkflowId(workflowId) + .build(); + TestWorkflows.QueryableWorkflow wf1 = + workflowClient.newWorkflowStub(TestWorkflows.QueryableWorkflow.class, options); + WorkflowClient.start(wf1::execute); + + Assert.assertEquals("1.0", wf1.getState()); + + // Wait for activity to run + ACTIVITY_RAN.waitForSignal(); + Assert.assertEquals("1.0", wf1.getState()); + + testWorkflowRule.getTestEnvironment().shutdown(); + workflowClient + .getWorkflowServiceStubs() + .blockingStub() + .resetStickyTaskQueue( + io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId).build()) + .build()); + + // Add 1.1 to the queue + workflowClient.updateWorkerBuildIdCompatability( + taskQueue, BuildIdOperation.newCompatibleVersion("1.1", "1.0")); + + WorkerFactory w11F = + WorkerFactory.newInstance(workflowClient, testWorkflowRule.getWorkerFactoryOptions()); + Worker w11 = + w11F.newWorker( + taskQueue, + WorkerOptions.newBuilder().setBuildId("1.1").setUseBuildIdForVersioning(true).build()); + w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class); + w11.registerActivitiesImplementations(new BuildIdVersioningTest.ActivityImpl()); + w11F.start(); + + Assert.assertEquals("1.0", wf1.getState()); + wf1.mySignal("finish"); + + Assert.assertEquals("1.1", wf1.getState()); + + w11F.shutdown(); + } + public static class TestVersioningWorkflowImpl implements TestWorkflows.QueryableWorkflow { WorkflowQueue sigQueue = Workflow.newWorkflowQueue(1); private final TestActivities.TestActivity1 activity = @@ -156,4 +229,43 @@ public String execute(String input) { return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input; } } + + public static class TestCurrentBuildIdWorkflow implements TestWorkflows.QueryableWorkflow { + private final TestActivities.TestActivity1 activity = + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build()); + private boolean doFinish = false; + private String lastBuildId; + + @WorkflowMethod + public String execute() { + updateBuildId(); + Workflow.sleep(1); + updateBuildId(); + if (Workflow.getInfo().getCurrentBuildId().orElse("").equals("1.0")) { + activity.execute("foo"); + updateBuildId(); + ACTIVITY_RAN.signal(); + } + Workflow.await(() -> doFinish); + updateBuildId(); + return "Yay done"; + } + + private void updateBuildId() { + lastBuildId = Workflow.getInfo().getCurrentBuildId().orElse(""); + } + + @Override + public void mySignal(String arg) { + doFinish = true; + } + + @Override + public String getState() { + // Workflow.getInfo isn't accessible in queries, so we do this + return lastBuildId; + } + } } diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java index 18be08aaf..25daa91e0 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -319,6 +319,11 @@ public boolean tryUseSdkFlag(SdkFlag flag) { return false; } + @Override + public Optional getCurrentBuildId() { + return Optional.empty(); + } + @Override public int getAttempt() { return 1; @@ -353,7 +358,7 @@ public Map getHeader() { } @Override - public long getCurrentWorkflowTaskStartedEventId() { + public long getLastWorkflowTaskStartedEventId() { return 0; }