Skip to content

Commit

Permalink
[wip] artf/impl create get (#4258)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Oct 20, 2023
1 parent 5c659aa commit 729b300
Show file tree
Hide file tree
Showing 23 changed files with 767 additions and 256 deletions.
16 changes: 8 additions & 8 deletions flyteartifacts/artifact_config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
artifactsServer:
myTestValue: "test from file"
database:
postgres:
dbname: artifacts
logger:
level: 5
show-source: true
# This is an (incomplete) configure file for the artifact service, here just as an example.
#artifactsServer:
# database:
# postgres:
# dbname: your pg db
#logger:
# level: 5
# show-source: true
3 changes: 2 additions & 1 deletion flyteartifacts/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ func main() {
rootCmd := sharedCmd.NewRootCmd("artifacts", server.GrpcRegistrationHook, server.HttpRegistrationHook)
migs := server.GetMigrations(ctx)
initializationSql := "create extension if not exists hstore;"
rootCmd.AddCommand(sharedCmd.NewMigrateCmd(migs, initializationSql))
dbConfig := server.GetDbConfig()
rootCmd.AddCommand(sharedCmd.NewMigrateCmd(migs, dbConfig, initializationSql))
err := rootCmd.ExecuteContext(ctx)
if err != nil {
panic(err)
Expand Down
4 changes: 2 additions & 2 deletions flyteartifacts/cmd/shared/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
)

// NewMigrateCmd represents the migrate command
func NewMigrateCmd(migs []*gormigrate.Migration, initializationSql string) *cobra.Command {
func NewMigrateCmd(migs []*gormigrate.Migration, databaseConfig *database.DbConfig, initializationSql string) *cobra.Command {
return &cobra.Command{
Use: "migrate",
Short: "This command will run all the migrations for the database",
RunE: func(cmd *cobra.Command, args []string) error {
return database.Migrate(context.Background(), migs, initializationSql)
return database.Migrate(context.Background(), databaseConfig, migs, initializationSql)
},
}
}
51 changes: 51 additions & 0 deletions flyteartifacts/pkg/blob/artifact_blob_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package blob

import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/golang/protobuf/ptypes/any"
)

type ArtifactBlobStore struct {
store *storage.DataStore
}

// OffloadArtifactCard stores the artifact card in the blob store
func (a *ArtifactBlobStore) OffloadArtifactCard(ctx context.Context, name, version string, card *any.Any) (storage.DataReference, error) {
uri, err := a.store.ConstructReference(ctx, a.store.GetBaseContainerFQN(ctx), name, version)
if err != nil {
return "", fmt.Errorf("failed to construct data reference for [%s/%s] with err: %v", name, version, err)
}
err = a.store.WriteProtobuf(ctx, uri, storage.Options{}, card)
if err != nil {
return "", fmt.Errorf("failed to write protobuf to %s with err: %v", uri, err)
}
return uri, nil
}

func (a *ArtifactBlobStore) RetrieveArtifactCard(ctx context.Context, uri storage.DataReference) (*any.Any, error) {
card := &any.Any{}
err := a.store.ReadProtobuf(ctx, uri, card)
if err != nil {
return nil, fmt.Errorf("failed to read protobuf from %s with err: %v", uri, err)
}
return nil, nil
}

func NewArtifactBlobStore(ctx context.Context, scope promutils.Scope) ArtifactBlobStore {
storageCfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration).ArtifactBlobStoreConfig
logger.Infof(ctx, "Initializing storage client with config [%+v]", storageCfg)

dataStorageClient, err := storage.NewDataStore(&storageCfg, scope)
if err != nil {
logger.Error(ctx, "Failed to initialize storage config")
panic(err)
}
return ArtifactBlobStore{
store: dataStorageClient,
}
}
7 changes: 6 additions & 1 deletion flyteartifacts/pkg/configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package configuration
import (
"github.com/flyteorg/flyte/flytestdlib/config"
stdLibDb "github.com/flyteorg/flyte/flytestdlib/database"
stdLibStorage "github.com/flyteorg/flyte/flytestdlib/storage"
"time"
)

const artifactsServer = "artifactsServer"

type ApplicationConfiguration struct {
ArtifactDatabaseConfig stdLibDb.DbConfig `json:"artifactDatabaseConfig" pflag:",Database configuration"`
ArtifactDatabaseConfig stdLibDb.DbConfig `json:"artifactDatabaseConfig" pflag:",Database configuration"`
ArtifactBlobStoreConfig stdLibStorage.Config `json:"artifactBlobStoreConfig" pflag:",Blob store configuration"`
}

var defaultApplicationConfiguration = ApplicationConfiguration{
Expand All @@ -28,6 +30,9 @@ var defaultApplicationConfiguration = ApplicationConfiguration{
ExtraOptions: "sslmode=disable",
},
},
ArtifactBlobStoreConfig: stdLibStorage.Config{
InitContainer: "flyte-artifacts",
},
}

