From 46e0d96477b9ca49f74a7d98c46b5190257804a2 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 5 Dec 2023 21:18:40 -0600 Subject: [PATCH 01/11] detecting subNode or task phase updates to increment TaskPhaseVersion on ArrayNode state Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index dddcd0e7c5..00a9fc747e 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState() currentArrayNodePhase := arrayNodeState.Phase - taskPhaseVersion := arrayNodeState.TaskPhaseVersion + incrementTaskPhaseVersion := false eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) switch currentArrayNodePhase { @@ -246,6 +246,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu messageCollector := errorcollector.NewErrorMessageCollector() for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) + taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) // do not process nodes in terminal state if isTerminalNodePhase(nodePhase) { @@ -283,6 +284,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures())) + + // increment task phase version if subNode phase or task phase changed + if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase { + incrementTaskPhaseVersion = true + } } // process phases of subNodes to determine overall `ArrayNode` phase @@ -429,17 +435,15 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu taskPhase = idlcore.TaskExecution_FAILED } - // need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise - // reset to 0. by incrementing this always we report an event and ensure processing - // every time the ArrayNode is evaluated. if this overhead becomes too large, we will need - // to revisit and only increment when any subNode state changes. + // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise + // increment it if we detect any changes in subNode state. if currentArrayNodePhase != arrayNodeState.Phase { arrayNodeState.TaskPhaseVersion = 0 - } else { - arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1 + } else if incrementTaskPhaseVersion { + arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1 } - if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil { + if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil { logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return handler.UnknownTransition, err } From 9a4f5c21f35dfdc078f0f2a7111d482c35bb5885 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 7 Dec 2023 01:35:00 -0600 Subject: [PATCH 02/11] not writting empty file on no inputs Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/executor.go | 2 +- flytestdlib/storage/cached_rawstore.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 8e96ee9645..23062a8cb3 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil } - if nodeInputs != nil { + if nodeInputs != nil && len(nodeInputs.Literals) > 0 { inputsFile := v1alpha1.GetInputsFile(dataDir) if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil { c.metrics.InputsWriteFailure.Inc(ctx) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index a37a4cdf6b..913a517a0f 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -35,6 +35,9 @@ type cachedRawStore struct { // Head gets metadata about the reference. This should generally be a lightweight operation. func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head") + defer span.End() + key := []byte(reference) if oRaw, err := s.cache.Get(key); err == nil { s.metrics.CacheHit.Inc() From 4cc3889cef35f88248c9b7412275da4896e6a21f Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 7 Dec 2023 04:11:13 -0600 Subject: [PATCH 03/11] parallelizing subNode evaluations and output retrievals Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 217 +++++++++++++++++- 1 file changed, 215 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 00a9fc747e..be8b23e1ac 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -49,6 +49,28 @@ type arrayNodeHandler struct { pluginStateBytesStarted []byte } +// TODO @hamersaw - docs +type nodeExecutionRequest struct { + index int + nodePhase v1alpha1.NodePhase + taskPhase int + nodeExecutor interfaces.Node + executionContext executors.ExecutionContext + dagStructure executors.DAGStructure + nodeLookup executors.NodeLookup + subNodeSpec *v1alpha1.NodeSpec + subNodeStatus *v1alpha1.NodeStatus + arrayEventRecorder arrayEventRecorder + responseChannel chan struct {interfaces.NodeStatus; error} +} + +type outputRequest struct { + index int + nodePhase v1alpha1.NodePhase + currentAttempt uint32 + responseChannel chan struct {literalMap map[string]*idlcore.Literal; error} +} + // metrics encapsulates the prometheus metrics for this handler type metrics struct { scope promutils.Scope @@ -244,7 +266,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // process array node subNodes currentParallelism := uint32(0) messageCollector := errorcollector.NewErrorMessageCollector() - for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { + /*for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) @@ -289,6 +311,107 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase { incrementTaskPhaseVersion = true } + }*/ + + // TODO @hamersaw docs + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + workerCount := 10 + nodeExecutionRequestChan := make(chan *nodeExecutionRequest, workerCount) + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case nodeExecutionRequest := <-nodeExecutionRequestChan: + nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(ctx, nodeExecutionRequest.executionContext, + nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) + nodeExecutionRequest.responseChannel <- struct {interfaces.NodeStatus; error}{nodeStatus, err} + } + } + }() + } + + nodeExecutionRequests := make([]*nodeExecutionRequest, 0) // TODO @hamersaw right-size + for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { + nodePhase := v1alpha1.NodePhase(nodePhaseUint64) + taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) + + // do not process nodes in terminal state + if isTerminalNodePhase(nodePhase) { + continue + } + + // create array contexts + subNodeEventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) + arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, err := + a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, ¤tParallelism, subNodeEventRecorder) + if err != nil { + return handler.UnknownTransition, err + } + + nodeExecutionRequest := &nodeExecutionRequest{ + index: i, + nodePhase: nodePhase, + taskPhase: taskPhase, + nodeExecutor: arrayNodeExecutor, + executionContext: arrayExecutionContext, + dagStructure: arrayDAGStructure, + nodeLookup: arrayNodeLookup, + subNodeSpec: subNodeSpec, + subNodeStatus: subNodeStatus, + arrayEventRecorder: subNodeEventRecorder, + responseChannel: make(chan struct {interfaces.NodeStatus; error}, 1), + } + + nodeExecutionRequests = append(nodeExecutionRequests, nodeExecutionRequest) + nodeExecutionRequestChan <- nodeExecutionRequest + + // TODO @hamersaw - need to handle parallelism here somehow?!? + } + + for _, nodeExecutionRequest := range nodeExecutionRequests { + nodeExecutionResponse := <-nodeExecutionRequest.responseChannel + if nodeExecutionResponse.error != nil { + return handler.UnknownTransition, nodeExecutionResponse.error + } + + index := nodeExecutionRequest.index + subNodeStatus := nodeExecutionRequest.subNodeStatus + + // capture subNode error if exists + if nodeExecutionRequest.subNodeStatus.Error != nil { + messageCollector.Collect(index, subNodeStatus.Error.Message) + } + + // process events - TODO @hamersaw - this is a hack + if arrayEventRecorder, ok := nodeExecutionRequest.arrayEventRecorder.(*externalResourcesEventRecorder); ok { + for _, event := range arrayEventRecorder.taskEvents { + eventRecorder.RecordTaskEvent(ctx, event, a.eventConfig) + } + for _, event := range arrayEventRecorder.nodeEvents { + eventRecorder.RecordNodeEvent(ctx, event, a.eventConfig) + } + } + eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()) + + // update subNode state + arrayNodeState.SubNodePhases.SetItem(index, uint64(subNodeStatus.GetPhase())) + if subNodeStatus.GetTaskNodeStatus() == nil { + // resetting task phase because during retries we clear the GetTaskNodeStatus + arrayNodeState.SubNodeTaskPhases.SetItem(index, uint64(0)) + } else { + arrayNodeState.SubNodeTaskPhases.SetItem(index, uint64(subNodeStatus.GetTaskNodeStatus().GetPhase())) + } + arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts())) + arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures())) + + // increment task phase version if subNode phase or task phase changed + if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase { + incrementTaskPhaseVersion = true + } } // process phases of subNodes to determine overall `ArrayNode` phase @@ -350,7 +473,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu )), nil case v1alpha1.ArrayNodePhaseSucceeding: outputLiterals := make(map[string]*idlcore.Literal) - for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { + /*for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) if nodePhase != v1alpha1.NodePhaseSucceeded { @@ -397,6 +520,96 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu appendLiteral(name, literal, outputLiterals, len(arrayNodeState.SubNodePhases.GetItems())) } } + }*/ + + // TODO @hamersaw docs + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + workerCount := 10 + outputRequestChannel := make(chan *outputRequest, workerCount) + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case outputRequest := <-outputRequestChannel: + if outputRequest.nodePhase != v1alpha1.NodePhaseSucceeded { + // retrieve output variables from task template + var outputLiterals map[string]*idlcore.Literal + task, err := nCtx.ExecutionContext().GetTask(*arrayNode.GetSubNodeSpec().TaskRef) + if err != nil { + // Should never happen + outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + continue + } + + if task.CoreTask() != nil && task.CoreTask().Interface != nil && task.CoreTask().Interface.Outputs != nil { + for name := range task.CoreTask().Interface.Outputs.Variables { + outputLiterals[name] = nilLiteral + } + } + + outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputLiterals, nil} + } else { + // initialize subNode reader + subDataDir, subOutputDir, err := constructOutputReferences(ctx, nCtx, + strconv.Itoa(outputRequest.index), strconv.Itoa(int(outputRequest.currentAttempt))) + if err != nil { + outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + continue + } + + // checkpoint paths are not computed here because this function is only called when writing + // existing cached outputs. if this functionality changes this will need to be revisited. + outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "") + reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) + + // read outputs + outputs, executionErr, err := reader.Read(ctx) + if err != nil { + outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + continue + } else if executionErr != nil { + outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{ + nil, + errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), + "execution error ArrayNode output, bad state: %s", executionErr.String())} + continue + } + + outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputs.GetLiterals(), nil} + } + } + } + }() + } + + outputRequests := make([]*outputRequest, 0) // TODO @hamersaw right-size + for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { + nodePhase := v1alpha1.NodePhase(nodePhaseUint64) + outputRequest := &outputRequest{ + index: i, + nodePhase: nodePhase, + currentAttempt: uint32(arrayNodeState.SubNodeRetryAttempts.GetItem(i)), + responseChannel: make(chan struct{literalMap map[string]*idlcore.Literal; error}, 1), + } + + outputRequests = append(outputRequests, outputRequest) + outputRequestChannel <- outputRequest + } + + for _, outputRequest := range outputRequests { + outputResponse := <-outputRequest.responseChannel + if outputResponse.error != nil { + return handler.UnknownTransition, outputResponse.error + } + + // append literal for all output variables + for name, literal := range outputResponse.literalMap { + appendLiteral(name, literal, outputLiterals, len(arrayNodeState.SubNodePhases.GetItems())) + } } outputLiteralMap := &idlcore.LiteralMap{ From dc845ae35e34d913899000a6390c073cd31de6f1 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 16:53:58 -0600 Subject: [PATCH 04/11] refactored workers out to ArrayHandler level Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 265 ++++-------------- .../controller/nodes/array/handler_test.go | 8 +- .../pkg/controller/nodes/array/worker.go | 65 +++++ 3 files changed, 132 insertions(+), 206 deletions(-) create mode 100644 flytepropeller/pkg/controller/nodes/array/worker.go diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index be8b23e1ac..3a8acbc1c8 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -42,33 +42,13 @@ var ( // arrayNodeHandler is a handle implementation for processing array nodes type arrayNodeHandler struct { - eventConfig *config.EventConfig - metrics metrics - nodeExecutor interfaces.Node - pluginStateBytesNotStarted []byte - pluginStateBytesStarted []byte -} - -// TODO @hamersaw - docs -type nodeExecutionRequest struct { - index int - nodePhase v1alpha1.NodePhase - taskPhase int - nodeExecutor interfaces.Node - executionContext executors.ExecutionContext - dagStructure executors.DAGStructure - nodeLookup executors.NodeLookup - subNodeSpec *v1alpha1.NodeSpec - subNodeStatus *v1alpha1.NodeStatus - arrayEventRecorder arrayEventRecorder - responseChannel chan struct {interfaces.NodeStatus; error} -} - -type outputRequest struct { - index int - nodePhase v1alpha1.NodePhase - currentAttempt uint32 - responseChannel chan struct {literalMap map[string]*idlcore.Literal; error} + eventConfig *config.EventConfig + gatherOutputsRequestChannel chan *gatherOutputsRequest + metrics metrics + nodeExecutionRequestChannel chan *nodeExecutionRequest + nodeExecutor interfaces.Node + pluginStateBytesNotStarted []byte + pluginStateBytesStarted []byte } // metrics encapsulates the prometheus metrics for this handler @@ -266,73 +246,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // process array node subNodes currentParallelism := uint32(0) messageCollector := errorcollector.NewErrorMessageCollector() - /*for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { - nodePhase := v1alpha1.NodePhase(nodePhaseUint64) - taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) - - // do not process nodes in terminal state - if isTerminalNodePhase(nodePhase) { - continue - } - - // create array contexts - arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, err := - a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, ¤tParallelism, eventRecorder) - if err != nil { - return handler.UnknownTransition, err - } - - // execute subNode - _, err = arrayNodeExecutor.RecursiveNodeHandler(ctx, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec) - if err != nil { - return handler.UnknownTransition, err - } - - // capture subNode error if exists - if subNodeStatus.Error != nil { - messageCollector.Collect(i, subNodeStatus.Error.Message) - } - - // process events - eventRecorder.process(ctx, nCtx, i, subNodeStatus.GetAttempts()) - - // update subNode state - arrayNodeState.SubNodePhases.SetItem(i, uint64(subNodeStatus.GetPhase())) - if subNodeStatus.GetTaskNodeStatus() == nil { - // resetting task phase because during retries we clear the GetTaskNodeStatus - arrayNodeState.SubNodeTaskPhases.SetItem(i, uint64(0)) - } else { - arrayNodeState.SubNodeTaskPhases.SetItem(i, uint64(subNodeStatus.GetTaskNodeStatus().GetPhase())) - } - arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts())) - arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures())) - - // increment task phase version if subNode phase or task phase changed - if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase { - incrementTaskPhaseVersion = true - } - }*/ - - // TODO @hamersaw docs - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - workerCount := 10 - nodeExecutionRequestChan := make(chan *nodeExecutionRequest, workerCount) - for i := 0; i < workerCount; i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case nodeExecutionRequest := <-nodeExecutionRequestChan: - nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(ctx, nodeExecutionRequest.executionContext, - nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) - nodeExecutionRequest.responseChannel <- struct {interfaces.NodeStatus; error}{nodeStatus, err} - } - } - }() - } nodeExecutionRequests := make([]*nodeExecutionRequest, 0) // TODO @hamersaw right-size for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { @@ -353,6 +266,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } nodeExecutionRequest := &nodeExecutionRequest{ + ctx: ctx, index: i, nodePhase: nodePhase, taskPhase: taskPhase, @@ -367,7 +281,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } nodeExecutionRequests = append(nodeExecutionRequests, nodeExecutionRequest) - nodeExecutionRequestChan <- nodeExecutionRequest + a.nodeExecutionRequestChannel <- nodeExecutionRequest // TODO @hamersaw - need to handle parallelism here somehow?!? } @@ -376,6 +290,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu nodeExecutionResponse := <-nodeExecutionRequest.responseChannel if nodeExecutionResponse.error != nil { return handler.UnknownTransition, nodeExecutionResponse.error + // TODO @hamersaw - error message collector } index := nodeExecutionRequest.index @@ -386,7 +301,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu messageCollector.Collect(index, subNodeStatus.Error.Message) } - // process events - TODO @hamersaw - this is a hack + // process events if arrayEventRecorder, ok := nodeExecutionRequest.arrayEventRecorder.(*externalResourcesEventRecorder); ok { for _, event := range arrayEventRecorder.taskEvents { eventRecorder.RecordTaskEvent(ctx, event, a.eventConfig) @@ -472,33 +387,39 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu nil, )), nil case v1alpha1.ArrayNodePhaseSucceeding: - outputLiterals := make(map[string]*idlcore.Literal) - /*for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { + gatherOutputsRequests := make([]*gatherOutputsRequest, 0) // TODO hamersaw - right size + for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) + gatherOutputsRequest := &gatherOutputsRequest{ + ctx: ctx, + responseChannel: make(chan struct{literalMap map[string]*idlcore.Literal; error}, 1), + } if nodePhase != v1alpha1.NodePhaseSucceeded { // retrieve output variables from task template - var outputVariables map[string]*idlcore.Variable + outputLiterals := make(map[string]*idlcore.Literal) task, err := nCtx.ExecutionContext().GetTask(*arrayNode.GetSubNodeSpec().TaskRef) if err != nil { // Should never happen - return handler.UnknownTransition, err + gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + continue } if task.CoreTask() != nil && task.CoreTask().Interface != nil && task.CoreTask().Interface.Outputs != nil { - outputVariables = task.CoreTask().Interface.Outputs.Variables + for name := range task.CoreTask().Interface.Outputs.Variables { + outputLiterals[name] = nilLiteral + } } - // append nil literal for all output variables - for name := range outputVariables { - appendLiteral(name, nilLiteral, outputLiterals, len(arrayNodeState.SubNodePhases.GetItems())) - } + gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputLiterals, nil} } else { // initialize subNode reader - currentAttempt := uint32(arrayNodeState.SubNodeRetryAttempts.GetItem(i)) - subDataDir, subOutputDir, err := constructOutputReferences(ctx, nCtx, strconv.Itoa(i), strconv.Itoa(int(currentAttempt))) + currentAttempt := int(arrayNodeState.SubNodeRetryAttempts.GetItem(i)) + subDataDir, subOutputDir, err := constructOutputReferences(ctx, nCtx, + strconv.Itoa(i), strconv.Itoa(currentAttempt)) if err != nil { - return handler.UnknownTransition, err + gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + continue } // checkpoint paths are not computed here because this function is only called when writing @@ -506,104 +427,19 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "") reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) - // read outputs - outputs, executionErr, err := reader.Read(ctx) - if err != nil { - return handler.UnknownTransition, err - } else if executionErr != nil { - return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), - "execution error ArrayNode output, bad state: %s", executionErr.String()) - } - - // copy individual subNode output literals into a collection of output literals - for name, literal := range outputs.GetLiterals() { - appendLiteral(name, literal, outputLiterals, len(arrayNodeState.SubNodePhases.GetItems())) - } - } - }*/ - - // TODO @hamersaw docs - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - workerCount := 10 - outputRequestChannel := make(chan *outputRequest, workerCount) - for i := 0; i < workerCount; i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case outputRequest := <-outputRequestChannel: - if outputRequest.nodePhase != v1alpha1.NodePhaseSucceeded { - // retrieve output variables from task template - var outputLiterals map[string]*idlcore.Literal - task, err := nCtx.ExecutionContext().GetTask(*arrayNode.GetSubNodeSpec().TaskRef) - if err != nil { - // Should never happen - outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} - continue - } - - if task.CoreTask() != nil && task.CoreTask().Interface != nil && task.CoreTask().Interface.Outputs != nil { - for name := range task.CoreTask().Interface.Outputs.Variables { - outputLiterals[name] = nilLiteral - } - } - - outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputLiterals, nil} - } else { - // initialize subNode reader - subDataDir, subOutputDir, err := constructOutputReferences(ctx, nCtx, - strconv.Itoa(outputRequest.index), strconv.Itoa(int(outputRequest.currentAttempt))) - if err != nil { - outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} - continue - } - - // checkpoint paths are not computed here because this function is only called when writing - // existing cached outputs. if this functionality changes this will need to be revisited. - outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "") - reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) - - // read outputs - outputs, executionErr, err := reader.Read(ctx) - if err != nil { - outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} - continue - } else if executionErr != nil { - outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{ - nil, - errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), - "execution error ArrayNode output, bad state: %s", executionErr.String())} - continue - } - - outputRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputs.GetLiterals(), nil} - } - } - } - }() - } - - outputRequests := make([]*outputRequest, 0) // TODO @hamersaw right-size - for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { - nodePhase := v1alpha1.NodePhase(nodePhaseUint64) - outputRequest := &outputRequest{ - index: i, - nodePhase: nodePhase, - currentAttempt: uint32(arrayNodeState.SubNodeRetryAttempts.GetItem(i)), - responseChannel: make(chan struct{literalMap map[string]*idlcore.Literal; error}, 1), + gatherOutputsRequest.reader = &reader + a.gatherOutputsRequestChannel <- gatherOutputsRequest } - outputRequests = append(outputRequests, outputRequest) - outputRequestChannel <- outputRequest + gatherOutputsRequests = append(gatherOutputsRequests, gatherOutputsRequest) } - for _, outputRequest := range outputRequests { - outputResponse := <-outputRequest.responseChannel + outputLiterals := make(map[string]*idlcore.Literal) + for _, gatherOutputsRequest := range gatherOutputsRequests { + outputResponse := <-gatherOutputsRequest.responseChannel if outputResponse.error != nil { return handler.UnknownTransition, outputResponse.error + // TODO - error message collector } // append literal for all output variables @@ -673,6 +509,19 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // Setup handles any initialization requirements for this handler func (a *arrayNodeHandler) Setup(_ context.Context, _ interfaces.SetupContext) error { + // start workers + workerCount := 8 + for i := 0; i < workerCount; i++ { + worker := worker{ + gatherOutputsRequestChannel: a.gatherOutputsRequestChannel, + nodeExecutionRequestChannel: a.nodeExecutionRequestChannel, + } + + go func(){ + worker.run() + }() + } + return nil } @@ -689,13 +538,19 @@ func New(nodeExecutor interfaces.Node, eventConfig *config.EventConfig, scope pr return nil, err } + /*nodeExecutionRequestChannel := make(chan *nodeExecutionRequest, workerCount) + for i := 0; i < workerCount; i++ { + go func() {*/ + arrayScope := scope.NewSubScope("array") return &arrayNodeHandler{ - eventConfig: eventConfig, - metrics: newMetrics(arrayScope), - nodeExecutor: nodeExecutor, - pluginStateBytesNotStarted: pluginStateBytesNotStarted, - pluginStateBytesStarted: pluginStateBytesStarted, + eventConfig: eventConfig, + gatherOutputsRequestChannel: make(chan *gatherOutputsRequest), + metrics: newMetrics(arrayScope), + nodeExecutionRequestChannel: make(chan *nodeExecutionRequest), + nodeExecutor: nodeExecutor, + pluginStateBytesNotStarted: pluginStateBytesNotStarted, + pluginStateBytesStarted: pluginStateBytesStarted, }, nil } diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index f9086218c2..fb4db59a2f 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -62,7 +62,13 @@ func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler inter assert.NoError(t, err) // return ArrayNodeHandler - return New(nodeExecutor, eventConfig, scope) + arrayNodeHandler, err := New(nodeExecutor, eventConfig, scope) + if err != nil { + return nil, err + } + + err = arrayNodeHandler.Setup(ctx, nil) + return arrayNodeHandler, err } func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder interfaces.EventRecorder, outputVariables []string, diff --git a/flytepropeller/pkg/controller/nodes/array/worker.go b/flytepropeller/pkg/controller/nodes/array/worker.go new file mode 100644 index 0000000000..43c09eb0b4 --- /dev/null +++ b/flytepropeller/pkg/controller/nodes/array/worker.go @@ -0,0 +1,65 @@ +package array + +import ( + "context" + "fmt" + + idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" +) + +// TODO @hamersaw - docs +type nodeExecutionRequest struct { + ctx context.Context + index int + nodePhase v1alpha1.NodePhase + taskPhase int + nodeExecutor interfaces.Node + executionContext executors.ExecutionContext + dagStructure executors.DAGStructure + nodeLookup executors.NodeLookup + subNodeSpec *v1alpha1.NodeSpec + subNodeStatus *v1alpha1.NodeStatus + arrayEventRecorder arrayEventRecorder + responseChannel chan struct {interfaces.NodeStatus; error} +} + +// TODO @hamersaw - docs +type gatherOutputsRequest struct { + ctx context.Context + reader *ioutils.RemoteFileOutputReader + responseChannel chan struct {literalMap map[string]*idlcore.Literal; error} +} + +// TODO @hamersaw - docs +type worker struct { + gatherOutputsRequestChannel chan *gatherOutputsRequest + nodeExecutionRequestChannel chan *nodeExecutionRequest +} + +// TODO @hamersaw - docs +func (w *worker) run() { + for { + select { + case nodeExecutionRequest := <-w.nodeExecutionRequestChannel: + nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext, + nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) + nodeExecutionRequest.responseChannel <- struct {interfaces.NodeStatus; error}{nodeStatus, err} + case gatherOutputsRequest := <-w.gatherOutputsRequestChannel: + // read outputs + outputs, executionErr, err := gatherOutputsRequest.reader.Read(gatherOutputsRequest.ctx) + if err != nil { + gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + continue + } else if executionErr != nil { + gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, fmt.Errorf("%s", executionErr.String())} + continue + } + + gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputs.GetLiterals(), nil} + } + } +} From 637433371a96aca6f07d0d7c03219ca36c6aedef Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 17:13:33 -0600 Subject: [PATCH 05/11] handling worker errors Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 38 +++++++++++++------ .../pkg/controller/nodes/array/worker.go | 1 + 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 3a8acbc1c8..194bc88a81 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -245,9 +245,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu case v1alpha1.ArrayNodePhaseExecuting: // process array node subNodes currentParallelism := uint32(0) - messageCollector := errorcollector.NewErrorMessageCollector() + nodeExecutionRequestsSize := int(arrayNode.GetParallelism()) + if nodeExecutionRequestsSize == 0 { + nodeExecutionRequestsSize = len(arrayNodeState.SubNodePhases.GetItems()) + } - nodeExecutionRequests := make([]*nodeExecutionRequest, 0) // TODO @hamersaw right-size + nodeExecutionRequests := make([]*nodeExecutionRequest, 0, nodeExecutionRequestsSize) for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) @@ -286,11 +289,13 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // TODO @hamersaw - need to handle parallelism here somehow?!? } - for _, nodeExecutionRequest := range nodeExecutionRequests { + workerErrorCollector := errorcollector.NewErrorMessageCollector() + subNodeFailureCollector := errorcollector.NewErrorMessageCollector() + for i, nodeExecutionRequest := range nodeExecutionRequests { nodeExecutionResponse := <-nodeExecutionRequest.responseChannel if nodeExecutionResponse.error != nil { - return handler.UnknownTransition, nodeExecutionResponse.error - // TODO @hamersaw - error message collector + workerErrorCollector.Collect(i, nodeExecutionResponse.error.Error()) + continue } index := nodeExecutionRequest.index @@ -298,7 +303,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // capture subNode error if exists if nodeExecutionRequest.subNodeStatus.Error != nil { - messageCollector.Collect(index, subNodeStatus.Error.Message) + subNodeFailureCollector.Collect(index, subNodeStatus.Error.Message) } // process events @@ -329,6 +334,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } } + // if any workers failed then return the error + if workerErrorCollector.Length() > 0 { + return handler.UnknownTransition, fmt.Errorf("worker error(s) encountered: %s", workerErrorCollector.Summary(events.MaxErrorMessageLength)) + } + // process phases of subNodes to determine overall `ArrayNode` phase successCount := 0 failedCount := 0 @@ -359,7 +369,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // if there is a failing node set the error message if it has not been previous set if failingCount > 0 && arrayNodeState.Error == nil { arrayNodeState.Error = &idlcore.ExecutionError{ - Message: messageCollector.Summary(events.MaxErrorMessageLength), + Message: subNodeFailureCollector.Summary(events.MaxErrorMessageLength), } } @@ -387,7 +397,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu nil, )), nil case v1alpha1.ArrayNodePhaseSucceeding: - gatherOutputsRequests := make([]*gatherOutputsRequest, 0) // TODO hamersaw - right size + gatherOutputsRequests := make([]*gatherOutputsRequest, 0, len(arrayNodeState.SubNodePhases.GetItems())) for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) gatherOutputsRequest := &gatherOutputsRequest{ @@ -435,11 +445,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } outputLiterals := make(map[string]*idlcore.Literal) - for _, gatherOutputsRequest := range gatherOutputsRequests { + workerErrorCollector := errorcollector.NewErrorMessageCollector() + for i, gatherOutputsRequest := range gatherOutputsRequests { outputResponse := <-gatherOutputsRequest.responseChannel if outputResponse.error != nil { - return handler.UnknownTransition, outputResponse.error - // TODO - error message collector + workerErrorCollector.Collect(i, outputResponse.error.Error()) + continue } // append literal for all output variables @@ -448,6 +459,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } } + // if any workers failed then return the error + if workerErrorCollector.Length() > 0 { + return handler.UnknownTransition, fmt.Errorf("worker error(s) encountered: %s", workerErrorCollector.Summary(events.MaxErrorMessageLength)) + } + outputLiteralMap := &idlcore.LiteralMap{ Literals: outputLiterals, } diff --git a/flytepropeller/pkg/controller/nodes/array/worker.go b/flytepropeller/pkg/controller/nodes/array/worker.go index 43c09eb0b4..96ae1c1a37 100644 --- a/flytepropeller/pkg/controller/nodes/array/worker.go +++ b/flytepropeller/pkg/controller/nodes/array/worker.go @@ -45,6 +45,7 @@ func (w *worker) run() { for { select { case nodeExecutionRequest := <-w.nodeExecutionRequestChannel: + // execute RecurseNodeHandler on node nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext, nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) nodeExecutionRequest.responseChannel <- struct {interfaces.NodeStatus; error}{nodeStatus, err} From 5a8c6581a1ee5e7060885ca9453f9612fb906c9b Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 17:29:37 -0600 Subject: [PATCH 06/11] implemented parallelism controls with workers Signed-off-by: Daniel Rammer --- .../nodes/array/execution_context.go | 21 ++---- .../pkg/controller/nodes/array/handler.go | 74 ++++++++++++------- .../controller/nodes/array/handler_test.go | 3 +- .../nodes/array/node_execution_context.go | 6 +- .../array/node_execution_context_builder.go | 34 ++++----- .../pkg/controller/nodes/array/worker.go | 46 ++++++++---- 6 files changed, 104 insertions(+), 80 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/execution_context.go b/flytepropeller/pkg/controller/nodes/array/execution_context.go index 2191b9c7d2..4fb5a8a214 100644 --- a/flytepropeller/pkg/controller/nodes/array/execution_context.go +++ b/flytepropeller/pkg/controller/nodes/array/execution_context.go @@ -14,35 +14,24 @@ const ( type arrayExecutionContext struct { executors.ExecutionContext - executionConfig v1alpha1.ExecutionConfig - currentParallelism *uint32 + executionConfig v1alpha1.ExecutionConfig } func (a *arrayExecutionContext) GetExecutionConfig() v1alpha1.ExecutionConfig { return a.executionConfig } -func (a *arrayExecutionContext) CurrentParallelism() uint32 { - return *a.currentParallelism -} - -func (a *arrayExecutionContext) IncrementParallelism() uint32 { - *a.currentParallelism = *a.currentParallelism + 1 - return *a.currentParallelism -} - -func newArrayExecutionContext(executionContext executors.ExecutionContext, subNodeIndex int, currentParallelism *uint32, maxParallelism uint32) *arrayExecutionContext { +func newArrayExecutionContext(executionContext executors.ExecutionContext, subNodeIndex int) *arrayExecutionContext { executionConfig := executionContext.GetExecutionConfig() if executionConfig.EnvironmentVariables == nil { executionConfig.EnvironmentVariables = make(map[string]string) } executionConfig.EnvironmentVariables[JobIndexVarName] = FlyteK8sArrayIndexVarName executionConfig.EnvironmentVariables[FlyteK8sArrayIndexVarName] = strconv.Itoa(subNodeIndex) - executionConfig.MaxParallelism = maxParallelism + executionConfig.MaxParallelism = 0 // hardcoded to 0 because parallelism is handled by the array node return &arrayExecutionContext{ - ExecutionContext: executionContext, - executionConfig: executionConfig, - currentParallelism: currentParallelism, + ExecutionContext: executionContext, + executionConfig: executionConfig, } } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 194bc88a81..587b2aeb48 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -72,7 +72,6 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut messageCollector := errorcollector.NewErrorMessageCollector() switch arrayNodeState.Phase { case v1alpha1.ArrayNodePhaseExecuting, v1alpha1.ArrayNodePhaseFailing: - currentParallelism := uint32(0) for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) @@ -83,7 +82,7 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut // create array contexts arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, _, err := - a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, ¤tParallelism, eventRecorder) + a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, eventRecorder) if err != nil { return err } @@ -126,7 +125,6 @@ func (a *arrayNodeHandler) Finalize(ctx context.Context, nCtx interfaces.NodeExe messageCollector := errorcollector.NewErrorMessageCollector() switch arrayNodeState.Phase { case v1alpha1.ArrayNodePhaseExecuting, v1alpha1.ArrayNodePhaseFailing, v1alpha1.ArrayNodePhaseSucceeding: - currentParallelism := uint32(0) for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) @@ -137,7 +135,7 @@ func (a *arrayNodeHandler) Finalize(ctx context.Context, nCtx interfaces.NodeExe // create array contexts arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, _, err := - a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, ¤tParallelism, eventRecorder) + a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, eventRecorder) if err != nil { return err } @@ -244,13 +242,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting case v1alpha1.ArrayNodePhaseExecuting: // process array node subNodes - currentParallelism := uint32(0) - nodeExecutionRequestsSize := int(arrayNode.GetParallelism()) - if nodeExecutionRequestsSize == 0 { - nodeExecutionRequestsSize = len(arrayNodeState.SubNodePhases.GetItems()) + currentParallelism := int(arrayNode.GetParallelism()) + if currentParallelism == 0 { + currentParallelism = len(arrayNodeState.SubNodePhases.GetItems()) } - nodeExecutionRequests := make([]*nodeExecutionRequest, 0, nodeExecutionRequestsSize) + nodeExecutionRequests := make([]*nodeExecutionRequest, 0, currentParallelism) for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) @@ -263,7 +260,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // create array contexts subNodeEventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) arrayNodeExecutor, arrayExecutionContext, arrayDAGStructure, arrayNodeLookup, subNodeSpec, subNodeStatus, err := - a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, ¤tParallelism, subNodeEventRecorder) + a.buildArrayNodeContext(ctx, nCtx, &arrayNodeState, arrayNode, i, subNodeEventRecorder) if err != nil { return handler.UnknownTransition, err } @@ -273,20 +270,29 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu index: i, nodePhase: nodePhase, taskPhase: taskPhase, - nodeExecutor: arrayNodeExecutor, + nodeExecutor: arrayNodeExecutor, executionContext: arrayExecutionContext, dagStructure: arrayDAGStructure, nodeLookup: arrayNodeLookup, subNodeSpec: subNodeSpec, subNodeStatus: subNodeStatus, arrayEventRecorder: subNodeEventRecorder, - responseChannel: make(chan struct {interfaces.NodeStatus; error}, 1), + responseChannel: make(chan struct { + interfaces.NodeStatus + error + }, 1), } nodeExecutionRequests = append(nodeExecutionRequests, nodeExecutionRequest) a.nodeExecutionRequestChannel <- nodeExecutionRequest - // TODO @hamersaw - need to handle parallelism here somehow?!? + // TODO - this is a naive implementation of parallelism, if we want to support more + // complex subNodes (ie. dynamics / subworkflows) we need to revisit this so that + // parallelism is handled during subNode evaluations. + currentParallelism-- + if currentParallelism == 0 { + break + } } workerErrorCollector := errorcollector.NewErrorMessageCollector() @@ -306,13 +312,17 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu subNodeFailureCollector.Collect(index, subNodeStatus.Error.Message) } - // process events + // process events by copying from internal event recorder if arrayEventRecorder, ok := nodeExecutionRequest.arrayEventRecorder.(*externalResourcesEventRecorder); ok { for _, event := range arrayEventRecorder.taskEvents { - eventRecorder.RecordTaskEvent(ctx, event, a.eventConfig) + if err := eventRecorder.RecordTaskEvent(ctx, event, a.eventConfig); err != nil { + return handler.UnknownTransition, err + } } for _, event := range arrayEventRecorder.nodeEvents { - eventRecorder.RecordNodeEvent(ctx, event, a.eventConfig) + if err := eventRecorder.RecordNodeEvent(ctx, event, a.eventConfig); err != nil { + return handler.UnknownTransition, err + } } } eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()) @@ -401,8 +411,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) gatherOutputsRequest := &gatherOutputsRequest{ - ctx: ctx, - responseChannel: make(chan struct{literalMap map[string]*idlcore.Literal; error}, 1), + ctx: ctx, + responseChannel: make(chan struct { + literalMap map[string]*idlcore.Literal + error + }, 1), } if nodePhase != v1alpha1.NodePhaseSucceeded { @@ -411,7 +424,10 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu task, err := nCtx.ExecutionContext().GetTask(*arrayNode.GetSubNodeSpec().TaskRef) if err != nil { // Should never happen - gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + gatherOutputsRequest.responseChannel <- struct { + literalMap map[string]*idlcore.Literal + error + }{nil, err} continue } @@ -421,14 +437,20 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } } - gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputLiterals, nil} + gatherOutputsRequest.responseChannel <- struct { + literalMap map[string]*idlcore.Literal + error + }{outputLiterals, nil} } else { // initialize subNode reader currentAttempt := int(arrayNodeState.SubNodeRetryAttempts.GetItem(i)) subDataDir, subOutputDir, err := constructOutputReferences(ctx, nCtx, strconv.Itoa(i), strconv.Itoa(currentAttempt)) if err != nil { - gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + gatherOutputsRequest.responseChannel <- struct { + literalMap map[string]*idlcore.Literal + error + }{nil, err} continue } @@ -533,7 +555,7 @@ func (a *arrayNodeHandler) Setup(_ context.Context, _ interfaces.SetupContext) e nodeExecutionRequestChannel: a.nodeExecutionRequestChannel, } - go func(){ + go func() { worker.run() }() } @@ -575,7 +597,7 @@ func New(nodeExecutor interfaces.Node, eventConfig *config.EventConfig, scope pr // but need many different execution details, for example setting input values as a singular item rather than a collection, // injecting environment variables for flytekit maptask execution, aggregating eventing so that rather than tracking state for // each subnode individually it sends a single event for the whole ArrayNode, and many more. -func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx interfaces.NodeExecutionContext, arrayNodeState *handler.ArrayNodeState, arrayNode v1alpha1.ExecutableArrayNode, subNodeIndex int, currentParallelism *uint32, eventRecorder arrayEventRecorder) ( +func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx interfaces.NodeExecutionContext, arrayNodeState *handler.ArrayNodeState, arrayNode v1alpha1.ExecutableArrayNode, subNodeIndex int, eventRecorder arrayEventRecorder) ( interfaces.Node, executors.ExecutionContext, executors.DAGStructure, executors.NodeLookup, *v1alpha1.NodeSpec, *v1alpha1.NodeStatus, error) { nodePhase := v1alpha1.NodePhase(arrayNodeState.SubNodePhases.GetItem(subNodeIndex)) @@ -640,12 +662,10 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter if err != nil { return nil, nil, nil, nil, nil, nil, err } - arrayExecutionContext := newArrayExecutionContext( - executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo), - subNodeIndex, currentParallelism, arrayNode.GetParallelism()) + arrayExecutionContext := newArrayExecutionContext(executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo), subNodeIndex) arrayNodeExecutionContextBuilder := newArrayNodeExecutionContextBuilder(a.nodeExecutor.GetNodeExecutionContextBuilder(), - subNodeID, subNodeIndex, subNodeStatus, inputReader, currentParallelism, arrayNode.GetParallelism(), eventRecorder) + subNodeID, subNodeIndex, subNodeStatus, inputReader, eventRecorder) arrayNodeExecutor := a.nodeExecutor.WithNodeExecutionContextBuilder(arrayNodeExecutionContextBuilder) return arrayNodeExecutor, arrayExecutionContext, &arrayNodeLookup, &arrayNodeLookup, &subNodeSpec, subNodeStatus, nil diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index fb4db59a2f..b2e85c0979 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -502,11 +502,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { }, subNodeTransitions: []handler.Transition{ handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), - handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, - expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_QUEUED}, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, }, { name: "AllSubNodesSuccedeed", diff --git a/flytepropeller/pkg/controller/nodes/array/node_execution_context.go b/flytepropeller/pkg/controller/nodes/array/node_execution_context.go index 17d46d2944..6ef7bb01c1 100644 --- a/flytepropeller/pkg/controller/nodes/array/node_execution_context.go +++ b/flytepropeller/pkg/controller/nodes/array/node_execution_context.go @@ -104,8 +104,10 @@ func (a *arrayNodeExecutionContext) TaskReader() interfaces.TaskReader { return a.taskReader } -func newArrayNodeExecutionContext(nodeExecutionContext interfaces.NodeExecutionContext, inputReader io.InputReader, eventRecorder arrayEventRecorder, subNodeIndex int, nodeStatus *v1alpha1.NodeStatus, currentParallelism *uint32, maxParallelism uint32) *arrayNodeExecutionContext { - arrayExecutionContext := newArrayExecutionContext(nodeExecutionContext.ExecutionContext(), subNodeIndex, currentParallelism, maxParallelism) +func newArrayNodeExecutionContext(nodeExecutionContext interfaces.NodeExecutionContext, inputReader io.InputReader, + eventRecorder arrayEventRecorder, subNodeIndex int, nodeStatus *v1alpha1.NodeStatus) *arrayNodeExecutionContext { + + arrayExecutionContext := newArrayExecutionContext(nodeExecutionContext.ExecutionContext(), subNodeIndex) return &arrayNodeExecutionContext{ NodeExecutionContext: nodeExecutionContext, eventRecorder: eventRecorder, diff --git a/flytepropeller/pkg/controller/nodes/array/node_execution_context_builder.go b/flytepropeller/pkg/controller/nodes/array/node_execution_context_builder.go index 6d0cfd3bfb..b66ae4a54d 100644 --- a/flytepropeller/pkg/controller/nodes/array/node_execution_context_builder.go +++ b/flytepropeller/pkg/controller/nodes/array/node_execution_context_builder.go @@ -10,14 +10,12 @@ import ( ) type arrayNodeExecutionContextBuilder struct { - nCtxBuilder interfaces.NodeExecutionContextBuilder - subNodeID v1alpha1.NodeID - subNodeIndex int - subNodeStatus *v1alpha1.NodeStatus - inputReader io.InputReader - currentParallelism *uint32 - maxParallelism uint32 - eventRecorder arrayEventRecorder + nCtxBuilder interfaces.NodeExecutionContextBuilder + subNodeID v1alpha1.NodeID + subNodeIndex int + subNodeStatus *v1alpha1.NodeStatus + inputReader io.InputReader + eventRecorder arrayEventRecorder } func (a *arrayNodeExecutionContextBuilder) BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext, @@ -31,23 +29,21 @@ func (a *arrayNodeExecutionContextBuilder) BuildNodeExecutionContext(ctx context if currentNodeID == a.subNodeID { // overwrite NodeExecutionContext for ArrayNode execution - nCtx = newArrayNodeExecutionContext(nCtx, a.inputReader, a.eventRecorder, a.subNodeIndex, a.subNodeStatus, a.currentParallelism, a.maxParallelism) + nCtx = newArrayNodeExecutionContext(nCtx, a.inputReader, a.eventRecorder, a.subNodeIndex, a.subNodeStatus) } return nCtx, nil } -func newArrayNodeExecutionContextBuilder(nCtxBuilder interfaces.NodeExecutionContextBuilder, subNodeID v1alpha1.NodeID, subNodeIndex int, subNodeStatus *v1alpha1.NodeStatus, - inputReader io.InputReader, currentParallelism *uint32, maxParallelism uint32, eventRecorder arrayEventRecorder) interfaces.NodeExecutionContextBuilder { +func newArrayNodeExecutionContextBuilder(nCtxBuilder interfaces.NodeExecutionContextBuilder, subNodeID v1alpha1.NodeID, subNodeIndex int, + subNodeStatus *v1alpha1.NodeStatus, inputReader io.InputReader, eventRecorder arrayEventRecorder) interfaces.NodeExecutionContextBuilder { return &arrayNodeExecutionContextBuilder{ - nCtxBuilder: nCtxBuilder, - subNodeID: subNodeID, - subNodeIndex: subNodeIndex, - subNodeStatus: subNodeStatus, - inputReader: inputReader, - currentParallelism: currentParallelism, - maxParallelism: maxParallelism, - eventRecorder: eventRecorder, + nCtxBuilder: nCtxBuilder, + subNodeID: subNodeID, + subNodeIndex: subNodeIndex, + subNodeStatus: subNodeStatus, + inputReader: inputReader, + eventRecorder: eventRecorder, } } diff --git a/flytepropeller/pkg/controller/nodes/array/worker.go b/flytepropeller/pkg/controller/nodes/array/worker.go index 96ae1c1a37..8a5342a8a0 100644 --- a/flytepropeller/pkg/controller/nodes/array/worker.go +++ b/flytepropeller/pkg/controller/nodes/array/worker.go @@ -15,23 +15,29 @@ import ( type nodeExecutionRequest struct { ctx context.Context index int - nodePhase v1alpha1.NodePhase - taskPhase int - nodeExecutor interfaces.Node - executionContext executors.ExecutionContext - dagStructure executors.DAGStructure - nodeLookup executors.NodeLookup - subNodeSpec *v1alpha1.NodeSpec - subNodeStatus *v1alpha1.NodeStatus + nodePhase v1alpha1.NodePhase + taskPhase int + nodeExecutor interfaces.Node + executionContext executors.ExecutionContext + dagStructure executors.DAGStructure + nodeLookup executors.NodeLookup + subNodeSpec *v1alpha1.NodeSpec + subNodeStatus *v1alpha1.NodeStatus arrayEventRecorder arrayEventRecorder - responseChannel chan struct {interfaces.NodeStatus; error} + responseChannel chan struct { + interfaces.NodeStatus + error + } } // TODO @hamersaw - docs type gatherOutputsRequest struct { ctx context.Context reader *ioutils.RemoteFileOutputReader - responseChannel chan struct {literalMap map[string]*idlcore.Literal; error} + responseChannel chan struct { + literalMap map[string]*idlcore.Literal + error + } } // TODO @hamersaw - docs @@ -48,19 +54,31 @@ func (w *worker) run() { // execute RecurseNodeHandler on node nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext, nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) - nodeExecutionRequest.responseChannel <- struct {interfaces.NodeStatus; error}{nodeStatus, err} + nodeExecutionRequest.responseChannel <- struct { + interfaces.NodeStatus + error + }{nodeStatus, err} case gatherOutputsRequest := <-w.gatherOutputsRequestChannel: // read outputs outputs, executionErr, err := gatherOutputsRequest.reader.Read(gatherOutputsRequest.ctx) if err != nil { - gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, err} + gatherOutputsRequest.responseChannel <- struct { + literalMap map[string]*idlcore.Literal + error + }{nil, err} continue } else if executionErr != nil { - gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{nil, fmt.Errorf("%s", executionErr.String())} + gatherOutputsRequest.responseChannel <- struct { + literalMap map[string]*idlcore.Literal + error + }{nil, fmt.Errorf("%s", executionErr.String())} continue } - gatherOutputsRequest.responseChannel <- struct{literalMap map[string]*idlcore.Literal; error}{outputs.GetLiterals(), nil} + gatherOutputsRequest.responseChannel <- struct { + literalMap map[string]*idlcore.Literal + error + }{outputs.GetLiterals(), nil} } } } From 4865d43f70c38cbb05048f844654cd59964a9ca4 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 17:37:50 -0600 Subject: [PATCH 07/11] added configuration for node execution worker count Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/config/config.go | 8 +++++--- .../pkg/controller/config/config_flags.go | 1 + .../pkg/controller/config/config_flags_test.go | 14 ++++++++++++++ .../pkg/controller/nodes/array/handler.go | 3 +-- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 10a83cd2fc..50c5d12422 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -112,9 +112,10 @@ var ( EventConfig: EventConfig{ RawOutputPolicy: RawOutputPolicyReference, }, - ClusterID: "propeller", - CreateFlyteWorkflowCRD: false, - ArrayNodeEventVersion: 0, + ClusterID: "propeller", + CreateFlyteWorkflowCRD: false, + ArrayNodeEventVersion: 0, + NodeExecutionWorkerCount: 8, } ) @@ -155,6 +156,7 @@ type Config struct { ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"` CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` + NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index 8e9c71bcdb..07a4fba742 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -108,5 +108,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "cluster-id"), defaultConfig.ClusterID, "Unique cluster id running this flytepropeller instance with which to annotate execution events") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "create-flyteworkflow-crd"), defaultConfig.CreateFlyteWorkflowCRD, "Enable creation of the FlyteWorkflow CRD on startup") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "array-node-event-version"), defaultConfig.ArrayNodeEventVersion, "ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "node-execution-worker-count"), defaultConfig.NodeExecutionWorkerCount, "Number of workers to evaluate node executions, currently only used for array nodes") return cmdFlags } diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index f48d01ebea..54da9e9fe1 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -911,4 +911,18 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_node-execution-worker-count", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-execution-worker-count", testValue) + if vInt, err := cmdFlags.GetInt("node-execution-worker-count"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.NodeExecutionWorkerCount) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 587b2aeb48..a77578b676 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -548,8 +548,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // Setup handles any initialization requirements for this handler func (a *arrayNodeHandler) Setup(_ context.Context, _ interfaces.SetupContext) error { // start workers - workerCount := 8 - for i := 0; i < workerCount; i++ { + for i := 0; i < config.GetConfig().NodeExecutionWorkerCount; i++ { worker := worker{ gatherOutputsRequestChannel: a.gatherOutputsRequestChannel, nodeExecutionRequestChannel: a.nodeExecutionRequestChannel, From d0c19a5b6a1b8920a73637a75e4990c7ee3c278a Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 19:12:36 -0600 Subject: [PATCH 08/11] docs Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/worker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/worker.go b/flytepropeller/pkg/controller/nodes/array/worker.go index 8a5342a8a0..0856504771 100644 --- a/flytepropeller/pkg/controller/nodes/array/worker.go +++ b/flytepropeller/pkg/controller/nodes/array/worker.go @@ -11,7 +11,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" ) -// TODO @hamersaw - docs +// nodeExecutionRequest is a request to execute an ArrayNode subNode type nodeExecutionRequest struct { ctx context.Context index int @@ -30,7 +30,7 @@ type nodeExecutionRequest struct { } } -// TODO @hamersaw - docs +// gatherOutputsRequest is a request to read outputs from an ArrayNode subNode type gatherOutputsRequest struct { ctx context.Context reader *ioutils.RemoteFileOutputReader @@ -40,13 +40,13 @@ type gatherOutputsRequest struct { } } -// TODO @hamersaw - docs +// worker is an entity that is used to parallelize I/O bound operations for ArrayNode execution type worker struct { gatherOutputsRequestChannel chan *gatherOutputsRequest nodeExecutionRequestChannel chan *nodeExecutionRequest } -// TODO @hamersaw - docs +// run starts the main handle loop for the worker func (w *worker) run() { for { select { From f059a6367d3c00fc233578ae93500dc5e317539d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 19:16:05 -0600 Subject: [PATCH 09/11] spaces instead of tabs ... Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 50c5d12422..1afc986287 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -156,7 +156,7 @@ type Config struct { ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"` CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` - NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` + NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. From eb40fe8f0d5482ba11fd7bb65292ec7be4b33c4d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 8 Dec 2023 19:18:39 -0600 Subject: [PATCH 10/11] removed dead code Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index a77578b676..f1e2ef64fc 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -575,10 +575,6 @@ func New(nodeExecutor interfaces.Node, eventConfig *config.EventConfig, scope pr return nil, err } - /*nodeExecutionRequestChannel := make(chan *nodeExecutionRequest, workerCount) - for i := 0; i < workerCount; i++ { - go func() {*/ - arrayScope := scope.NewSubScope("array") return &arrayNodeHandler{ eventConfig: eventConfig, From 61ed21427c935a90f7c98bb6e5fa6a8f2394b739 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 14 Dec 2023 09:03:25 -0600 Subject: [PATCH 11/11] added panic handling on worker execution Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/worker.go | 59 +++++++++++++------ 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/worker.go b/flytepropeller/pkg/controller/nodes/array/worker.go index 0856504771..b5b5db49da 100644 --- a/flytepropeller/pkg/controller/nodes/array/worker.go +++ b/flytepropeller/pkg/controller/nodes/array/worker.go @@ -3,12 +3,14 @@ package array import ( "context" "fmt" + "runtime/debug" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" + "github.com/flyteorg/flyte/flytestdlib/logger" ) // nodeExecutionRequest is a request to execute an ArrayNode subNode @@ -51,34 +53,53 @@ func (w *worker) run() { for { select { case nodeExecutionRequest := <-w.nodeExecutionRequestChannel: - // execute RecurseNodeHandler on node - nodeStatus, err := nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext, - nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) + var nodeStatus interfaces.NodeStatus + var err error + func() { + defer func() { + if r := recover(); r != nil { + stack := debug.Stack() + err = fmt.Errorf("panic when executing ArrayNode subNode, Stack: [%s]", string(stack)) + logger.Errorf(nodeExecutionRequest.ctx, err.Error()) + } + }() + + // execute RecurseNodeHandler on node + nodeStatus, err = nodeExecutionRequest.nodeExecutor.RecursiveNodeHandler(nodeExecutionRequest.ctx, nodeExecutionRequest.executionContext, + nodeExecutionRequest.dagStructure, nodeExecutionRequest.nodeLookup, nodeExecutionRequest.subNodeSpec) + }() + nodeExecutionRequest.responseChannel <- struct { interfaces.NodeStatus error }{nodeStatus, err} case gatherOutputsRequest := <-w.gatherOutputsRequestChannel: - // read outputs - outputs, executionErr, err := gatherOutputsRequest.reader.Read(gatherOutputsRequest.ctx) - if err != nil { - gatherOutputsRequest.responseChannel <- struct { - literalMap map[string]*idlcore.Literal - error - }{nil, err} - continue - } else if executionErr != nil { - gatherOutputsRequest.responseChannel <- struct { - literalMap map[string]*idlcore.Literal - error - }{nil, fmt.Errorf("%s", executionErr.String())} - continue - } + var literalMap map[string]*idlcore.Literal + var err error + func() { + defer func() { + if r := recover(); r != nil { + stack := debug.Stack() + err = fmt.Errorf("panic when executing ArrayNode subNode, Stack: [%s]", string(stack)) + logger.Errorf(gatherOutputsRequest.ctx, err.Error()) + } + }() + + // read outputs + outputs, executionErr, gatherErr := gatherOutputsRequest.reader.Read(gatherOutputsRequest.ctx) + if gatherErr != nil { + err = gatherErr + } else if executionErr != nil { + err = fmt.Errorf("%s", executionErr.String()) + } else { + literalMap = outputs.GetLiterals() + } + }() gatherOutputsRequest.responseChannel <- struct { literalMap map[string]*idlcore.Literal error - }{outputs.GetLiterals(), nil} + }{literalMap, nil} } } }