Skip to content

Commit

Permalink
fix: only drop databases in ftl dev when running with --recreate (#3596)
Browse files Browse the repository at this point in the history
Closes #3515
  • Loading branch information
jvmakine authored Dec 3, 2024
1 parent 5d47ef4 commit d476a24
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
43 changes: 25 additions & 18 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ var redPandaBrokers = []string{"127.0.0.1:19092"}
var pubSubNameLimit = 249 // 255 (filename limit) - 6 (partition id)

// NewDevProvisioner creates a new provisioner that provisions resources locally when running FTL in dev mode
func NewDevProvisioner(postgresPort int, mysqlPort int) *InMemProvisioner {
func NewDevProvisioner(postgresPort int, mysqlPort int, recreate bool) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypePostgres: provisionPostgres(postgresPort),
ResourceTypeMysql: provisionMysql(mysqlPort),
ResourceTypePostgres: provisionPostgres(postgresPort, recreate),
ResourceTypeMysql: provisionMysql(mysqlPort, recreate),
ResourceTypeTopic: provisionTopic(),
ResourceTypeSubscription: provisionSubscription(),
})
}
func provisionMysql(mysqlPort int) InMemResourceProvisionerFn {
func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
return func(ctx context.Context, rc *provisioner.ResourceContext, module, id string, previous *provisioner.Resource) (*provisioner.Resource, error) {
mysql, ok := rc.Resource.Resource.(*provisioner.Resource_Mysql)
if !ok {
Expand All @@ -56,7 +56,7 @@ func provisionMysql(mysqlPort int) InMemResourceProvisionerFn {
return nil, fmt.Errorf("failed to query database: %w", err)
case <-retry.C:
var ret *provisioner.Resource
ret, err = establishMySQLDB(ctx, rc, mysqlDSN, dbName, mysql, mysqlPort)
ret, err = establishMySQLDB(ctx, rc, mysqlDSN, dbName, mysql, mysqlPort, recreate)
if err != nil {
logger.Debugf("failed to establish mysql database: %s", err.Error())
continue
Expand All @@ -67,7 +67,7 @@ func provisionMysql(mysqlPort int) InMemResourceProvisionerFn {
}
}

func establishMySQLDB(ctx context.Context, rc *provisioner.ResourceContext, mysqlDSN string, dbName string, mysql *provisioner.Resource_Mysql, mysqlPort int) (*provisioner.Resource, error) {
func establishMySQLDB(ctx context.Context, rc *provisioner.ResourceContext, mysqlDSN string, dbName string, mysql *provisioner.Resource_Mysql, mysqlPort int, recreate bool) (*provisioner.Resource, error) {
conn, err := otelsql.Open("mysql", mysqlDSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to mysql: %w", err)
Expand All @@ -79,16 +79,19 @@ func establishMySQLDB(ctx context.Context, rc *provisioner.ResourceContext, mysq
return nil, fmt.Errorf("failed to query database: %w", err)
}
defer res.Close()
if res.Next() {

exists := res.Next()
if exists && recreate {
_, err = conn.ExecContext(ctx, "DROP DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to drop database %q: %w", dbName, err)
}
}

_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to create database %q: %w", dbName, err)
if !exists || recreate {
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to create database %q: %w", dbName, err)
}
}

if mysql.Mysql == nil {
Expand Down Expand Up @@ -119,7 +122,7 @@ func ProvisionPostgresForTest(ctx context.Context, module string, id string) (st
rc.Resource = &provisioner.Resource{
Resource: &provisioner.Resource_Postgres{},
}
res, err := provisionPostgres(15432)(ctx, rc, module, id+"_test", nil)
res, err := provisionPostgres(15432, true)(ctx, rc, module, id+"_test", nil)
if err != nil {
return "", err
}
Expand All @@ -132,14 +135,14 @@ func ProvisionMySQLForTest(ctx context.Context, module string, id string) (strin
rc.Resource = &provisioner.Resource{
Resource: &provisioner.Resource_Mysql{},
}
res, err := provisionMysql(13306)(ctx, rc, module, id+"_test", nil)
res, err := provisionMysql(13306, true)(ctx, rc, module, id+"_test", nil)
if err != nil {
return "", err
}
return res.GetMysql().GetOutput().WriteConnector.GetDsnDatabaseConnector().GetDsn(), nil
}

func provisionPostgres(postgresPort int) func(ctx context.Context, rc *provisioner.ResourceContext, module string, id string, previous *provisioner.Resource) (*provisioner.Resource, error) {
func provisionPostgres(postgresPort int, recreate bool) func(ctx context.Context, rc *provisioner.ResourceContext, module string, id string, previous *provisioner.Resource) (*provisioner.Resource, error) {
return func(ctx context.Context, rc *provisioner.ResourceContext, module, id string, previous *provisioner.Resource) (*provisioner.Resource, error) {
pg, ok := rc.Resource.Resource.(*provisioner.Resource_Postgres)
if !ok {
Expand All @@ -164,7 +167,9 @@ func provisionPostgres(postgresPort int) func(ctx context.Context, rc *provision
return nil, fmt.Errorf("failed to query database: %w", err)
}
defer res.Close()
if res.Next() {

exists := res.Next()
if exists && recreate {
// Terminate any dangling connections.
_, err = conn.ExecContext(ctx, `
SELECT pid, pg_terminate_backend(pid)
Expand All @@ -179,9 +184,11 @@ func provisionPostgres(postgresPort int) func(ctx context.Context, rc *provision
return nil, fmt.Errorf("failed to drop database %q: %w", dbName, err)
}
}
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to create database %q: %w", dbName, err)
if !exists || recreate {
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
return nil, fmt.Errorf("failed to create database %q: %w", dbName, err)
}
}

if pg.Postgres == nil {
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
)

type serveCmd struct {
Recreate bool `help:"Recreate the database even if it already exists." default:"false"`
serveCommonConfig
}

Expand All @@ -62,6 +61,7 @@ type serveCommonConfig struct {
GrafanaImage string `help:"The container image to start for the automatic Grafana instance" default:"grafana/otel-lgtm" env:"FTL_GRAFANA_IMAGE" hidden:""`
DisableGrafana bool `help:"Disable the automatic Grafana that is started if no telemetry collector is specified." default:"false"`
Ingress ingress.Config `embed:"" prefix:"ingress-"`
Recreate bool `help:"Recreate any stateful resources if they already exist." default:"false"`
controller.CommonConfig
provisioner.CommonProvisionerConfig
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *serveCommonConfig) run(
provisionerRegistry := &provisioner.ProvisionerRegistry{
Bindings: []*provisioner.ProvisionerBinding{
{
Provisioner: provisioner.NewDevProvisioner(s.DBPort, s.MysqlPort),
Provisioner: provisioner.NewDevProvisioner(s.DBPort, s.MysqlPort, s.Recreate),
Types: []provisioner.ResourceType{
provisioner.ResourceTypeMysql,
provisioner.ResourceTypePostgres,
Expand Down

0 comments on commit d476a24

Please sign in to comment.