diff --git a/flyteadmin/pkg/rpc/adminservice/base.go b/flyteadmin/pkg/rpc/adminservice/base.go index 491a24a1f03..651f170fd66 100644 --- a/flyteadmin/pkg/rpc/adminservice/base.go +++ b/flyteadmin/pkg/rpc/adminservice/base.go @@ -77,7 +77,13 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi repo) workflowBuilder := workflowengineImpl.NewFlyteWorkflowBuilder( adminScope.NewSubScope("builder").NewSubScope("flytepropeller")) - workflowExecutor := workflowengineImpl.NewK8sWorkflowExecutor(configuration, execCluster, workflowBuilder, dataStorageClient) + + executionEventWriter := eventWriter.NewWorkflowExecutionEventWriter(repo, applicationConfiguration.GetAsyncEventsBufferSize()) + go func() { + executionEventWriter.Run() + }() + + workflowExecutor := workflowengineImpl.NewK8sWorkflowExecutor(configuration, execCluster, workflowBuilder, dataStorageClient, executionEventWriter) logger.Info(ctx, "Successfully created a workflow executor engine") pluginRegistry.RegisterDefault(plugins.PluginIDWorkflowExecutor, workflowExecutor) @@ -121,11 +127,6 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi namedEntityManager := manager.NewNamedEntityManager(repo, configuration, adminScope.NewSubScope("named_entity_manager")) descriptionEntityManager := manager.NewDescriptionEntityManager(repo, configuration, adminScope.NewSubScope("description_entity_manager")) - executionEventWriter := eventWriter.NewWorkflowExecutionEventWriter(repo, applicationConfiguration.GetAsyncEventsBufferSize()) - go func() { - executionEventWriter.Run() - }() - executionManager := manager.NewExecutionManager(repo, pluginRegistry, configuration, dataStorageClient, adminScope.NewSubScope("execution_manager"), adminScope.NewSubScope("user_execution_metrics"), publisher, urlData, workflowManager, namedEntityManager, eventPublisher, cloudEventPublisher, executionEventWriter) diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index d941cc8309d..2837296e758 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -3,17 +3,21 @@ package impl import ( "context" - "google.golang.org/grpc/codes" - k8_api_err "k8s.io/apimachinery/pkg/api/errors" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + eventsInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster" execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + event "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" + "google.golang.org/grpc/codes" + "google.golang.org/protobuf/types/known/timestamppb" + k8_api_err "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var deletePropagationBackground = v1.DeletePropagationBackground @@ -23,10 +27,11 @@ const defaultIdentifier = "DefaultK8sExecutor" // K8sWorkflowExecutor directly creates and delete Flyte workflow execution CRD objects using the configured execution // cluster interface. type K8sWorkflowExecutor struct { - config runtimeInterfaces.Configuration - executionCluster execClusterInterfaces.ClusterInterface - workflowBuilder interfaces.FlyteWorkflowBuilder - storageClient *storage.DataStore + config runtimeInterfaces.Configuration + executionCluster execClusterInterfaces.ClusterInterface + workflowBuilder interfaces.FlyteWorkflowBuilder + storageClient *storage.DataStore + executionEventWriter eventsInterfaces.WorkflowExecutionEventWriter } func (e K8sWorkflowExecutor) ID() string { @@ -101,15 +106,35 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat if err != nil && !k8_api_err.IsNotFound(err) { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) } + + e.executionEventWriter.Write(&admin.WorkflowExecutionEventRequest{ + Event: &event.WorkflowExecutionEvent{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: data.ExecutionID.Project, + Domain: data.ExecutionID.Domain, + Name: data.ExecutionID.Name, + }, + Phase: core.WorkflowExecution_ABORTED, + ProducerId: "k8s_executor", + OccurredAt: timestamppb.Now(), + OutputResult: &event.WorkflowExecutionEvent_Error{ + Error: &core.ExecutionError{ + Code: "ExecutionAborted", + Message: "Execution aborted", + }, + }, + }, + }) return nil } -func NewK8sWorkflowExecutor(config runtimeInterfaces.Configuration, executionCluster execClusterInterfaces.ClusterInterface, workflowBuilder interfaces.FlyteWorkflowBuilder, client *storage.DataStore) *K8sWorkflowExecutor { +func NewK8sWorkflowExecutor(config runtimeInterfaces.Configuration, executionCluster execClusterInterfaces.ClusterInterface, workflowBuilder interfaces.FlyteWorkflowBuilder, client *storage.DataStore, executionEventWriter eventsInterfaces.WorkflowExecutionEventWriter) *K8sWorkflowExecutor { return &K8sWorkflowExecutor{ - config: config, - executionCluster: executionCluster, - workflowBuilder: workflowBuilder, - storageClient: client, + config: config, + executionCluster: executionCluster, + workflowBuilder: workflowBuilder, + storageClient: client, + executionEventWriter: executionEventWriter, } }