Skip to content

Commit

Permalink
Write abort event inside k8s executor
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Gomez Ferrer <[email protected]>
  • Loading branch information
andresgomezfrr committed Sep 30, 2024
1 parent 66ff152 commit bcba038
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 19 deletions.
13 changes: 7 additions & 6 deletions flyteadmin/pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi
repo)
workflowBuilder := workflowengineImpl.NewFlyteWorkflowBuilder(
adminScope.NewSubScope("builder").NewSubScope("flytepropeller"))
workflowExecutor := workflowengineImpl.NewK8sWorkflowExecutor(configuration, execCluster, workflowBuilder, dataStorageClient)

executionEventWriter := eventWriter.NewWorkflowExecutionEventWriter(repo, applicationConfiguration.GetAsyncEventsBufferSize())
go func() {
executionEventWriter.Run()
}()

workflowExecutor := workflowengineImpl.NewK8sWorkflowExecutor(configuration, execCluster, workflowBuilder, dataStorageClient, executionEventWriter)
logger.Info(ctx, "Successfully created a workflow executor engine")
pluginRegistry.RegisterDefault(plugins.PluginIDWorkflowExecutor, workflowExecutor)

Expand Down Expand Up @@ -121,11 +127,6 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi
namedEntityManager := manager.NewNamedEntityManager(repo, configuration, adminScope.NewSubScope("named_entity_manager"))
descriptionEntityManager := manager.NewDescriptionEntityManager(repo, configuration, adminScope.NewSubScope("description_entity_manager"))

executionEventWriter := eventWriter.NewWorkflowExecutionEventWriter(repo, applicationConfiguration.GetAsyncEventsBufferSize())
go func() {
executionEventWriter.Run()
}()

executionManager := manager.NewExecutionManager(repo, pluginRegistry, configuration, dataStorageClient,
adminScope.NewSubScope("execution_manager"), adminScope.NewSubScope("user_execution_metrics"),
publisher, urlData, workflowManager, namedEntityManager, eventPublisher, cloudEventPublisher, executionEventWriter)
Expand Down
51 changes: 38 additions & 13 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package impl
import (
"context"

"google.golang.org/grpc/codes"
k8_api_err "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventsInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster"
execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
event "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
k8_api_err "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var deletePropagationBackground = v1.DeletePropagationBackground
Expand All @@ -23,10 +27,11 @@ const defaultIdentifier = "DefaultK8sExecutor"
// K8sWorkflowExecutor directly creates and delete Flyte workflow execution CRD objects using the configured execution
// cluster interface.
type K8sWorkflowExecutor struct {
config runtimeInterfaces.Configuration
executionCluster execClusterInterfaces.ClusterInterface
workflowBuilder interfaces.FlyteWorkflowBuilder
storageClient *storage.DataStore
config runtimeInterfaces.Configuration
executionCluster execClusterInterfaces.ClusterInterface
workflowBuilder interfaces.FlyteWorkflowBuilder
storageClient *storage.DataStore
executionEventWriter eventsInterfaces.WorkflowExecutionEventWriter
}

func (e K8sWorkflowExecutor) ID() string {
Expand Down Expand Up @@ -101,15 +106,35 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
}

e.executionEventWriter.Write(&admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Name: data.ExecutionID.Name,
},
Phase: core.WorkflowExecution_ABORTED,
ProducerId: "k8s_executor",
OccurredAt: timestamppb.Now(),
OutputResult: &event.WorkflowExecutionEvent_Error{
Error: &core.ExecutionError{
Code: "ExecutionAborted",
Message: "Execution aborted",
},
},
},
})
return nil
}

func NewK8sWorkflowExecutor(config runtimeInterfaces.Configuration, executionCluster execClusterInterfaces.ClusterInterface, workflowBuilder interfaces.FlyteWorkflowBuilder, client *storage.DataStore) *K8sWorkflowExecutor {
func NewK8sWorkflowExecutor(config runtimeInterfaces.Configuration, executionCluster execClusterInterfaces.ClusterInterface, workflowBuilder interfaces.FlyteWorkflowBuilder, client *storage.DataStore, executionEventWriter eventsInterfaces.WorkflowExecutionEventWriter) *K8sWorkflowExecutor {

return &K8sWorkflowExecutor{
config: config,
executionCluster: executionCluster,
workflowBuilder: workflowBuilder,
storageClient: client,
config: config,
executionCluster: executionCluster,
workflowBuilder: workflowBuilder,
storageClient: client,
executionEventWriter: executionEventWriter,
}
}

0 comments on commit bcba038

Please sign in to comment.