Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't replay commands from non-completed task #1750

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"fmt"
"math"
"reflect"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -1023,6 +1024,8 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
return !wfPanicked && isInReplayer
}

var wftCompletedIndex int64

metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
start := time.Now()
// This is set to nil once recorded
Expand All @@ -1045,10 +1048,26 @@ ProcessEvents:
binaryChecksum := nextTask.binaryChecksum
nextTaskBuildId := nextTask.buildID
admittedUpdates := nextTask.admittedMsgs
// Only replay up to the last completed event
if len(reorderedEvents) > 0 {
if reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
wftCompletedIndex = reorderedEvents[len(reorderedEvents)-2].EventId
} else if reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED || reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED {
wftCompletedIndex = reorderedEvents[len(reorderedEvents)-1].EventId
} else
// If completed + task scheduled, that's a WFT heartbeat, does not complete sequence
if (len(reorderedEvents) == 1 && reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) || (len(reorderedEvents) > 1 && reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED && reorderedEvents[1].GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED) {
wftCompletedIndex = reorderedEvents[0].EventId
}
}

// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
if isReplay {
//if reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
// completedTaskReplayCommandIndex = len(replayCommands)
//}
admittedUpdatesByID := make(map[string]*protocolpb.Message, len(admittedUpdates))
for _, admittedUpdate := range admittedUpdates {
admittedUpdatesByID[admittedUpdate.GetProtocolInstanceId()] = admittedUpdate
Expand Down Expand Up @@ -1181,6 +1200,9 @@ ProcessEvents:
}
}
if isReplay {
// TODO: Why does this have commands that aren't a part of history?
// looks like both replayCommands and respondEvents sometimes hold events/commands that aren't a part of
// the replay history
eventCommands := eventHandler.commandsHelper.getCommands(true)
if !skipReplayCheck {
replayCommands = append(replayCommands, eventCommands...)
Expand All @@ -1195,6 +1217,23 @@ ProcessEvents:
metricsTimer = nil
}

// We do not want to run non-determinism checks on a task start that
// doesn't have a corresponding completed task.
for i, cmd := range replayCommands {
activityId, err := strconv.ParseInt(cmd.GetScheduleActivityTaskCommandAttributes().ActivityId, 10, 64)
if err != nil {
return nil, err
}
if activityId > wftCompletedIndex {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Quinn-With-Two-Ns, when working through the integration test I added, it appeared that both replayCommands and respondEvents sometimes held events/commands with event id's that didn't exist in the history yet. I'm pretty sure this only happens when the history partial (I assume getCommands is eagerly generating the next commands, even if the partial history doesn't have the corresponding event)

Let me know if this assumption is wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand what you mean, do you have an example? What do you mean by with event id's that didn't exist in the history yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When replaying this history

	[event], 0 event_id:1 event_time:{seconds:1736463805 nanos:286266000} event_type:EVENT_TYPE_WORKFLOW_EXECUTION_STARTED task_id:1049456 workflow_execution_started_event_attributes:{workflow_type:{name:"Basic"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} workflow_execution_timeout:{seconds:15} workflow_run_timeout:{seconds:15} workflow_task_timeout:{seconds:1} original_execution_run_id:"50e54d91-21e0-4f20-8646-6b9d5d9fb92a" identity:"[email protected]@" first_execution_run_id:"50e54d91-21e0-4f20-8646-6b9d5d9fb92a" attempt:1 workflow_execution_expiration_time:{seconds:1736463820 nanos:286000000} first_workflow_task_backoff:{} header:{} workflow_id:"test-partial-history-replay"}
	[event], 1 event_id:2 event_time:{seconds:1736463805 nanos:286296000} event_type:EVENT_TYPE_WORKFLOW_TASK_SCHEDULED task_id:1049457 workflow_task_scheduled_event_attributes:{task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} start_to_close_timeout:{seconds:1} attempt:1}
	[event], 2 event_id:3 event_time:{seconds:1736463805 nanos:287675000} event_type:EVENT_TYPE_WORKFLOW_TASK_STARTED task_id:1049466 workflow_task_started_event_attributes:{scheduled_event_id:2 identity:"[email protected]@" request_id:"e37014db-16c8-4e08-8d68-3e39d9df1a0c" history_size_bytes:856 worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"}}
	[event], 3 event_id:4 event_time:{seconds:1736463805 nanos:292672000} event_type:EVENT_TYPE_WORKFLOW_TASK_COMPLETED task_id:1049470 workflow_task_completed_event_attributes:{scheduled_event_id:2 started_event_id:3 identity:"[email protected]@" worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"} sdk_metadata:{lang_used_flags:3 sdk_name:"temporal-go" sdk_version:"1.30.1"} metering_metadata:{}}
	[event], 4 event_id:5 event_time:{seconds:1736463805 nanos:292703000} event_type:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED task_id:1049471 activity_task_scheduled_event_attributes:{activity_id:"5" activity_type:{name:"Prefix_ToUpperWithDelay"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} header:{} input:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"hello\""} payloads:{metadata:{key:"encoding" value:"json/plain"} data:"1000000000"}} schedule_to_close_timeout:{seconds:5} schedule_to_start_timeout:{seconds:5} start_to_close_timeout:{seconds:5} heartbeat_timeout:{} workflow_task_completed_event_id:4 retry_policy:{initial_interval:{seconds:1} backoff_coefficient:2 maximum_interval:{seconds:100}} use_workflow_build_id:true}
	[event], 5 event_id:6 event_time:{seconds:1736463805 nanos:293543000} event_type:EVENT_TYPE_ACTIVITY_TASK_STARTED task_id:1049478 activity_task_started_event_attributes:{scheduled_event_id:5 identity:"[email protected]@" request_id:"5022f13d-356e-4474-aa7e-ef22ed75c430" attempt:1 worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"}}
	[event], 6 event_id:7 event_time:{seconds:1736463806 nanos:299015000} event_type:EVENT_TYPE_ACTIVITY_TASK_COMPLETED task_id:1049479 activity_task_completed_event_attributes:{result:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"HELLO\""}} scheduled_event_id:5 started_event_id:6 identity:"[email protected]@"}
	[event], 7 event_id:8 event_time:{seconds:1736463806 nanos:299034000} event_type:EVENT_TYPE_WORKFLOW_TASK_SCHEDULED task_id:1049480 workflow_task_scheduled_event_attributes:{task_queue:{name:"Andrews-MacBook-Pro.local:f9cda759-7917-4209-abdf-e5265acb3bdc" kind:TASK_QUEUE_KIND_STICKY normal_name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay"} start_to_close_timeout:{seconds:1} attempt:1}
	[event], 8 event_id:9 event_time:{seconds:1736463806 nanos:302956000} event_type:EVENT_TYPE_WORKFLOW_TASK_STARTED task_id:1049484 workflow_task_started_event_attributes:{scheduled_event_id:8 identity:"[email protected]@" request_id:"b7ffeb4f-5271-4f90-8fd2-6ccaca01e390" history_size_bytes:1819 worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"}}
	[event], 9 event_id:10 event_time:{seconds:1736463806 nanos:312989000} event_type:EVENT_TYPE_WORKFLOW_TASK_COMPLETED task_id:1049488 workflow_task_completed_event_attributes:{scheduled_event_id:8 started_event_id:9 identity:"[email protected]@" worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"} sdk_metadata:{} metering_metadata:{}}

replayCommands contains

[replayCommands] command_type:COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK schedule_activity_task_command_attributes:{activity_id:"11" activity_type:{name:"Prefix_ToUpper"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} header:{} input:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"HELLO\""}} schedule_to_close_timeout:{seconds:5} schedule_to_start_timeout:{seconds:5} start_to_close_timeout:{seconds:9} heartbeat_timeout:{} request_eager_execution:true use_workflow_build_id:true}

but that activity_id doesn't exist in the history we're replaying

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like respondEvents properly removes the entry with event_id 11 when we take that out of history

[respondEvents] event_id:11 event_time:{seconds:1736463806 nanos:313101000} event_type:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED task_id:1049489 activity_task_scheduled_event_attributes:{activity_id:"11" activity_type:{name:"Prefix_ToUpper"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} header:{} input:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"HELLO\""}} schedule_to_close_timeout:{seconds:5} schedule_to_start_timeout:{seconds:5} start_to_close_timeout:{seconds:5} heartbeat_timeout:{} workflow_task_completed_event_id:10 retry_policy:{initial_interval:{seconds:1} backoff_coefficient:2 maximum_interval:{seconds:100}} use_workflow_build_id:true}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand that history you shared, is that intended to be the whole history? Since if that is the history then that history should contain an activity scheduled event or throw a non determinism exception.

