From b8cd12cd01c4f8faea961e67361e8eb8af810619 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Wed, 1 Nov 2023 09:26:13 -0500 Subject: [PATCH] Not revisiting task nodes and correctly incrementing parallelism (#4318) * adding fix for not revisiting task nodes and correctly incrementing parallelism Signed-off-by: Daniel Rammer * fixed unit tests Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/task/handler.go | 6 ++++ .../pkg/controller/nodes/task/handler_test.go | 29 +++++++------------ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 6a0f44d648..64bf012b80 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -570,6 +570,12 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex } if pluginTrns.IsPreviouslyObserved() { logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.") + logger.Infof(ctx, "Parallelism now set to [%d].", nCtx.ExecutionContext().IncrementParallelism()) + + // This is a hack to ensure that we do not re-evaluate the same node again in the same round. + if err := nCtx.NodeStateWriter().PutTaskNodeState(ts); err != nil { + return handler.UnknownTransition, err + } return pluginTrns.FinalTransition(ctx) } } diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 0e778a5e48..91c0879759 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -530,13 +530,12 @@ func Test_task_Handle_NoCatalog(t *testing.T) { expectedState fakeplugins.NextPhaseState } type want struct { - handlerPhase handler.EPhase - wantErr bool - event bool - eventPhase core.TaskExecution_Phase - skipStateUpdate bool - incrParallel bool - checkpoint bool + handlerPhase handler.EPhase + wantErr bool + event bool + eventPhase core.TaskExecution_Phase + incrParallel bool + checkpoint bool } tests := []struct { name string @@ -666,10 +665,9 @@ func Test_task_Handle_NoCatalog(t *testing.T) { }, }, want{ - handlerPhase: handler.EPhaseRunning, - event: false, - skipStateUpdate: true, - incrParallel: true, + handlerPhase: handler.EPhaseRunning, + event: false, + incrParallel: true, }, }, { @@ -738,13 +736,8 @@ func Test_task_Handle_NoCatalog(t *testing.T) { expectedPhase = pluginCore.PhasePermanentFailure } } - if tt.want.skipStateUpdate { - assert.Equal(t, pluginCore.PhaseUndefined, state.s.PluginPhase) - assert.Equal(t, uint32(0), state.s.PluginPhaseVersion) - } else { - assert.Equal(t, expectedPhase.String(), state.s.PluginPhase.String()) - assert.Equal(t, tt.args.expectedState.PhaseVersion, state.s.PluginPhaseVersion) - } + assert.Equal(t, expectedPhase.String(), state.s.PluginPhase.String()) + assert.Equal(t, tt.args.expectedState.PhaseVersion, state.s.PluginPhaseVersion) if tt.want.checkpoint { assert.Equal(t, "s3://sandbox/x/name-n1-1/_flytecheckpoints", got.Info().GetInfo().TaskNodeInfo.TaskNodeMetadata.CheckpointUri)