Skip to content

Commit

Permalink
Update metadata db schema if table not exists (#738)
Browse files Browse the repository at this point in the history
* Update metadata db schema if table not exists

* Address comments

* Address comments
  • Loading branch information
manitgupta authored Jan 3, 2024
1 parent 9d850e7 commit 2e124dd
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 74 deletions.
148 changes: 76 additions & 72 deletions webv2/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,54 @@ const (
GOOGLE_SQL_DIALECT = "Google Standard SQL"
)

var TABLE_STATEMENTS = []string{
`CREATE TABLE IF NOT EXISTS SchemaConversionSession (
VersionId STRING(36) NOT NULL,
PreviousVersionId ARRAY<STRING(36)>,
SessionName STRING(50) NOT NULL,
EditorName STRING(100) NOT NULL,
DatabaseType STRING(50) NOT NULL,
DatabaseName STRING(50) NOT NULL,
Dialect STRING(50) NOT NULL,
Notes ARRAY<STRING(MAX)> NOT NULL,
Tags ARRAY<STRING(20)>,
SchemaChanges STRING(MAX),
SchemaConversionObject JSON NOT NULL,
CreateTimestamp TIMESTAMP NOT NULL,
) PRIMARY KEY(VersionId)`,
`CREATE TABLE IF NOT EXISTS SMT_JOBS (
JobId STRING(100) NOT NULL,
JobName STRING(100) NOT NULL,
JobType STRING(100) NOT NULL,
JobData JSON,
Dialect STRING(50) NOT NULL,
SpannerDatabaseName STRING(100) NOT NULL,
CreatedAt TIMESTAMP NOT NULL,
) PRIMARY KEY(JobId)`,
`CREATE TABLE IF NOT EXISTS SMT_RESOURCES (
ResourceId STRING(100) NOT NULL,
JobId STRING(100) NOT NULL,
ExternalId STRING(100) NOT NULL,
ResourceName STRING(100) NOT NULL,
ResourceType STRING(100) NOT NULL,
ResourceData JSON,
CreatedAt TIMESTAMP NOT NULL,
) PRIMARY KEY(ResourceId)`,
`CREATE TABLE IF NOT EXISTS SMT_STATES (
StateId STRING(100) NOT NULL,
StateVersion INT64 NOT NULL,
ResourceId STRING(100) NOT NULL,
StateData JSON,
CreatedAt TIMESTAMP NOT NULL,
) PRIMARY KEY(StateId, StateVersion)`,
}

func GetSpannerUri(projectId string, instanceId string) string {
return fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, constants.METADATA_DB)
}

