From ee034895802185b504fffed8f0297f54518e070a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 10 Nov 2023 16:41:35 +0800 Subject: [PATCH] Artf/triggers (#4394) This PR adds the trigger concept. Other changes: * Forgot to make the index on artifactkey unique. * Fix to create artifact, it wasn't pulling back out the artifact key. * Updated the Artifact service to handle the passing of artifacts created by the events processor to the trigger handling components. Currently, and this is potentially a design flaw, the events processor has no way of contacting the trigger component directly. Signed-off-by: Yee Hing Tong --- flyteartifacts/go.mod | 6 +- flyteartifacts/go.sum | 4 + flyteartifacts/pkg/db/gorm_models.go | 35 +- flyteartifacts/pkg/db/gorm_transformers.go | 98 +++- flyteartifacts/pkg/db/metrics.go | 4 + flyteartifacts/pkg/db/migrations.go | 43 +- flyteartifacts/pkg/db/storage.go | 183 ++++++- flyteartifacts/pkg/lib/string_converter.go | 4 +- flyteartifacts/pkg/models/service_models.go | 27 +- flyteartifacts/pkg/models/transformers.go | 73 ++- flyteartifacts/pkg/server/interfaces.go | 13 + .../pkg/server/processor/events_handler.go | 20 +- .../pkg/server/processor/processor.go | 8 +- flyteartifacts/pkg/server/server.go | 65 ++- flyteartifacts/pkg/server/service.go | 35 +- flyteartifacts/pkg/server/trigger_engine.go | 445 ++++++++++++++++++ .../pkg/server/trigger_engine_test.go | 13 + flytestdlib/database/gorm.go | 4 +- flytestdlib/database/postgres.go | 8 +- flytestdlib/database/postgres_test.go | 4 +- 20 files changed, 1048 insertions(+), 44 deletions(-) create mode 100644 flyteartifacts/pkg/server/trigger_engine.go create mode 100644 flyteartifacts/pkg/server/trigger_engine_test.go diff --git a/flyteartifacts/go.mod b/flyteartifacts/go.mod index ab515168e2..ec499332ac 100644 --- a/flyteartifacts/go.mod +++ b/flyteartifacts/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/NYTimes/gizmo v1.3.6 github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 + github.com/cloudevents/sdk-go/v2 v2.14.0 github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 github.com/flyteorg/flyte/flytestdlib v1.9.5 github.com/go-gormigrate/gormigrate/v2 v2.1.1 @@ -19,6 +20,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.56.1 + google.golang.org/protobuf v1.30.0 gorm.io/gorm v1.25.5 ) @@ -43,7 +45,6 @@ require ( github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect github.com/coocood/freecache v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.13.0 // indirect @@ -78,6 +79,7 @@ require ( github.com/ncw/swift v1.0.53 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect + github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect @@ -87,6 +89,7 @@ require ( github.com/spf13/cast v1.4.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.11.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/crypto v0.13.0 // indirect @@ -99,7 +102,6 @@ require ( google.golang.org/api v0.114.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/flyteartifacts/go.sum b/flyteartifacts/go.sum index b2788b31e9..619a567139 100644 --- a/flyteartifacts/go.sum +++ b/flyteartifacts/go.sum @@ -321,6 +321,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -414,6 +415,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -473,6 +475,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -963,6 +966,7 @@ k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= diff --git a/flyteartifacts/pkg/db/gorm_models.go b/flyteartifacts/pkg/db/gorm_models.go index 4bd7c28848..101435a843 100644 --- a/flyteartifacts/pkg/db/gorm_models.go +++ b/flyteartifacts/pkg/db/gorm_models.go @@ -7,9 +7,9 @@ import ( type ArtifactKey struct { gorm.Model - Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"` - Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"` - Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"` + Project string `gorm:"uniqueIndex:idx_pdn;index:idx_proj;type:varchar(64)"` + Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"` + Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"` } type Artifact struct { @@ -41,3 +41,32 @@ type Artifact struct { // Here nullable in the case of workflow output. RetryAttempt *uint32 } + +type TriggerKey struct { + gorm.Model + Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"` + Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"` + Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"` + RunsOn []ArtifactKey `gorm:"many2many:active_trigger_artifact_keys;"` +} + +type LaunchPlanID struct { + Name string `gorm:"not null;index:idx_lp_id;type:varchar(255)"` + Version string `gorm:"not null;type:varchar(255);index:idx_launch_plan_version"` +} + +type Trigger struct { + gorm.Model + TriggerKeyID uint `gorm:"uniqueIndex:idx_trigger_pdnv"` + TriggerKey TriggerKey `gorm:"foreignKey:TriggerKeyID;references:ID"` + Version string `gorm:"not null;type:varchar(255);index:idx_trigger_version;uniqueIndex:idx_trigger_pdnv"` + + // Unlike the one in the TriggerKey table, these are the list of artifact keys as specified by the user + // for this specific version. Currently just the key but can add additional fields in the future. + RunsOn []ArtifactKey `gorm:"many2many:trigger_ids_artifact_keys;"` + + Active bool `gorm:"index:idx_active"` + LaunchPlanID LaunchPlanID `gorm:"embedded"` + LaunchPlanSpec []byte `gorm:"not null"` + LaunchPlanClosure []byte `gorm:"not null"` +} diff --git a/flyteartifacts/pkg/db/gorm_transformers.go b/flyteartifacts/pkg/db/gorm_transformers.go index 2eeec630c7..18457bc013 100644 --- a/flyteartifacts/pkg/db/gorm_transformers.go +++ b/flyteartifacts/pkg/db/gorm_transformers.go @@ -2,11 +2,14 @@ package db import ( "context" + "fmt" "github.com/flyteorg/flyte/flyteartifacts/pkg/models" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" "github.com/jackc/pgx/v5/pgtype" ) @@ -89,7 +92,6 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) { // gatepr: principal is missing still - can be added following discussion on source object. // taskexecution and additional source information to be added when resolved. - // gatepr: implement tags a := artifact.Artifact{ ArtifactId: &core.ArtifactID{ ArtifactKey: &core.ArtifactKey{ @@ -123,7 +125,97 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) { return models.Artifact{ Artifact: a, OffloadedMetadata: "", - LiteralTypeBytes: nil, - LiteralValueBytes: nil, + LiteralTypeBytes: ga.LiteralType, + LiteralValueBytes: ga.LiteralValue, }, nil } + +func ServiceToGormTrigger(serviceTrigger models.Trigger) Trigger { + + t := Trigger{ + TriggerKey: TriggerKey{ + Project: serviceTrigger.Project, + Domain: serviceTrigger.Domain, + Name: serviceTrigger.Name, + }, + Version: serviceTrigger.Version, + Active: serviceTrigger.Active, + LaunchPlanID: LaunchPlanID{ + Name: serviceTrigger.LaunchPlan.Id.Name, + Version: serviceTrigger.LaunchPlan.Id.Version, + }, + LaunchPlanSpec: serviceTrigger.SpecBytes, + LaunchPlanClosure: serviceTrigger.ClosureBytes, + } + + var runsOn = make([]ArtifactKey, len(serviceTrigger.RunsOn)) + for i, a := range serviceTrigger.RunsOn { + runsOn[i] = ArtifactKey{ + Project: a.ArtifactKey.Project, + Domain: a.ArtifactKey.Domain, + Name: a.ArtifactKey.Name, + } + } + t.RunsOn = runsOn + + return t +} + +func GormToServiceTrigger(gormTrigger Trigger) (models.Trigger, error) { + spec := &admin.LaunchPlanSpec{} + closure := &admin.LaunchPlanClosure{} + if err := proto.Unmarshal(gormTrigger.LaunchPlanSpec, spec); err != nil { + return models.Trigger{}, err + } + if err := proto.Unmarshal(gormTrigger.LaunchPlanClosure, closure); err != nil { + return models.Trigger{}, err + } + lpID := core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Project: gormTrigger.TriggerKey.Project, + Domain: gormTrigger.TriggerKey.Domain, + Name: gormTrigger.LaunchPlanID.Name, + Version: gormTrigger.Version, // gormTrigger.LaunchPlanID.Version, + } + t := models.Trigger{ + Project: gormTrigger.TriggerKey.Project, + Domain: gormTrigger.TriggerKey.Domain, + Name: gormTrigger.TriggerKey.Name, + Version: gormTrigger.Version, + Active: gormTrigger.Active, + LaunchPlanID: lpID, + LaunchPlan: &admin.LaunchPlan{ + Id: &lpID, + Spec: spec, + Closure: closure, + }, + SpecBytes: gormTrigger.LaunchPlanSpec, + ClosureBytes: gormTrigger.LaunchPlanClosure, + } + + // TODO: This is a copy/paste of the code in transformers.go. Refactor. + // Basically the DB model only has artifact keys, not whole artifact IDs including partitions + // so pull the artifact IDs again from the spec. + lc := spec.GetEntityMetadata().GetLaunchConditions() + + var err error + idlTrigger := core.Trigger{} + err = ptypes.UnmarshalAny(lc, &idlTrigger) + if err != nil { + logger.Errorf(context.TODO(), "Failed to unmarshal launch conditions to idl, metadata: [%+v]", spec.GetEntityMetadata()) + return models.Trigger{}, err + } + if len(idlTrigger.Triggers) == 0 { + return models.Trigger{}, fmt.Errorf("invalid request to CreateTrigger, launch conditions cannot be empty") + } + var runsOnArtifactIDs = make([]core.ArtifactID, len(idlTrigger.Triggers)) + for i, t := range idlTrigger.Triggers { + runsOnArtifactIDs[i] = *t + runsOnArtifactIDs[i].ArtifactKey.Project = lpID.Project + runsOnArtifactIDs[i].ArtifactKey.Domain = lpID.Domain + } + + t.RunsOn = runsOnArtifactIDs + + return t, nil +} diff --git a/flyteartifacts/pkg/db/metrics.go b/flyteartifacts/pkg/db/metrics.go index 07225174e3..b275548cd4 100644 --- a/flyteartifacts/pkg/db/metrics.go +++ b/flyteartifacts/pkg/db/metrics.go @@ -13,6 +13,8 @@ type gormMetrics struct { GetDuration promutils.StopWatch UpdateDuration promutils.StopWatch SearchDuration promutils.StopWatch + + CreateTriggerDuration promutils.StopWatch } func newMetrics(scope promutils.Scope) gormMetrics { @@ -26,5 +28,7 @@ func newMetrics(scope promutils.Scope) gormMetrics { "update", "time taken to update an entry", time.Millisecond), SearchDuration: scope.MustNewStopWatch( "search", "time taken for searching", time.Millisecond), + CreateTriggerDuration: scope.MustNewStopWatch( + "createT", "time taken to create a new trigger", time.Millisecond), } } diff --git a/flyteartifacts/pkg/db/migrations.go b/flyteartifacts/pkg/db/migrations.go index 773dcb99c8..e91552f3e6 100644 --- a/flyteartifacts/pkg/db/migrations.go +++ b/flyteartifacts/pkg/db/migrations.go @@ -51,7 +51,48 @@ var Migrations = []*gormigrate.Migration{ }, Rollback: func(tx *gorm.DB) error { return tx.Migrator().DropTable( - "artifact_keys", "artifacts", + "artifacts", "artifact_keys", + ) + }, + }, + { + ID: "2023-10-22-trigger", + Migrate: func(tx *gorm.DB) error { + type TriggerKey struct { + gorm.Model + Project string `gorm:"index:idx_t_pdn;index:idx_t_proj;type:varchar(64)"` + Domain string `gorm:"index:idx_t_pdn;index:idx_t_dom;type:varchar(64)"` + Name string `gorm:"index:idx_t_pdn;index:idx_t_name;type:varchar(255)"` + RunsOn []ArtifactKey `gorm:"many2many:active_trigger_artifact_keys;"` + } + + type LaunchPlanID struct { + Name string `gorm:"not null;index:idx_lp_id;type:varchar(255)"` + Version string `gorm:"not null;type:varchar(255);index:idx_launch_plan_version"` + } + + type Trigger struct { + gorm.Model + TriggerKeyID uint `gorm:"uniqueIndex:idx_trigger_pdnv"` + TriggerKey TriggerKey `gorm:"foreignKey:TriggerKeyID;references:ID"` + Version string `gorm:"not null;type:varchar(255);index:idx_trigger_version;uniqueIndex:idx_trigger_pdnv"` + + // Unlike the one in the TriggerKey table, these are the list of artifact keys as specified by the user + // for this specific version. Currently just the key but can add additional fields in the future. + RunsOn []ArtifactKey `gorm:"many2many:trigger_ids_artifact_keys;"` + + Active bool `gorm:"index:idx_t_active"` + LaunchPlanID LaunchPlanID `gorm:"embedded"` + LaunchPlanSpec []byte `gorm:"not null"` + LaunchPlanClosure []byte `gorm:"not null"` + } + return tx.AutoMigrate( + &TriggerKey{}, &Trigger{}, + ) + }, + Rollback: func(tx *gorm.DB) error { + return tx.Migrator().DropTable( + "triggers", "trigger_keys", ) }, }, diff --git a/flyteartifacts/pkg/db/storage.go b/flyteartifacts/pkg/db/storage.go index 40bc6c1e6b..fac2e06680 100644 --- a/flyteartifacts/pkg/db/storage.go +++ b/flyteartifacts/pkg/db/storage.go @@ -11,6 +11,8 @@ import ( "github.com/flyteorg/flyte/flytestdlib/database" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "gorm.io/gorm" ) @@ -53,6 +55,11 @@ func (r *RDSStorage) CreateArtifact(ctx context.Context, serviceModel models.Art logger.Errorf(ctx, "Failed to create artifact %+v", tx.Error) return tx.Error } + getSaved := tx.Preload("ArtifactKey").First(&gormModel, "id = ?", gormModel.ID) + if getSaved.Error != nil { + logger.Errorf(ctx, "Failed to find artifact that was just saved: %+v", getSaved.Error) + return getSaved.Error + } return nil }) if err != nil { @@ -60,8 +67,14 @@ func (r *RDSStorage) CreateArtifact(ctx context.Context, serviceModel models.Art return models.Artifact{}, err } timer.Stop() + svcModel, err := GormToServiceModel(gormModel) + if err != nil { + // metric + logger.Errorf(ctx, "Failed to convert gorm model to service model: %+v", err) + return models.Artifact{}, err + } - return models.Artifact{}, nil + return svcModel, nil } func (r *RDSStorage) handleUriGet(ctx context.Context, uri string) (models.Artifact, error) { @@ -102,7 +115,8 @@ func (r *RDSStorage) handleArtifactIdGet(ctx context.Context, artifactID core.Ar if err := db.Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logger.Infof(ctx, "Artifact not found: %+v", artifactID) - return models.Artifact{}, fmt.Errorf("artifact [%v] not found", artifactID) + // todo: return grpc errors at the service layer not here. + return models.Artifact{}, status.Errorf(codes.NotFound, "artifact [%v] not found", artifactID) } logger.Errorf(ctx, "Failed to query for artifact: %+v", err) return models.Artifact{}, err @@ -136,6 +150,171 @@ func (r *RDSStorage) GetArtifact(ctx context.Context, query core.ArtifactQuery) return resp, err } +func (r *RDSStorage) CreateTrigger(ctx context.Context, trigger models.Trigger) (models.Trigger, error) { + + timer := r.metrics.CreateTriggerDuration.Start() + logger.Debugf(ctx, "Attempt create trigger [%s:%s]", trigger.Name, trigger.Version) + dbTrigger := ServiceToGormTrigger(trigger) + // TODO: Add a check to ensure that the artifact IDs that the trigger is triggering on are unique if more than one. + + // Check to see if the trigger key already exists. Create if not + err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var extantKey TriggerKey + + tx.FirstOrCreate(&extantKey, dbTrigger.TriggerKey) + if err := tx.Error; err != nil { + logger.Errorf(ctx, "Failed to firstorcreate key: %+v", err) + return err + } + + // Look for all earlier versions of the trigger and mark them inactive + setFalse := tx.Model(&Trigger{}).Where("trigger_key_id = ?", extantKey.ID).Update("active", false) + if tx.Error != nil { + logger.Errorf(ctx, "transaction error marking earlier versions inactive for %s: %+v", dbTrigger.TriggerKey.Name, tx.Error) + return tx.Error + } + if setFalse.Error != nil { + logger.Errorf(ctx, "Failed to mark earlier versions inactive for %s: %+v", dbTrigger.TriggerKey.Name, setFalse.Error) + return setFalse.Error + } + + var artifactKeys []ArtifactKey + // should we use tx here? + db := r.db.Model(&ArtifactKey{}).Where(&dbTrigger.RunsOn).Find(&artifactKeys) + if db.Error != nil { + logger.Errorf(ctx, "Error %v", db.Error) + return db.Error + } + if len(artifactKeys) != len(dbTrigger.RunsOn) { + logger.Errorf(ctx, "Could not find all artifact keys for trigger: %+v, only found %v", dbTrigger.RunsOn, artifactKeys) + return fmt.Errorf("could not find all artifact keys for trigger") + } + dbTrigger.RunsOn = artifactKeys + + dbTrigger.TriggerKeyID = extantKey.ID + dbTrigger.TriggerKey = TriggerKey{} // zero out the artifact key + // This create should update the join table between individual triggers and artifact keys + tt := tx.Save(&dbTrigger) + if tx.Error != nil || tt.Error != nil { + if tx.Error != nil { + logger.Errorf(ctx, "Transaction error: %v", tx.Error) + return tx.Error + } + logger.Errorf(ctx, "Save query failed with: %v", tt.Error) + return tt.Error + } + var savedTrigger Trigger + tt = tx.Preload("TriggerKey").Preload("RunsOn").First(&savedTrigger, "id = ?", dbTrigger.ID) + if tx.Error != nil || tt.Error != nil { + if tx.Error != nil { + logger.Errorf(ctx, "Transaction error: %v", tx.Error) + return tx.Error + } + logger.Errorf(ctx, "Failed to find trigger that was just saved: %+v", tx.Error) + return tt.Error + } + + // Next update the active_trigger_artifact_keys join table that keeps track of active key relationships + // That is, if you have a trigger_on=[artifactA, artifactB], this table links the trigger's name to those + // artifact names. If you register a new version of the trigger that is just trigger_on=[artifactC], then + // this table should just hold the reference to artifactC. + err := tx.Model(&savedTrigger.TriggerKey).Association("RunsOn").Replace(savedTrigger.RunsOn) + if err != nil { + logger.Errorf(ctx, "Failed to update active_trigger_artifact_keys: %+v", err) + return err + } + + return nil + }) + if err != nil { + if database.IsPgErrorWithCode(err, database.PgDuplicatedKey) { + logger.Infof(ctx, "Duplicate key detected, the current transaction will be cancelled: %s %s", trigger.Name, trigger.Version) + // TODO: Replace with the retrieved Trigger object maybe + // TODO: Add an error handling layer that translates from pg errors to a general service error. + return models.Trigger{}, err + } else { + logger.Errorf(ctx, "Failed transaction upsert on key [%s]: %+v", trigger.Name, err) + return models.Trigger{}, err + } + } + timer.Stop() + + return models.Trigger{}, nil +} + +func (r *RDSStorage) GetLatestTrigger(ctx context.Context, project, domain, name string) (models.Trigger, error) { + var gotTrigger Trigger + tk := TriggerKey{ + Project: project, + Domain: domain, + Name: name, + } + db := r.db.Model(&Trigger{}).InnerJoins("TriggerKey", r.db.Where(&tk)) + db = db.Where("active = true").Order("created_at desc").Limit(1).First(&gotTrigger) + if err := db.Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + logger.Infof(ctx, "Trigger not found: %+v", tk) + return models.Trigger{}, fmt.Errorf("could not find a latest trigger") + } + logger.Errorf(ctx, "Failed to query for triggers: %+v", err) + return models.Trigger{}, err + } + logger.Debugf(ctx, "Found and returning trigger obj %v", gotTrigger) + + m, err := GormToServiceTrigger(gotTrigger) + if err != nil { + logger.Errorf(ctx, "Failed to convert gorm model to service model: %+v", err) + return models.Trigger{}, err + } + return m, nil +} + +// GetTriggersByArtifactKey - Given an artifact key, presumably of a newly created artifact, find active triggers that +// trigger on that key (and potentially others). +func (r *RDSStorage) GetTriggersByArtifactKey(ctx context.Context, key core.ArtifactKey) ([]models.Trigger, error) { + // First find the trigger keys relevant to the artifact key + // then find the most recent, active, version of each trigger key. + + var triggerKey []TriggerKey + db := r.db.Preload("RunsOn"). + Joins("inner join active_trigger_artifact_keys ug on ug.trigger_key_id = trigger_keys.id "). + Joins("inner join artifact_keys g on g.id= ug.artifact_key_id "). + Where("g.project = ? and g.domain = ? and g.name = ?", key.Project, key.Domain, key.Name). + Find(&triggerKey) + + err := db.Error + if err != nil { + logger.Errorf(ctx, "Failed to find triggers for artifact key %v: %+v", key, err) + return nil, err + } + logger.Debugf(ctx, "Found trigger keys: %+v for artifact key %v", triggerKey, key) + if triggerKey == nil || len(triggerKey) == 0 { + logger.Infof(ctx, "No triggers found for artifact key %v", key) + return nil, nil + } + + ts := make([]Trigger, len(triggerKey)) + + db = r.db.Preload("RunsOn").Model(&Trigger{}).InnerJoins("TriggerKey", r.db.Where(&triggerKey)).Where("active = true").Find(&ts) + if err := db.Error; err != nil { + logger.Errorf(ctx, "Failed to query for triggers: %+v", err) + return nil, err + } + logger.Debugf(ctx, "Found (%d) triggers %v", len(ts), ts) + + modelTriggers := make([]models.Trigger, len(ts)) + for i, t := range ts { + st, err := GormToServiceTrigger(t) + if err != nil { + logger.Errorf(ctx, "Failed to convert gorm model to service model: %+v", err) + return nil, err + } + modelTriggers[i] = st + } + + return modelTriggers, nil +} + func NewStorage(ctx context.Context, scope promutils.Scope) *RDSStorage { dbCfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration).ArtifactDatabaseConfig logConfig := logger.GetConfig() diff --git a/flyteartifacts/pkg/lib/string_converter.go b/flyteartifacts/pkg/lib/string_converter.go index dd3f2133d1..b85e56ee2a 100644 --- a/flyteartifacts/pkg/lib/string_converter.go +++ b/flyteartifacts/pkg/lib/string_converter.go @@ -7,6 +7,8 @@ import ( "time" ) +const DateFormat string = "2006-01-02" + func RenderLiteral(lit *core.Literal) (string, error) { if lit == nil { return "", fmt.Errorf("can't RenderLiteral, input is nil") @@ -35,7 +37,7 @@ func RenderLiteral(lit *core.Literal) (string, error) { case *core.Primitive_Datetime: // just date for now, not sure if we should support time... dt := scalar.GetPrimitive().GetDatetime().AsTime() - txt := dt.Format("2006-01-02") + txt := dt.Format(DateFormat) return txt, nil case *core.Primitive_Duration: dur := scalar.GetPrimitive().GetDuration().AsDuration() diff --git a/flyteartifacts/pkg/models/service_models.go b/flyteartifacts/pkg/models/service_models.go index c1a1df03c9..ce811b5f8e 100644 --- a/flyteartifacts/pkg/models/service_models.go +++ b/flyteartifacts/pkg/models/service_models.go @@ -1,6 +1,10 @@ package models -import "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" +import ( + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" +) // Artifact is a wrapper object for easier handling of additional fields type Artifact struct { @@ -9,3 +13,24 @@ type Artifact struct { LiteralTypeBytes []byte LiteralValueBytes []byte } + +// Trigger - A trigger is nothing more than a launch plan, so wrap that. +type Trigger struct { + // The launch plan below doesn't have an ID field for the trigger directly (it's nested), so add one here. + Project string + Domain string + Name string + Version string + + LaunchPlanID core.Identifier + + // The trigger as defined in the user code becomes a launch plan as it is the most similar. + *admin.LaunchPlan + + RunsOn []core.ArtifactID + + // Additional meta fields relevant to the trigger. + Active bool + SpecBytes []byte + ClosureBytes []byte +} diff --git a/flyteartifacts/pkg/models/transformers.go b/flyteartifacts/pkg/models/transformers.go index bb59f72fc2..6b8d3eace4 100644 --- a/flyteartifacts/pkg/models/transformers.go +++ b/flyteartifacts/pkg/models/transformers.go @@ -3,11 +3,11 @@ package models import ( "context" "fmt" - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/golang/protobuf/proto" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" ) func CreateArtifactModelFromRequest(ctx context.Context, key *core.ArtifactKey, spec *artifact.ArtifactSpec, version string, partitions map[string]string, tag string, principal string) (Artifact, error) { @@ -105,3 +105,70 @@ func PartitionsFromIdl(ctx context.Context, partitions *core.Partitions) map[str return p } + +func CreateTriggerModelFromRequest(ctx context.Context, request *artifact.CreateTriggerRequest) (Trigger, error) { + if request.GetTriggerLaunchPlan().GetSpec() == nil || request.GetTriggerLaunchPlan().GetId() == nil || request.GetTriggerLaunchPlan().GetClosure() == nil { + logger.Errorf(ctx, "Something nil in CreateTrigger, [%+v]", request) + return Trigger{}, fmt.Errorf("invalid request to CreateTrigger, something is nil") + } + + spec := request.GetTriggerLaunchPlan().GetSpec() + if spec.GetEntityMetadata().GetLaunchConditions() == nil { + logger.Errorf(ctx, "Launch conditions cannot be nil in CreateTrigger, [%+v]", request) + return Trigger{}, fmt.Errorf("invalid request to CreateTrigger, launch conditions cannot be nil") + } + + lpID := request.GetTriggerLaunchPlan().GetId() + + lc := spec.GetEntityMetadata().GetLaunchConditions() + + var err error + idlTrigger := core.Trigger{} + err = ptypes.UnmarshalAny(lc, &idlTrigger) + if err != nil { + logger.Errorf(ctx, "Failed to unmarshal launch conditions to idl, metadata: [%+v]", spec.GetEntityMetadata()) + return Trigger{}, err + } + if len(idlTrigger.Triggers) == 0 { + return Trigger{}, fmt.Errorf("invalid request to CreateTrigger, launch conditions cannot be empty") + } + // Create a list of the Artifact IDs referenced by the trigger definition. + // Keep in mind: these are not real IDs, they just contain partial information like the name. + // Always set the referenced artifacts to the project/domain of the launch plan + var runsOnArtifactIDs = make([]core.ArtifactID, len(idlTrigger.Triggers)) + for i, t := range idlTrigger.Triggers { + runsOnArtifactIDs[i] = *t + runsOnArtifactIDs[i].ArtifactKey.Project = lpID.Project + runsOnArtifactIDs[i].ArtifactKey.Domain = lpID.Domain + } + + specBytes, err := proto.Marshal(request.GetTriggerLaunchPlan().GetSpec()) + if err != nil { + logger.Errorf(ctx, "Failed to marshal lp spec for CreateTrigger err: %v", err) + return Trigger{}, err + } + closureBytes, err := proto.Marshal(request.GetTriggerLaunchPlan().GetClosure()) + if err != nil { + logger.Errorf(ctx, "Failed to marshal lp closure for CreateTrigger err: %v", err) + return Trigger{}, err + } + + // Always set the project/domain of the trigger equal to the underlying launch plan + t := Trigger{ + Project: lpID.Project, + Domain: lpID.Domain, + Name: idlTrigger.TriggerId.Name, + // Use LP id for the version because the trigger doesn't come with its own + // version for now... too difficult to update the version of the trigger + // inside the launch conditions object during registration. + Version: lpID.Version, + LaunchPlanID: *lpID, + LaunchPlan: request.GetTriggerLaunchPlan(), + RunsOn: runsOnArtifactIDs, + Active: true, + SpecBytes: specBytes, + ClosureBytes: closureBytes, + } + + return t, nil +} diff --git a/flyteartifacts/pkg/server/interfaces.go b/flyteartifacts/pkg/server/interfaces.go index 8b73cce847..9c63d75080 100644 --- a/flyteartifacts/pkg/server/interfaces.go +++ b/flyteartifacts/pkg/server/interfaces.go @@ -3,6 +3,7 @@ package server import ( "context" "github.com/flyteorg/flyte/flyteartifacts/pkg/models" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" stdLibStorage "github.com/flyteorg/flyte/flytestdlib/storage" "github.com/golang/protobuf/ptypes/any" @@ -12,6 +13,14 @@ type StorageInterface interface { CreateArtifact(context.Context, models.Artifact) (models.Artifact, error) GetArtifact(context.Context, core.ArtifactQuery) (models.Artifact, error) + + CreateTrigger(context.Context, models.Trigger) (models.Trigger, error) + + GetLatestTrigger(ctx context.Context, project, domain, name string) (models.Trigger, error) + + GetTriggersByArtifactKey(ctx context.Context, key core.ArtifactKey) ([]models.Trigger, error) + + // DeleteTrigger(context.Context, models.Trigger) error } type BlobStoreInterface interface { @@ -19,3 +28,7 @@ type BlobStoreInterface interface { RetrieveArtifactCard(context.Context, stdLibStorage.DataReference) (*any.Any, error) } + +type TriggerHandlerInterface interface { + EvaluateNewArtifact(context.Context, *artifact.Artifact) ([]core.WorkflowExecutionIdentifier, error) +} diff --git a/flyteartifacts/pkg/server/processor/events_handler.go b/flyteartifacts/pkg/server/processor/events_handler.go index b8d4127c67..e7b54d3d32 100644 --- a/flyteartifacts/pkg/server/processor/events_handler.go +++ b/flyteartifacts/pkg/server/processor/events_handler.go @@ -15,6 +15,7 @@ import ( // ServiceCallHandler will take events and call the grpc endpoints directly. The service should most likely be local. type ServiceCallHandler struct { service artifact.ArtifactRegistryServer + created chan<- artifact.Artifact } func (s *ServiceCallHandler) HandleEvent(ctx context.Context, cloudEvent *event2.Event, msg proto.Message) error { @@ -104,6 +105,14 @@ func (s *ServiceCallHandler) HandleEventWorkflowExec(ctx context.Context, source logger.Errorf(ctx, "failed to create artifact for [%s] with error: %v", varName, err) return err } + // metric + select { + case s.created <- *resp.Artifact: + logger.Debugf(ctx, "Sent %v from handle workflow", resp.Artifact.ArtifactId) + default: + // metric + logger.Debugf(ctx, "Channel is full. didn't send %v", resp.Artifact.ArtifactId) + } logger.Debugf(ctx, "Created wf artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName) } } @@ -233,6 +242,14 @@ func (s *ServiceCallHandler) HandleEventTaskExec(ctx context.Context, source str return err } // metric + select { + case s.created <- *resp.Artifact: + logger.Debugf(ctx, "Sent %v from handle task", resp.Artifact.ArtifactId) + default: + // metric + logger.Debugf(ctx, "Channel is full. task handler didn't send %v", resp.Artifact.ArtifactId) + } + logger.Debugf(ctx, "Created artifact id [%+v] for key %s", resp.Artifact.ArtifactId, varName) } } @@ -244,9 +261,10 @@ func (s *ServiceCallHandler) HandleEventNodeExec(_ context.Context, _ *event.Clo return nil } -func NewServiceCallHandler(ctx context.Context, svc artifact.ArtifactRegistryServer) EventsHandlerInterface { +func NewServiceCallHandler(ctx context.Context, svc artifact.ArtifactRegistryServer, created chan<- artifact.Artifact) EventsHandlerInterface { logger.Infof(ctx, "Creating new service call handler") return &ServiceCallHandler{ service: svc, + created: created, } } diff --git a/flyteartifacts/pkg/server/processor/processor.go b/flyteartifacts/pkg/server/processor/processor.go index e3a2e67039..64aec5c149 100644 --- a/flyteartifacts/pkg/server/processor/processor.go +++ b/flyteartifacts/pkg/server/processor/processor.go @@ -11,13 +11,13 @@ import ( "github.com/flyteorg/flyte/flytestdlib/promutils" ) -func NewBackgroundProcessor(ctx context.Context, processorConfiguration configuration.EventProcessorConfiguration, service artifact.ArtifactRegistryServer, scope promutils.Scope) EventsProcessorInterface { +func NewBackgroundProcessor(ctx context.Context, processorConfiguration configuration.EventProcessorConfiguration, service artifact.ArtifactRegistryServer, created chan<- artifact.Artifact, scope promutils.Scope) EventsProcessorInterface { + // TODO: Add retry logic - //reconnectAttempts := processorConfiguration.ReconnectAttempts - //reconnectDelay := time.Duration(processorConfiguration.ReconnectDelaySeconds) * time.Second var sub pubsub.Subscriber switch processorConfiguration.CloudProvider { case configCommon.CloudDeploymentAWS: + // TODO: When we start using this, the created channel will also need to be added to the pubsubprocessor sqsConfig := gizmoAWS.SQSConfig{ QueueName: processorConfiguration.Subscriber.QueueName, QueueOwnerAccountID: processorConfiguration.Subscriber.AccountID, @@ -43,7 +43,7 @@ func NewBackgroundProcessor(ctx context.Context, processorConfiguration configur case configCommon.CloudDeploymentGCP: panic("Artifacts not implemented for GCP") case configCommon.CloudDeploymentSandbox: - handler := NewServiceCallHandler(ctx, service) + handler := NewServiceCallHandler(ctx, service, created) return NewSandboxCloudEventProcessor(handler) case configCommon.CloudDeploymentLocal: fallthrough diff --git a/flyteartifacts/pkg/server/server.go b/flyteartifacts/pkg/server/server.go index 853a81728a..f1c2ec9f52 100644 --- a/flyteartifacts/pkg/server/server.go +++ b/flyteartifacts/pkg/server/server.go @@ -20,13 +20,27 @@ import ( type ArtifactService struct { artifact.UnimplementedArtifactRegistryServer - Metrics ServiceMetrics - Service CoreService - EventConsumer processor.EventsProcessorInterface + Metrics ServiceMetrics + Service CoreService + EventConsumer processor.EventsProcessorInterface + TriggerEngine TriggerHandlerInterface + eventsToTrigger chan artifact.Artifact } func (a *ArtifactService) CreateArtifact(ctx context.Context, req *artifact.CreateArtifactRequest) (*artifact.CreateArtifactResponse, error) { - return a.Service.CreateArtifact(ctx, req) + resp, err := a.Service.CreateArtifact(ctx, req) + if err != nil { + return resp, err + } + + // gatepr add go func + execIDs, err := a.TriggerEngine.EvaluateNewArtifact(ctx, resp.Artifact) + if err != nil { + logger.Warnf(ctx, "Failed to evaluate triggers for artifact: %v, err: %v", resp.Artifact, err) + } else { + logger.Infof(ctx, "Triggered %v executions", len(execIDs)) + } + return resp, err } func (a *ArtifactService) GetArtifact(ctx context.Context, req *artifact.GetArtifactRequest) (*artifact.GetArtifactResponse, error) { @@ -57,15 +71,42 @@ func (a *ArtifactService) RegisterConsumer(ctx context.Context, req *artifact.Re return a.Service.RegisterConsumer(ctx, req) } +func (a *ArtifactService) runProcessor(ctx context.Context) { + for { + select { + case art := <-a.eventsToTrigger: + logger.Infof(ctx, "Received artifact: %v", art) + execIDs, err := a.TriggerEngine.EvaluateNewArtifact(ctx, &art) + if err != nil { + logger.Warnf(ctx, "Failed to evaluate triggers for artifact: %v, err: %v", art, err) + } else { + logger.Infof(ctx, "Triggered %v executions", len(execIDs)) + } + case <-ctx.Done(): + logger.Infof(ctx, "Stopping artifact processor") + return + } + } +} + func NewArtifactService(ctx context.Context, scope promutils.Scope) *ArtifactService { cfg := configuration.GetApplicationConfig() fmt.Println(cfg) eventsCfg := configuration.GetEventsProcessorConfig() + // channel that the event processor should use to pass created artifacts to, which this artifact server will + // then call the trigger engine with. + createdArtifacts := make(chan artifact.Artifact, 1000) + storage := db.NewStorage(ctx, scope.NewSubScope("storage:rds")) blobStore := blob.NewArtifactBlobStore(ctx, scope.NewSubScope("storage:s3")) coreService := NewCoreService(storage, &blobStore, scope.NewSubScope("server")) - eventsReceiverAndHandler := processor.NewBackgroundProcessor(ctx, *eventsCfg, &coreService, scope.NewSubScope("events")) + triggerHandler, err := NewTriggerEngine(ctx, storage, &coreService, scope.NewSubScope("triggers")) + if err != nil { + logger.Errorf(ctx, "Failed to create Admin client, stopping server. Error: %v", err) + panic(err) + } + eventsReceiverAndHandler := processor.NewBackgroundProcessor(ctx, *eventsCfg, &coreService, createdArtifacts, scope.NewSubScope("events")) if eventsReceiverAndHandler != nil { go func() { logger.Info(ctx, "Starting Artifact service background processing...") @@ -73,11 +114,17 @@ func NewArtifactService(ctx context.Context, scope promutils.Scope) *ArtifactSer }() } - return &ArtifactService{ - Metrics: InitMetrics(scope), - Service: coreService, - EventConsumer: eventsReceiverAndHandler, + as := &ArtifactService{ + Metrics: InitMetrics(scope), + Service: coreService, + EventConsumer: eventsReceiverAndHandler, + TriggerEngine: &triggerHandler, + eventsToTrigger: createdArtifacts, } + + go as.runProcessor(ctx) + + return as } func HttpRegistrationHook(ctx context.Context, gwmux *runtime.ServeMux, grpcAddress string, grpcConnectionOpts []grpc.DialOption, _ promutils.Scope) error { diff --git a/flyteartifacts/pkg/server/service.go b/flyteartifacts/pkg/server/service.go index cf1fa8ccb2..216d0d4995 100644 --- a/flyteartifacts/pkg/server/service.go +++ b/flyteartifacts/pkg/server/service.go @@ -13,7 +13,6 @@ import ( type CoreService struct { Storage StorageInterface BlobStore BlobStoreInterface - // TriggerHandler TriggerHandlerInterface // SearchHandler SearchHandlerInterface } @@ -71,35 +70,57 @@ func (c *CoreService) GetArtifact(ctx context.Context, request *artifact.GetArti } func (c *CoreService) CreateTrigger(ctx context.Context, request *artifact.CreateTriggerRequest) (*artifact.CreateTriggerResponse, error) { + // Create the new trigger object. + // Mark all older versions of the trigger as inactive. + + // trigger handler create trigger(storage layer) + serviceTrigger, err := models.CreateTriggerModelFromRequest(ctx, request) + if err != nil { + logger.Errorf(ctx, "Failed to create a valid Trigger from create request: %v with err %v", request, err) + return nil, err + } + + createdTrigger, err := c.Storage.CreateTrigger(ctx, serviceTrigger) + if err != nil { + logger.Errorf(ctx, "Failed to create trigger: %v", err) + } + logger.Infof(ctx, "Created trigger: %+v", createdTrigger) + return &artifact.CreateTriggerResponse{}, nil } func (c *CoreService) DeleteTrigger(ctx context.Context, request *artifact.DeleteTriggerRequest) (*artifact.DeleteTriggerResponse, error) { + // Todo: gatepr - This needs to be implemented before merging. return &artifact.DeleteTriggerResponse{}, nil } func (c *CoreService) AddTag(ctx context.Context, request *artifact.AddTagRequest) (*artifact.AddTagResponse, error) { + // Holding off on implementing for a while. return &artifact.AddTagResponse{}, nil } func (c *CoreService) RegisterProducer(ctx context.Context, request *artifact.RegisterProducerRequest) (*artifact.RegisterResponse, error) { + // These are lineage endpoints slated for future work return &artifact.RegisterResponse{}, nil } func (c *CoreService) RegisterConsumer(ctx context.Context, request *artifact.RegisterConsumerRequest) (*artifact.RegisterResponse, error) { + // These are lineage endpoints slated for future work return &artifact.RegisterResponse{}, nil } func (c *CoreService) SearchArtifacts(ctx context.Context, request *artifact.SearchArtifactsRequest) (*artifact.SearchArtifactsResponse, error) { + // This is demo test code, will be deleted. + logger.Infof(ctx, "SearchArtifactsRequest: %+v", request) + triggers, err := c.Storage.GetTriggersByArtifactKey(ctx, *request.ArtifactKey) + if err != nil { + logger.Errorf(ctx, "delete me Failed to get triggers by artifact key [%+v]: %v", request.ArtifactKey, err) + return nil, err + } + fmt.Println(triggers) return &artifact.SearchArtifactsResponse{}, nil } -// HandleCloudEvent is the stand-in for simple open-source handling of the event stream, rather than using -// a real -func (c *CoreService) HandleCloudEvent(ctx context.Context, request *artifact.CloudEventRequest) (*artifact.CloudEventResponse, error) { - return &artifact.CloudEventResponse{}, nil -} - func NewCoreService(storage StorageInterface, blobStore BlobStoreInterface, _ promutils.Scope) CoreService { return CoreService{ Storage: storage, diff --git a/flyteartifacts/pkg/server/trigger_engine.go b/flyteartifacts/pkg/server/trigger_engine.go new file mode 100644 index 0000000000..68da0645b0 --- /dev/null +++ b/flyteartifacts/pkg/server/trigger_engine.go @@ -0,0 +1,445 @@ +package server + +import ( + "context" + "fmt" + "github.com/flyteorg/flyte/flyteartifacts/pkg/lib" + "github.com/flyteorg/flyte/flyteartifacts/pkg/models" + admin2 "github.com/flyteorg/flyte/flyteidl/clients/go/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + "math/rand" + "regexp" + "strconv" + "time" + "unicode/utf8" +) + +type TriggerEngine struct { + service artifact.ArtifactRegistryServer + adminClient service.AdminServiceClient + store StorageInterface + + // needs to be used + scope promutils.Scope +} + +// Evaluate the trigger and launch the workflow if it's true +func (e *TriggerEngine) evaluateAndHandleTrigger(ctx context.Context, trigger models.Trigger, incoming *artifact.Artifact) error { + incomingKey := core.ArtifactKey{ + Project: incoming.GetArtifactId().GetArtifactKey().GetProject(), + Domain: incoming.GetArtifactId().GetArtifactKey().GetDomain(), + Name: incoming.GetArtifactId().GetArtifactKey().GetName(), + } + + var triggeringArtifacts = make([]artifact.Artifact, 0) + + incomingPartitions := map[string]*core.LabelValue{} + + if incoming.GetArtifactId().GetPartitions().GetValue() != nil && len(incoming.GetArtifactId().GetPartitions().GetValue()) > 0 { + for k, p := range incoming.GetArtifactId().GetPartitions().GetValue() { + if len(p.GetStaticValue()) == 0 { + logger.Warningf(ctx, "Trigger %s has non-static partition [%+v]", trigger.Name, incoming.GetArtifactId().GetPartitions().GetValue()) + return fmt.Errorf("trigger %s has non-static partition %s [%+v]", trigger.Name, k, p) + } + incomingPartitions[k] = p + } + } + + // Note the order of this is important. It needs to be the same order as the trigger.RunsOn + // This is because binding data references an index in this array. + for _, triggeringArtifactID := range trigger.RunsOn { + // First check partitions. + // They must either both have no partitions, or both have the same partitions. + var thisIDPartitions = make(map[string]string) + if triggeringArtifactID.GetPartitions().GetValue() != nil { + for k, _ := range triggeringArtifactID.GetPartitions().GetValue() { + thisIDPartitions[k] = "placeholder" + } + } + if len(thisIDPartitions) != len(incomingPartitions) { + return fmt.Errorf("trigger %s has different number of partitions [%v] [%+v]", trigger.Name, incoming.GetArtifactId(), triggeringArtifactID) + } + + // If the lengths match, they must still also have the same keys. + // Build a query map of partitions for this triggering artifact while at it. + queryPartitions := map[string]*core.LabelValue{} + if len(thisIDPartitions) > 0 { + for k, _ := range thisIDPartitions { + if incomingValue, ok := incomingPartitions[k]; !ok { + return fmt.Errorf("trigger %s has different partitions [%v] [%+v]", trigger.Name, incoming.GetArtifactId(), triggeringArtifactID) + } else { + queryPartitions[k] = incomingValue + } + } + } + + // See if it's the same one as incoming. + if triggeringArtifactID.GetArtifactKey().Project == incomingKey.Project && + triggeringArtifactID.GetArtifactKey().Domain == incomingKey.Domain && + triggeringArtifactID.GetArtifactKey().Name == incomingKey.Name { + triggeringArtifacts = append(triggeringArtifacts, *incoming) + continue + } + + // Otherwise, assume it's a different one + // Construct a query and fetch it + var lookupID = core.ArtifactID{ + ArtifactKey: triggeringArtifactID.ArtifactKey, + } + if len(queryPartitions) > 0 { + lookupID.Dimensions = &core.ArtifactID_Partitions{ + Partitions: &core.Partitions{Value: queryPartitions}, + } + } + query := core.ArtifactQuery{ + Identifier: &core.ArtifactQuery_ArtifactId{ + ArtifactId: &lookupID, + }, + } + + resp, err := e.service.GetArtifact(ctx, &artifact.GetArtifactRequest{ + Query: &query, + }) + if err != nil { + return fmt.Errorf("failed to get artifact [%+v]: %w", lookupID, err) + } + + triggeringArtifacts = append(triggeringArtifacts, *resp.Artifact) + + } + + err := e.createExecution( + ctx, + triggeringArtifacts, + trigger.LaunchPlan.Id, + trigger.LaunchPlan.Spec.DefaultInputs, + ) + + return err +} + +func (e *TriggerEngine) generateRandomString(length int) string { + rand.Seed(time.Now().UnixNano()) + charset := "abcdefghijklmnopqrstuvwxyz" + var result string + for i := 0; i < length; i++ { + randomIndex := rand.Intn(len(charset)) + result += string(charset[randomIndex]) + } + + return result +} + +func (e *TriggerEngine) getSpec(_ context.Context, launchPlanID *core.Identifier) admin.ExecutionSpec { + + var spec = admin.ExecutionSpec{ + LaunchPlan: launchPlanID, + Metadata: nil, + NotificationOverrides: nil, + Labels: nil, + Annotations: nil, + SecurityContext: nil, + } + return spec +} + +func (e *TriggerEngine) createExecution(ctx context.Context, triggeringArtifacts []artifact.Artifact, launchPlanID *core.Identifier, defaultInputs *core.ParameterMap) error { + + resolvedInputs, err := e.resolveInputs(ctx, triggeringArtifacts, defaultInputs, launchPlanID) + if err != nil { + return fmt.Errorf("failed to resolve inputs: %w", err) + } + + spec := e.getSpec(ctx, launchPlanID) + + resp, err := e.adminClient.CreateExecution(ctx, &admin.ExecutionCreateRequest{ + Project: launchPlanID.Project, + Domain: launchPlanID.Domain, + Name: e.generateRandomString(12), + Spec: &spec, + Inputs: &core.LiteralMap{Literals: resolvedInputs}, + }) + if err != nil { + return fmt.Errorf("failed to create execution: %w", err) + } + + logger.Infof(ctx, "Created execution %v", resp) + return nil +} + +func (e *TriggerEngine) resolveInputs(ctx context.Context, triggeringArtifacts []artifact.Artifact, defaultInputs *core.ParameterMap, launchPlanID *core.Identifier) (map[string]*core.Literal, error) { + // Process inputs that have defaults separately as these may be used to fill in other inputs + var defaults = map[string]*core.Literal{} + for k, v := range defaultInputs.Parameters { + if v.GetDefault() != nil { + defaults[k] = v.GetDefault() + } + } + + var inputs = map[string]*core.Literal{} + for k, v := range defaultInputs.Parameters { + if v.GetDefault() != nil { + continue + } + if v == nil { + return nil, fmt.Errorf("parameter [%s] is nil", k) + } + + convertedLiteral, err := e.convertParameterToLiteral(ctx, triggeringArtifacts, *v, defaults, launchPlanID) + if err != nil { + logger.Errorf(ctx, "Error converting parameter [%s] [%v] to literal: %v", k, v, err) + return nil, err + } + inputs[k] = &convertedLiteral + } + + for k, v := range inputs { + defaults[k] = v + } + return defaults, nil +} + +func (e *TriggerEngine) convertParameterToLiteral(ctx context.Context, triggeringArtifacts []artifact.Artifact, + value core.Parameter, otherInputs map[string]*core.Literal, launchPlanID *core.Identifier) (core.Literal, error) { + + if value.GetArtifactQuery() != nil { + return e.convertArtifactQueryToLiteral(ctx, triggeringArtifacts, *value.GetArtifactQuery(), otherInputs, value, launchPlanID) + + } else if value.GetArtifactId() != nil { + return core.Literal{}, fmt.Errorf("artifact id not supported yet") + } + return core.Literal{}, fmt.Errorf("trying to convert non artifact Parameter to Literal") + +} + +var DurationRegex = regexp.MustCompile(`P(?P\d+Y)?(?P\d+M)?(?P\d+D)?T?(?P\d+H)?(?P\d+M)?(?P\d+S)?`) + +func ParseDuration(str string) time.Duration { + matches := DurationRegex.FindStringSubmatch(str) + + years := ParseInt64(matches[1]) + months := ParseInt64(matches[2]) + days := ParseInt64(matches[3]) + hours := ParseInt64(matches[4]) + minutes := ParseInt64(matches[5]) + seconds := ParseInt64(matches[6]) + + hour := int64(time.Hour) + minute := int64(time.Minute) + second := int64(time.Second) + return time.Duration(years*24*365*hour + months*30*24*hour + days*24*hour + hours*hour + minutes*minute + seconds*second) +} + +func ParseInt64(value string) int64 { + if len(value) == 0 { + return 0 + } + parsed, err := strconv.Atoi(value[:len(value)-1]) + if err != nil { + return 0 + } + return int64(parsed) +} + +func (e *TriggerEngine) parseStringAsDateAndApplyTransform(ctx context.Context, dateFormat string, dateStr string, transform string) (time.Time, error) { + t, err := time.Parse(dateFormat, dateStr) + if err != nil { + return time.Time{}, fmt.Errorf("failed to parse date [%s] as date : %w", dateStr, err) + } + + if transform != "" { + firstRune, runeSize := utf8.DecodeRuneInString(transform) + op := string(firstRune) + transformStr := transform[runeSize:] + + if op == "-" { + td := ParseDuration(transformStr) + t = t.Add(-td) + logger.Infof(ctx, "Applying transform [%s] new date is [%s] from %s", transform, t, transformStr) + } else { + logger.Warningf(ctx, "Ignoring transform [%s]", transformStr) + } + } + return t, nil +} + +func (e *TriggerEngine) convertArtifactQueryToLiteral(ctx context.Context, triggeringArtifacts []artifact.Artifact, + aq core.ArtifactQuery, otherInputs map[string]*core.Literal, p core.Parameter, launchPlanID *core.Identifier) (core.Literal, error) { + + // if it's a binding and it has a partition key + // this is the only time we need to transform the type. + // and it's only ever going to be a datetime + if bnd := aq.GetBinding(); bnd != nil { + a := triggeringArtifacts[bnd.GetIndex()] + if bnd.GetPartitionKey() != "" { + if partitions := a.GetArtifactId().GetPartitions().GetValue(); partitions != nil { + + if sv, ok := partitions[bnd.GetPartitionKey()]; ok { + dateStr := sv.GetStaticValue() + + t, err := e.parseStringAsDateAndApplyTransform(ctx, lib.DateFormat, dateStr, bnd.GetTransform()) + if err != nil { + return core.Literal{}, fmt.Errorf("failed to parse [%s] transform %s] for [%+v]: %w", dateStr, bnd.GetTransform(), a.GetArtifactId(), err) + } + + // convert time to timestamp + ts := timestamppb.New(t) + + return core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Datetime{ + Datetime: ts, + }, + }, + }, + }, + }, + }, nil + } + } + return core.Literal{}, fmt.Errorf("partition key [%s] not found in artifact [%+v]", bnd.GetPartitionKey(), a.GetArtifactId()) + } + + // this means it's bound to the whole triggering artifact, not a partition. Just pull out the literal and use it + idlLit := triggeringArtifacts[bnd.GetIndex()].GetSpec().GetValue() + return *idlLit, nil + + // this is a real query - will need to first iterate through the partitions to see if we need to fill in any + } else if queryID := aq.GetArtifactId(); queryID != nil { + searchPartitions := map[string]string{} + if len(queryID.GetPartitions().GetValue()) > 0 { + for partitionKey, lv := range queryID.GetPartitions().GetValue() { + if lv.GetStaticValue() != "" { + searchPartitions[partitionKey] = lv.GetStaticValue() + + } else if lv.GetInputBinding() != nil { + inputVar := lv.GetInputBinding().GetVar() + if val, ok := otherInputs[inputVar]; ok { + strVal, err := lib.RenderLiteral(val) + if err != nil { + return core.Literal{}, fmt.Errorf("failed to render input [%s] for partition [%s] with error: %w", inputVar, partitionKey, err) + } + searchPartitions[partitionKey] = strVal + } else { + return core.Literal{}, fmt.Errorf("input binding [%s] not found in input data", inputVar) + } + + // This is like AnArtifact.query(time_partition=Upstream.time_partition - timedelta(days=1)) or + // AnArtifact.query(region=Upstream.region) + } else if triggerBinding := lv.GetTriggeredBinding(); triggerBinding != nil { + a := triggeringArtifacts[triggerBinding.GetIndex()] + aP := a.GetArtifactId().GetPartitions().GetValue() + var searchValue = aP[triggerBinding.GetPartitionKey()].GetStaticValue() + + if triggerBinding.GetTransform() != "" { + logger.Infof(ctx, "Transform detected [%s] value [%s] with transform [%s], assuming datetime", + triggerBinding.GetPartitionKey(), searchValue, triggerBinding.GetTransform()) + t, err := e.parseStringAsDateAndApplyTransform(ctx, lib.DateFormat, searchValue, triggerBinding.GetTransform()) + if err != nil { + return core.Literal{}, fmt.Errorf("failed to parse [%s] transform %s] for [%+v]: %w", searchValue, triggerBinding.GetTransform(), a.GetArtifactId(), err) + } + logger.Debugf(ctx, "Transformed [%s] to [%s]", aP[triggerBinding.GetPartitionKey()].GetStaticValue(), searchValue) + searchValue = t.Format(lib.DateFormat) + } + searchPartitions[partitionKey] = searchValue + } + } + } + + artifactID := core.ArtifactID{ + ArtifactKey: &core.ArtifactKey{ + Project: launchPlanID.Project, + Domain: launchPlanID.Domain, + Name: queryID.ArtifactKey.Name, + }, + } + if len(searchPartitions) > 0 { + artifactID.Dimensions = &core.ArtifactID_Partitions{ + Partitions: models.PartitionsToIdl(searchPartitions), + } + } + + resp, err := e.service.GetArtifact(ctx, &artifact.GetArtifactRequest{ + Query: &core.ArtifactQuery{ + Identifier: &core.ArtifactQuery_ArtifactId{ + ArtifactId: &artifactID, + }, + }, + }) + if err != nil { + st, ok := status.FromError(err) + if ok && st.Code() == codes.NotFound { + if len(p.GetVar().GetType().GetUnionType().GetVariants()) > 0 { + for _, v := range p.GetVar().GetType().GetUnionType().GetVariants() { + if v.GetSimple() == core.SimpleType_NONE { + return core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_NoneType{ + NoneType: &core.Void{}, + }, + }, + }, + }, nil + } + } + } + } + } + + return *resp.Artifact.Spec.Value, nil + } + + return core.Literal{}, fmt.Errorf("query was neither binding nor artifact id [%v]", aq) +} + +func (e *TriggerEngine) EvaluateNewArtifact(ctx context.Context, artifact *artifact.Artifact) ([]core.WorkflowExecutionIdentifier, error) { + + if artifact.GetArtifactId().GetArtifactKey() == nil { + // metric + return nil, fmt.Errorf("artifact or its key cannot be nil") + } + + triggers, err := e.store.GetTriggersByArtifactKey(ctx, *artifact.ArtifactId.ArtifactKey) + if err != nil { + logger.Errorf(ctx, "Failed to get triggers for artifact [%+v]: %v", artifact.ArtifactId.ArtifactKey, err) + return nil, err + } + + for _, trigger := range triggers { + err := e.evaluateAndHandleTrigger(ctx, trigger, artifact) + if err != nil { + logger.Errorf(ctx, "Failed to evaluate trigger [%s]: %v", trigger.Name, err) + return nil, err + } + } + + // todo: capture return IDs + return nil, nil +} + +func NewTriggerEngine(ctx context.Context, storage StorageInterface, service artifact.ArtifactRegistryServer, scope promutils.Scope) (TriggerEngine, error) { + cfg := admin2.GetConfig(ctx) + clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx) + if err != nil { + return TriggerEngine{}, fmt.Errorf("failed to initialize clientset. Error: %w", err) + } + + return TriggerEngine{ + service: service, + adminClient: clients.AdminClient(), + store: storage, + scope: scope, + }, nil +} diff --git a/flyteartifacts/pkg/server/trigger_engine_test.go b/flyteartifacts/pkg/server/trigger_engine_test.go new file mode 100644 index 0000000000..3c61d6cc46 --- /dev/null +++ b/flyteartifacts/pkg/server/trigger_engine_test.go @@ -0,0 +1,13 @@ +package server + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestA(t *testing.T) { + dur := ParseDuration("P1D") + assert.Equal(t, time.Hour*24, dur) + +} diff --git a/flytestdlib/database/gorm.go b/flytestdlib/database/gorm.go index 246f47a872..03eb28f098 100644 --- a/flytestdlib/database/gorm.go +++ b/flytestdlib/database/gorm.go @@ -28,11 +28,11 @@ func GetGormLogger(ctx context.Context, logConfig *logger.Config) gormLogger.Int case logger.FatalLevel: fallthrough case logger.ErrorLevel: - logLevel = gormLogger.Error + logLevel = gormLogger.Info case logger.WarnLevel: fallthrough case logger.InfoLevel: - logLevel = gormLogger.Warn + logLevel = gormLogger.Info case logger.DebugLevel: logLevel = gormLogger.Info ignoreRecordNotFoundError = false diff --git a/flytestdlib/database/postgres.go b/flytestdlib/database/postgres.go index 404f691168..23d78a8c90 100644 --- a/flytestdlib/database/postgres.go +++ b/flytestdlib/database/postgres.go @@ -16,6 +16,8 @@ import ( const pqInvalidDBCode = "3D000" const pqDbAlreadyExistsCode = "42P04" +const PgDuplicatedForeignKey = "23503" +const PgDuplicatedKey = "23505" const defaultDB = "postgres" // Resolves a password value from either a user-provided inline value or a filepath whose contents contain a password. @@ -57,7 +59,7 @@ func CreatePostgresDbIfNotExists(ctx context.Context, gormConfig *gorm.Config, p if err == nil { return gormDb, nil } - if !isPgErrorWithCode(err, pqInvalidDBCode) { + if !IsPgErrorWithCode(err, pqInvalidDBCode) { return nil, err } logger.Warningf(ctx, "Database [%v] does not exist", pgConfig.DbName) @@ -80,7 +82,7 @@ func CreatePostgresDbIfNotExists(ctx context.Context, gormConfig *gorm.Config, p result := gormDb.Exec(createDBStatement) if result.Error != nil { - if !isPgErrorWithCode(result.Error, pqDbAlreadyExistsCode) { + if !IsPgErrorWithCode(result.Error, pqDbAlreadyExistsCode) { return nil, result.Error } logger.Warningf(ctx, "Got DB already exists error for [%s], skipping...", pgConfig.DbName) @@ -89,7 +91,7 @@ func CreatePostgresDbIfNotExists(ctx context.Context, gormConfig *gorm.Config, p return gorm.Open(dialector, gormConfig) } -func isPgErrorWithCode(err error, code string) bool { +func IsPgErrorWithCode(err error, code string) bool { // Make sure the pgconn you're using is "github.com/jackc/pgx/v5/pgconn" // See https://github.com/go-gorm/gorm/issues/4135 pgErr := &pgconn.PgError{} diff --git a/flytestdlib/database/postgres_test.go b/flytestdlib/database/postgres_test.go index fab6822726..b84698291c 100644 --- a/flytestdlib/database/postgres_test.go +++ b/flytestdlib/database/postgres_test.go @@ -112,7 +112,7 @@ func TestIsInvalidDBPgError(t *testing.T) { tc := tc t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.ExpectedResult, isPgErrorWithCode(tc.Err, pqInvalidDBCode)) + assert.Equal(t, tc.ExpectedResult, IsPgErrorWithCode(tc.Err, pqInvalidDBCode)) }) } } @@ -150,7 +150,7 @@ func TestIsPgDbAlreadyExistsError(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.ExpectedResult, isPgErrorWithCode(tc.Err, pqDbAlreadyExistsCode)) + assert.Equal(t, tc.ExpectedResult, IsPgErrorWithCode(tc.Err, pqDbAlreadyExistsCode)) }) } }