Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artf/switch event #4428

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 123 additions & 49 deletions flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flytestdlib/contextutils"

"reflect"
"time"

Expand Down Expand Up @@ -200,47 +203,89 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: &workflowInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
}, nil
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
func getNodeExecutionContext(ctx context.Context, identifier *core.NodeExecutionIdentifier) context.Context {
ctx = contextutils.WithProjectDomain(ctx, identifier.ExecutionId.Project, identifier.ExecutionId.Domain)
ctx = contextutils.WithExecutionID(ctx, identifier.ExecutionId.Name)
return contextutils.WithNodeID(ctx, identifier.NodeId)
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {
// This is a rough copy of the ListTaskExecutions function in TaskExecutionManager. It can be deprecated once we move the processing out of Admin itself.
// Just return the highest retry attempt.
func (c *CloudEventWrappedPublisher) getLatestTaskExecutions(ctx context.Context, nodeExecutionID core.NodeExecutionIdentifier) (*admin.TaskExecution, error) {
ctx = getNodeExecutionContext(ctx, &nodeExecutionID)

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
identifierFilters, err := util.GetNodeExecutionIdentifierFilters(ctx, nodeExecutionID)
if err != nil {
return nil, err
}

// For now, don't append any additional information unless succeeded
if rawEvent.Phase != core.TaskExecution_SUCCEEDED {
return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: nil,
OutputInterface: nil,
sort := admin.Sort{
Key: "retry_attempt",
Direction: 0,
}
sortParameter, err := common.NewSortParameter(&sort, models.TaskExecutionColumns)
if err != nil {
return nil, err
}

output, err := c.db.TaskExecutionRepo().List(ctx, repositoryInterfaces.ListResourceInput{
InlineFilters: identifierFilters,
Offset: 0,
Limit: 1,
SortParameter: sortParameter,
})
if err != nil {
return nil, err
}
if output.TaskExecutions == nil || len(output.TaskExecutions) == 0 {
logger.Debugf(ctx, "no task executions found for node exec id [%+v]", nodeExecutionID)
return nil, nil
}

taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform task execution models for node exec id [%+v] with err: %v", nodeExecutionID, err)
return nil, err
}

return taskExecutionList[0], nil
}

func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Context, rawEvent *event.NodeExecutionEvent) (*event.CloudEventNodeExecution, error) {
if rawEvent == nil || rawEvent.Id == nil {
return nil, fmt.Errorf("nothing to publish, NodeExecution event or ID is nil")
}

// Skip nodes unless they're succeeded and not start nodes
if rawEvent.Phase != core.NodeExecution_SUCCEEDED {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
} else if rawEvent.Id.NodeId == "start-node" {
return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
}, nil
}
// metric

// This gets the parent workflow execution metadata
executionModel, err := c.db.ExecutionRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.ParentNodeExecutionId.ExecutionId.Project,
Domain: rawEvent.ParentNodeExecutionId.ExecutionId.Domain,
Name: rawEvent.ParentNodeExecutionId.ExecutionId.Name,
Project: rawEvent.Id.ExecutionId.Project,
Domain: rawEvent.Id.ExecutionId.Domain,
Name: rawEvent.Id.ExecutionId.Name,
})
if err != nil {
logger.Infof(ctx, "couldn't find execution [%+v] to save termination cause", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.Id.ExecutionId)
return nil, err
}

Expand All @@ -250,19 +295,9 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
fmt.Printf("there was an error with spec %v %v", err, executionModel.Spec)
}

taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: rawEvent.TaskId.Project,
Domain: rawEvent.TaskId.Domain,
Name: rawEvent.TaskId.Name,
Version: rawEvent.TaskId.Version,
})
if err != nil {
// TODO: metric this
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", rawEvent.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)

// Get inputs/outputs
// This will likely need to move to the artifact service side, given message size limits.
// Replace with call to GetNodeExecutionData
var inputs *core.LiteralMap
if rawEvent.GetInputData() != nil {
inputs = rawEvent.GetInputData()
Expand All @@ -273,9 +308,10 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
fmt.Printf("Error fetching input literal map %v", rawEvent)
}
} else {
logger.Infof(ctx, "Task execution for node exec [%+v] has no input data", rawEvent.ParentNodeExecutionId)
logger.Infof(ctx, "Node execution for node exec [%+v] has no input data", rawEvent.Id)
}

// This will likely need to move to the artifact service side, given message size limits.
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
outputs = rawEvent.GetOutputData()
Expand All @@ -289,16 +325,53 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
}
}

// Fetch the latest task execution if any, and pull out the task interface, if applicable.
// These are optional fields... if the node execution doesn't have a task execution then these will be empty.
var taskExecID *core.TaskExecutionIdentifier
var typedInterface *core.TypedInterface

lte, err := c.getLatestTaskExecutions(ctx, *rawEvent.Id)
if err != nil {
logger.Errorf(ctx, "failed to get latest task execution for node exec id [%+v] with err: %v", rawEvent.Id, err)
return nil, err
}
if lte != nil {
taskModel, err := c.db.TaskRepo().Get(ctx, repositoryInterfaces.Identifier{
Project: lte.Id.TaskId.Project,
Domain: lte.Id.TaskId.Domain,
Name: lte.Id.TaskId.Name,
Version: lte.Id.TaskId.Version,
})
if err != nil {
// TODO: metric this
// metric
logger.Debugf(ctx, "Failed to get task with task id [%+v] with err %v", lte.Id.TaskId, err)
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)
typedInterface = task.Closure.CompiledTask.Template.Interface
taskExecID = lte.Id
}

