Skip to content

Commit

Permalink
Prepopulate ArrayNode output literals with TaskNode interface output …
Browse files Browse the repository at this point in the history
…variables (flyteorg#5080)

* prepopulate output literals with TaskNode interface output variables

Signed-off-by: Daniel Rammer <[email protected]>

* added unit test

Signed-off-by: Daniel Rammer <[email protected]>

* docs

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored and yubofredwang committed Mar 26, 2024
1 parent 35c8604 commit 2459c96
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
32 changes: 32 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,39 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
gatherOutputsRequests = append(gatherOutputsRequests, gatherOutputsRequest)
}

// attempt best effort at initializing outputLiterals with output variable names. currently
// only TaskNode and WorkflowNode contain node interfaces.
outputLiterals := make(map[string]*idlcore.Literal)

switch arrayNode.GetSubNodeSpec().GetKind() {
case v1alpha1.NodeKindTask:
taskID := *arrayNode.GetSubNodeSpec().TaskRef
taskNode, err := nCtx.ExecutionContext().GetTask(taskID)
if err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(idlcore.ExecutionError_SYSTEM,
errors.BadSpecificationError, fmt.Sprintf("failed to find ArrayNode subNode task with id: '%s'", taskID), nil)), nil
}

if outputs := taskNode.CoreTask().GetInterface().GetOutputs(); outputs != nil {
for name := range outputs.Variables {
outputLiteral := &idlcore.Literal{
Value: &idlcore.Literal_Collection{
Collection: &idlcore.LiteralCollection{
Literals: make([]*idlcore.Literal, 0, len(arrayNodeState.SubNodePhases.GetItems())),
},
},
}

outputLiterals[name] = outputLiteral
}
}
case v1alpha1.NodeKindWorkflow:
// TODO - to support launchplans we will need to process the output interface variables here
fallthrough
default:
logger.Warnf(ctx, "ArrayNode does not support pre-populating outputLiteral collections for node kind '%s'", arrayNode.GetSubNodeSpec().GetKind())
}

workerErrorCollector := errorcollector.NewErrorMessageCollector()
for i, gatherOutputsRequest := range gatherOutputsRequests {
outputResponse := <-gatherOutputsRequest.responseChannel
Expand Down
9 changes: 9 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
ID: "foo",
ArrayNode: &v1alpha1.ArrayNodeSpec{
SubNodeSpec: &v1alpha1.NodeSpec{
Kind: v1alpha1.NodeKindTask,
TaskRef: &taskRef,
},
},
Expand Down Expand Up @@ -830,6 +831,14 @@ func TestHandleArrayNodePhaseSucceeding(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedTransitionPhase: handler.EPhaseSuccess,
},
{
name: "SuccessEmptyInput",
outputValues: []*int{},
outputVariable: "foo",
subNodePhases: []v1alpha1.NodePhase{},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding,
expectedTransitionPhase: handler.EPhaseSuccess,
},
}

for _, test := range tests {
Expand Down

0 comments on commit 2459c96

Please sign in to comment.