var ApplicationConfig = config.MustRegisterSection(artifactsServer, &defaultApplicationConfiguration)
1 change: 0 additions & 1 deletion flyteartifacts/pkg/db/gorm_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type ArtifactKey struct {

type Artifact struct {
gorm.Model
// gatepr: this doesn't actually create a foreign key...
ArtifactKeyID uint
ArtifactKey ArtifactKey `gorm:"foreignKey:ArtifactKeyID;references:ID"`
Version string `gorm:"not null;type:varchar(255);index:idx_artifact_version"`
Expand Down
129 changes: 129 additions & 0 deletions flyteartifacts/pkg/db/gorm_transformers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package db

import (
"context"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"github.com/jackc/pgx/v5/pgtype"
)

func PartitionsIdlToHstore(idlPartitions *core.Partitions) pgtype.Hstore {
ctx := context.Background()
if idlPartitions == nil || idlPartitions.GetValue() == nil {
return nil
}
var hstore = make(pgtype.Hstore)

for k, v := range idlPartitions.GetValue() {
if len(v.GetStaticValue()) == 0 {
logger.Warningf(ctx, "Partition key [%s] missing static value, [%+v]", k, v.GetValue())
continue
}
sv := v.GetStaticValue()
hstore[k] = &sv
}
return hstore
}

func HstoreToIdlPartitions(hs pgtype.Hstore) *core.Partitions {
if hs == nil || len(hs) == 0 {
return nil
}
m := make(map[string]*core.LabelValue, len(hs))
for k, v := range hs {
m[k] = &core.LabelValue{
Value: &core.LabelValue_StaticValue{
StaticValue: *v,
},
}
}
return &core.Partitions{
Value: m,
}
}

func ServiceToGormModel(serviceModel models.Artifact) (Artifact, error) {
partitions := PartitionsIdlToHstore(serviceModel.Artifact.GetArtifactId().GetPartitions())

ga := Artifact{
ArtifactKey: ArtifactKey{
Project: serviceModel.Artifact.ArtifactId.ArtifactKey.Project,
Domain: serviceModel.Artifact.ArtifactId.ArtifactKey.Domain,
Name: serviceModel.Artifact.ArtifactId.ArtifactKey.Name,
},
Version: serviceModel.Artifact.ArtifactId.Version,
Partitions: partitions,

LiteralType: serviceModel.LiteralTypeBytes,
LiteralValue: serviceModel.LiteralValueBytes,
Description: serviceModel.Artifact.Spec.ShortDescription,
MetadataType: serviceModel.Artifact.Spec.MetadataType,
OffloadedUserMetadata: serviceModel.OffloadedMetadata,

ExecutionName: serviceModel.Artifact.Spec.Execution.Name,
}

if serviceModel.Artifact.Spec.TaskExecution != nil {
ga.TaskProject = serviceModel.Artifact.Spec.TaskExecution.TaskId.Project
ga.TaskDomain = serviceModel.Artifact.Spec.TaskExecution.TaskId.Domain
ga.TaskName = serviceModel.Artifact.Spec.TaskExecution.TaskId.Name
ga.TaskVersion = serviceModel.Artifact.Spec.TaskExecution.TaskId.Version
ga.RetryAttempt = &serviceModel.Artifact.Spec.TaskExecution.RetryAttempt
}

return ga, nil
}

func GormToServiceModel(ga Artifact) (models.Artifact, error) {
lt := &core.LiteralType{}
lit := &core.Literal{}
if err := proto.Unmarshal(ga.LiteralType, lt); err != nil {
return models.Artifact{}, err
}
if err := proto.Unmarshal(ga.LiteralValue, lit); err != nil {
return models.Artifact{}, err
}

// gatepr: principal is missing still - can be added following discussion on source object.
// taskexecution and additional source information to be added when resolved.
// gatepr: implement tags
a := artifact.Artifact{
ArtifactId: &core.ArtifactID{
ArtifactKey: &core.ArtifactKey{
Project: ga.ArtifactKey.Project,
Domain: ga.ArtifactKey.Domain,
Name: ga.ArtifactKey.Name,
},
Version: ga.Version,
},
Spec: &artifact.ArtifactSpec{
Value: lit,
Type: lt,
TaskExecution: nil,
Execution: &core.WorkflowExecutionIdentifier{
Project: ga.ArtifactKey.Project,
Domain: ga.ArtifactKey.Domain,
Name: ga.ExecutionName,
},
Principal: "",
ShortDescription: ga.Description,
UserMetadata: nil,
MetadataType: ga.MetadataType,
},
Tags: nil,
}
p := HstoreToIdlPartitions(ga.Partitions)
if p != nil {
a.ArtifactId.Dimensions = &core.ArtifactID_Partitions{Partitions: p}
}

return models.Artifact{
Artifact: a,
OffloadedMetadata: "",
LiteralTypeBytes: nil,
LiteralValueBytes: nil,
}, nil
}
6 changes: 3 additions & 3 deletions flyteartifacts/pkg/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ var Migrations = []*gormigrate.Migration{
Migrate: func(tx *gorm.DB) error {
type ArtifactKey struct {
gorm.Model
Project string `gorm:"index:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"index:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"index:idx_pdn;index:idx_name;type:varchar(255)"`
Project string `gorm:"uniqueIndex:idx_pdn;index:idx_proj;type:varchar(64)"`
Domain string `gorm:"uniqueIndex:idx_pdn;index:idx_dom;type:varchar(64)"`
Name string `gorm:"uniqueIndex:idx_pdn;index:idx_name;type:varchar(255)"`
}
type Artifact struct {
gorm.Model
Expand Down
Loading

0 comments on commit 729b300

Please sign in to comment.