Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ArrayNode subNode timeouts #6054

Merged
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ type ExecutableArrayNodeStatus interface {
GetSubNodeTaskPhases() bitarray.CompactArray
GetSubNodeRetryAttempts() bitarray.CompactArray
GetSubNodeSystemFailures() bitarray.CompactArray
GetSubNodeDeltaTimestamps() bitarray.CompactArray
GetTaskPhaseVersion() uint32
}

Expand All @@ -302,6 +303,7 @@ type MutableArrayNodeStatus interface {
SetSubNodeTaskPhases(subNodeTaskPhases bitarray.CompactArray)
SetSubNodeRetryAttempts(subNodeRetryAttempts bitarray.CompactArray)
SetSubNodeSystemFailures(subNodeSystemFailures bitarray.CompactArray)
SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray)
SetTaskPhaseVersion(taskPhaseVersion uint32)
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 19 additions & 7 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,14 @@

type ArrayNodeStatus struct {
MutableStruct
Phase ArrayNodePhase `json:"phase,omitempty"`
ExecutionError *core.ExecutionError `json:"executionError,omitempty"`
SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"`
SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"`
SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"`
SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"`
TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"`
Phase ArrayNodePhase `json:"phase,omitempty"`
ExecutionError *core.ExecutionError `json:"executionError,omitempty"`
SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"`
SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"`
SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"`
SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"`
SubNodeDeltaTimestamps bitarray.CompactArray `json:"subtimestamps,omitempty"`
TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"`
}

func (in *ArrayNodeStatus) GetArrayNodePhase() ArrayNodePhase {
Expand Down Expand Up @@ -305,6 +306,17 @@
}
}

func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray {
return in.SubNodeDeltaTimestamps

Check warning on line 310 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L309-L310

Added lines #L309 - L310 were not covered by tests
}

func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) {
if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps {
in.SetDirty()
in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps
}

Check warning on line 317 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L313-L317

Added lines #L313 - L317 were not covered by tests
}

func (in *ArrayNodeStatus) GetTaskPhaseVersion() uint32 {
return in.TaskPhaseVersion
}
Expand Down
32 changes: 32 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"math"
"strconv"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand All @@ -28,6 +31,11 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const (
// value is 3 days of seconds which is covered by 18 bits (262144)
MAX_DELTA_TIMESTAMP = 259200
)

var (
nilLiteral = &idlcore.Literal{
Value: &idlcore.Literal_Scalar{
Expand Down Expand Up @@ -254,6 +262,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: MAX_DELTA_TIMESTAMP},
} {

*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
Expand Down Expand Up @@ -380,6 +389,20 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts()))
arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures()))

if arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil {
startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt()
subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt()
if subNodeStartedAt == nil {
// subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has
// been reset (ex. retryable failure). in both cases we set the delta timestamp to 0
arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0)
} else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 {
// otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it
deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds())
arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration)
}
}

// increment task phase version if subNode phase or task phase changed
if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase {
incrementTaskPhaseVersion = true
Expand Down Expand Up @@ -767,6 +790,14 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
return nil, nil, nil, nil, nil, nil, err
}

// compute start time for subNode using delta timestamp from ArrayNode NodeStatus
var startedAt *metav1.Time
if nCtx.NodeStatus().GetLastAttemptStartedAt() != nil && arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil {
if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 {
startedAt = &metav1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} // #nosec G115
}
}

subNodeStatus := &v1alpha1.NodeStatus{
Phase: nodePhase,
DataDir: subDataDir,
Expand All @@ -777,6 +808,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
Phase: taskPhase,
PluginState: pluginStateBytes,
},
LastAttemptStartedAt: startedAt,
}

// initialize mocks
Expand Down
93 changes: 72 additions & 21 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -184,9 +186,15 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
nCtx.OnNodeStateWriter().Return(nodeStateWriter)

