Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't replay commands from non-completed task #1750

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

yuandrew
Copy link
Contributor

@yuandrew yuandrew commented Dec 9, 2024

What was changed

Skip any commands that aren't a part of a completed WFT (a.k.a. if WFT start is the last event)

Why?

Fix issue where we're hitting NDE when we shouldn't be.

Checklist

  1. Closes Erroneous extra replay command when replaying mid-workflow tasks #1670

  2. How was this tested:

Added new test

  1. Any docs updates needed?

@yuandrew yuandrew requested a review from a team as a code owner December 9, 2024 23:03
Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an integration test that has a full history that replays successfully in the replayer, then confirm it continually succeeds in a replayer popping one event off the end each time?

Also, would like @Quinn-With-Two-Ns to look at this PR when he's available before we merge.

Comment on lines 537 to 539
func isTaskCompletedEvent(event *historypb.HistoryEvent) bool {
return event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just inline this single-use-single-line function, no need for a separate function

@@ -1045,6 +1050,9 @@ ProcessEvents:
binaryChecksum := nextTask.binaryChecksum
nextTaskBuildId := nextTask.buildID
admittedUpdates := nextTask.admittedMsgs
if len(reorderedEvents) > 0 && isTaskCompletedEvent(reorderedEvents[0]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't check the code, but I assume you have confirmed that for every event set, the initial index is where the task completed event lives? Is there ever a situation in the Go SDK besides partial task during worker replayer where reorderedEvents[0] is not task completed? Are there any concerns about some of this logic affecting non-replayer code paths?

Copy link
Contributor Author

@yuandrew yuandrew Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't able to figure out from looking at code if this is enforced, but I've confirmed that every unit and integ test we run today has task completed at index 0 of reorderedEvents.

Is there ever a situation in the Go SDK besides partial task during worker replayer where reorderedEvents[0] is not task completed? Are there any concerns about some of this logic affecting non-replayer code paths?

Not sure, maybe a question @Quinn-With-Two-Ns can help answer when he gets back

Comment on lines 1206 to 1210
// We do not want to run non-determinism checks on a task start that
// doesn't have a corresponding completed task.
if completedTaskCommandIndex >= 0 {
replayCommands = replayCommands[:completedTaskCommandIndex]
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically same question from above, is there ever a case this if evaluates to true when not using a replayer (i.e. in normal operation)?

@@ -2339,3 +2378,14 @@ func traceLog(fn func()) {
fn()
}
}

//func trimEventsToCompletedWFT(commands []*commandpb.Command) []*commandpb.Command {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to self: remove

if err != nil {
return nil, err
}
if activityId > wftCompletedIndex {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Quinn-With-Two-Ns, when working through the integration test I added, it appeared that both replayCommands and respondEvents sometimes held events/commands with event id's that didn't exist in the history yet. I'm pretty sure this only happens when the history partial (I assume getCommands is eagerly generating the next commands, even if the partial history doesn't have the corresponding event)

Let me know if this assumption is wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand what you mean, do you have an example? What do you mean by with event id's that didn't exist in the history yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When replaying this history

	[event], 0 event_id:1 event_time:{seconds:1736463805 nanos:286266000} event_type:EVENT_TYPE_WORKFLOW_EXECUTION_STARTED task_id:1049456 workflow_execution_started_event_attributes:{workflow_type:{name:"Basic"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} workflow_execution_timeout:{seconds:15} workflow_run_timeout:{seconds:15} workflow_task_timeout:{seconds:1} original_execution_run_id:"50e54d91-21e0-4f20-8646-6b9d5d9fb92a" identity:"[email protected]@" first_execution_run_id:"50e54d91-21e0-4f20-8646-6b9d5d9fb92a" attempt:1 workflow_execution_expiration_time:{seconds:1736463820 nanos:286000000} first_workflow_task_backoff:{} header:{} workflow_id:"test-partial-history-replay"}
	[event], 1 event_id:2 event_time:{seconds:1736463805 nanos:286296000} event_type:EVENT_TYPE_WORKFLOW_TASK_SCHEDULED task_id:1049457 workflow_task_scheduled_event_attributes:{task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} start_to_close_timeout:{seconds:1} attempt:1}
	[event], 2 event_id:3 event_time:{seconds:1736463805 nanos:287675000} event_type:EVENT_TYPE_WORKFLOW_TASK_STARTED task_id:1049466 workflow_task_started_event_attributes:{scheduled_event_id:2 identity:"[email protected]@" request_id:"e37014db-16c8-4e08-8d68-3e39d9df1a0c" history_size_bytes:856 worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"}}
	[event], 3 event_id:4 event_time:{seconds:1736463805 nanos:292672000} event_type:EVENT_TYPE_WORKFLOW_TASK_COMPLETED task_id:1049470 workflow_task_completed_event_attributes:{scheduled_event_id:2 started_event_id:3 identity:"[email protected]@" worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"} sdk_metadata:{lang_used_flags:3 sdk_name:"temporal-go" sdk_version:"1.30.1"} metering_metadata:{}}
	[event], 4 event_id:5 event_time:{seconds:1736463805 nanos:292703000} event_type:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED task_id:1049471 activity_task_scheduled_event_attributes:{activity_id:"5" activity_type:{name:"Prefix_ToUpperWithDelay"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} header:{} input:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"hello\""} payloads:{metadata:{key:"encoding" value:"json/plain"} data:"1000000000"}} schedule_to_close_timeout:{seconds:5} schedule_to_start_timeout:{seconds:5} start_to_close_timeout:{seconds:5} heartbeat_timeout:{} workflow_task_completed_event_id:4 retry_policy:{initial_interval:{seconds:1} backoff_coefficient:2 maximum_interval:{seconds:100}} use_workflow_build_id:true}
	[event], 5 event_id:6 event_time:{seconds:1736463805 nanos:293543000} event_type:EVENT_TYPE_ACTIVITY_TASK_STARTED task_id:1049478 activity_task_started_event_attributes:{scheduled_event_id:5 identity:"[email protected]@" request_id:"5022f13d-356e-4474-aa7e-ef22ed75c430" attempt:1 worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"}}
	[event], 6 event_id:7 event_time:{seconds:1736463806 nanos:299015000} event_type:EVENT_TYPE_ACTIVITY_TASK_COMPLETED task_id:1049479 activity_task_completed_event_attributes:{result:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"HELLO\""}} scheduled_event_id:5 started_event_id:6 identity:"[email protected]@"}
	[event], 7 event_id:8 event_time:{seconds:1736463806 nanos:299034000} event_type:EVENT_TYPE_WORKFLOW_TASK_SCHEDULED task_id:1049480 workflow_task_scheduled_event_attributes:{task_queue:{name:"Andrews-MacBook-Pro.local:f9cda759-7917-4209-abdf-e5265acb3bdc" kind:TASK_QUEUE_KIND_STICKY normal_name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay"} start_to_close_timeout:{seconds:1} attempt:1}
	[event], 8 event_id:9 event_time:{seconds:1736463806 nanos:302956000} event_type:EVENT_TYPE_WORKFLOW_TASK_STARTED task_id:1049484 workflow_task_started_event_attributes:{scheduled_event_id:8 identity:"[email protected]@" request_id:"b7ffeb4f-5271-4f90-8fd2-6ccaca01e390" history_size_bytes:1819 worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"}}
	[event], 9 event_id:10 event_time:{seconds:1736463806 nanos:312989000} event_type:EVENT_TYPE_WORKFLOW_TASK_COMPLETED task_id:1049488 workflow_task_completed_event_attributes:{scheduled_event_id:8 started_event_id:9 identity:"[email protected]@" worker_version:{build_id:"a151b5d0c2ed62e204f3dbaac3bccd6a"} sdk_metadata:{} metering_metadata:{}}

replayCommands contains

[replayCommands] command_type:COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK schedule_activity_task_command_attributes:{activity_id:"11" activity_type:{name:"Prefix_ToUpper"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} header:{} input:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"HELLO\""}} schedule_to_close_timeout:{seconds:5} schedule_to_start_timeout:{seconds:5} start_to_close_timeout:{seconds:9} heartbeat_timeout:{} request_eager_execution:true use_workflow_build_id:true}

but that activity_id doesn't exist in the history we're replaying

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like respondEvents properly removes the entry with event_id 11 when we take that out of history

[respondEvents] event_id:11 event_time:{seconds:1736463806 nanos:313101000} event_type:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED task_id:1049489 activity_task_scheduled_event_attributes:{activity_id:"11" activity_type:{name:"Prefix_ToUpper"} task_queue:{name:"tq-c36b9ce7-1a22-4c63-b904-816a6319d70a-TestIntegrationSuite/TestPartialHistoryReplay" kind:TASK_QUEUE_KIND_NORMAL} header:{} input:{payloads:{metadata:{key:"encoding" value:"json/plain"} data:"\"HELLO\""}} schedule_to_close_timeout:{seconds:5} schedule_to_start_timeout:{seconds:5} start_to_close_timeout:{seconds:5} heartbeat_timeout:{} workflow_task_completed_event_id:10 retry_policy:{initial_interval:{seconds:1} backoff_coefficient:2 maximum_interval:{seconds:100}} use_workflow_build_id:true}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand that history you shared, is that intended to be the whole history? Since if that is the history then that history should contain an activity scheduled event or throw a non determinism exception.


for _ = range history.Events {
if len(history.Events) >= 3 {
fmt.Println("\n[REPLAY]")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove before merging

defer cancel()

// Run the workflow
run, err := ts.client.ExecuteWorkflow(ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this workflow misses some key features that we have struggle with replaying in the past. It only executes activities serially. I would like to see a workflow that has some more events like version markers, signals, and local activities. It also only has 2 WFT. Maybe we can add a small fuzzing workflow that picks from a few commands in a deterministic , but random manner and does a few iterations of that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I understand, you're suggesting writing a workflow that takes in a deterministic seed, and then "randomly" selects different commands to call? As in something along the lines of


for i := 0; i < iterations; i++ {
		cmd := rnd.Intn(3)

		switch cmd {
		case 1:
			var ans1 string
			workflow.GetLogger(ctx).Info("calling ExecuteActivity")
		case 2:
			workflow.GetLogger(ctx).Info("calling ExecuteLocalActivity")
		default:
			workflow.GetLogger(ctx).Warn("Unknown command")
		}
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah could generate the random number in a SideEffect so it is deterministic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Erroneous extra replay command when replaying mid-workflow tasks
3 participants