From 8a895b9fda926cd5fc3f225c3ea70a48faaede90 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 23 Nov 2023 15:22:27 +0800 Subject: [PATCH] add a feature gate (#4472) Signed-off-by: Yee Hing Tong --- .../pkg/artifacts/artifact_client_test.go | 12 ++ flyteadmin/pkg/artifacts/config.go | 1 - flyteadmin/pkg/artifacts/registry.go | 15 ++- flyteadmin/pkg/artifacts/registry_test.go | 28 +++++ .../manager/impl/exec_manager_other_test.go | 63 ++++++++++ .../pkg/manager/impl/execution_manager.go | 113 ++++++++++-------- .../pkg/manager/impl/launch_plan_manager.go | 28 +++-- flyteadmin/pkg/manager/impl/task_manager.go | 25 ++-- .../impl/validation/launch_plan_validator.go | 10 +- .../pkg/manager/impl/validation/validation.go | 16 +++ .../pkg/manager/impl/workflow_manager.go | 24 ++-- flyteadmin/pkg/rpc/adminservice/base.go | 9 +- .../interfaces/application_configuration.go | 24 +--- flyteartifacts/pkg/db/gorm_transformers.go | 2 - 14 files changed, 255 insertions(+), 115 deletions(-) create mode 100644 flyteadmin/pkg/artifacts/artifact_client_test.go create mode 100644 flyteadmin/pkg/artifacts/registry_test.go create mode 100644 flyteadmin/pkg/manager/impl/exec_manager_other_test.go diff --git a/flyteadmin/pkg/artifacts/artifact_client_test.go b/flyteadmin/pkg/artifacts/artifact_client_test.go new file mode 100644 index 0000000000..64a886c5f5 --- /dev/null +++ b/flyteadmin/pkg/artifacts/artifact_client_test.go @@ -0,0 +1,12 @@ +package artifacts + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestEmpty(t *testing.T) { + c := InitializeArtifactClient(context.Background(), nil) + assert.Nil(t, c) +} diff --git a/flyteadmin/pkg/artifacts/config.go b/flyteadmin/pkg/artifacts/config.go index 2cb5be9cc8..c639666ffe 100644 --- a/flyteadmin/pkg/artifacts/config.go +++ b/flyteadmin/pkg/artifacts/config.go @@ -1,7 +1,6 @@ package artifacts // gatepr: add proper config bits for this -// eduardo to consider moving to idl clients. type Config struct { Host string `json:"host"` Port int `json:"port"` diff --git a/flyteadmin/pkg/artifacts/registry.go b/flyteadmin/pkg/artifacts/registry.go index cee436c1d6..bcc6f35d1c 100644 --- a/flyteadmin/pkg/artifacts/registry.go +++ b/flyteadmin/pkg/artifacts/registry.go @@ -3,7 +3,6 @@ package artifacts import ( "context" "fmt" - "google.golang.org/grpc" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" @@ -18,10 +17,11 @@ type ArtifactRegistry struct { } func (a *ArtifactRegistry) RegisterArtifactProducer(ctx context.Context, id *core.Identifier, ti core.TypedInterface) { - if a.client == nil { + if a == nil || a.client == nil { logger.Debugf(ctx, "Artifact client not configured, skipping registration for task [%+v]", id) return } + ap := &artifact.ArtifactProducer{ EntityId: id, Outputs: ti.Outputs, @@ -36,7 +36,7 @@ func (a *ArtifactRegistry) RegisterArtifactProducer(ctx context.Context, id *cor } func (a *ArtifactRegistry) RegisterArtifactConsumer(ctx context.Context, id *core.Identifier, pm core.ParameterMap) { - if a.client == nil { + if a == nil || a.client == nil { logger.Debugf(ctx, "Artifact client not configured, skipping registration for consumer [%+v]", id) return } @@ -54,7 +54,7 @@ func (a *ArtifactRegistry) RegisterArtifactConsumer(ctx context.Context, id *cor } func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.LaunchPlan) error { - if a.client == nil { + if a == nil || a.client == nil { logger.Debugf(ctx, "Artifact client not configured, skipping trigger [%+v]", plan) return fmt.Errorf("artifact client not configured") } @@ -70,11 +70,14 @@ func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.Laun } func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient { + if a == nil { + return nil + } return a.client } -func NewArtifactRegistry(ctx context.Context, config *Config, opts ...grpc.DialOption) ArtifactRegistry { - return ArtifactRegistry{ +func NewArtifactRegistry(ctx context.Context, config *Config, opts ...grpc.DialOption) *ArtifactRegistry { + return &ArtifactRegistry{ client: InitializeArtifactClient(ctx, config, opts...), } } diff --git a/flyteadmin/pkg/artifacts/registry_test.go b/flyteadmin/pkg/artifacts/registry_test.go new file mode 100644 index 0000000000..49b5de52e8 --- /dev/null +++ b/flyteadmin/pkg/artifacts/registry_test.go @@ -0,0 +1,28 @@ +package artifacts + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestRegistryNoClient(t *testing.T) { + r := NewArtifactRegistry(context.Background(), nil) + assert.Nil(t, r.GetClient()) +} + +type Parent struct { + R *ArtifactRegistry +} + +func TestPointerReceivers(t *testing.T) { + p := Parent{} + nilClient := p.R.GetClient() + assert.Nil(t, nilClient) +} + +func TestNilCheck(t *testing.T) { + r := NewArtifactRegistry(context.Background(), nil) + err := r.RegisterTrigger(context.Background(), nil) + assert.NotNil(t, err) +} diff --git a/flyteadmin/pkg/manager/impl/exec_manager_other_test.go b/flyteadmin/pkg/manager/impl/exec_manager_other_test.go new file mode 100644 index 0000000000..84f86419d9 --- /dev/null +++ b/flyteadmin/pkg/manager/impl/exec_manager_other_test.go @@ -0,0 +1,63 @@ +package impl + +import ( + "context" + "fmt" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" + eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + mockScope "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestResolveNotWorking(t *testing.T) { + mockConfig := getMockExecutionsConfigProvider() + + execManager := NewExecutionManager(nil, nil, mockConfig, nil, mockScope.NewTestScope(), mockScope.NewTestScope(), nil, nil, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil)).(*ExecutionManager) + + pm, artifactIDs, err := execManager.ResolveParameterMapArtifacts(context.Background(), nil, nil) + assert.Nil(t, err) + fmt.Println(pm, artifactIDs) + +} + +func TestTrackingBitExtract(t *testing.T) { + mockConfig := getMockExecutionsConfigProvider() + + execManager := NewExecutionManager(nil, nil, mockConfig, nil, mockScope.NewTestScope(), mockScope.NewTestScope(), nil, nil, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil)).(*ExecutionManager) + + lit := core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 1, + }, + }, + }, + }, + }, + Metadata: map[string]string{"_ua": "proj/domain/name@version"}, + } + inputMap := core.LiteralMap{ + Literals: map[string]*core.Literal{ + "a": &lit, + }, + } + inputColl := core.LiteralCollection{ + Literals: []*core.Literal{ + &lit, + }, + } + + trackers := execManager.ExtractArtifactKeys(&lit) + assert.Equal(t, 1, len(trackers)) + + trackers = execManager.ExtractArtifactKeys(&core.Literal{Value: &core.Literal_Map{Map: &inputMap}}) + assert.Equal(t, 1, len(trackers)) + trackers = execManager.ExtractArtifactKeys(&core.Literal{Value: &core.Literal_Collection{Collection: &inputColl}}) + assert.Equal(t, 1, len(trackers)) + assert.Equal(t, "proj/domain/name@version", trackers[0]) +} diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index b21e762cdc..415fe2c21a 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -94,7 +94,7 @@ type ExecutionManager struct { cloudEventPublisher notificationInterfaces.Publisher dbEventWriter eventWriter.WorkflowExecutionEventWriter pluginRegistry *plugins.Registry - artifactRegistry artifacts.ArtifactRegistry + artifactRegistry *artifacts.ArtifactRegistry } func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context { @@ -872,10 +872,10 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar } // ResolveParameterMapArtifacts will go through the parameter map, and resolve any artifact queries. -func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inputs *core.ParameterMap, inputsForQueryTemplating map[string]*core.Literal) (*core.ParameterMap, map[string]*core.ArtifactID, error) { +func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inputs *core.ParameterMap, inputsForQueryTemplating map[string]*core.Literal) (*core.ParameterMap, []*core.ArtifactID, error) { // only top level replace for now. Need to make this recursive - var artifactIDs = make(map[string]*core.ArtifactID) + var artifactIDs []*core.ArtifactID if inputs == nil { return nil, artifactIDs, nil } @@ -911,7 +911,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp if err != nil { return nil, nil, err } - artifactIDs[k] = resp.Artifact.GetArtifactId() + artifactIDs = append(artifactIDs, resp.Artifact.GetArtifactId()) logger.Debugf(ctx, "Resolved query for [%s] to [%+v]", k, resp.Artifact.ArtifactId) outputs[k] = &core.Parameter{ Var: v.Var, @@ -931,7 +931,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp if err != nil { return nil, nil, err } - artifactIDs[k] = v.GetArtifactId() + artifactIDs = append(artifactIDs, v.GetArtifactId()) logger.Debugf(ctx, "Using specified artifactID for [%+v] for [%s]", v.GetArtifactId(), k) outputs[k] = &core.Parameter{ Var: v.Var, @@ -974,60 +974,69 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( return nil, nil, err } - // Literals may have an artifact key in the metadata field. This is something the artifact service should have - // added. Pull these back out so we can keep track of them for lineage purposes. Use a dummy wrapper object for - // easier recursion. - requestInputMap := &core.Literal{ - Value: &core.Literal_Map{Map: request.Inputs}, - } - fixedInputMap := &core.Literal{ - Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs}, - } - requestInputArtifactKeys := m.ExtractArtifactKeys(requestInputMap) - fixedInputArtifactKeys := m.ExtractArtifactKeys(fixedInputMap) - requestInputArtifactKeys = append(requestInputArtifactKeys, fixedInputArtifactKeys...) - - // Put together the inputs that we've already resolved so that the artifact querying bit can fill them in. - // This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct. - var inputsForQueryTemplating = make(map[string]*core.Literal) - if request.Inputs != nil { - for k, v := range request.Inputs.Literals { + // TODO: Artifact feature gate, remove when ready + var lpExpectedInputs *core.ParameterMap + var artifactTrackers []string + var usedArtifactIDs []*core.ArtifactID + if m.artifactRegistry.GetClient() != nil { + // Literals may have an artifact key in the metadata field. This is something the artifact service should have + // added. Pull these back out so we can keep track of them for lineage purposes. Use a dummy wrapper object for + // easier recursion. + requestInputMap := &core.Literal{ + Value: &core.Literal_Map{Map: request.Inputs}, + } + fixedInputMap := &core.Literal{ + Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs}, + } + artifactTrackers = m.ExtractArtifactKeys(requestInputMap) + fixedInputArtifactKeys := m.ExtractArtifactKeys(fixedInputMap) + artifactTrackers = append(artifactTrackers, fixedInputArtifactKeys...) + + // Put together the inputs that we've already resolved so that the artifact querying bit can fill them in. + // This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct. + var inputsForQueryTemplating = make(map[string]*core.Literal) + if request.Inputs != nil { + for k, v := range request.Inputs.Literals { + inputsForQueryTemplating[k] = v + } + } + for k, v := range launchPlan.Spec.FixedInputs.Literals { inputsForQueryTemplating[k] = v } - } - for k, v := range launchPlan.Spec.FixedInputs.Literals { - inputsForQueryTemplating[k] = v - } - logger.Debugf(ctx, "Inputs for query templating: [%+v]", inputsForQueryTemplating) + logger.Debugf(ctx, "Inputs for query templating: [%+v]", inputsForQueryTemplating) + + // Resolve artifact queries + // Within the launch plan, the artifact will be in the Parameter map, and can come in form of an ArtifactID, + // or as an ArtifactQuery. + // Also send in the inputsForQueryTemplating for two reasons, so we don't run queries for things we don't need to + // and so we can fill in template args. + // ArtifactIDs are also returned for lineage purposes. + resolvedExpectedInputs, usedArtifactIDs, err := m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating) + if err != nil { + logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err) + return nil, nil, err + } - // Resolve artifact queries - // Within the launch plan, the artifact will be in the Parameter map, and can come in form of an ArtifactID, - // or as an ArtifactQuery. - // Also send in the inputsForQueryTemplating for two reasons, so we don't run queries for things we don't need to - // and so we can fill in template args. - // ArtifactIDs are also returned for lineage purposes. - resolvedExpectedInputs, usedArtifactIDs, err := m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating) - if err != nil { - logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err) - return nil, nil, err - } + logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs) + logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers) + logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs) - logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs) - logger.Debugf(ctx, "Found artifact keys: %v", requestInputArtifactKeys) - logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs) + } else { + lpExpectedInputs = launchPlan.Closure.ExpectedInputs + } // Artifacts retrieved will need to be stored somewhere to ensure that we can re-emit events if necessary // in the future, and also to make sure that relaunch and recover can use it if necessary. executionInputs, err := validation.CheckAndFetchInputsForExecution( request.Inputs, launchPlan.Spec.FixedInputs, - resolvedExpectedInputs, + lpExpectedInputs, ) if err != nil { logger.Debugf(ctx, "Failed to CheckAndFetchInputsForExecution with request.Inputs: %+v"+ "fixed inputs: %+v and expected inputs: %+v with err %v", - request.Inputs, launchPlan.Spec.FixedInputs, resolvedExpectedInputs, err) + request.Inputs, launchPlan.Spec.FixedInputs, lpExpectedInputs, err) return nil, nil, err } @@ -1061,13 +1070,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( requestSpec.Metadata = &admin.ExecutionMetadata{} } requestSpec.Metadata.Principal = getUser(ctx) - // Construct a list of the values and save to request spec metadata. - // Avoids going through the model creation step. - artifactIDs := make([]*core.ArtifactID, 0, len(usedArtifactIDs)) - for _, value := range usedArtifactIDs { - artifactIDs = append(artifactIDs, value) - } - requestSpec.Metadata.ArtifactIds = artifactIDs + requestSpec.Metadata.ArtifactIds = usedArtifactIDs // Get the node and parent execution (if any) that launched this execution var parentNodeExecutionID uint @@ -1185,7 +1188,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( notificationsSettings = make([]*admin.Notification, 0) } - m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, requestInputArtifactKeys, requestSpec.Metadata.ArtifactIds) + // Publish of event is also gated on the artifact client being available, even though it's not directly required. + // TODO: Artifact feature gate, remove when ready + if m.artifactRegistry.GetClient() != nil { + m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs) + } executionModel, err := transformers.CreateExecutionModel(transformers.CreateExecutionModelInput{ WorkflowExecutionID: workflowExecutionID, @@ -1958,7 +1965,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface, workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface, eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher, - eventWriter eventWriter.WorkflowExecutionEventWriter, artifactRegistry artifacts.ArtifactRegistry) interfaces.ExecutionInterface { + eventWriter eventWriter.WorkflowExecutionEventWriter, artifactRegistry *artifacts.ArtifactRegistry) interfaces.ExecutionInterface { queueAllocator := executions.NewQueueAllocator(config, db) systemMetrics := newExecutionSystemMetrics(systemScope) diff --git a/flyteadmin/pkg/manager/impl/launch_plan_manager.go b/flyteadmin/pkg/manager/impl/launch_plan_manager.go index dcb8484559..48441905a1 100644 --- a/flyteadmin/pkg/manager/impl/launch_plan_manager.go +++ b/flyteadmin/pkg/manager/impl/launch_plan_manager.go @@ -39,7 +39,7 @@ type LaunchPlanManager struct { config runtimeInterfaces.Configuration scheduler scheduleInterfaces.EventScheduler metrics launchPlanMetrics - artifactRegistry artifacts.ArtifactRegistry + artifactRegistry *artifacts.ArtifactRegistry } func getLaunchPlanContext(ctx context.Context, identifier *core.Identifier) context.Context { @@ -90,6 +90,11 @@ func (m *LaunchPlanManager) CreateLaunchPlan( // The presence of this field indicates that this is a trigger launch plan // Return true and send this request over to the artifact registry instead if launchPlan.Spec.GetEntityMetadata() != nil && launchPlan.Spec.GetEntityMetadata().GetLaunchConditions() != nil { + // TODO: Artifact feature gate, remove when ready + if m.artifactRegistry.GetClient() == nil { + logger.Debugf(ctx, "artifact feature not enabled, skipping launch plan %v", launchPlan.Id) + return &admin.LaunchPlanCreateResponse{}, nil + } err := m.artifactRegistry.RegisterTrigger(ctx, &launchPlan) if err != nil { return nil, err @@ -123,14 +128,17 @@ func (m *LaunchPlanManager) CreateLaunchPlan( } m.metrics.SpecSizeBytes.Observe(float64(len(launchPlanModel.Spec))) m.metrics.ClosureSizeBytes.Observe(float64(len(launchPlanModel.Closure))) - go func() { - ceCtx := context.TODO() - if launchPlan.Spec.DefaultInputs == nil { - logger.Debugf(ceCtx, "Insufficient fields to submit launchplan interface %v", launchPlan.Id) - return - } - m.artifactRegistry.RegisterArtifactConsumer(ceCtx, launchPlan.Id, *launchPlan.Spec.DefaultInputs) - }() + // TODO: Artifact feature gate, remove when ready + if m.artifactRegistry.GetClient() != nil { + go func() { + ceCtx := context.TODO() + if launchPlan.Spec.DefaultInputs == nil { + logger.Debugf(ceCtx, "Insufficient fields to submit launchplan interface %v", launchPlan.Id) + return + } + m.artifactRegistry.RegisterArtifactConsumer(ceCtx, launchPlan.Id, *launchPlan.Spec.DefaultInputs) + }() + } return &admin.LaunchPlanCreateResponse{}, nil } @@ -566,7 +574,7 @@ func NewLaunchPlanManager( config runtimeInterfaces.Configuration, scheduler scheduleInterfaces.EventScheduler, scope promutils.Scope, - artifactRegistry artifacts.ArtifactRegistry) interfaces.LaunchPlanInterface { + artifactRegistry *artifacts.ArtifactRegistry) interfaces.LaunchPlanInterface { metrics := launchPlanMetrics{ Scope: scope, diff --git a/flyteadmin/pkg/manager/impl/task_manager.go b/flyteadmin/pkg/manager/impl/task_manager.go index 87107d4eb4..2d6bc2a91e 100644 --- a/flyteadmin/pkg/manager/impl/task_manager.go +++ b/flyteadmin/pkg/manager/impl/task_manager.go @@ -43,7 +43,7 @@ type TaskManager struct { compiler workflowengine.Compiler metrics taskMetrics resourceManager interfaces.ResourceInterface - artifactRegistry artifacts.ArtifactRegistry + artifactRegistry *artifacts.ArtifactRegistry } func getTaskContext(ctx context.Context, identifier *core.Identifier) context.Context { @@ -133,15 +133,18 @@ func (t *TaskManager) CreateTask( contextWithRuntimeMeta, common.RuntimeVersionKey, finalizedRequest.Spec.Template.Metadata.Runtime.Version) t.metrics.Registered.Inc(contextWithRuntimeMeta) } - tIfaceCopy := proto.Clone(finalizedRequest.Spec.Template.Interface).(*core.TypedInterface) - go func() { - ceCtx := context.TODO() - if finalizedRequest.Spec.Template.Interface == nil { - logger.Debugf(ceCtx, "Task [%+v] has no interface, skipping registration", finalizedRequest.Id) - return - } - t.artifactRegistry.RegisterArtifactProducer(ceCtx, finalizedRequest.Id, *tIfaceCopy) - }() + // TODO: Artifact feature gate, remove when ready + if t.artifactRegistry.GetClient() != nil { + tIfaceCopy := proto.Clone(finalizedRequest.Spec.Template.Interface).(*core.TypedInterface) + go func() { + ceCtx := context.TODO() + if finalizedRequest.Spec.Template.Interface == nil { + logger.Debugf(ceCtx, "Task [%+v] has no interface, skipping registration", finalizedRequest.Id) + return + } + t.artifactRegistry.RegisterArtifactProducer(ceCtx, finalizedRequest.Id, *tIfaceCopy) + }() + } return &admin.TaskCreateResponse{}, nil } @@ -274,7 +277,7 @@ func NewTaskManager( db repoInterfaces.Repository, config runtimeInterfaces.Configuration, compiler workflowengine.Compiler, scope promutils.Scope, - artifactRegistry artifacts.ArtifactRegistry) interfaces.TaskInterface { + artifactRegistry *artifacts.ArtifactRegistry) interfaces.TaskInterface { metrics := taskMetrics{ Scope: scope, diff --git a/flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go b/flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go index 7b569fae06..aa41265f2b 100644 --- a/flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go +++ b/flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go @@ -38,8 +38,14 @@ func ValidateLaunchPlan(ctx context.Context, if err := validateLiteralMap(request.Spec.FixedInputs, shared.FixedInputs); err != nil { return err } - if err := validateParameterMap(request.Spec.DefaultInputs, shared.DefaultInputs); err != nil { - return err + if config.GetTopLevelConfig().FeatureGates.EnableArtifacts { + if err := validateParameterMapAllowArtifacts(request.Spec.DefaultInputs, shared.DefaultInputs); err != nil { + return err + } + } else { + if err := validateParameterMapDisableArtifacts(request.Spec.DefaultInputs, shared.DefaultInputs); err != nil { + return err + } } expectedInputs, err := checkAndFetchExpectedInputForLaunchPlan(workflowInterface.GetInputs(), request.Spec.FixedInputs, request.Spec.DefaultInputs) if err != nil { diff --git a/flyteadmin/pkg/manager/impl/validation/validation.go b/flyteadmin/pkg/manager/impl/validation/validation.go index 3d5e931047..5546a8ae8f 100644 --- a/flyteadmin/pkg/manager/impl/validation/validation.go +++ b/flyteadmin/pkg/manager/impl/validation/validation.go @@ -245,6 +245,20 @@ func validateLiteralMap(inputMap *core.LiteralMap, fieldName string) error { return nil } +// TODO: Artifact feature gate, remove when ready +func validateParameterMapAllowArtifacts(inputMap *core.ParameterMap, fieldName string) error { + return validateParameterMap(inputMap, fieldName) +} + +func validateParameterMapDisableArtifacts(inputMap *core.ParameterMap, fieldName string) error { + for name, defaultInput := range inputMap.Parameters { + if defaultInput.GetArtifactQuery() != nil { + return errors.NewFlyteAdminErrorf(codes.InvalidArgument, "artifact mode not enabled but query found %s %s", fieldName, name) + } + } + return validateParameterMap(inputMap, fieldName) +} + func validateParameterMap(inputMap *core.ParameterMap, fieldName string) error { if inputMap != nil && len(inputMap.Parameters) > 0 { for name, defaultInput := range inputMap.Parameters { @@ -256,6 +270,8 @@ func validateParameterMap(inputMap *core.ParameterMap, fieldName string) error { "The Variable component of the Parameter %s in %s either is missing, or has a missing Type", name, fieldName) } + // if artifacts not enabled, then we know all GetArtifactQuery()s are nil, so the middle condition disappears + // if artifacts are enabled, then this is the valid check that we want to do if defaultInput.GetDefault() == nil && defaultInput.GetArtifactQuery() == nil && !defaultInput.GetRequired() { return errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Invalid variable %s in %s - variable has neither default, nor is required. "+ diff --git a/flyteadmin/pkg/manager/impl/workflow_manager.go b/flyteadmin/pkg/manager/impl/workflow_manager.go index aeb2e82fe7..4777b529de 100644 --- a/flyteadmin/pkg/manager/impl/workflow_manager.go +++ b/flyteadmin/pkg/manager/impl/workflow_manager.go @@ -47,7 +47,7 @@ type WorkflowManager struct { storageClient *storage.DataStore storagePrefix []string metrics workflowMetrics - artifactRegistry artifacts.ArtifactRegistry + artifactRegistry *artifacts.ArtifactRegistry } func getWorkflowContext(ctx context.Context, identifier *core.Identifier) context.Context { @@ -223,14 +223,18 @@ func (w *WorkflowManager) CreateWorkflow( // Send the interface definition to Artifact service, this is so that it can statically pick up one dimension of // lineage information tIfaceCopy := proto.Clone(workflowClosure.CompiledWorkflow.Primary.Template.Interface).(*core.TypedInterface) - go func() { - ceCtx := context.TODO() - if workflowClosure.CompiledWorkflow == nil || workflowClosure.CompiledWorkflow.Primary == nil { - logger.Debugf(ceCtx, "Insufficient fields to submit workflow interface %v", finalizedRequest.Id) - return - } - w.artifactRegistry.RegisterArtifactProducer(ceCtx, finalizedRequest.Id, *tIfaceCopy) - }() + // TODO: Artifact feature gate, remove when ready + if w.artifactRegistry.GetClient() != nil { + go func() { + ceCtx := context.TODO() + if workflowClosure.CompiledWorkflow == nil || workflowClosure.CompiledWorkflow.Primary == nil { + logger.Debugf(ceCtx, "Insufficient fields to submit workflow interface %v", finalizedRequest.Id) + return + } + + w.artifactRegistry.RegisterArtifactProducer(ceCtx, finalizedRequest.Id, *tIfaceCopy) + }() + } return &admin.WorkflowCreateResponse{}, nil } @@ -364,7 +368,7 @@ func NewWorkflowManager( storageClient *storage.DataStore, storagePrefix []string, scope promutils.Scope, - artifactRegistry artifacts.ArtifactRegistry) interfaces.WorkflowInterface { + artifactRegistry *artifacts.ArtifactRegistry) interfaces.WorkflowInterface { metrics := workflowMetrics{ Scope: scope, diff --git a/flyteadmin/pkg/rpc/adminservice/base.go b/flyteadmin/pkg/rpc/adminservice/base.go index e578ab464f..46db18d878 100644 --- a/flyteadmin/pkg/rpc/adminservice/base.go +++ b/flyteadmin/pkg/rpc/adminservice/base.go @@ -43,7 +43,7 @@ type AdminService struct { DescriptionEntityManager interfaces.DescriptionEntityInterface MetricsManager interfaces.MetricsInterface Metrics AdminMetrics - Artifacts artifacts.ArtifactRegistry + Artifacts *artifacts.ArtifactRegistry } // Intercepts all admin requests to handle panics during execution. @@ -113,7 +113,12 @@ func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, confi eventScheduler := workflowScheduler.GetEventScheduler() - artifactRegistry := artifacts.NewArtifactRegistry(ctx, configuration.ApplicationConfiguration().GetArtifactsConfig()) + var artifactRegistry *artifacts.ArtifactRegistry + if configuration.ApplicationConfiguration().GetTopLevelConfig().FeatureGates.EnableArtifacts { + artifactRegistry = artifacts.NewArtifactRegistry(ctx, configuration.ApplicationConfiguration().GetArtifactsConfig()) + } else { + artifactRegistry = nil + } launchPlanManager := manager.NewLaunchPlanManager( repo, configuration, eventScheduler, adminScope.NewSubScope("launch_plan_manager"), artifactRegistry) diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 68227ce224..dfc558a13b 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -49,6 +49,10 @@ type PostgresConfig struct { Debug bool `json:"debug" pflag:" Whether or not to start the database connection with debug mode enabled."` } +type FeatureGates struct { + EnableArtifacts bool `json:"enableArtifacts" pflag:",Enable artifacts feature."` +} + // ApplicationConfig is the base configuration to start admin type ApplicationConfig struct { // The RoleName key inserted as an annotation (https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) @@ -98,6 +102,8 @@ type ApplicationConfig struct { // Environment variables to be set for the execution. Envs map[string]string `json:"envs,omitempty"` + + FeatureGates FeatureGates `json:"featureGates" pflag:",Enable experimental features."` } func (a *ApplicationConfig) GetRoleNameKey() string { @@ -227,23 +233,6 @@ type KafkaConfig struct { Brokers []string `json:"brokers"` } -// RedisConfig is basically a subset of the client options in the Redis library -type RedisConfig struct { - // host:port address. - Addr string `json:"addr"` - // Use the specified Username to authenticate the current connection - // with one of the connections defined in the ACL list when connecting - // to a Redis 6.0 instance, or greater, that is using the Redis ACL system. - Username string `json:"username"` - // Optional password. Must match the password specified in the - // requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower), - // or the User Password when connecting to a Redis 6.0 instance, or greater, - // that is using the Redis ACL system. - Password string `json:"password"` - // Database to be selected after connecting to the server. - DB int `json:"db"` -} - // This section holds configuration for the event scheduler used to schedule workflow executions. type EventSchedulerConfig struct { // Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local' @@ -555,7 +544,6 @@ type CloudEventsConfig struct { AWSConfig AWSConfig `json:"aws"` GCPConfig GCPConfig `json:"gcp"` KafkaConfig KafkaConfig `json:"kafka"` - RedisConfig RedisConfig `json:"redis"` // Publish events to a pubsub tops EventsPublisherConfig EventsPublisherConfig `json:"eventsPublisher"` // Number of times to attempt recreating a notifications processor client should there be any disruptions. diff --git a/flyteartifacts/pkg/db/gorm_transformers.go b/flyteartifacts/pkg/db/gorm_transformers.go index 776028a0f1..9569896f78 100644 --- a/flyteartifacts/pkg/db/gorm_transformers.go +++ b/flyteartifacts/pkg/db/gorm_transformers.go @@ -105,8 +105,6 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) { return models.Artifact{}, err } - // gatepr: principal is missing still - can be added following discussion on source object. - // taskexecution and additional source information to be added when resolved. a := artifact.Artifact{ ArtifactId: &core.ArtifactID{ ArtifactKey: &core.ArtifactKey{