diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index ab67faf76c..2c66316aad 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -172,7 +172,7 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index NodeId: subNodeID, ExecutionId: workflowExecutionID, }, - Phase: idlcore.NodeExecution_UNDEFINED, + Phase: nodePhase, OccurredAt: timestamp, ParentNodeMetadata: &event.ParentNodeExecutionMetadata{ NodeId: nCtx.NodeID(), @@ -194,7 +194,7 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index Version: "v1", // this value is irrelevant but necessary for the identifier to be valid }, ParentNodeExecutionId: nodeExecutionEvent.Id, - Phase: idlcore.TaskExecution_UNDEFINED, + Phase: taskPhase, TaskType: "k8s-array", OccurredAt: timestamp, ReportedAt: timestamp, diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go b/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go new file mode 100644 index 0000000000..94ca8af17d --- /dev/null +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go @@ -0,0 +1,27 @@ +package array + +import ( + "context" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" +) + +type bufferedEventRecorder struct { + taskExecutionEvents []*event.TaskExecutionEvent + nodeExecutionEvents []*event.NodeExecutionEvent +} + +func (b *bufferedEventRecorder) RecordTaskEvent(ctx context.Context, taskExecutionEvent *event.TaskExecutionEvent, eventConfig *config.EventConfig) error { + b.taskExecutionEvents = append(b.taskExecutionEvents, taskExecutionEvent) + return nil +} + +func (b *bufferedEventRecorder) RecordNodeEvent(ctx context.Context, nodeExecutionEvent *event.NodeExecutionEvent, eventConfig *config.EventConfig) error { + b.nodeExecutionEvents = append(b.nodeExecutionEvents, nodeExecutionEvent) + return nil +} + +func newBufferedEventRecorder() *bufferedEventRecorder { + return &bufferedEventRecorder{} +} diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 8c5ef76dce..7fcfaa82da 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -235,7 +235,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // initialize subNode status by faking events for i := 0; i < size; i++ { - if err := sendEvents(ctx, nCtx, i, 0, idlcore.NodeExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED, eventRecorder, a.eventConfig); err != nil { + if err := sendEvents(ctx, nCtx, i, 0, idlcore.NodeExecution_QUEUED, idlcore.TaskExecution_UNDEFINED, eventRecorder, a.eventConfig); err != nil { logger.Warnf(ctx, "failed to record ArrayNode events: %v", err) } diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 7072c0269b..c16cd24969 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -73,6 +73,7 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte nCtx := &mocks.NodeExecutionContext{} nCtx.OnMaxDatasetSizeBytes().Return(9999999) + nCtx.OnCurrentAttempt().Return(uint32(0)) // ContextualNodeLookup nodeLookup := &execmocks.NodeLookup{} @@ -243,7 +244,7 @@ func TestAbort(t *testing.T) { } // create NodeExecutionContext - eventRecorder := newArrayEventRecorder() + eventRecorder := newBufferedEventRecorder() nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) // evaluate node @@ -252,15 +253,15 @@ func TestAbort(t *testing.T) { nodeHandler.AssertNumberOfCalls(t, "Abort", len(test.expectedExternalResourcePhases)) if len(test.expectedExternalResourcePhases) > 0 { - assert.Equal(t, 1, len(eventRecorder.taskEvents)) + assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) - externalResources := eventRecorder.taskEvents[0].Metadata.GetExternalResources() + externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) for i, expectedPhase := range test.expectedExternalResourcePhases { assert.Equal(t, expectedPhase, externalResources[i].Phase) } } else { - assert.Equal(t, 0, len(eventRecorder.taskEvents)) + assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents)) } }) } @@ -339,7 +340,7 @@ func TestFinalize(t *testing.T) { } // create NodeExecutionContext - eventRecorder := newArrayEventRecorder() + eventRecorder := newBufferedEventRecorder() nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) // evaluate node @@ -379,7 +380,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { }, expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, - expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_QUEUED, idlcore.TaskExecution_QUEUED}, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED}, }, { name: "SuccessMultipleInputs", @@ -389,7 +390,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { }, expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, - expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_QUEUED, idlcore.TaskExecution_QUEUED, idlcore.TaskExecution_QUEUED}, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED}, }, { name: "FailureDifferentInputListLengths", @@ -406,7 +407,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // create NodeExecutionContext - eventRecorder := newArrayEventRecorder() + eventRecorder := newBufferedEventRecorder() literalMap := convertMapToArrayLiterals(test.inputValues) arrayNodeState := &handler.ArrayNodeState{ Phase: v1alpha1.ArrayNodePhaseNone, @@ -422,15 +423,15 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { assert.Equal(t, test.expectedTransitionPhase, transition.Info().GetPhase()) if len(test.expectedExternalResourcePhases) > 0 { - assert.Equal(t, 1, len(eventRecorder.taskEvents)) + assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) - externalResources := eventRecorder.taskEvents[0].Metadata.GetExternalResources() + externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) for i, expectedPhase := range test.expectedExternalResourcePhases { assert.Equal(t, expectedPhase, externalResources[i].Phase) } } else { - assert.Equal(t, 0, len(eventRecorder.taskEvents)) + assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents)) } }) } @@ -595,7 +596,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } // create NodeExecutionContext - eventRecorder := newArrayEventRecorder() + eventRecorder := newBufferedEventRecorder() nodeSpec := arrayNodeSpec nodeSpec.ArrayNode.Parallelism = uint32(test.parallelism) @@ -607,7 +608,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { nodeHandler := &mocks.NodeHandler{} nodeHandler.OnFinalizeRequired().Return(false) for i, transition := range test.subNodeTransitions { - nodeID := fmt.Sprintf("%s-n%d", nCtx.NodeID(), i) + nodeID := fmt.Sprintf("n%d", i) transitionPhase := test.expectedExternalResourcePhases[i] nodeHandler.OnHandleMatch(mock.Anything, mock.MatchedBy(func(arrayNCtx interfaces.NodeExecutionContext) bool { @@ -637,15 +638,15 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { assert.Equal(t, test.expectedTransitionPhase, transition.Info().GetPhase()) if len(test.expectedExternalResourcePhases) > 0 { - assert.Equal(t, 1, len(eventRecorder.taskEvents)) + assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) - externalResources := eventRecorder.taskEvents[0].Metadata.GetExternalResources() + externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) for i, expectedPhase := range test.expectedExternalResourcePhases { assert.Equal(t, expectedPhase, externalResources[i].Phase) } } else { - assert.Equal(t, 0, len(eventRecorder.taskEvents)) + assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents)) } }) } @@ -705,7 +706,7 @@ func TestHandleArrayNodePhaseSucceeding(t *testing.T) { } // create NodeExecutionContext - eventRecorder := newArrayEventRecorder() + eventRecorder := newBufferedEventRecorder() literalMap := &idlcore.LiteralMap{} nCtx := createNodeExecutionContext(dataStore, eventRecorder, []string{test.outputVariable}, literalMap, &arrayNodeSpec, arrayNodeState) @@ -831,7 +832,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) { } // create NodeExecutionContext - eventRecorder := newArrayEventRecorder() + eventRecorder := newBufferedEventRecorder() literalMap := &idlcore.LiteralMap{} nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)