Skip to content

Commit

Permalink
add start of boilerplate for single binary, renamed the cloud event c…
Browse files Browse the repository at this point in the history
…onfig option to v2 instead of a 'transform', add launch plan id to the cloud events, moved the database option for artifact service into its own application configuration instead of relying on a shared database because i think we should use a different postgres db name, drop the Omit(id) bit in the still non-working Create call, updated the cloud event rst a bit (#4256)

Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Oct 18, 2023
1 parent f718e6c commit 4674a08
Show file tree
Hide file tree
Showing 37 changed files with 1,317 additions and 3,175 deletions.
5 changes: 5 additions & 0 deletions cmd/single/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ type Config struct {
Propeller Propeller `json:"propeller" pflag:",Configuration to disable propeller or any of its components."`
Admin Admin `json:"admin" pflag:",Configuration to disable FlyteAdmin or any of its components"`
DataCatalog DataCatalog `json:"dataCatalog" pflag:",Configuration to disable DataCatalog or any of its components"`
Artifact Artifacts `json:"artifact" pflag:",Configuration to disable Artifact or any of its components"`
}

type Propeller struct {
Disabled bool `json:"disabled" pflag:",Disables flytepropeller in the single binary mode"`
DisableWebhook bool `json:"disableWebhook" pflag:",Disables webhook only"`
}

type Artifacts struct {
Disabled bool `json:"disabled" pflag:",Disables flyteartifacts in the single binary mode"`
}

type Admin struct {
Disabled bool `json:"disabled" pflag:",Disables flyteadmin in the single binary mode"`
DisableScheduler bool `json:"disableScheduler" pflag:",Disables Native scheduler in the single binary mode"`
Expand Down
14 changes: 14 additions & 0 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func startClusterResourceController(ctx context.Context) error {
return nil
}

func startArtifact(ctx context.Context, cfg Artifacts) error {
return nil
}

func startAdmin(ctx context.Context, cfg Admin) error {
logger.Infof(ctx, "Running Database Migrations...")
if err := adminServer.Migrate(ctx); err != nil {
Expand Down Expand Up @@ -192,6 +196,16 @@ var startCmd = &cobra.Command{
})
}

if !cfg.Artifact.Disabled {
g.Go(func() error {
err := startArtifact(childCtx, cfg.Artifact)
if err != nil {
logger.Panicf(childCtx, "Failed to start Artifacts server, err: %v", err)
}
return nil
})
}

if !cfg.Propeller.Disabled {
g.Go(func() error {
err := startPropeller(childCtx, cfg.Propeller)
Expand Down
2 changes: 1 addition & 1 deletion flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ database:
# options: "sslmode=disable"
cloudEvents:
enable: true
transformToCloudEvents: true
cloudEventVersion: v2
type: redis
redis:
addr: "localhost:30004"
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi
return implementations.NewNoopPublish()
}

if !cloudEventsConfig.TransformToCloudEvents {
if cloudEventsConfig.CloudEventVersion == runtimeInterfaces.CloudEventVersionv2 {
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig)
} else {
return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)
}
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,12 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
// TODO: metric this
// gatepr: metric this
logger.Warningf(ctx, "Error fetching output literal map %v", rawEvent)
return nil, err
}
}

if outputs == nil {
// todo: remove after testing
logger.Debugf(ctx, "Output data was nil for %v", rawEvent)
}

return &event.CloudEventWorkflowExecution{
RawEvent: rawEvent,
OutputData: outputs,
Expand All @@ -217,6 +212,7 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
}, nil
}

Expand Down Expand Up @@ -312,6 +308,7 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con
ArtifactIds: spec.GetMetadata().GetArtifactIds(),
ParentNodeExecution: spec.GetMetadata().GetParentNodeExecution(),
ReferenceExecution: spec.GetMetadata().GetReferenceExecution(),
LaunchPlanId: spec.LaunchPlan,
}, nil
}

Expand Down Expand Up @@ -403,7 +400,8 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
cloudEvt.SetSource(eventSource)
cloudEvt.SetID(eventID)
cloudEvt.SetTime(eventTime)
cloudEvt.SetExtension(jsonSchemaURLKey, jsonSchemaURL)
// TODO: Fill this in after we can get auto-generation in buf.
cloudEvt.SetExtension(jsonSchemaURLKey, "")

