diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 509310b467..7985b52149 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -151,7 +151,7 @@ type Config struct { ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"` ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"` CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` - ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` + ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index 2c66316aad..b0811a8d6a 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -5,12 +5,12 @@ import ( "fmt" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - idlcore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" + idlcore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/golang/protobuf/ptypes" ) @@ -22,10 +22,10 @@ type arrayEventRecorder interface { } type externalResourcesEventRecorder struct { - interfaces.EventRecorder + interfaces.EventRecorder externalResources []*event.ExternalResourceInfo - nodeEvents []*event.NodeExecutionEvent - taskEvents []*event.TaskExecutionEvent + nodeEvents []*event.NodeExecutionEvent + taskEvents []*event.TaskExecutionEvent } func (e *externalResourcesEventRecorder) RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error { @@ -136,7 +136,8 @@ type passThroughEventRecorder struct { interfaces.EventRecorder } -func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {} +func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) { +} func (*passThroughEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error { @@ -166,16 +167,16 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index timestamp := ptypes.TimestampNow() workflowExecutionID := nCtx.ExecutionContext().GetExecutionID().WorkflowExecutionIdentifier - // send NodeExecutionEvent with UNDEFINED phase + // send NodeExecutionEvent nodeExecutionEvent := &event.NodeExecutionEvent{ Id: &idlcore.NodeExecutionIdentifier{ NodeId: subNodeID, ExecutionId: workflowExecutionID, }, - Phase: nodePhase, + Phase: nodePhase, OccurredAt: timestamp, ParentNodeMetadata: &event.ParentNodeExecutionMetadata{ - NodeId: nCtx.NodeID(), + NodeId: nCtx.NodeID(), }, ReportedAt: timestamp, } @@ -184,7 +185,7 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index return err } - // send TaskExeucutionEvent with UNDEFINED phase + // send TaskExeucutionEvent taskExecutionEvent := &event.TaskExecutionEvent{ TaskId: &idlcore.Identifier{ ResourceType: idlcore.ResourceType_TASK, diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go b/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go index 94ca8af17d..1dcb7c1ed2 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder_test.go @@ -3,8 +3,8 @@ package array import ( "context" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" ) type bufferedEventRecorder struct { diff --git a/flytepropeller/pkg/controller/nodes/array/node_execution_context.go b/flytepropeller/pkg/controller/nodes/array/node_execution_context.go index 69b854e8bc..1edf7b86f4 100644 --- a/flytepropeller/pkg/controller/nodes/array/node_execution_context.go +++ b/flytepropeller/pkg/controller/nodes/array/node_execution_context.go @@ -3,11 +3,11 @@ package array import ( "context" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) type staticInputReader struct { diff --git a/flytepropeller/pkg/controller/nodes/array/utils.go b/flytepropeller/pkg/controller/nodes/array/utils.go index aa71ace760..3fde690770 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils.go +++ b/flytepropeller/pkg/controller/nodes/array/utils.go @@ -5,13 +5,13 @@ import ( "context" "fmt" - idlcore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s" "github.com/flyteorg/flyte/flytestdlib/storage" + idlcore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) func appendLiteral(name string, literal *idlcore.Literal, outputLiterals map[string]*idlcore.Literal, length int) {