Skip to content

Commit

Permalink
Upstream: Add labels to published execution events (#6104)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Dec 12, 2024
1 parent d770918 commit 4063ab8
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 130 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi
}

if cloudEventsConfig.CloudEventVersion == runtimeInterfaces.CloudEventVersionv2 {
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig)
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig, cloudEventsConfig.EventsPublisherConfig)
}

return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ func (p *Publisher) shouldPublishEvent(notificationType string) bool {
}

type CloudEventWrappedPublisher struct {
db repositoryInterfaces.Repository
sender interfaces.Sender
systemMetrics implementations.EventPublisherSystemMetrics
storageClient *storage.DataStore
urlData dataInterfaces.RemoteURLInterface
remoteDataConfig runtimeInterfaces.RemoteDataConfig
db repositoryInterfaces.Repository
sender interfaces.Sender
systemMetrics implementations.EventPublisherSystemMetrics
storageClient *storage.DataStore
urlData dataInterfaces.RemoteURLInterface
remoteDataConfig runtimeInterfaces.RemoteDataConfig
eventPublisherConfig runtimeInterfaces.EventsPublisherConfig
}

func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context.Context, rawEvent *event.WorkflowExecutionEvent) (*event.CloudEventWorkflowExecution, error) {
Expand All @@ -133,8 +134,8 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
return nil, fmt.Errorf("nil execution id in event [%+v]", rawEvent)
}

// For now, don't append any additional information unless succeeded
if rawEvent.GetPhase() != core.WorkflowExecution_SUCCEEDED {
// For now, don't append any additional information unless succeeded or otherwise configured
if rawEvent.GetPhase() != core.WorkflowExecution_SUCCEEDED && !c.eventPublisherConfig.EnrichAllWorkflowEventTypes {
return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
}, nil
Expand Down Expand Up @@ -193,6 +194,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
Principal: spec.GetMetadata().GetPrincipal(),
LaunchPlanId: spec.GetLaunchPlan(),
Labels: spec.GetLabels().GetValues(),
}, nil
}

Expand Down Expand Up @@ -317,6 +319,7 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
Principal: spec.GetMetadata().GetPrincipal(),
LaunchPlanId: spec.GetLaunchPlan(),
Labels: spec.GetLabels().GetValues(),
}, nil
}

Expand All @@ -326,8 +329,24 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
}

executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.GetParentNodeExecutionId().GetExecutionId().GetProject(),
Domain: rawEvent.GetParentNodeExecutionId().GetExecutionId().GetDomain(),
Name: rawEvent.GetParentNodeExecutionId().GetExecutionId().GetName(),
})
if err != nil {
logger.Warningf(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.GetParentNodeExecutionId().GetExecutionId())
return nil, err
}
ex, err := transformers.FromExecutionModel(ctx, executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(ctx, "couldn't transform execution [%+v] for cloud event processing", rawEvent.GetParentNodeExecutionId().GetExecutionId())
return nil, err
}

return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
Labels: ex.GetSpec().GetLabels().GetValues(),
}, nil
}

Expand Down Expand Up @@ -472,14 +491,15 @@ func NewCloudEventsPublisher(sender interfaces.Sender, scope promutils.Scope, ev
}

func NewCloudEventsWrappedPublisher(
db repositoryInterfaces.Repository, sender interfaces.Sender, scope promutils.Scope, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, remoteDataConfig runtimeInterfaces.RemoteDataConfig) interfaces.Publisher {
db repositoryInterfaces.Repository, sender interfaces.Sender, scope promutils.Scope, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, remoteDataConfig runtimeInterfaces.RemoteDataConfig, eventPublisherConfig runtimeInterfaces.EventsPublisherConfig) interfaces.Publisher {

return &CloudEventWrappedPublisher{
db: db,
sender: sender,
systemMetrics: implementations.NewEventPublisherSystemMetrics(scope.NewSubScope("cloudevents_publisher")),
storageClient: storageClient,
urlData: urlData,
remoteDataConfig: remoteDataConfig,
db: db,
sender: sender,
systemMetrics: implementations.NewEventPublisherSystemMetrics(scope.NewSubScope("cloudevents_publisher")),
storageClient: storageClient,
urlData: urlData,
remoteDataConfig: remoteDataConfig,
eventPublisherConfig: eventPublisherConfig,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ type EventsPublisherConfig struct {
TopicName string `json:"topicName"`
// Event types: task, node, workflow executions
EventTypes []string `json:"eventTypes"`
// Whether to publish enriched events for all workflow execution events
EnrichAllWorkflowEventTypes bool `json:"enrichAllWorkflowEventTypes"`
}

type ExternalEventsConfig struct {
Expand Down
24 changes: 24 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/event/cloudevents_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4063ab8

Please sign in to comment.