Skip to content

Commit

Permalink
Fix transition in LA when handling canceled child wf (#2156)
Browse files Browse the repository at this point in the history
* Fix transition in LA when handling canceled child wf
  • Loading branch information
Quinn-With-Two-Ns authored Jul 30, 2024
1 parent b92c97d commit f7c7341
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ enum State {
State.REQUEST_PREPARED,
LocalActivityStateMachine::sendRequest)
.add(State.REQUEST_PREPARED, ExplicitEvent.MARK_AS_SENT, State.REQUEST_SENT)
// This is to cover an edge case where the event loop is
// run more than once while processing a workflow task.
// This can happen due to external cancellation
.add(
State.REQUEST_PREPARED,
ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
State.REQUEST_PREPARED)
.add(
State.REQUEST_SENT,
ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ EXECUTING --> REQUEST_PREPARED: SCHEDULE
MARKER_COMMAND_CREATED --> RESULT_NOTIFIED: RECORD_MARKER
REPLAYING --> WAITING_MARKER_EVENT: SCHEDULE
REQUEST_PREPARED --> REQUEST_SENT: MARK_AS_SENT
REQUEST_PREPARED --> REQUEST_PREPARED: NON_REPLAY_WORKFLOW_TASK_STARTED
REQUEST_SENT --> REQUEST_SENT: NON_REPLAY_WORKFLOW_TASK_STARTED
REQUEST_SENT --> MARKER_COMMAND_CREATED: HANDLE_RESULT
RESULT_NOTIFIED --> MARKER_COMMAND_RECORDED: MARKER_RECORDED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,25 @@
import static org.junit.Assert.fail;

import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.ActivityType;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
import io.temporal.api.history.v1.*;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.internal.history.LocalActivityMarkerUtils;
import io.temporal.internal.worker.LocalActivityResult;
import io.temporal.workflow.ChildWorkflowCancellationType;
import io.temporal.workflow.Functions;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.AfterClass;
import org.junit.Test;

Expand Down Expand Up @@ -343,4 +348,99 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
List<Command> commands = stateMachines.takeCommands();
assertTrue(commands.isEmpty());
}

@Test
public void testLocalActivityStateMachineDuplicateTask() {
class TestListener extends TestEntityManagerListenerBase {
@Override
protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
StartChildWorkflowExecutionParameters childRequest =
new StartChildWorkflowExecutionParameters(
StartChildWorkflowExecutionCommandAttributes.newBuilder(),
ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED);
ExecuteLocalActivityParameters parameters1 =
new ExecuteLocalActivityParameters(
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id1")
.setActivityType(ActivityType.newBuilder().setName("activity1")),
null,
System.currentTimeMillis(),
null,
false,
null);
// TODO: This is a workaround for the lack of support for child workflow in the test
// framework.
// The test framework has no support for state machines with multiple callbacks.
AtomicReference<Functions.Proc> cc = new AtomicReference<>();
AtomicReference<Functions.Proc2<Optional<Payloads>, Exception>> completionCallback =
new AtomicReference<>();
builder
.<WorkflowExecution, Exception>add2(
(r, c) ->
cc.set(
stateMachines.startChildWorkflow(
childRequest,
c,
(r1, c1) -> {
completionCallback.get().apply(r1, c1);
})))
.add((r) -> cc.get().apply())
.<Optional<Payloads>, Exception>add2(
(r, c) -> {
completionCallback.set(c);
})
.<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>add2(
(r, c) -> stateMachines.scheduleLocalActivityTask(parameters1, c));
}
}
/*
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED
6: EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED
7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
8: EVENT_TYPE_WORKFLOW_TASK_STARTED
9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
10: EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED
11: EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED
12: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
13: EVENT_TYPE_WORKFLOW_TASK_STARTED
*/
TestHistoryBuilder h =
new TestHistoryBuilder()
.add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
.addWorkflowTask()
.add(
EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED,
StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder().build())
.add(
EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED,
ChildWorkflowExecutionStartedEventAttributes.newBuilder()
.setInitiatedEventId(5)
.build())
.addWorkflowTask()
.add(
EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED,
RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder().build())
.addWorkflowTaskScheduled()
.add(
EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED,
ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
.setInitiatedEventId(10)
.build())
.addWorkflowTaskScheduled()
.addWorkflowTaskStarted();

TestListener listener = new TestListener();
stateMachines = newStateMachines(listener);

h.handleWorkflowTask(stateMachines);
List<ExecuteLocalActivityParameters> requests = stateMachines.takeLocalActivityRequests();
assertEquals(1, requests.size());
assertEquals("id1", requests.get(0).getActivityId());
List<Command> commands = stateMachines.takeCommands();
assertTrue(commands.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,27 @@ private HistoryEvent newAttributes(EventType type, Object attributes) {
result.setWorkflowExecutionUpdateCompletedEventAttributes(
(WorkflowExecutionUpdateCompletedEventAttributes) attributes);
break;
case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
result.setStartChildWorkflowExecutionInitiatedEventAttributes(
(StartChildWorkflowExecutionInitiatedEventAttributes) attributes);
break;
case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
result.setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(
(RequestCancelExternalWorkflowExecutionInitiatedEventAttributes) attributes);
break;
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
result.setExternalWorkflowExecutionCancelRequestedEventAttributes(
(ExternalWorkflowExecutionCancelRequestedEventAttributes) attributes);
break;

case EVENT_TYPE_UNSPECIFIED:
case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.activityTests;

import static org.junit.Assert.assertThrows;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowStub;
import io.temporal.failure.TemporalFailure;
import io.temporal.testing.WorkflowReplayer;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class LocalActivityAfterCancelTest {
private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestLocalActivityRetry.class, BlockingWorkflow.class)
.setActivityImplementations(activitiesImpl)
.build();

@Test
public void localActivityAfterChildWorkflowCanceled() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
WorkflowClient.execute(workflowStub::execute, "sada");
WorkflowStub.fromTyped(workflowStub).cancel();
WorkflowFailedException exception =
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("sada"));
Assert.assertEquals(
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED, exception.getWorkflowCloseEventType());
}

@Test
public void testLocalActivityAfterChildWorkflowCanceledReplay() {
assertThrows(
RuntimeException.class,
() ->
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testLocalActivityAfterCancelTest.json",
LocalActivityAfterCancelTest.TestLocalActivityRetry.class));
}

@WorkflowInterface
public static class BlockingWorkflow implements TestWorkflows.TestWorkflowReturnString {
@Override
public String execute() {
Workflow.await(() -> false);
return "";
}
}

public static class TestLocalActivityRetry implements TestWorkflow1 {

@Override
public String execute(String taskQueue) {
try {
ChildWorkflowOptions childOptions =
ChildWorkflowOptions.newBuilder()
.setWorkflowId(Workflow.getInfo().getWorkflowId() + "-child1")
.setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED)
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL)
.validateAndBuildWithDefaults();
TestWorkflows.TestWorkflowReturnString child =
Workflow.newChildWorkflowStub(
TestWorkflows.TestWorkflowReturnString.class, childOptions);
child.execute();
} catch (TemporalFailure e) {
if (CancellationScope.current().isCancelRequested()) {
Workflow.newDetachedCancellationScope(
() -> {
VariousTestActivities act =
Workflow.newLocalActivityStub(
VariousTestActivities.class,
LocalActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(5))
.validateAndBuildWithDefaults());
act.activity1(10);
})
.run();
throw e;
}
}
return "dsadsa";
}
}
}
Loading

0 comments on commit f7c7341

Please sign in to comment.