replayCommands = replayCommands[:i]
}
}
for i, event := range respondEvents {
if event.EventId > wftCompletedIndex {
respondEvents = respondEvents[:i]
}
}

// Non-deterministic error could happen in 2 different places:
// 1) the replay commands does not match to history events. This is usually due to non backwards compatible code
// change to workflow logic. For example, change calling one activity to a different activity.
Expand Down Expand Up @@ -2339,3 +2378,14 @@ func traceLog(fn func()) {
fn()
}
}

//func trimEventsToCompletedWFT(commands []*commandpb.Command) []*commandpb.Command {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to self: remove

// lastIndex := 0
// sawAnyCommandEvent := false
// // wftStartedEventIdToIndex
// for ix, command := range commands {
// last_index := ix
//
// if
// }
//}
21 changes: 21 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,27 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() {
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowTask() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
require.NoError(s.T(), err)
replayer.RegisterWorkflow(testReplayWorkflow)
err = replayer.ReplayWorkflowHistory(logger, history)
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
Expand Down
35 changes: 35 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6904,3 +6904,38 @@ func (ts *InvalidUTF8Suite) TestBasic() {
// Go's JSON coding stack will replace invalid bytes with the unicode substitute char U+FFFD
ts.Equal("\n�\x01\n\x0ejunk\x12data", response)
}

func (ts *IntegrationTestSuite) TestPartialHistoryReplay() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Run the workflow
run, err := ts.client.ExecuteWorkflow(ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this workflow misses some key features that we have struggle with replaying in the past. It only executes activities serially. I would like to see a workflow that has some more events like version markers, signals, and local activities. It also only has 2 WFT. Maybe we can add a small fuzzing workflow that picks from a few commands in a deterministic , but random manner and does a few iterations of that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I understand, you're suggesting writing a workflow that takes in a deterministic seed, and then "randomly" selects different commands to call? As in something along the lines of


for i := 0; i < iterations; i++ {
		cmd := rnd.Intn(3)

		switch cmd {
		case 1:
			var ans1 string
			workflow.GetLogger(ctx).Info("calling ExecuteActivity")
		case 2:
			workflow.GetLogger(ctx).Info("calling ExecuteLocalActivity")
		default:
			workflow.GetLogger(ctx).Warn("Unknown command")
		}
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah could generate the random number in a SideEffect so it is deterministic

ts.startWorkflowOptions("test-partial-history-replay"), ts.workflows.Basic)
ts.NotNil(run)
ts.NoError(err)
ts.NoError(run.Get(ctx, nil))

// Obtain history
var history historypb.History
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
history.Events = append(history.Events, event)
}

for _ = range history.Events {
if len(history.Events) >= 3 {
fmt.Println("\n[REPLAY]")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove before merging

for i, event := range history.Events {
fmt.Println("\t[event],", i, event)
}
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ts.workflows.Basic)
ts.NoError(replayer.ReplayWorkflowHistory(nil, &history))
history.Events = history.Events[:len(history.Events)-1]
}
}
}
Loading