Skip to content

Commit

Permalink
Ignore history events with worker_may_ignore: true. (#2000)
Browse files Browse the repository at this point in the history
## What was changed

* Unknown event types with `worker_may_ignore: true` are skipped
* All other unknown event types continue to throw
* Added new replay tests verifying that `worker_may_ignore` is handled correctly

## Why?

The new `worker_may_ignore` flag is intended to mark events that can be handled as no-ops if the SDK doesn't know the event type.
  • Loading branch information
chronos-tachyon authored Mar 4, 2024
1 parent ad1dabc commit b182d78
Show file tree
Hide file tree
Showing 5 changed files with 564 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import io.temporal.api.command.v1.*;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.common.v1.*;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.*;
Expand Down Expand Up @@ -60,7 +58,8 @@ enum HandleEventStatus {

/** Initial set of SDK flags that will be set on all new workflow executions. */
private static final List<SdkFlag> initialFlags =
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
Collections.unmodifiableList(
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));

/**
* EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
Expand Down Expand Up @@ -159,7 +158,7 @@ enum HandleEventStatus {

private final WFTBuffer wftBuffer = new WFTBuffer();

private List<Message> messages = new ArrayList<Message>();
private List<Message> messages = new ArrayList<>();

private final SdkFlags flags;

Expand Down Expand Up @@ -358,8 +357,8 @@ private void handleSingleEventLookahead(HistoryEvent event) {
}

private List<Message> takeLTE(long eventId) {
List<Message> m = new ArrayList<Message>();
List<Message> remainingMessages = new ArrayList<Message>();
List<Message> m = new ArrayList<>();
List<Message> remainingMessages = new ArrayList<>();
for (Message msg : this.messages) {
if (msg.getEventId() > eventId) {
remainingMessages.add(msg);
Expand Down Expand Up @@ -430,12 +429,16 @@ private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
replaying = false;
}

Long initialCommandEventId = getInitialCommandEventId(event);
EntityStateMachine c = stateMachines.get(initialCommandEventId);
final OptionalLong initialCommandEventId = getInitialCommandEventId(event);
if (!initialCommandEventId.isPresent()) {
return;
}

EntityStateMachine c = stateMachines.get(initialCommandEventId.getAsLong());
if (c != null) {
c.handleEvent(event, hasNextEvent);
if (c.isFinalState()) {
stateMachines.remove(initialCommandEventId);
stateMachines.remove(initialCommandEventId.getAsLong());
}
} else {
handleNonStatefulEvent(event, hasNextEvent);
Expand Down Expand Up @@ -585,9 +588,7 @@ public void sendMessage(Message message) {

public List<Message> takeMessages() {
List<Message> result = new ArrayList<>(messageOutbox.size());
for (Message message : messageOutbox) {
result.add(message);
}
result.addAll(messageOutbox);
messageOutbox.clear();
return result;
}
Expand Down Expand Up @@ -960,10 +961,9 @@ public boolean getVersion(
VersionStateMachine stateMachine =
versions.computeIfAbsent(
changeId,
(idKey) -> {
return VersionStateMachine.newInstance(
changeId, this::isReplaying, commandSink, stateMachineSink);
});
(idKey) ->
VersionStateMachine.newInstance(
changeId, this::isReplaying, commandSink, stateMachineSink));
return stateMachine.getVersion(
minSupported,
maxSupported,
Expand Down Expand Up @@ -1194,60 +1194,85 @@ public void updateRunId(String currentRunId) {
}
}

private long getInitialCommandEventId(HistoryEvent event) {
/**
* Extracts the eventId of the "initial command" for the given event.
*
* <p>The "initial command" is the event which started a group of related events:
* ActivityTaskScheduled, TimerStarted, and so on; for events which are not part of a group, the
* event's own eventId is returned. If the event has an unknown type but is marked as ignorable,
* then {@link OptionalLong#empty()} is returned instead.
*
* @return the eventId of the initial command, or {@link OptionalLong#empty()}
*/
private OptionalLong getInitialCommandEventId(HistoryEvent event) {
switch (event.getEventType()) {
case EVENT_TYPE_ACTIVITY_TASK_STARTED:
return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getActivityTaskStartedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskCompletedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_FAILED:
return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getActivityTaskFailedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskTimedOutEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId());
case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getActivityTaskCanceledEventAttributes().getScheduledEventId());
case EVENT_TYPE_TIMER_FIRED:
return event.getTimerFiredEventAttributes().getStartedEventId();
return OptionalLong.of(event.getTimerFiredEventAttributes().getStartedEventId());
case EVENT_TYPE_TIMER_CANCELED:
return event.getTimerCanceledEventAttributes().getStartedEventId();
return OptionalLong.of(event.getTimerCanceledEventAttributes().getStartedEventId());
case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
return event
.getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
.getInitiatedEventId();
return OptionalLong.of(
event
.getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
.getInitiatedEventId());
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
return event
.getExternalWorkflowExecutionCancelRequestedEventAttributes()
.getInitiatedEventId();
return OptionalLong.of(
event
.getExternalWorkflowExecutionCancelRequestedEventAttributes()
.getInitiatedEventId());
case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId());
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
return event
.getSignalExternalWorkflowExecutionFailedEventAttributes()
.getInitiatedEventId();
return OptionalLong.of(
event.getSignalExternalWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
return OptionalLong.of(
event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId());
case EVENT_TYPE_WORKFLOW_TASK_STARTED:
return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getWorkflowTaskStartedEventAttributes().getScheduledEventId());
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId());
case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
return OptionalLong.of(
event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId());
case EVENT_TYPE_WORKFLOW_TASK_FAILED:
return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
return OptionalLong.of(event.getWorkflowTaskFailedEventAttributes().getScheduledEventId());

case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
case EVENT_TYPE_TIMER_STARTED:
Expand All @@ -1266,12 +1291,14 @@ private long getInitialCommandEventId(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
return event.getEventId();
case UNRECOGNIZED:
case EVENT_TYPE_UNSPECIFIED:
return OptionalLong.of(event.getEventId());

default:
if (event.getWorkerMayIgnore()) {
return OptionalLong.empty();
}
throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
}
throw new IllegalStateException("unreachable");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.replay;

import static io.temporal.testing.WorkflowHistoryLoader.readHistoryFromResource;

import io.temporal.activity.*;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.*;
import java.time.Duration;
import org.junit.*;
import org.junit.rules.Timeout;

public class UnknownHistoryEventReplayerTest {

public static final String TASK_QUEUE = "unknown-history-event";
public static final String RES_CLEAN = "testUnknownHistoryEventClean.json";
public static final String RES_MAY_IGNORE = "testUnknownHistoryEventMayIgnore.json";
public static final String RES_MAY_NOT_IGNORE = "testUnknownHistoryEventMayNotIgnore.json";

@Rule public Timeout testTimeout = Timeout.seconds(10);

private TestWorkflowEnvironment testEnvironment;
private Worker worker;

@Before
public void setUp() {
testEnvironment = TestWorkflowEnvironment.newInstance();
worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class);
worker.registerActivitiesImplementations(new MyActivityImpl());
testEnvironment.start();
}

@After
public void tearDown() {
testEnvironment.close();
}

@Test
public void testRun() {
WorkflowClient client = testEnvironment.getWorkflowClient();
WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId("plain-run").build();
MyWorkflow stub = client.newWorkflowStub(MyWorkflow.class, options);
stub.execute();
WorkflowExecutionHistory history = client.fetchHistory("plain-run");
System.out.println(history.toJson(true));
}

@Test
public void testClean() throws Exception {
WorkflowExecutionHistory history = readHistoryFromResource(RES_CLEAN);
worker.replayWorkflowExecution(history);
}

@Test
public void testMayIgnore() throws Exception {
WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_IGNORE);
worker.replayWorkflowExecution(history);
}

@Test(expected = RuntimeException.class)
public void testMayNotIgnore() throws Exception {
WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_NOT_IGNORE);
worker.replayWorkflowExecution(history);
}

@WorkflowInterface
public interface MyWorkflow {

@WorkflowMethod
void execute();
}

@ActivityInterface
public interface MyActivity {

@ActivityMethod
void execute();
}

public static class MyWorkflowImpl implements MyWorkflow {

@Override
public void execute() {
MyActivity activity =
Workflow.newLocalActivityStub(
MyActivity.class,
LocalActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.build());
activity.execute();
}
}

public static class MyActivityImpl implements MyActivity {

@Override
public void execute() {}
}
}
Loading

0 comments on commit b182d78

Please sign in to comment.