Skip to content

Commit

Permalink
Artf/triggers (#4394)
Browse files Browse the repository at this point in the history
This PR adds the trigger concept.

Other changes:
* Forgot to make the index on artifactkey unique.
* Fix to create artifact, it wasn't pulling back out the artifact key.
* Updated the Artifact service to handle the passing of artifacts created by the events processor to the trigger handling components. Currently, and this is potentially a design flaw, the events processor has no way of contacting the trigger component directly.

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Nov 10, 2023
1 parent 6792f4e commit ee03489
Show file tree
Hide file tree
Showing 20 changed files with 1,048 additions and 44 deletions.
6 changes: 4 additions & 2 deletions flyteartifacts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/NYTimes/gizmo v1.3.6
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v1.9.5
github.com/go-gormigrate/gormigrate/v2 v2.1.1
Expand All @@ -19,6 +20,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/gorm v1.25.5
)

Expand All @@ -43,7 +45,6 @@ require (
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/ncw/swift v1.0.53 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
Expand All @@ -87,6 +89,7 @@ require (
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
Expand All @@ -99,7 +102,6 @@ require (
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions flyteartifacts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
Expand Down Expand Up @@ -414,6 +415,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -473,6 +475,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -963,6 +966,7 @@ k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
Expand Down
35 changes: 32 additions & 3 deletions flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

type ArtifactKey struct {
gorm.Model
Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
Project string `gorm:"uniqueIndex:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}

type Artifact struct {
Expand Down Expand Up @@ -41,3 +41,32 @@ type Artifact struct {
// Here nullable in the case of workflow output.
RetryAttempt *uint32
}

type TriggerKey struct {
gorm.Model
Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
RunsOn []ArtifactKey `gorm:"many2many:active_trigger_artifact_keys;"`
}

type LaunchPlanID struct {
Name string `gorm:"not null;index:idx_lp_id;type:varchar(255)"`
Version string `gorm:"not null;type:varchar(255);index:idx_launch_plan_version"`
}

type Trigger struct {
gorm.Model
TriggerKeyID uint `gorm:"uniqueIndex:idx_trigger_pdnv"`
TriggerKey TriggerKey `gorm:"foreignKey:TriggerKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_trigger_version;uniqueIndex:idx_trigger_pdnv"`

// Unlike the one in the TriggerKey table, these are the list of artifact keys as specified by the user
// for this specific version. Currently just the key but can add additional fields in the future.
RunsOn []ArtifactKey `gorm:"many2many:trigger_ids_artifact_keys;"`

Active bool `gorm:"index:idx_active"`
LaunchPlanID LaunchPlanID `gorm:"embedded"`
LaunchPlanSpec []byte `gorm:"not null"`
LaunchPlanClosure []byte `gorm:"not null"`
}
98 changes: 95 additions & 3 deletions flyteartifacts/pkg/db/gorm_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package db

import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/jackc/pgx/v5/pgtype"
)

Expand Down Expand Up @@ -89,7 +92,6 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {

// gatepr: principal is missing still - can be added following discussion on source object.
// taskexecution and additional source information to be added when resolved.
// gatepr: implement tags
a := artifact.Artifact{
ArtifactId: &core.ArtifactID{
ArtifactKey: &core.ArtifactKey{
Expand Down Expand Up @@ -123,7 +125,97 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {
return models.Artifact{
Artifact: a,
OffloadedMetadata: "",
LiteralTypeBytes: nil,
LiteralValueBytes: nil,
LiteralTypeBytes: ga.LiteralType,
LiteralValueBytes: ga.LiteralValue,
}, nil
}

func ServiceToGormTrigger(serviceTrigger models.Trigger) Trigger {

t := Trigger{
TriggerKey: TriggerKey{
Project: serviceTrigger.Project,
Domain: serviceTrigger.Domain,
Name: serviceTrigger.Name,
},
Version: serviceTrigger.Version,
Active: serviceTrigger.Active,
LaunchPlanID: LaunchPlanID{
Name: serviceTrigger.LaunchPlan.Id.Name,
Version: serviceTrigger.LaunchPlan.Id.Version,
},
LaunchPlanSpec: serviceTrigger.SpecBytes,
LaunchPlanClosure: serviceTrigger.ClosureBytes,
}

var runsOn = make([]ArtifactKey, len(serviceTrigger.RunsOn))
for i, a := range serviceTrigger.RunsOn {
runsOn[i] = ArtifactKey{
Project: a.ArtifactKey.Project,
Domain: a.ArtifactKey.Domain,
Name: a.ArtifactKey.Name,
}
}
t.RunsOn = runsOn

return t
}

func GormToServiceTrigger(gormTrigger Trigger) (models.Trigger, error) {
spec := &admin.LaunchPlanSpec{}
closure := &admin.LaunchPlanClosure{}
if err := proto.Unmarshal(gormTrigger.LaunchPlanSpec, spec); err != nil {
return models.Trigger{}, err
}
if err := proto.Unmarshal(gormTrigger.LaunchPlanClosure, closure); err != nil {
return models.Trigger{}, err
}
lpID := core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Project: gormTrigger.TriggerKey.Project,
Domain: gormTrigger.TriggerKey.Domain,
Name: gormTrigger.LaunchPlanID.Name,
Version: gormTrigger.Version, // gormTrigger.LaunchPlanID.Version,
}
t := models.Trigger{
Project: gormTrigger.TriggerKey.Project,
Domain: gormTrigger.TriggerKey.Domain,
Name: gormTrigger.TriggerKey.Name,
Version: gormTrigger.Version,
Active: gormTrigger.Active,
LaunchPlanID: lpID,
LaunchPlan: &admin.LaunchPlan{
Id: &lpID,
Spec: spec,
Closure: closure,
},
SpecBytes: gormTrigger.LaunchPlanSpec,
ClosureBytes: gormTrigger.LaunchPlanClosure,
}

// TODO: This is a copy/paste of the code in transformers.go. Refactor.
// Basically the DB model only has artifact keys, not whole artifact IDs including partitions
// so pull the artifact IDs again from the spec.
lc := spec.GetEntityMetadata().GetLaunchConditions()

var err error
idlTrigger := core.Trigger{}
err = ptypes.UnmarshalAny(lc, &idlTrigger)
if err != nil {
logger.Errorf(context.TODO(), "Failed to unmarshal launch conditions to idl, metadata: [%+v]", spec.GetEntityMetadata())
return models.Trigger{}, err
}
if len(idlTrigger.Triggers) == 0 {
return models.Trigger{}, fmt.Errorf("invalid request to CreateTrigger, launch conditions cannot be empty")
}
var runsOnArtifactIDs = make([]core.ArtifactID, len(idlTrigger.Triggers))
for i, t := range idlTrigger.Triggers {
runsOnArtifactIDs[i] = *t
runsOnArtifactIDs[i].ArtifactKey.Project = lpID.Project
runsOnArtifactIDs[i].ArtifactKey.Domain = lpID.Domain
}

t.RunsOn = runsOnArtifactIDs

return t, nil
}
4 changes: 4 additions & 0 deletions flyteartifacts/pkg/db/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type gormMetrics struct {
GetDuration promutils.StopWatch
UpdateDuration promutils.StopWatch
SearchDuration promutils.StopWatch

CreateTriggerDuration promutils.StopWatch
}

func newMetrics(scope promutils.Scope) gormMetrics {
Expand All @@ -26,5 +28,7 @@ func newMetrics(scope promutils.Scope) gormMetrics {
"update", "time taken to update an entry", time.Millisecond),
SearchDuration: scope.MustNewStopWatch(
"search", "time taken for searching", time.Millisecond),
CreateTriggerDuration: scope.MustNewStopWatch(
"createT", "time taken to create a new trigger", time.Millisecond),
}
}
43 changes: 42 additions & 1 deletion flyteartifacts/pkg/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,48 @@ var Migrations = []*gormigrate.Migration{
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
"artifact_keys", "artifacts",
"artifacts", "artifact_keys",
)
},
},
{
ID: "2023-10-22-trigger",
Migrate: func(tx *gorm.DB) error {
type TriggerKey struct {
gorm.Model
Project string `gorm:"index:idx_t_pdn;index:idx_t_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_t_pdn;index:idx_t_dom;type:varchar(64)"`
Name string `gorm:"index:idx_t_pdn;index:idx_t_name;type:varchar(255)"`
RunsOn []ArtifactKey `gorm:"many2many:active_trigger_artifact_keys;"`
}

type LaunchPlanID struct {
Name string `gorm:"not null;index:idx_lp_id;type:varchar(255)"`
Version string `gorm:"not null;type:varchar(255);index:idx_launch_plan_version"`
}

type Trigger struct {
gorm.Model
TriggerKeyID uint `gorm:"uniqueIndex:idx_trigger_pdnv"`
TriggerKey TriggerKey `gorm:"foreignKey:TriggerKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_trigger_version;uniqueIndex:idx_trigger_pdnv"`

// Unlike the one in the TriggerKey table, these are the list of artifact keys as specified by the user
// for this specific version. Currently just the key but can add additional fields in the future.
RunsOn []ArtifactKey `gorm:"many2many:trigger_ids_artifact_keys;"`

Active bool `gorm:"index:idx_t_active"`
LaunchPlanID LaunchPlanID `gorm:"embedded"`
LaunchPlanSpec []byte `gorm:"not null"`
LaunchPlanClosure []byte `gorm:"not null"`
}
return tx.AutoMigrate(
&TriggerKey{}, &Trigger{},
)
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
"triggers", "trigger_keys",
)
},
},
Expand Down
Loading

0 comments on commit ee03489

Please sign in to comment.