Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

artf/updates to source #4443

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
Principal: spec.GetMetadata().Principal,

Check warning on line 219 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L219

Added line #L219 was not covered by tests
LaunchPlanId: spec.LaunchPlan,
}, nil
}
Expand Down Expand Up @@ -371,6 +372,7 @@
OutputInterface: typedInterface,
InputData: inputs,
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
Principal: spec.GetMetadata().Principal,

Check warning on line 375 in flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go#L375

Added line #L375 was not covered by tests
LaunchPlanId: spec.LaunchPlan,
}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
}

// ExtractArtifactKeys pulls out artifact keys from Literals for lineage
// todo: rename this function to be less confusing
func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
var artifactKeys []string

Expand All @@ -713,7 +714,6 @@ func (m *ExecutionManager) ExtractArtifactKeys(input *core.Literal) []string {
}
}
if input.GetCollection() != nil {
// TODO: Make recursive
for _, v := range input.GetCollection().Literals {
mapKeys := m.ExtractArtifactKeys(v)
artifactKeys = append(artifactKeys, mapKeys...)
Expand Down Expand Up @@ -1218,6 +1218,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, inputArtifactKeys []string, usedArtifactIDs []*core.ArtifactID) {

if len(inputArtifactKeys) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with input artifact keys [%+v] and used artifact ids [%+v]", executionID, inputArtifactKeys, usedArtifactIDs)

Expand Down
7 changes: 6 additions & 1 deletion flyteartifacts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/flyteorg/flyte/flyteartifacts
go 1.19

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/NYTimes/gizmo v1.3.6
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.14.0
Expand All @@ -21,6 +22,7 @@ require (
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/driver/postgres v1.5.3
gorm.io/gorm v1.25.5
)

Expand Down Expand Up @@ -61,7 +63,11 @@ require (
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
Expand Down Expand Up @@ -105,7 +111,6 @@ require (
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/postgres v1.5.3 // indirect
gorm.io/driver/sqlite v1.5.4 // indirect
k8s.io/apimachinery v0.24.1 // indirect
k8s.io/client-go v0.24.1 // indirect
Expand Down
82 changes: 82 additions & 0 deletions flyteartifacts/go.sum

Large diffs are not rendered by default.

23 changes: 18 additions & 5 deletions flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ type ArtifactKey struct {
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}

// WorkflowExecution - The Project/Domain is assumed to always be the same as the Artifact.
// The
type WorkflowExecution struct {
gorm.Model
ExecutionProject string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_proj;type:varchar(64)"`
ExecutionDomain string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_dom;type:varchar(64)"`
ExecutionName string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_name;type:varchar(255)"`
InputArtifacts []Artifact `gorm:"many2many:execution_inputs;"`
}

type Artifact struct {
gorm.Model
ArtifactKeyID uint
ArtifactKeyID uint `gorm:"not null;uniqueIndex:idx_artifact_version"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_artifact_version"`
Version string `gorm:"not null;type:varchar(255);uniqueIndex:idx_artifact_version"`
Partitions pgtype.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`

LiteralType []byte `gorm:"not null"`
Expand All @@ -26,8 +36,10 @@ type Artifact struct {
MetadataType string `gorm:"type:varchar(64)"`
OffloadedUserMetadata string `gorm:"type:varchar(255)"`

// Project/Domain assumed to always be the same as the Artifact
ExecutionName string `gorm:"type:varchar(255)"`
WorkflowExecutionID uint `gorm:"index:idx_artifact_wf_exec_id"`
WorkflowExecution WorkflowExecution `gorm:"foreignKey:WorkflowExecutionID;references:ID"`
NodeID string `gorm:"type:varchar(128)"`

WorkflowProject string `gorm:"type:varchar(64)"`
WorkflowDomain string `gorm:"type:varchar(64)"`
WorkflowName string `gorm:"type:varchar(255)"`
Expand All @@ -36,10 +48,11 @@ type Artifact struct {
TaskDomain string `gorm:"type:varchar(64)"`
TaskName string `gorm:"type:varchar(255)"`
TaskVersion string `gorm:"type:varchar(255)"`
NodeID string `gorm:"type:varchar(64)"`
// See Admin migration for note.
// Here nullable in the case of workflow output.
RetryAttempt *uint32

Principal string `gorm:"type:varchar(256)"`
}

type TriggerKey struct {
Expand Down
65 changes: 49 additions & 16 deletions flyteartifacts/pkg/db/gorm_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,31 @@ func ServiceToGormModel(serviceModel models.Artifact) (Artifact, error) {
Description: serviceModel.Artifact.Spec.ShortDescription,
MetadataType: serviceModel.Artifact.Spec.MetadataType,
OffloadedUserMetadata: serviceModel.OffloadedMetadata,
}

ExecutionName: serviceModel.Artifact.Spec.Execution.Name,
if serviceModel.Artifact.GetSource().GetWorkflowExecution() != nil {
// artifact and execution project/domains are always the same.
// Note the service model will not have workflow execution if it was an upload
wfExec := WorkflowExecution{
ExecutionProject: serviceModel.Artifact.ArtifactId.ArtifactKey.Project,
ExecutionDomain: serviceModel.Artifact.ArtifactId.ArtifactKey.Domain,
ExecutionName: serviceModel.Source.WorkflowExecution.Name,
}
ga.WorkflowExecution = wfExec
ga.NodeID = serviceModel.Source.NodeId
}
if serviceModel.GetSource() != nil {
ga.Principal = serviceModel.GetSource().GetPrincipal()
}

if serviceModel.Artifact.Spec.TaskExecution != nil {
ga.TaskProject = serviceModel.Artifact.Spec.TaskExecution.TaskId.Project
ga.TaskDomain = serviceModel.Artifact.Spec.TaskExecution.TaskId.Domain
ga.TaskName = serviceModel.Artifact.Spec.TaskExecution.TaskId.Name
ga.TaskVersion = serviceModel.Artifact.Spec.TaskExecution.TaskId.Version
ga.RetryAttempt = &serviceModel.Artifact.Spec.TaskExecution.RetryAttempt
if serviceModel.GetSource().GetTaskId() != nil {
// If task id is there, so should the retry attempt
retry := serviceModel.GetSource().GetRetryAttempt()
ga.RetryAttempt = &retry
ga.TaskProject = serviceModel.GetSource().GetTaskId().Project
ga.TaskDomain = serviceModel.GetSource().GetTaskId().Domain
ga.TaskName = serviceModel.GetSource().GetTaskId().Name
ga.TaskVersion = serviceModel.GetSource().GetTaskId().Version
}

return ga, nil
Expand Down Expand Up @@ -102,15 +117,8 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {
Version: ga.Version,
},
Spec: &artifact.ArtifactSpec{
Value: lit,
Type: lt,
TaskExecution: nil,
Execution: &core.WorkflowExecutionIdentifier{
Project: ga.ArtifactKey.Project,
Domain: ga.ArtifactKey.Domain,
Name: ga.ExecutionName,
},
Principal: "",
Value: lit,
Type: lt,
ShortDescription: ga.Description,
UserMetadata: nil,
MetadataType: ga.MetadataType,
Expand All @@ -121,6 +129,31 @@ func GormToServiceModel(ga Artifact) (models.Artifact, error) {
if p != nil {
a.ArtifactId.Dimensions = &core.ArtifactID_Partitions{Partitions: p}
}
aSrc := artifact.ArtifactSource{
NodeId: ga.NodeID,
Principal: ga.Principal,
}
if ga.RetryAttempt != nil {
aSrc.RetryAttempt = *ga.RetryAttempt
}
if ga.WorkflowExecutionID != 0 {
execID := &core.WorkflowExecutionIdentifier{
Project: ga.ArtifactKey.Project,
Domain: ga.ArtifactKey.Domain,
Name: ga.WorkflowExecution.ExecutionName,
}
aSrc.WorkflowExecution = execID
}
if ga.TaskProject != "" {
aSrc.TaskId = &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: ga.TaskProject,
Domain: ga.TaskDomain,
Name: ga.TaskName,
Version: ga.TaskVersion,
}
}
a.Source = &aSrc

return models.Artifact{
Artifact: a,
Expand Down
36 changes: 27 additions & 9 deletions flyteartifacts/pkg/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ var Migrations = []*gormigrate.Migration{
Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}
type WorkflowExecution struct {
gorm.Model
ExecutionProject string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_proj;type:varchar(64)"`
ExecutionDomain string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_dom;type:varchar(64)"`
ExecutionName string `gorm:"uniqueIndex:idx_we_pdn;index:idx_we_name;type:varchar(255)"`
InputArtifacts []Artifact `gorm:"many2many:execution_inputs;"`
}

type Artifact struct {
gorm.Model
ArtifactKeyID uint `gorm:"uniqueIndex:idx_pdnv"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"type:varchar(255);index:idx_artifact_version;uniqueIndex:idx_pdnv"`
Partitions *pgtype.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`
ArtifactKeyID uint `gorm:"not null;uniqueIndex:idx_artifact_version"`
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);uniqueIndex:idx_artifact_version"`
Partitions pgtype.Hstore `gorm:"type:hstore;index:idx_artifact_partitions"`

LiteralType []byte `gorm:"not null"`
LiteralValue []byte `gorm:"not null"`
Expand All @@ -30,8 +38,10 @@ var Migrations = []*gormigrate.Migration{
MetadataType string `gorm:"type:varchar(64)"`
OffloadedUserMetadata string `gorm:"type:varchar(255)"`

// Project/Domain assumed to always be the same as the Artifact
ExecutionName string `gorm:"type:varchar(255)"`
WorkflowExecutionID uint `gorm:"index:idx_artifact_wf_exec_id"`
WorkflowExecution WorkflowExecution `gorm:"foreignKey:WorkflowExecutionID;references:ID"`
NodeID string `gorm:"type:varchar(128)"`

WorkflowProject string `gorm:"type:varchar(64)"`
WorkflowDomain string `gorm:"type:varchar(64)"`
WorkflowName string `gorm:"type:varchar(255)"`
Expand All @@ -40,14 +50,22 @@ var Migrations = []*gormigrate.Migration{
TaskDomain string `gorm:"type:varchar(64)"`
TaskName string `gorm:"type:varchar(255)"`
TaskVersion string `gorm:"type:varchar(255)"`
NodeID string `gorm:"type:varchar(64)"`
// See Admin migration for note.
// Here nullable in the case of workflow output.
RetryAttempt *uint32

Principal string `gorm:"type:varchar(256)"`
}
return tx.AutoMigrate(
&ArtifactKey{}, &Artifact{},
err := tx.AutoMigrate(
&ArtifactKey{}, &Artifact{}, &WorkflowExecution{},
)
if err != nil {
return err
}

tx.Exec("CREATE INDEX idx_gin_artifact_partitions ON artifacts USING GIN (partitions)")
tx.Exec("CREATE INDEX idx_created_at ON artifacts (created_at desc)")
return tx.Error
},
Rollback: func(tx *gorm.DB) error {
return tx.Migrator().DropTable(
Expand Down
Loading
Loading