// NodeStatus
nowMinus := time.Now().Add(time.Duration(-5) * time.Second)
metav1NowMinus := metav1.Time{
Time: nowMinus,
}
nCtx.OnNodeStatus().Return(&v1alpha1.NodeStatus{
DataDir: storage.DataReference("s3://bucket/data"),
OutputDir: storage.DataReference("s3://bucket/output"),
DataDir: storage.DataReference("s3://bucket/data"),
OutputDir: storage.DataReference("s3://bucket/output"),
LastAttemptStartedAt: &metav1NowMinus,
StartedAt: &metav1NowMinus,
})

return nCtx
Expand Down Expand Up @@ -252,6 +260,7 @@ func TestAbort(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {

*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
Expand Down Expand Up @@ -348,6 +357,7 @@ func TestFinalize(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {
*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
assert.NoError(t, err)
Expand Down Expand Up @@ -506,25 +516,27 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
}

tests := []struct {
name string
parallelism *uint32
minSuccessRatio *float32
subNodePhases []v1alpha1.NodePhase
subNodeTaskPhases []core.Phase
subNodeTransitions []handler.Transition
expectedArrayNodePhase v1alpha1.ArrayNodePhase
expectedArrayNodeSubPhases []v1alpha1.NodePhase
expectedTransitionPhase handler.EPhase
expectedExternalResourcePhases []idlcore.TaskExecution_Phase
currentWfParallelism uint32
maxWfParallelism uint32
incrementParallelismCount uint32
useFakeEventRecorder bool
eventRecorderFailures uint32
eventRecorderError error
expectedTaskPhaseVersion uint32
expectHandleError bool
expectedEventingCalls int
name string
parallelism *uint32
minSuccessRatio *float32
subNodePhases []v1alpha1.NodePhase
subNodeTaskPhases []core.Phase
subNodeDeltaTimestamps []uint64
subNodeTransitions []handler.Transition
expectedArrayNodePhase v1alpha1.ArrayNodePhase
expectedArrayNodeSubPhases []v1alpha1.NodePhase
expectedDiffArrayNodeSubDeltaTimestamps []bool
expectedTransitionPhase handler.EPhase
expectedExternalResourcePhases []idlcore.TaskExecution_Phase
currentWfParallelism uint32
maxWfParallelism uint32
incrementParallelismCount uint32
useFakeEventRecorder bool
eventRecorderFailures uint32
eventRecorderError error
expectedTaskPhaseVersion uint32
expectHandleError bool
expectedEventingCalls int
}{
{
name: "StartAllSubNodes",
Expand Down Expand Up @@ -827,6 +839,31 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectHandleError: true,
expectedEventingCalls: 1,
},
{
name: "DeltaTimestampUpdates",
parallelism: uint32Ptr(0),
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseRunning,
},
subNodeTaskPhases: []core.Phase{
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRetryableFailure,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_FAILED},
incrementParallelismCount: 1,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -858,6 +895,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {
*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
assert.NoError(t, err)
Expand All @@ -867,6 +905,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
arrayNodeState.SubNodePhases.SetItem(i, bitarray.Item(nodePhase)) // #nosec G115
}

for i, deltaTimestmap := range test.subNodeDeltaTimestamps {
arrayNodeState.SubNodeDeltaTimestamps.SetItem(i, deltaTimestmap) // #nosec G115
}

nodeSpec := arrayNodeSpec
nodeSpec.ArrayNode.Parallelism = test.parallelism
nodeSpec.ArrayNode.MinSuccessRatio = test.minSuccessRatio
Expand Down Expand Up @@ -922,6 +964,14 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
assert.Equal(t, expectedPhase, v1alpha1.NodePhase(arrayNodeState.SubNodePhases.GetItem(i))) // #nosec G115
}

for i, expectedDiffDeltaTimestamps := range test.expectedDiffArrayNodeSubDeltaTimestamps {
if expectedDiffDeltaTimestamps {
assert.NotEqual(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i])
} else {
assert.Equal(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i])
}
}

bufferedEventRecorder, ok := eventRecorder.(*bufferedEventRecorder)
if ok {
if len(test.expectedExternalResourcePhases) > 0 {
Expand Down Expand Up @@ -1301,6 +1351,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {
*item.arrayReference, err = bitarray.NewCompactArray(uint(len(test.subNodePhases)), bitarray.Item(item.maxValue)) // #nosec G115
assert.NoError(t, err)
Expand Down
Loading
Loading