return &event.CloudEventNodeExecution{
RawEvent: rawEvent,
TaskExecId: taskExecID,
OutputData: outputs,
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
LaunchPlanId: spec.LaunchPlan,
}, nil
}

func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Context, rawEvent *event.TaskExecutionEvent) (*event.CloudEventTaskExecution, error) {

if rawEvent == nil {
return nil, fmt.Errorf("nothing to publish, TaskExecution event is nil")
}

return &event.CloudEventTaskExecution{
RawEvent: rawEvent,
OutputData: outputs,
OutputInterface: task.Closure.CompiledTask.Template.Interface,
InputData: inputs,
ScheduledAt: spec.GetMetadata().GetScheduledAt(),
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
RawEvent: rawEvent,
}, nil
}

Expand Down Expand Up @@ -359,6 +432,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
phase = e.Phase.String()
eventTime = e.OccurredAt.AsTime()
eventID = fmt.Sprintf("%v.%v", executionID, phase)
eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id)
finalMsg, err = c.TransformNodeExecutionEvent(ctx, e)
case *event.CloudEventExecutionStart:
topic = "cloudevents.ExecutionStart"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
}

go func() {
ceCtx := context.Background()
ceCtx := context.TODO()
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
Expand Down
63 changes: 46 additions & 17 deletions flyteartifacts/pkg/server/processor/events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2
return s.HandleEventTaskExec(ctx, source, msgType)
case *event.CloudEventNodeExecution:
logger.Debugf(ctx, "Handling CloudEventNodeExecution [%v]", msgType.RawEvent.Id)
return s.HandleEventNodeExec(ctx, msgType)
return s.HandleEventNodeExec(ctx, source, msgType)
default:
return fmt.Errorf("HandleEvent found unknown message type [%T]", msgType)
}
}

func (s *ServiceCallHandler) HandleEventExecStart(_ context.Context, _ *event.CloudEventExecutionStart) error {
// metric
return nil
}

Expand Down Expand Up @@ -171,18 +172,57 @@ func getPartitionsAndTag(ctx context.Context, partialID core.ArtifactID, variabl
return partitions, tag, nil
}

func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source string, evt *event.CloudEventTaskExecution) error {
func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, _ string, evt *event.CloudEventTaskExecution) error {

if evt.RawEvent.Phase != core.TaskExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
// metric

execID := evt.RawEvent.ParentNodeExecutionId.ExecutionId
return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(ctx context.Context, source string, evt *event.CloudEventNodeExecution) error {
if evt.RawEvent.Phase != core.NodeExecution_SUCCEEDED {
logger.Debug(ctx, "Skipping non-successful task execution event")
return nil
}
if evt.RawEvent.Id.NodeId == "end-node" {
logger.Debug(ctx, "Skipping end node for %s", evt.RawEvent.Id.ExecutionId.Name)
return nil
}
// metric

execID := evt.RawEvent.Id.ExecutionId
if evt.GetOutputData().GetLiterals() == nil || len(evt.OutputData.Literals) == 0 {
logger.Debugf(ctx, "No output data to process for task event from [%v]: %s", execID, evt.RawEvent.TaskId.Name)
logger.Debugf(ctx, "No output data to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
}

if evt.OutputInterface == nil {
if evt.GetOutputData() != nil {
// metric this as error
logger.Errorf(ctx, "No output interface to process for task event from [%s] node %s, but output data is not nil", execID, evt.RawEvent.Id.NodeId)
}
logger.Debugf(ctx, "No output interface to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

if evt.RawEvent.GetTaskNodeMetadata() != nil {
if evt.RawEvent.GetTaskNodeMetadata().CacheStatus == core.CatalogCacheStatus_CACHE_HIT {
logger.Debugf(ctx, "Skipping cache hit for %s", evt.RawEvent.Id)
return nil
}
}
var taskExecID *core.TaskExecutionIdentifier
if taskExecID = evt.GetTaskExecId(); taskExecID == nil {
logger.Debugf(ctx, "No task execution id to process for task event from [%s] node %s", execID, evt.RawEvent.Id.NodeId)
return nil
}

// See note on the cloudevent_publisher side, we'll have to call one of the get data endpoints to get the actual data
// rather than reading them here. But read here for now.

// Iterate through the output interface. For any outputs that have an artifact ID specified, grab the
// output Literal and construct a Create request and call the service.
for varName, variable := range evt.OutputInterface.Outputs.Variables {
Expand All @@ -191,14 +231,8 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str

output := evt.OutputData.Literals[varName]

taskExecID := core.TaskExecutionIdentifier{
TaskId: evt.RawEvent.TaskId,
NodeExecutionId: evt.RawEvent.ParentNodeExecutionId,
RetryAttempt: evt.RawEvent.RetryAttempt,
}

// Add a tracking tag to the Literal before saving.
version := fmt.Sprintf("%s/%s", source, varName)
version := fmt.Sprintf("%s/%d/%s", source, taskExecID.RetryAttempt, varName)
trackingTag := fmt.Sprintf("%s/%s/%s", execID.Project, execID.Domain, version)
if output.Metadata == nil {
output.Metadata = make(map[string]string, 1)
Expand All @@ -208,7 +242,7 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
spec := artifact.ArtifactSpec{
Value: output,
Type: evt.OutputInterface.Outputs.Variables[varName].Type,
TaskExecution: &taskExecID,
TaskExecution: taskExecID,
Execution: execID,
}

Expand Down Expand Up @@ -253,11 +287,6 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str
logger.Debugf(ctx, "Created artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName)
}
}

return nil
}

func (s *ServiceCallHandler) HandleEventNodeExec(_ context.Context, _ *event.CloudEventNodeExecution) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions flyteidl/gen/pb-cpp/flyteidl/artifact/artifacts.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading