-
Notifications
You must be signed in to change notification settings - Fork 220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't replay commands from non-completed task #1750
base: master
Are you sure you want to change the base?
Changes from 3 commits
a3df5df
6241536
027e581
f30fecf
c316076
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -534,6 +534,10 @@ func isPreloadMarkerEvent(event *historypb.HistoryEvent) bool { | |
return event.GetEventType() == enumspb.EVENT_TYPE_MARKER_RECORDED | ||
} | ||
|
||
func isTaskCompletedEvent(event *historypb.HistoryEvent) bool { | ||
return event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED | ||
} | ||
|
||
func inferMessageFromAcceptedEvent(attrs *historypb.WorkflowExecutionUpdateAcceptedEventAttributes) *protocolpb.Message { | ||
return &protocolpb.Message{ | ||
Id: attrs.GetAcceptedRequestMessageId(), | ||
|
@@ -1022,6 +1026,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo | |
_, wfPanicked := w.err.(*workflowPanicError) | ||
return !wfPanicked && isInReplayer | ||
} | ||
completedTaskCommandIndex := 0 | ||
|
||
metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName())) | ||
start := time.Now() | ||
|
@@ -1045,6 +1050,9 @@ ProcessEvents: | |
binaryChecksum := nextTask.binaryChecksum | ||
nextTaskBuildId := nextTask.buildID | ||
admittedUpdates := nextTask.admittedMsgs | ||
if len(reorderedEvents) > 0 && isTaskCompletedEvent(reorderedEvents[0]) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't check the code, but I assume you have confirmed that for every event set, the initial index is where the task completed event lives? Is there ever a situation in the Go SDK besides partial task during worker replayer where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wasn't able to figure out from looking at code if this is enforced, but I've confirmed that every unit and integ test we run today has
Not sure, maybe a question @Quinn-With-Two-Ns can help answer when he gets back |
||
completedTaskCommandIndex = len(replayCommands) | ||
} | ||
// Check if we are replaying so we know if we should use the messages in the WFT or the history | ||
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) | ||
var msgs *eventMsgIndex | ||
|
@@ -1195,6 +1203,12 @@ ProcessEvents: | |
metricsTimer = nil | ||
} | ||
|
||
// We do not want to run non-determinism checks on a task start that | ||
// doesn't have a corresponding completed task. | ||
if completedTaskCommandIndex >= 0 { | ||
replayCommands = replayCommands[:completedTaskCommandIndex] | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically same question from above, is there ever a case this |
||
|
||
// Non-deterministic error could happen in 2 different places: | ||
// 1) the replay commands does not match to history events. This is usually due to non backwards compatible code | ||
// change to workflow logic. For example, change calling one activity to a different activity. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just inline this single-use-single-line function, no need for a separate function