Skip to content

Commit

Permalink
fixed unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Oct 4, 2023
1 parent 905e30d commit c3b177e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index
NodeId: subNodeID,
ExecutionId: workflowExecutionID,
},
Phase: idlcore.NodeExecution_UNDEFINED,
Phase: nodePhase,
OccurredAt: timestamp,
ParentNodeMetadata: &event.ParentNodeExecutionMetadata{
NodeId: nCtx.NodeID(),
Expand All @@ -194,7 +194,7 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index
Version: "v1", // this value is irrelevant but necessary for the identifier to be valid
},
ParentNodeExecutionId: nodeExecutionEvent.Id,
Phase: idlcore.TaskExecution_UNDEFINED,
Phase: taskPhase,
TaskType: "k8s-array",
OccurredAt: timestamp,
ReportedAt: timestamp,
Expand Down
27 changes: 27 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/event_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package array

import (
"context"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
)

type bufferedEventRecorder struct {
taskExecutionEvents []*event.TaskExecutionEvent
nodeExecutionEvents []*event.NodeExecutionEvent
}

func (b *bufferedEventRecorder) RecordTaskEvent(ctx context.Context, taskExecutionEvent *event.TaskExecutionEvent, eventConfig *config.EventConfig) error {
b.taskExecutionEvents = append(b.taskExecutionEvents, taskExecutionEvent)
return nil
}

func (b *bufferedEventRecorder) RecordNodeEvent(ctx context.Context, nodeExecutionEvent *event.NodeExecutionEvent, eventConfig *config.EventConfig) error {
b.nodeExecutionEvents = append(b.nodeExecutionEvents, nodeExecutionEvent)
return nil
}

func newBufferedEventRecorder() *bufferedEventRecorder {
return &bufferedEventRecorder{}
}
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu

// initialize subNode status by faking events
for i := 0; i < size; i++ {
if err := sendEvents(ctx, nCtx, i, 0, idlcore.NodeExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED, eventRecorder, a.eventConfig); err != nil {
if err := sendEvents(ctx, nCtx, i, 0, idlcore.NodeExecution_QUEUED, idlcore.TaskExecution_UNDEFINED, eventRecorder, a.eventConfig); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

Expand Down
37 changes: 19 additions & 18 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte

nCtx := &mocks.NodeExecutionContext{}
nCtx.OnMaxDatasetSizeBytes().Return(9999999)
nCtx.OnCurrentAttempt().Return(uint32(0))

// ContextualNodeLookup
nodeLookup := &execmocks.NodeLookup{}
Expand Down Expand Up @@ -243,7 +244,7 @@ func TestAbort(t *testing.T) {
}

// create NodeExecutionContext
eventRecorder := newArrayEventRecorder()
eventRecorder := newBufferedEventRecorder()
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)

// evaluate node
Expand All @@ -252,15 +253,15 @@ func TestAbort(t *testing.T) {

nodeHandler.AssertNumberOfCalls(t, "Abort", len(test.expectedExternalResourcePhases))
if len(test.expectedExternalResourcePhases) > 0 {
assert.Equal(t, 1, len(eventRecorder.taskEvents))
assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents))

externalResources := eventRecorder.taskEvents[0].Metadata.GetExternalResources()
externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources()
assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources))
for i, expectedPhase := range test.expectedExternalResourcePhases {
assert.Equal(t, expectedPhase, externalResources[i].Phase)
}
} else {
assert.Equal(t, 0, len(eventRecorder.taskEvents))
assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents))
}
})
}
Expand Down Expand Up @@ -339,7 +340,7 @@ func TestFinalize(t *testing.T) {
}

// create NodeExecutionContext
eventRecorder := newArrayEventRecorder()
eventRecorder := newBufferedEventRecorder()
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)

// evaluate node
Expand Down Expand Up @@ -379,7 +380,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) {
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_QUEUED, idlcore.TaskExecution_QUEUED},
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED},
},
{
name: "SuccessMultipleInputs",
Expand All @@ -389,7 +390,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) {
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_QUEUED, idlcore.TaskExecution_QUEUED, idlcore.TaskExecution_QUEUED},
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED, idlcore.TaskExecution_UNDEFINED},
},
{
name: "FailureDifferentInputListLengths",
Expand All @@ -406,7 +407,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// create NodeExecutionContext
eventRecorder := newArrayEventRecorder()
eventRecorder := newBufferedEventRecorder()
literalMap := convertMapToArrayLiterals(test.inputValues)
arrayNodeState := &handler.ArrayNodeState{
Phase: v1alpha1.ArrayNodePhaseNone,
Expand All @@ -422,15 +423,15 @@ func TestHandleArrayNodePhaseNone(t *testing.T) {
assert.Equal(t, test.expectedTransitionPhase, transition.Info().GetPhase())

if len(test.expectedExternalResourcePhases) > 0 {
assert.Equal(t, 1, len(eventRecorder.taskEvents))
assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents))

externalResources := eventRecorder.taskEvents[0].Metadata.GetExternalResources()
externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources()
assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources))
for i, expectedPhase := range test.expectedExternalResourcePhases {
assert.Equal(t, expectedPhase, externalResources[i].Phase)
}
} else {
assert.Equal(t, 0, len(eventRecorder.taskEvents))
assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents))
}
})
}
Expand Down Expand Up @@ -595,7 +596,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
}

// create NodeExecutionContext
eventRecorder := newArrayEventRecorder()
eventRecorder := newBufferedEventRecorder()

nodeSpec := arrayNodeSpec
nodeSpec.ArrayNode.Parallelism = uint32(test.parallelism)
Expand All @@ -607,7 +608,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
nodeHandler := &mocks.NodeHandler{}
nodeHandler.OnFinalizeRequired().Return(false)
for i, transition := range test.subNodeTransitions {
nodeID := fmt.Sprintf("%s-n%d", nCtx.NodeID(), i)
nodeID := fmt.Sprintf("n%d", i)
transitionPhase := test.expectedExternalResourcePhases[i]

nodeHandler.OnHandleMatch(mock.Anything, mock.MatchedBy(func(arrayNCtx interfaces.NodeExecutionContext) bool {
Expand Down Expand Up @@ -637,15 +638,15 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
assert.Equal(t, test.expectedTransitionPhase, transition.Info().GetPhase())

if len(test.expectedExternalResourcePhases) > 0 {
assert.Equal(t, 1, len(eventRecorder.taskEvents))
assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents))

externalResources := eventRecorder.taskEvents[0].Metadata.GetExternalResources()
externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources()
assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources))
for i, expectedPhase := range test.expectedExternalResourcePhases {
assert.Equal(t, expectedPhase, externalResources[i].Phase)
}
} else {
assert.Equal(t, 0, len(eventRecorder.taskEvents))
assert.Equal(t, 0, len(eventRecorder.taskExecutionEvents))
}
})
}
Expand Down Expand Up @@ -705,7 +706,7 @@ func TestHandleArrayNodePhaseSucceeding(t *testing.T) {
}

// create NodeExecutionContext
eventRecorder := newArrayEventRecorder()
eventRecorder := newBufferedEventRecorder()
literalMap := &idlcore.LiteralMap{}
nCtx := createNodeExecutionContext(dataStore, eventRecorder, []string{test.outputVariable}, literalMap, &arrayNodeSpec, arrayNodeState)

Expand Down Expand Up @@ -831,7 +832,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) {
}

// create NodeExecutionContext
eventRecorder := newArrayEventRecorder()
eventRecorder := newBufferedEventRecorder()
literalMap := &idlcore.LiteralMap{}
nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState)

Expand Down

0 comments on commit c3b177e

Please sign in to comment.