Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't replay commands from non-completed task #1750

Merged
merged 16 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,17 +1018,20 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
var replayOutbox []outboxEntry
var replayCommands []*commandpb.Command
var respondEvents []*historypb.HistoryEvent
var partialHistory bool

taskMessages := workflowTask.task.GetMessages()
skipReplayCheck := w.skipReplayCheck()
isInReplayer := IsReplayNamespace(w.wth.namespace)
shouldForceReplayCheck := func() bool {
isInReplayer := IsReplayNamespace(w.wth.namespace)
// If we are in the replayer we should always check the history replay, even if the workflow is completed
// Skip if the workflow panicked to avoid potentially breaking old histories
_, wfPanicked := w.err.(*workflowPanicError)
return !wfPanicked && isInReplayer
}

curReplayCmdsIndex := -1

metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
start := time.Now()
// This is set to nil once recorded
Expand All @@ -1051,6 +1054,17 @@ ProcessEvents:
binaryChecksum := nextTask.binaryChecksum
nextTaskBuildId := nextTask.buildID
admittedUpdates := nextTask.admittedMsgs

// Peak ahead to confirm there are no more events
isLastWFTForPartialWFE := len(reorderedEvents) > 0 &&
reorderedEvents[len(reorderedEvents)-1].EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED &&
len(reorderedHistory.next) == 0 &&
isInReplayer
if isLastWFTForPartialWFE {
partialHistory = true
break ProcessEvents
}

// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
Expand Down Expand Up @@ -1092,6 +1106,10 @@ ProcessEvents:
if len(reorderedEvents) == 0 {
break ProcessEvents
}
// Since replayCommands updates a loop early, keep track of index before the
// early update to handle replaying incomplete WFE
curReplayCmdsIndex = len(replayCommands)

if binaryChecksum == "" {
w.workflowInfo.BinaryChecksum = w.wth.workerBuildID
} else {
Expand Down Expand Up @@ -1196,6 +1214,10 @@ ProcessEvents:
}
}

if partialHistory && curReplayCmdsIndex != -1 {
replayCommands = replayCommands[:curReplayCmdsIndex]
}

if metricsTimer != nil {
metricsTimer.Record(time.Since(start))
metricsTimer = nil
Expand Down
21 changes: 21 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,27 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory() {
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_IncompleteWorkflowExecution() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflow"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
}

history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer, err := NewWorkflowReplayer(WorkflowReplayerOptions{})
require.NoError(s.T(), err)
replayer.RegisterWorkflow(testReplayWorkflow)
err = replayer.ReplayWorkflowHistory(logger, history)
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
taskQueue := "taskQueue1"
testEvents := []*historypb.HistoryEvent{
Expand Down
43 changes: 43 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7024,3 +7024,46 @@ func (c *coroutineCountingWorkflowOutboundInterceptor) Go(
f(ctx)
})
}

func (ts *IntegrationTestSuite) TestPartialHistoryReplayFuzzer() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Run the workflow
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-partial-history-replay-fuzzer"), ts.workflows.CommandsFuzz)
ts.NotNil(run)
ts.NoError(err)
ts.NoError(run.Get(ctx, nil))

// Obtain history
var history historypb.History
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false,
enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
history.Events = append(history.Events, event)
}

var startedPoints []int
for i, event := range history.Events {
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
startedPoints = append(startedPoints, i)
}
}
startedPoints = append(startedPoints, len(history.Events)-1)

// Replay partial history, cutting off at each WFT_STARTED event
for i := len(startedPoints) - 1; i >= 0; i-- {
point := startedPoints[i]
history.Events = history.Events[:point+1]

replayer := worker.NewWorkflowReplayer()

ts.NoError(err)
replayer.RegisterWorkflow(ts.workflows.CommandsFuzz)
replayer.RegisterWorkflow(ts.workflows.childWorkflowWaitOnSignal)
ts.NoError(replayer.ReplayWorkflowHistory(nil, &history))
}
}
150 changes: 150 additions & 0 deletions test/replaytests/partial-replay-non-command-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2025-01-21T21:13:17.763980Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1048587",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "TripWorkflow"
},
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "MA=="
}
]
},
"workflowExecutionTimeout": "0s",
"workflowRunTimeout": "0s",
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "7360b8c8-735b-4364-a950-9f8bb78c04e5",
"identity": "[email protected]@",
"firstExecutionRunId": "7360b8c8-735b-4364-a950-9f8bb78c04e5",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"header": {},
"workflowId": "trip_workflow"
}
},
{
"eventId": "2",
"eventTime": "2025-01-21T21:13:17.764040Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048588",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2025-01-21T21:13:17.766282Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1048593",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]@",
"requestId": "e116305f-6b36-414a-ac33-a1ca9c9a1640",
"historySizeBytes": "279",
"workerVersion": {
"buildId": "0f02752b442ba36079c7735a5ea5e1ee"
}
}
},
{
"eventId": "4",
"eventTime": "2025-01-21T21:13:17.768731Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1048597",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "[email protected]@",
"workerVersion": {
"buildId": "0f02752b442ba36079c7735a5ea5e1ee"
},
"sdkMetadata": {
"langUsedFlags": [
3
],
"sdkName": "temporal-go",
"sdkVersion": "1.31.0"
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2025-01-21T21:13:40.639292Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
"taskId": "1048600",
"workflowExecutionSignaledEventAttributes": {
"signalName": "trip_event",
"input": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg=="
},
"data": "eyJJRCI6IiIsIlRvdGFsIjoxMH0="
}
]
},
"identity": "[email protected]@",
"header": {}
}
},
{
"eventId": "6",
"eventTime": "2025-01-21T21:13:40.639294Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048601",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "Andrews-MacBook-Pro.local:1bee34bb-8c2b-4738-84b5-25f257233211",
"kind": "TASK_QUEUE_KIND_STICKY",
"normalName": "recovery"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "7",
"eventTime": "2025-01-21T21:13:45.641420Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT",
"taskId": "1048605",
"workflowTaskTimedOutEventAttributes": {
"scheduledEventId": "6",
"timeoutType": "TIMEOUT_TYPE_SCHEDULE_TO_START"
}
},
{
"eventId": "8",
"eventTime": "2025-01-21T21:13:45.641428Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1048606",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "recovery",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
}
]
}
9 changes: 9 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,15 @@ func (s *replayTestSuite) TestSelectorNonBlocking() {
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestPartialReplayNonCommandEvent() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(TripWorkflow)
// Verify we can replay partial history that has ended on a non-command event
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "partial-replay-non-command-event.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
20 changes: 20 additions & 0 deletions test/replaytests/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,3 +656,23 @@ func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string,
logger.Info("Activity", "value", value)
return value + " was logged!", nil
}

func TripWorkflow(ctx workflow.Context, tripCounter int) error {
logger := workflow.GetLogger(ctx)
workflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
logger.Info("Trip Workflow Started for User.",
"User", workflowID,
"TripCounter", tripCounter)

// TripCh to wait on trip completed event signals
tripCh := workflow.GetSignalChannel(ctx, "trip_event")
for i := 0; i < 10; i++ {
var trip int
tripCh.Receive(ctx, &trip)
logger.Info("Trip complete event received.", "Total", trip)
tripCounter++
}

logger.Info("Starting a new run.", "TripCounter", tripCounter)
return workflow.NewContinueAsNewError(ctx, "TripWorkflow", tripCounter)
}
Loading
Loading