Skip to content

Commit

Permalink
trying to make it work!
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 committed Nov 15, 2023
1 parent 97979d9 commit 6730797
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
22 changes: 17 additions & 5 deletions flytepropeller/pkg/controller/executors/failure_node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@ import (
)

type FailureNodeLookup struct {
NodeSpec *v1alpha1.NodeSpec
NodeStatus v1alpha1.ExecutableNodeStatus
NodeSpec *v1alpha1.NodeSpec
NodeStatus v1alpha1.ExecutableNodeStatus
StartNode v1alpha1.ExecutableNode
StartNodeStatus v1alpha1.ExecutableNodeStatus
}

func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) {
if nodeID == v1alpha1.StartNodeID {
return f.StartNode, true
}
return f.NodeSpec, true
}

func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus {
if id == v1alpha1.StartNodeID {
return f.StartNodeStatus
}
return f.NodeStatus
}

Expand All @@ -26,9 +34,13 @@ func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, erro
return nil, nil
}

func NewFailureNodeLookup(nodeSpec *v1alpha1.NodeSpec, nodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup {
func NewFailureNodeLookup(nodeSpec *v1alpha1.NodeSpec, startNode v1alpha1.ExecutableNode, nodeStatusGetter v1alpha1.NodeStatusGetter) NodeLookup {
startNodeStatus := nodeStatusGetter.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID)
errNodeStatus := nodeStatusGetter.GetNodeExecutionStatus(context.TODO(), nodeSpec.GetID())
return FailureNodeLookup{
NodeSpec: nodeSpec,
NodeStatus: nodeStatus,
NodeSpec: nodeSpec,
NodeStatus: errNodeStatus,
StartNode: startNode,
StartNodeStatus: startNodeStatus,
}
}
3 changes: 1 addition & 2 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())

// TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD
status := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, errorNode.GetID())
failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), status)
failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), w.GetNode(v1alpha1.StartNodeID), w.GetExecutionStatus())

Check failure on line 177 in flytepropeller/pkg/controller/workflow/executor.go

View workflow job for this annotation

GitHub Actions / compile

multiple-value w.GetNode(v1alpha1.StartNodeID) (value of type ("github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1".ExecutableNode, bool)) in single-value context
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode)
logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state)
logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err)
Expand Down

0 comments on commit 6730797

Please sign in to comment.