Skip to content

Commit

Permalink
added version control of ArrayNode eventing
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Oct 3, 2023
1 parent b35cc95 commit fdec3b3
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 160 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var (
},
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
ArrayNodeEventVersion: 0,
}
)

Expand Down Expand Up @@ -150,6 +151,7 @@ type Config struct {
ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

158 changes: 158 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package array

import (
"context"
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
idlcore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/golang/protobuf/ptypes"
)

type arrayEventRecorder interface {
interfaces.EventRecorder
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32)
finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error
finalizeRequired(ctx context.Context) bool
}

type externalResourcesEventRecorder struct {
interfaces.EventRecorder
externalResources []*event.ExternalResourceInfo
nodeEvents []*event.NodeExecutionEvent
taskEvents []*event.TaskExecutionEvent
}

func (e *externalResourcesEventRecorder) RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error {
e.nodeEvents = append(e.nodeEvents, event)
return nil
}

func (e *externalResourcesEventRecorder) RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent, eventConfig *config.EventConfig) error {
e.taskEvents = append(e.taskEvents, event)
return nil
}

func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
// process events
cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED
for _, nodeExecutionEvent := range e.nodeEvents {
switch target := nodeExecutionEvent.TargetMetadata.(type) {
case *event.NodeExecutionEvent_TaskNodeMetadata:
if target.TaskNodeMetadata != nil {
cacheStatus = target.TaskNodeMetadata.CacheStatus
}
}
}

// fastcache will not emit task events for cache hits. we need to manually detect a
// transition to `SUCCEEDED` and add an `ExternalResourceInfo` for it.
if cacheStatus == idlcore.CatalogCacheStatus_CACHE_HIT && len(e.taskEvents) == 0 {
e.externalResources = append(e.externalResources, &event.ExternalResourceInfo{
ExternalId: buildSubNodeID(nCtx, index, retryAttempt),
Index: uint32(index),
RetryAttempt: retryAttempt,
Phase: idlcore.TaskExecution_SUCCEEDED,
CacheStatus: cacheStatus,
})
}

for _, taskExecutionEvent := range e.taskEvents {
for _, log := range taskExecutionEvent.Logs {
log.Name = fmt.Sprintf("%s-%d", log.Name, index)
}

e.externalResources = append(e.externalResources, &event.ExternalResourceInfo{
ExternalId: buildSubNodeID(nCtx, index, retryAttempt),
Index: uint32(index),
Logs: taskExecutionEvent.Logs,
RetryAttempt: retryAttempt,
Phase: taskExecutionEvent.Phase,
CacheStatus: cacheStatus,
})
}

// clear nodeEvents and taskEvents
e.nodeEvents = e.nodeEvents[:0]
e.taskEvents = e.taskEvents[:0]
}

func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error {

// build TaskExecutionEvent
occurredAt, err := ptypes.TimestampProto(time.Now())
if err != nil {
return err
}

nodeExecutionID := nCtx.NodeExecutionMetadata().GetNodeExecutionID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nodeExecutionID.NodeId)
if err != nil {
return err
}
nodeExecutionID.NodeId = currentNodeUniqueID
}

workflowExecutionID := nodeExecutionID.ExecutionId

taskExecutionEvent := &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Project: workflowExecutionID.Project,
Domain: workflowExecutionID.Domain,
Name: nCtx.NodeID(),
Version: "v1", // this value is irrelevant but necessary for the identifier to be valid
},
ParentNodeExecutionId: nodeExecutionID,
RetryAttempt: 0, // ArrayNode will never retry
Phase: taskPhase,
PhaseVersion: taskPhaseVersion,
OccurredAt: occurredAt,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: e.externalResources,
},
TaskType: "k8s-array",
EventVersion: 1,
}

// record TaskExecutionEvent
return e.EventRecorder.RecordTaskEvent(ctx, taskExecutionEvent, eventConfig)
}

func (e *externalResourcesEventRecorder) finalizeRequired(ctx context.Context) bool {
return len(e.externalResources) > 0
}

type passThroughEventRecorder struct {
interfaces.EventRecorder
}

func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {}

func (*passThroughEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error {
return nil
}

func (*passThroughEventRecorder) finalizeRequired(ctx context.Context) bool {
return false
}

func newArrayEventRecorder(eventRecorder interfaces.EventRecorder) arrayEventRecorder {
if config.GetConfig().ArrayNodeEventVersion == 0 {
return &externalResourcesEventRecorder{
EventRecorder: eventRecorder,
}
}

return &passThroughEventRecorder{
EventRecorder: eventRecorder,
}
}
Loading

0 comments on commit fdec3b3

Please sign in to comment.