diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 831258776..10c20ee09 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -32,6 +32,7 @@ import ( "fmt" "math" "reflect" + "strconv" "strings" "sync" "time" @@ -1023,6 +1024,8 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo return !wfPanicked && isInReplayer } + var wftCompletedIndex int64 + metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName())) start := time.Now() // This is set to nil once recorded @@ -1045,10 +1048,26 @@ ProcessEvents: binaryChecksum := nextTask.binaryChecksum nextTaskBuildId := nextTask.buildID admittedUpdates := nextTask.admittedMsgs + // Only replay up to the last completed event + if len(reorderedEvents) > 0 { + if reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED { + wftCompletedIndex = reorderedEvents[len(reorderedEvents)-2].EventId + } else if reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED || reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED { + wftCompletedIndex = reorderedEvents[len(reorderedEvents)-1].EventId + } else + // If completed + task scheduled, that's a WFT heartbeat, does not complete sequence + if (len(reorderedEvents) == 1 && reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) || (len(reorderedEvents) > 1 && reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED && reorderedEvents[1].GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED) { + wftCompletedIndex = reorderedEvents[0].EventId + } + } + // Check if we are replaying so we know if we should use the messages in the WFT or the history isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) var msgs *eventMsgIndex if isReplay { + //if reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + // completedTaskReplayCommandIndex = len(replayCommands) + //} admittedUpdatesByID := make(map[string]*protocolpb.Message, len(admittedUpdates)) for _, admittedUpdate := range admittedUpdates { admittedUpdatesByID[admittedUpdate.GetProtocolInstanceId()] = admittedUpdate @@ -1181,6 +1200,9 @@ ProcessEvents: } } if isReplay { + // TODO: Why does this have commands that aren't a part of history? + // looks like both replayCommands and respondEvents sometimes hold events/commands that aren't a part of + // the replay history eventCommands := eventHandler.commandsHelper.getCommands(true) if !skipReplayCheck { replayCommands = append(replayCommands, eventCommands...) @@ -1195,6 +1217,23 @@ ProcessEvents: metricsTimer = nil } + // We do not want to run non-determinism checks on a task start that + // doesn't have a corresponding completed task. + for i, cmd := range replayCommands { + activityId, err := strconv.ParseInt(cmd.GetScheduleActivityTaskCommandAttributes().ActivityId, 10, 64) + if err != nil { + return nil, err + } + if activityId > wftCompletedIndex { + replayCommands = replayCommands[:i] + } + } + for i, event := range respondEvents { + if event.EventId > wftCompletedIndex { + respondEvents = respondEvents[:i] + } + } + // Non-deterministic error could happen in 2 different places: // 1) the replay commands does not match to history events. This is usually due to non backwards compatible code // change to workflow logic. For example, change calling one activity to a different activity. @@ -2339,3 +2378,14 @@ func traceLog(fn func()) { fn() } } + +//func trimEventsToCompletedWFT(commands []*commandpb.Command) []*commandpb.Command { +// lastIndex := 0 +// sawAnyCommandEvent := false +// // wftStartedEventIdToIndex +// for ix, command := range commands { +// last_index := ix +// +// if +// } +//} diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 6a5a34862..2859d4614 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -314,6 +314,27 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() { require.NoError(s.T(), err) } +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowTask() { + taskQueue := "taskQueue1" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()), + }), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}), + createTestEventWorkflowTaskStarted(3), + } + + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{}) + require.NoError(s.T(), err) + replayer.RegisterWorkflow(testReplayWorkflow) + err = replayer.ReplayWorkflowHistory(logger, history) + require.NoError(s.T(), err) +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() { taskQueue := "taskQueue1" testEvents := []*historypb.HistoryEvent{ diff --git a/test/integration_test.go b/test/integration_test.go index bafe9667a..17c8723f8 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6904,3 +6904,38 @@ func (ts *InvalidUTF8Suite) TestBasic() { // Go's JSON coding stack will replace invalid bytes with the unicode substitute char U+FFFD ts.Equal("\n�\x01\n\x0ejunk\x12data", response) } + +func (ts *IntegrationTestSuite) TestPartialHistoryReplay() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run the workflow + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions("test-partial-history-replay"), ts.workflows.Basic) + ts.NotNil(run) + ts.NoError(err) + ts.NoError(run.Get(ctx, nil)) + + // Obtain history + var history historypb.History + iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, + enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + for iter.HasNext() { + event, err := iter.Next() + ts.NoError(err) + history.Events = append(history.Events, event) + } + + for _ = range history.Events { + if len(history.Events) >= 3 { + fmt.Println("\n[REPLAY]") + for i, event := range history.Events { + fmt.Println("\t[event],", i, event) + } + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(ts.workflows.Basic) + ts.NoError(replayer.ReplayWorkflowHistory(nil, &history)) + history.Events = history.Events[:len(history.Events)-1] + } + } +}