if err := cloudEvt.SetData(cloudevents.ApplicationJSON, buf.Bytes()); err != nil {
c.systemMetrics.PublishError.Inc()
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/async/cloudevent/implementations/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type PubSubSender struct {
}

func (s *PubSubSender) Send(ctx context.Context, notificationType string, event cloudevents.Event) error {
// gatepr: investigate why the previous statement didn't work.
// eventByte, err := pbcloudevents.Protobuf.Marshal(&event)
eventByte, err := json.Marshal(&event)
if err != nil {
logger.Errorf(ctx, "Failed to marshal cloudevent with error: %v", err)
Expand Down
13 changes: 12 additions & 1 deletion flyteadmin/pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,17 @@ type ExternalEventsConfig struct {
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}

//go:generate enumer -type=CloudEventVersion -trimprefix=CloudEventVersion
type CloudEventVersion int

const (
// This is the initial version of the cloud events
CloudEventVersionv1 CloudEventVersion = iota

// Version 2 of the cloud events add a lot more information into the event
CloudEventVersionv2
)

type CloudEventsConfig struct {
Enable bool `json:"enable"`
// Defines the cloud provider that backs the scheduler. In the absence of a specification the no-op, 'local'
Expand All @@ -552,7 +563,7 @@ type CloudEventsConfig struct {
// Specifies the time interval to wait before attempting to reconnect the notifications processor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
// Transform the raw events into the fuller cloudevent events before publishing
TransformToCloudEvents bool `json:"transformToCloudEvents"`
CloudEventVersion CloudEventVersion `json:"cloudEventVersion"`
}

// Configuration specific to notifications handling
Expand Down
49 changes: 49 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flyteartifacts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/lib/pq v1.1.1 // indirect
github.com/lib/pq v1.10.2 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand Down
3 changes: 2 additions & 1 deletion flyteartifacts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down
22 changes: 20 additions & 2 deletions flyteartifacts/pkg/configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,32 @@ package configuration

import (
"github.com/flyteorg/flyte/flytestdlib/config"
stdLibDb "github.com/flyteorg/flyte/flytestdlib/database"
"time"
)

const artifactsServer = "artifactsServer"

type ApplicationConfiguration struct {
MyTestVal string `json:"myTestValue" pflag:",Database configuration"`
ArtifactDatabaseConfig stdLibDb.DbConfig `json:"artifactDatabaseConfig" pflag:",Database configuration"`
}

var defaultApplicationConfiguration = ApplicationConfiguration{}
var defaultApplicationConfiguration = ApplicationConfiguration{
ArtifactDatabaseConfig: stdLibDb.DbConfig{
EnableForeignKeyConstraintWhenMigrating: true,
MaxIdleConnections: 10,
MaxOpenConnections: 100,
ConnMaxLifeTime: config.Duration{Duration: time.Hour},
Postgres: stdLibDb.PostgresConfig{
// These values are suitable for local sandbox development
Host: "localhost",
Port: 30001,
DbName: "artifacts",
User: "postgres",
Password: "postgres",
ExtraOptions: "sslmode=disable",
},
},
}

var ApplicationConfig = config.MustRegisterSection(artifactsServer, &defaultApplicationConfiguration)
9 changes: 5 additions & 4 deletions flyteartifacts/pkg/db/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flyteartifacts/pkg/models"
"github.com/flyteorg/flyte/flytestdlib/database"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -20,7 +21,7 @@ type RDSStorage struct {
func (r *RDSStorage) WriteOne(ctx context.Context, gormModel Artifact) (models.Artifact, error) {
timer := r.metrics.CreateDuration.Start()
logger.Debugf(ctx, "Attempt create artifact %s", gormModel.Version)
tx := r.db.Omit("id").Create(&gormModel)
tx := r.db.Create(&gormModel)
timer.Stop()
if tx.Error != nil {
logger.Errorf(ctx, "Failed to create artifact %+v", tx.Error)
Expand All @@ -39,15 +40,15 @@ func (r *RDSStorage) GetArtifact(ctx context.Context) (models.Artifact, error) {
}

func NewStorage(ctx context.Context, scope promutils.Scope) *RDSStorage {
dbCfg := database.GetConfig()
dbCfg := configuration.ApplicationConfig.GetConfig().(*configuration.ApplicationConfiguration).ArtifactDatabaseConfig
logConfig := logger.GetConfig()

db, err := database.GetDB(ctx, dbCfg, logConfig)
db, err := database.GetDB(ctx, &dbCfg, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
return &RDSStorage{
config: *dbCfg,
config: dbCfg,
db: db,
metrics: newMetrics(scope),
}
Expand Down
6 changes: 3 additions & 3 deletions flyteartifacts/pkg/db/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestWriteOne(t *testing.T) {
rds := NewStorage(ctx, scope)

one := uint32(1)
pval1 := "f51f"
pval1 := "51"
p := postgres.Hstore{"area": &pval1}

lt := &core.LiteralType{
Expand All @@ -54,9 +54,9 @@ func TestWriteOne(t *testing.T) {
ArtifactKey: ArtifactKey{
Project: "demotst",
Domain: "unit",
Name: "testname 1",
Name: "testname 2",
},
Version: "abc123/1/n0/7",
Version: "abc123/1/n0/8",
Partitions: &p,
LiteralType: ltBytes,
LiteralValue: litBytes,
Expand Down
5 changes: 0 additions & 5 deletions flyteartifacts/sandbox.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
artifactsServer:
myTestValue: "test from file"
database:
postgres:
dbname: artifacts
logger:
level: 5
show-source: true
42 changes: 0 additions & 42 deletions flyteidl/gen/pb-cpp/flyteidl/artifact/artifacts.grpc.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4674a08

Please sign in to comment.