Skip to content

Commit

Permalink
Merge branch 'master' into monorepo-move/flyteadmin/upgrade-coreos/go…
Browse files Browse the repository at this point in the history
…-oidc
  • Loading branch information
EngHabu authored Nov 28, 2023
2 parents 4e4be1f + 4c1d993 commit 34649bc
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 105 deletions.
43 changes: 23 additions & 20 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,26 +845,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
executionParameters.RecoveryExecution = request.Spec.Metadata.ReferenceExecution
}

workflowExecutor := plugins.Get[workflowengineInterfaces.WorkflowExecutor](m.pluginRegistry, plugins.PluginIDWorkflowExecutor)
execInfo, err := workflowExecutor.Execute(ctx, workflowengineInterfaces.ExecutionData{
Namespace: namespace,
ExecutionID: &workflowExecutionID,
ReferenceWorkflowName: workflow.Id.Name,
ReferenceLaunchPlanName: launchPlan.Id.Name,
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
})

if err != nil {
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, err)
return nil, nil, err
}
executionCreatedAt := time.Now()
acceptanceDelay := executionCreatedAt.Sub(requestedAt)
m.systemMetrics.AcceptanceDelay.Observe(acceptanceDelay.Seconds())

// Request notification settings takes precedence over the launch plan settings.
// If there is no notification in the request and DisableAll is not true, use the settings from the launch plan.
Expand All @@ -879,7 +861,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
notificationsSettings = make([]*admin.Notification, 0)
}

executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{
createExecModelInput := transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: requestSpec,
LaunchPlanID: launchPlanModel.ID,
Expand All @@ -891,13 +873,34 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
SourceExecutionID: sourceExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
SecurityContext: executionConfig.SecurityContext,
LaunchEntity: launchPlan.Id.ResourceType,
Namespace: namespace,
}

workflowExecutor := plugins.Get[workflowengineInterfaces.WorkflowExecutor](m.pluginRegistry, plugins.PluginIDWorkflowExecutor)
execInfo, execErr := workflowExecutor.Execute(ctx, workflowengineInterfaces.ExecutionData{
Namespace: namespace,
ExecutionID: &workflowExecutionID,
ReferenceWorkflowName: workflow.Id.Name,
ReferenceLaunchPlanName: launchPlan.Id.Name,
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
})
if execErr != nil {
createExecModelInput.Error = execErr
m.systemMetrics.PropellerFailures.Inc()
logger.Infof(ctx, "failed to execute workflow %+v with execution id %+v and inputs %+v with err %v",
request, workflowExecutionID, executionInputs, execErr)
} else {
m.systemMetrics.AcceptanceDelay.Observe(acceptanceDelay.Seconds())
createExecModelInput.Cluster = execInfo.Cluster
}

executionModel, err := transformers.CreateExecutionModel(createExecModelInput)
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
workflowExecutionID, err)
Expand Down
50 changes: 42 additions & 8 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const (
principal = "principal"
rawOutput = "raw_output"
)

var spec = testutils.GetExecutionRequest().Spec
var specBytes, _ = proto.Marshal(spec)
var phase = core.WorkflowExecution_RUNNING.String()
Expand Down Expand Up @@ -296,8 +301,6 @@ func TestCreateExecution(t *testing.T) {
Labels: &labels}), nil
}

principal := "principal"
rawOutput := "raw_output"
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
Expand Down Expand Up @@ -421,7 +424,6 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
},
)

principal := "feeny"
getExecutionCalled := false
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
Expand Down Expand Up @@ -618,6 +620,7 @@ func TestCreateExecutionInCompatibleInputs(t *testing.T) {
}

func TestCreateExecutionPropellerFailure(t *testing.T) {
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
expectedErr := flyteAdminErrors.NewFlyteAdminErrorf(codes.Internal, "ABC")
Expand All @@ -626,13 +629,45 @@ func TestCreateExecutionPropellerFailure(t *testing.T) {
mockExecutor.OnID().Return("customMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

qosProvider := &runtimeIFaceMocks.QualityOfServiceConfiguration{}
qosProvider.OnGetTierExecutionValues().Return(map[core.QualityOfService_Tier]core.QualityOfServiceSpec{
core.QualityOfService_HIGH: {
QueueingBudget: ptypes.DurationProto(10 * time.Minute),
},
core.QualityOfService_MEDIUM: {
QueueingBudget: ptypes.DurationProto(20 * time.Minute),
},
core.QualityOfService_LOW: {
QueueingBudget: ptypes.DurationProto(30 * time.Minute),
},
})

qosProvider.OnGetDefaultTiers().Return(map[string]core.QualityOfService_Tier{
"domain": core.QualityOfService_HIGH,
})

mockConfig := getMockExecutionsConfigProvider()
mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider)

request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Principal: "unused - populated from authenticated context",
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}
request.Spec.ClusterAssignment = &clusterAssignment

response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.EqualError(t, err, expectedErr.Error())
assert.Nil(t, response)
identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
ctx := identity.WithContext(context.Background())
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

expectedResponse := &admin.ExecutionCreateResponse{Id: &executionIdentifier}

response, err := execManager.CreateExecution(ctx, request, requestedAt)

assert.NoError(t, err)
assert.Equal(t, expectedResponse, response)
}

