Skip to content

Commit

Permalink
added panic handling on worker execution
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Dec 14, 2023
1 parent 8b5284e commit 61ed214
Showing 1 changed file with 40 additions and 19 deletions.
59 changes: 40 additions & 19 deletions flytepropeller/pkg/controller/nodes/array/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package array
import (
"context"
"fmt"
"runtime/debug"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

// nodeExecutionRequest is a request to execute an ArrayNode subNode
Expand Down Expand Up @@ -51,34 +53,53 @@ func (w *worker) run() {
for {
select {
case nodeExecutionRequest := <-w.nodeExecutionRequestChannel:
// execute RecurseNodeHandler on node
nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext,
nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec)
var nodeStatus interfaces.NodeStatus
var err error
func() {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when executing ArrayNode subNode, Stack: [%s]", string(stack))
logger.Errorf(nodeExecutionRequest.ctx, err.Error())
}
}()

// execute RecurseNodeHandler on node
nodeStatus, err = nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext,
nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec)
}()

nodeExecutionRequest.responseChannel <- struct {
interfaces.NodeStatus
error
}{nodeStatus, err}
case gatherOutputsRequest := <-w.gatherOutputsRequestChannel:
// read outputs
outputs, executionErr, err := gatherOutputsRequest.reader.Read(gatherOutputsRequest.ctx)
if err != nil {
gatherOutputsRequest.responseChannel <- struct {
literalMap map[string]*idlcore.Literal
error
}{nil, err}
continue
} else if executionErr != nil {
gatherOutputsRequest.responseChannel <- struct {
literalMap map[string]*idlcore.Literal
error
}{nil, fmt.Errorf("%s", executionErr.String())}
continue
}
var literalMap map[string]*idlcore.Literal
var err error
func() {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when executing ArrayNode subNode, Stack: [%s]", string(stack))
logger.Errorf(gatherOutputsRequest.ctx, err.Error())
}
}()

// read outputs
outputs, executionErr, gatherErr := gatherOutputsRequest.reader.Read(gatherOutputsRequest.ctx)
if gatherErr != nil {
err = gatherErr
} else if executionErr != nil {
err = fmt.Errorf("%s", executionErr.String())
} else {
literalMap = outputs.GetLiterals()
}
}()

gatherOutputsRequest.responseChannel <- struct {
literalMap map[string]*idlcore.Literal
error
}{outputs.GetLiterals(), nil}
}{literalMap, nil}
}
}
}

0 comments on commit 61ed214

Please sign in to comment.