Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't replay commands from non-completed task #1750

Merged
merged 16 commits into from
Feb 3, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more complicated test, fix replay partial history logic by breaki…
…ng early, so we don't process events not in history yet
yuandrew committed Jan 21, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 6a6b3441b05c069d12e0f3201fa2a08576f57f39
71 changes: 23 additions & 48 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@ import (
"fmt"
"math"
"reflect"
"strconv"
"strings"
"sync"
"time"
@@ -1013,18 +1012,19 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
var replayOutbox []outboxEntry
var replayCommands []*commandpb.Command
var respondEvents []*historypb.HistoryEvent
var partialHistory bool

taskMessages := workflowTask.task.GetMessages()
skipReplayCheck := w.skipReplayCheck()
isInReplayer := IsReplayNamespace(w.wth.namespace)
shouldForceReplayCheck := func() bool {
isInReplayer := IsReplayNamespace(w.wth.namespace)
// If we are in the replayer we should always check the history replay, even if the workflow is completed
// Skip if the workflow panicked to avoid potentially breaking old histories
_, wfPanicked := w.err.(*workflowPanicError)
return !wfPanicked && isInReplayer
}

var wftCompletedIndex int64
curReplayCmdsIndex := -1

metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
start := time.Now()
@@ -1048,26 +1048,21 @@ ProcessEvents:
binaryChecksum := nextTask.binaryChecksum
nextTaskBuildId := nextTask.buildID
admittedUpdates := nextTask.admittedMsgs
// Only replay up to the last completed event
if len(reorderedEvents) > 0 {
if reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
wftCompletedIndex = reorderedEvents[len(reorderedEvents)-2].EventId
} else if reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED || reorderedEvents[len(reorderedEvents)-1].GetEventType() == enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED {
wftCompletedIndex = reorderedEvents[len(reorderedEvents)-1].EventId
} else
// If completed + task scheduled, that's a WFT heartbeat, does not complete sequence
if (len(reorderedEvents) == 1 && reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) || (len(reorderedEvents) > 1 && reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED && reorderedEvents[1].GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED) {
wftCompletedIndex = reorderedEvents[0].EventId
}

// Peak ahead to confirm there are no more events
isLastWFTForPartialWFE := len(reorderedEvents) > 0 &&
reorderedEvents[len(reorderedEvents)-1].EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED &&
len(reorderedHistory.next) == 0 &&
isInReplayer
if isLastWFTForPartialWFE {
partialHistory = true
break ProcessEvents
}

// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
if isReplay {
//if reorderedEvents[0].GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
// completedTaskReplayCommandIndex = len(replayCommands)
//}
admittedUpdatesByID := make(map[string]*protocolpb.Message, len(admittedUpdates))
for _, admittedUpdate := range admittedUpdates {
admittedUpdatesByID[admittedUpdate.GetProtocolInstanceId()] = admittedUpdate
@@ -1105,6 +1100,10 @@ ProcessEvents:
if len(reorderedEvents) == 0 {
break ProcessEvents
}
// Since replayCommands updates a loop early, keep track of index before the
// early update to handle replaying incomplete WFE
curReplayCmdsIndex = len(replayCommands)

if binaryChecksum == "" {
w.workflowInfo.BinaryChecksum = w.wth.workerBuildID
} else {
@@ -1200,9 +1199,6 @@ ProcessEvents:
}
}
if isReplay {
// TODO: Why does this have commands that aren't a part of history?
// looks like both replayCommands and respondEvents sometimes hold events/commands that aren't a part of
// the replay history
eventCommands := eventHandler.commandsHelper.getCommands(true)
if !skipReplayCheck {
replayCommands = append(replayCommands, eventCommands...)
@@ -1212,28 +1208,15 @@ ProcessEvents:
}
}

if partialHistory && curReplayCmdsIndex != -1 {
replayCommands = replayCommands[:curReplayCmdsIndex]
}

if metricsTimer != nil {
metricsTimer.Record(time.Since(start))
metricsTimer = nil
}

// We do not want to run non-determinism checks on a task start that
// doesn't have a corresponding completed task.
for i, cmd := range replayCommands {
activityId, err := strconv.ParseInt(cmd.GetScheduleActivityTaskCommandAttributes().ActivityId, 10, 64)
if err != nil {
return nil, err
}
if activityId > wftCompletedIndex {
replayCommands = replayCommands[:i]
}
}
for i, event := range respondEvents {
if event.EventId > wftCompletedIndex {
respondEvents = respondEvents[:i]
}
}

// Non-deterministic error could happen in 2 different places:
// 1) the replay commands does not match to history events. This is usually due to non backwards compatible code
// change to workflow logic. For example, change calling one activity to a different activity.
@@ -1446,6 +1429,9 @@ func (w *workflowExecutionContextImpl) clearCurrentTask() {
}

func (w *workflowExecutionContextImpl) skipReplayCheck() bool {
//fmt.Println("\nw.currentWorkflowTask.Query", w.currentWorkflowTask.Query)
//fmt.Println("isFullHistory(w.currentWorkflowTask.History)", isFullHistory(w.currentWorkflowTask.History))
//fmt.Println()
return w.currentWorkflowTask.Query != nil || !isFullHistory(w.currentWorkflowTask.History)
}

@@ -2378,14 +2364,3 @@ func traceLog(fn func()) {
fn()
}
}

//func trimEventsToCompletedWFT(commands []*commandpb.Command) []*commandpb.Command {
// lastIndex := 0
// sawAnyCommandEvent := false
// // wftStartedEventIdToIndex
// for ix, command := range commands {
// last_index := ix
//
// if
// }
//}
32 changes: 20 additions & 12 deletions test/integration_test.go
Original file line number Diff line number Diff line change
@@ -6905,13 +6905,13 @@ func (ts *InvalidUTF8Suite) TestBasic() {
ts.Equal("\n�\x01\n\x0ejunk\x12data", response)
}

func (ts *IntegrationTestSuite) TestPartialHistoryReplay() {
func (ts *IntegrationTestSuite) TestPartialHistoryReplayFuzzer() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Run the workflow
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-partial-history-replay"), ts.workflows.Basic)
ts.startWorkflowOptions("test-partial-history-replay-fuzzer"), ts.workflows.CommandsFuzz)
ts.NotNil(run)
ts.NoError(err)
ts.NoError(run.Get(ctx, nil))
@@ -6926,16 +6926,24 @@ func (ts *IntegrationTestSuite) TestPartialHistoryReplay() {
history.Events = append(history.Events, event)
}

for _ = range history.Events {
if len(history.Events) >= 3 {
fmt.Println("\n[REPLAY]")
for i, event := range history.Events {
fmt.Println("\t[event],", i, event)
}
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ts.workflows.Basic)
ts.NoError(replayer.ReplayWorkflowHistory(nil, &history))
history.Events = history.Events[:len(history.Events)-1]
var startedPoints []int
for i, event := range history.Events {
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
startedPoints = append(startedPoints, i)
}
}
startedPoints = append(startedPoints, len(history.Events)-1)

// Replay partial history, cutting off at each WFT_STARTED event
for i := len(startedPoints) - 1; i >= 0; i-- {
point := startedPoints[i]
history.Events = history.Events[:point+1]

replayer := worker.NewWorkflowReplayer()

ts.NoError(err)
replayer.RegisterWorkflow(ts.workflows.CommandsFuzz)
replayer.RegisterWorkflow(ts.workflows.childWorkflowWaitOnSignal)
ts.NoError(replayer.ReplayWorkflowHistory(nil, &history))
}
}
150 changes: 150 additions & 0 deletions test/replaytests/partial-replay-non-command-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2025-01-21T21:13:17.763980Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048587",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "TripWorkflow"
},
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "MA=="
}
]
},
"workflowExecutionTimeout": "0s",
"workflowRunTimeout": "0s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "7360b8c8-735b-4364-a950-9f8bb78c04e5",
"identity": "[email protected]@",
"firstExecutionRunId": "7360b8c8-735b-4364-a950-9f8bb78c04e5",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "trip_workflow"
}
},
{
"eventId": "2",
"eventTime": "2025-01-21T21:13:17.764040Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048588",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2025-01-21T21:13:17.766282Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048593",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]@",
"requestId": "e116305f-6b36-414a-ac33-a1ca9c9a1640",
"historySizeBytes": "279",
"workerVersion": {
"buildId": "0f02752b442ba36079c7735a5ea5e1ee"
}
}
},
{
"eventId": "4",
"eventTime": "2025-01-21T21:13:17.768731Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048597",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "[email protected]@",
"workerVersion": {
"buildId": "0f02752b442ba36079c7735a5ea5e1ee"
},
"sdkMetadata": {
"langUsedFlags": [
3
],
"sdkName": "temporal-go",
"sdkVersion": "1.31.0"
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2025-01-21T21:13:40.639292Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
"taskId": "1048600",
"workflowExecutionSignaledEventAttributes": {
"signalName": "trip_event",
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "eyJJRCI6IiIsIlRvdGFsIjoxMH0="
}
]
},
"identity": "[email protected]@",
"header": {}
}
},
{
"eventId": "6",
"eventTime": "2025-01-21T21:13:40.639294Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048601",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "Andrews-MacBook-Pro.local:1bee34bb-8c2b-4738-84b5-25f257233211",
"kind": "TASK_QUEUE_KIND_STICKY",
"normalName": "recovery"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "7",
"eventTime": "2025-01-21T21:13:45.641420Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT",
"taskId": "1048605",
"workflowTaskTimedOutEventAttributes": {
"scheduledEventId": "6",
"timeoutType": "TIMEOUT_TYPE_SCHEDULE_TO_START"
}
},
{
"eventId": "8",
"eventTime": "2025-01-21T21:13:45.641428Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048606",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
}
]
}
9 changes: 9 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
@@ -484,6 +484,15 @@ func (s *replayTestSuite) TestSelectorNonBlocking() {
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestPartialReplayNonCommandEvent() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(TripWorkflow)
// Verify we can replay partial history that has ended on a non-command event
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "partial-replay-non-command-event.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
20 changes: 20 additions & 0 deletions test/replaytests/workflows.go
Original file line number Diff line number Diff line change
@@ -656,3 +656,23 @@ func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string,
logger.Info("Activity", "value", value)
return value + " was logged!", nil
}

func TripWorkflow(ctx workflow.Context, tripCounter int) error {
logger := workflow.GetLogger(ctx)
workflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
logger.Info("Trip Workflow Started for User.",
"User", workflowID,
"TripCounter", tripCounter)

// TripCh to wait on trip completed event signals
tripCh := workflow.GetSignalChannel(ctx, "trip_event")
for i := 0; i < 10; i++ {
var trip int
tripCh.Receive(ctx, &trip)
logger.Info("Trip complete event received.", "Total", trip)
tripCounter++
}

logger.Info("Starting a new run.", "TripCounter", tripCounter)
return workflow.NewContinueAsNewError(ctx, "TripWorkflow", tripCounter)
}
Loading