func TestCreateExecutionDatabaseFailure(t *testing.T) {
Expand Down Expand Up @@ -3379,7 +3414,6 @@ func TestTerminateExecution(t *testing.T) {
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)

abortCause := "abort cause"
principal := "principal"
updateExecutionFunc := func(
context context.Context, execution models.Execution) error {
assert.Equal(t, "project", execution.Project)
Expand Down
55 changes: 36 additions & 19 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformers

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -13,7 +14,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
flyteErrs "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -45,6 +46,7 @@ type CreateExecutionModelInput struct {
SecurityContext *core.SecurityContext
LaunchEntity core.ResourceType
Namespace string
Error error
}

type ExecutionTransformerOptions struct {
Expand All @@ -70,12 +72,9 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
requestSpec.SecurityContext = input.SecurityContext
spec, err := proto.Marshal(requestSpec)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
}
createdAt, err := ptypes.TimestampProto(input.CreatedAt)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to serialize execution created at time")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to serialize execution spec: %v", err)
}
createdAt := timestamppb.New(input.CreatedAt)
closure := admin.ExecutionClosure{
Phase: input.Phase,
CreatedAt: createdAt,
Expand All @@ -91,11 +90,29 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
if input.Phase == core.WorkflowExecution_RUNNING {
closure.StartedAt = createdAt
}
if input.Error != nil {
closure.Phase = core.WorkflowExecution_FAILED
execErr := &core.ExecutionError{
Code: "Unknown",
Message: input.Error.Error(),
Kind: core.ExecutionError_SYSTEM,
}

var adminErr flyteErrs.FlyteAdminError
if errors.As(input.Error, &adminErr) {
execErr.Code = adminErr.Code().String()
execErr.Message = adminErr.Error()
if adminErr.Code() == codes.InvalidArgument {
execErr.Kind = core.ExecutionError_USER
}
}
closure.OutputResult = &admin.ExecutionClosure_Error{Error: execErr}
}

closureBytes, err := proto.Marshal(&closure)

if err != nil {
return nil, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")
return nil, flyteErrs.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan status")
}

activeExecution := int32(admin.ExecutionState_EXECUTION_ACTIVE)
Expand Down Expand Up @@ -147,7 +164,7 @@ func reassignCluster(ctx context.Context, cluster string, executionID *core.Work
var executionSpec admin.ExecutionSpec
err := proto.Unmarshal(execution.Spec, &executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution spec: %v", err)
}
if executionSpec.Metadata == nil {
executionSpec.Metadata = &admin.ExecutionMetadata{}
Expand All @@ -158,7 +175,7 @@ func reassignCluster(ctx context.Context, cluster string, executionID *core.Work
executionSpec.Metadata.SystemMetadata.ExecutionCluster = cluster
marshaledSpec, err := proto.Marshal(&executionSpec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
}
execution.Spec = marshaledSpec
return nil
Expand All @@ -172,15 +189,15 @@ func UpdateExecutionModelState(
var executionClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
executionClosure.Phase = request.Event.Phase
executionClosure.UpdatedAt = request.Event.OccurredAt
execution.Phase = request.Event.Phase.String()

occurredAtTimestamp, err := ptypes.Timestamp(request.Event.OccurredAt)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to parse OccurredAt: %v", err)
}
execution.ExecutionUpdatedAt = &occurredAtTimestamp

Expand Down Expand Up @@ -210,7 +227,7 @@ func UpdateExecutionModelState(
errorMsg := fmt.Sprintf("Cannot accept events for running/terminated execution [%v] from cluster [%s],"+
"expected events to originate from [%s]",
request.Event.ExecutionId, request.Event.ProducerId, execution.Cluster)
return errors.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
return flyteErrs.NewIncompatibleClusterError(ctx, errorMsg, execution.Cluster)
}
}

Expand Down Expand Up @@ -253,7 +270,7 @@ func UpdateExecutionModelState(
}
marshaledClosure, err := proto.Marshal(&executionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
return nil
Expand All @@ -267,7 +284,7 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st
var closure admin.ExecutionClosure
err := proto.Unmarshal(executionModel.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
// Update the indexed columns
stateInt := int32(stateUpdatedTo)
Expand All @@ -286,7 +303,7 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st
}
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
executionModel.Closure = marshaledClosure
return nil
Expand All @@ -298,7 +315,7 @@ func SetExecutionAborting(execution *models.Execution, cause, principal string)
var closure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err)
}
closure.OutputResult = &admin.ExecutionClosure_AbortMetadata{
AbortMetadata: &admin.AbortMetadata{
Expand All @@ -309,7 +326,7 @@ func SetExecutionAborting(execution *models.Execution, cause, principal string)
closure.Phase = core.WorkflowExecution_ABORTING
marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
return flyteErrs.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err)
}
execution.Closure = marshaledClosure
execution.AbortCause = cause
Expand All @@ -329,7 +346,7 @@ func FromExecutionModel(ctx context.Context, executionModel models.Execution, op
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}
if len(opts.DefaultNamespace) > 0 {
if spec.Metadata == nil {
Expand All @@ -346,7 +363,7 @@ func FromExecutionModel(ctx context.Context, executionModel models.Execution, op

var closure admin.ExecutionClosure
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
return nil, flyteErrs.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
Expand Down
Loading

0 comments on commit 34649bc

Please sign in to comment.