// Creates the schema for the internal metadata database
func createDatabase(ctx context.Context, uri string) error {
func createDatabase(ctx context.Context, uri string, dbExists bool) error {

// Spanner uri will be in this format 'projects/project-id/instances/spanner-instance-id/databases/db-name'
matches := regexp.MustCompile("^(.*)/databases/(.*)$").FindStringSubmatch(uri)
Expand All @@ -54,100 +96,62 @@ func createDatabase(ctx context.Context, uri string) error {
return err
}
defer adminClient.Close()
fmt.Println("Creating database to store session metadata...")

// JobDetails contains the commit log of the jobs created using Spanner migration tool.
// JobResources contains the resources generated as part of a migration job (dataflow, datastream, pubsub etc.)
op, err := adminClient.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
Parent: spInstance,
CreateStatement: "CREATE DATABASE `" + dbName + "`",
ExtraStatements: []string{
`CREATE TABLE SchemaConversionSession (
VersionId STRING(36) NOT NULL,
PreviousVersionId ARRAY<STRING(36)>,
SessionName STRING(50) NOT NULL,
EditorName STRING(100) NOT NULL,
DatabaseType STRING(50) NOT NULL,
DatabaseName STRING(50) NOT NULL,
Dialect STRING(50) NOT NULL,
Notes ARRAY<STRING(MAX)> NOT NULL,
Tags ARRAY<STRING(20)>,
SchemaChanges STRING(MAX),
SchemaConversionObject JSON NOT NULL,
CreateTimestamp TIMESTAMP NOT NULL,
) PRIMARY KEY(VersionId)`,
`CREATE TABLE SMT_JOBS (
JobId STRING(100) NOT NULL,
JobName STRING(100) NOT NULL,
JobType STRING(100) NOT NULL,
JobData JSON,
Dialect STRING(50) NOT NULL,
SpannerDatabaseName STRING(100) NOT NULL,
CreatedAt TIMESTAMP NOT NULL,
) PRIMARY KEY(JobId)`,
`CREATE TABLE SMT_RESOURCES (
ResourceId STRING(100) NOT NULL,
JobId STRING(100) NOT NULL,
ExternalId STRING(100) NOT NULL,
ResourceName STRING(100) NOT NULL,
ResourceType STRING(100) NOT NULL,
ResourceData JSON,
CreatedAt TIMESTAMP NOT NULL,
) PRIMARY KEY(ResourceId)`,
`CREATE TABLE SMT_STATES (
StateId STRING(100) NOT NULL,
StateVersion INT64 NOT NULL,
ResourceId STRING(100) NOT NULL,
StateData JSON,
CreatedAt TIMESTAMP NOT NULL,
) PRIMARY KEY(StateId, StateVersion)`,
},
})
if err != nil {
return err
}
if _, err := op.Wait(ctx); err != nil {
return err
fmt.Println("Creating/Updating database to store session metadata...")
if dbExists {
op, err := adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: uri,
Statements: TABLE_STATEMENTS,
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
fmt.Printf("Updated database [%s]\n", matches[2])
} else {
op, err := adminClient.CreateDatabase(ctx, &adminpb.CreateDatabaseRequest{
Parent: spInstance,
CreateStatement: "CREATE DATABASE`" + dbName + "`",
ExtraStatements: TABLE_STATEMENTS,
})
if err != nil {
return err
}
if _, err := op.Wait(ctx); err != nil {
return err
}
fmt.Printf("Created database [%s]\n", matches[2])
}

fmt.Printf("Created database [%s]\n", matches[2])
return nil
}

func CheckOrCreateMetadataDb(projectId string, instanceId string) (isExist bool, isDbCreated bool) {
func CheckOrCreateMetadataDb(projectId string, instanceId string) bool {
uri := GetSpannerUri(projectId, instanceId)
if uri == "" {
fmt.Println("Invalid spanner uri")
return
return false
}

ctx := context.Background()
adminClient, err := database.NewDatabaseAdminClient(ctx)
if err != nil {
fmt.Println(err)
return
return false
}
defer adminClient.Close()

dbExists, err := conversion.CheckExistingDb(ctx, adminClient, uri)
if err != nil {
fmt.Println(err)
return
return false
}
if dbExists {
isExist = true
return
}

err = createDatabase(ctx, uri)
err = createDatabase(ctx, uri, dbExists)
if err != nil {
fmt.Println(err)
return
return false
}
fmt.Println("No existing database found to store session metadata.")
isDbCreated = true
isExist = true
return
return true
}

func GetSourceDatabaseFromDriver(driver string) (string, error) {
Expand Down
4 changes: 2 additions & 2 deletions webv2/session/session_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func SetSessionStorageConnectionState(projectId string, spInstanceId string) (bo
sessionState.IsOffline = true
return false, false
} else {
if isExist, isDbCreated := helpers.CheckOrCreateMetadataDb(projectId, spInstanceId); isExist {
if isDbCreated := helpers.CheckOrCreateMetadataDb(projectId, spInstanceId); isDbCreated {
sessionState.IsOffline = false
isConfigValid := isExist || isDbCreated
isConfigValid := isDbCreated
migrateMetadataDb(projectId, spInstanceId)
return isDbCreated, isConfigValid
} else {
Expand Down

0 comments on commit 2e124dd

Please sign in to comment.