Skip to content

Commit

Permalink
feat: default values final PR (GoogleCloudPlatform#976)
Browse files Browse the repository at this point in the history
* changes

* changes

* changes

* changes
  • Loading branch information
asthamohta authored Dec 31, 2024
1 parent 1a4e61b commit e18a9a0
Show file tree
Hide file tree
Showing 50 changed files with 594 additions and 196 deletions.
12 changes: 11 additions & 1 deletion cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/conversion"
"github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration"
Expand Down Expand Up @@ -134,7 +135,16 @@ func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interfa
return subcommands.ExitFailure
}
} else {
conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{})
ctx := context.Background()
ddlVerifier, err := expressions_api.NewDDLVerifierImpl(ctx, "", "")
if err != nil {
logger.Log.Error(fmt.Sprintf("error trying create ddl verifier: %v", err))
return subcommands.ExitFailure
}
sfs := &conversion.SchemaFromSourceImpl{
DdlVerifier: ddlVerifier,
}
conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, sfs)
if err != nil {
return subcommands.ExitFailure
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/schema_and_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/conversion"
"github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
Expand Down Expand Up @@ -136,7 +137,15 @@ func (cmd *SchemaAndDataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...
dbURI string
)
convImpl := &conversion.ConvImpl{}
conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, &conversion.SchemaFromSourceImpl{})
ddlVerifier, err := expressions_api.NewDDLVerifierImpl(ctx, "", "")
if err != nil {
logger.Log.Error(fmt.Sprintf("error trying create ddl verifier: %v", err))
return subcommands.ExitFailure
}
sfs := &conversion.SchemaFromSourceImpl{
DdlVerifier: ddlVerifier,
}
conv, err = convImpl.SchemaConv(cmd.project, sourceProfile, targetProfile, &ioHelper, sfs)
if err != nil {
panic(err)
}
Expand Down
9 changes: 5 additions & 4 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ const (
DLQ_GCS string = "dlq"

// VerifyExpresions API
CHECK_EXPRESSION = "CHECK"
DEFAUT_EXPRESSION = "DEFAULT"
DEFAULT_GENERATED = "DEFAULT_GENERATED"
TEMP_DB = "smt-staging-db"
CHECK_EXPRESSION = "CHECK"
DEFAULT_EXPRESSION = "DEFAULT"
DEFAULT_GENERATED = "DEFAULT_GENERATED"
TEMP_DB = "smt-staging-db"
DB_URI = "projects/%s/instances/%s/databases/%s"

// Regex for matching database collation
DB_COLLATION_REGEX = `(_[a-zA-Z0-9]+\\|\\)`
Expand Down
10 changes: 9 additions & 1 deletion common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/parse"
"github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/spanner"
Expand Down Expand Up @@ -445,7 +446,14 @@ func GetLegacyModeSupportedDrivers() []string {
func ReadSpannerSchema(ctx context.Context, conv *internal.Conv, client *sp.Client) error {
infoSchema := spanner.InfoSchemaImpl{Client: client, Ctx: ctx, SpDialect: conv.SpDialect}
processSchema := common.ProcessSchemaImpl{}
err := processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, internal.AdditionalSchemaAttributes{IsSharded: false}, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{})
ddlVerifier, err := expressions_api.NewDDLVerifierImpl(ctx, conv.SpProjectId, conv.SpInstanceId)
if err != nil {
return fmt.Errorf("error trying create ddl verifier: %v", err)
}
schemaToSpanner := common.SchemaToSpannerImpl{
DdlV: ddlVerifier,
}
err = processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, internal.AdditionalSchemaAttributes{IsSharded: false}, &schemaToSpanner, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{})
if err != nil {
return fmt.Errorf("error trying to read and convert spanner schema: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (ci *ConvImpl) SchemaConv(migrationProjectId string, sourceProfile profiles
case constants.POSTGRES, constants.MYSQL, constants.DYNAMODB, constants.SQLSERVER, constants.ORACLE:
return schemaFromSource.schemaFromDatabase(migrationProjectId, sourceProfile, targetProfile, &GetInfoImpl{}, &common.ProcessSchemaImpl{})
case constants.PGDUMP, constants.MYSQLDUMP:
return schemaFromSource.SchemaFromDump(sourceProfile.Driver, targetProfile.Conn.Sp.Dialect, ioHelper, &ProcessDumpByDialectImpl{})
return schemaFromSource.SchemaFromDump(targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance, sourceProfile.Driver, targetProfile.Conn.Sp.Dialect, ioHelper, &ProcessDumpByDialectImpl{})
default:
return nil, fmt.Errorf("schema conversion for driver %s not supported", sourceProfile.Driver)
}
Expand Down
14 changes: 10 additions & 4 deletions conversion/conversion_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
Expand All @@ -38,10 +39,12 @@ import (

type SchemaFromSourceInterface interface {
schemaFromDatabase(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, getInfo GetInfoInterface, processSchema common.ProcessSchemaInterface) (*internal.Conv, error)
SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error)
SchemaFromDump(SpProjectId string, SpInstanceId string, driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error)
}

type SchemaFromSourceImpl struct{}
type SchemaFromSourceImpl struct {
DdlVerifier expressions_api.DDLVerifier
}

type DataFromSourceInterface interface {
dataFromDatabase(ctx context.Context, migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, config writer.BatchWriterConfig, conv *internal.Conv, client *sp.Client, getInfo GetInfoInterface, dataFromDb DataFromDatabaseInterface, snapshotMigration SnapshotMigrationInterface) (*writer.BatchWriter, error)
Expand Down Expand Up @@ -99,10 +102,13 @@ func (sads *SchemaFromSourceImpl) schemaFromDatabase(migrationProjectId string,
additionalSchemaAttributes := internal.AdditionalSchemaAttributes{
IsSharded: isSharded,
}
return conv, processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, additionalSchemaAttributes, &common.SchemaToSpannerImpl{}, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{})
schemaToSpanner := common.SchemaToSpannerImpl{
DdlV: sads.DdlVerifier,
}
return conv, processSchema.ProcessSchema(conv, infoSchema, common.DefaultWorkers, additionalSchemaAttributes, &schemaToSpanner, &common.UtilsOrderImpl{}, &common.InfoSchemaImpl{})
}

func (sads *SchemaFromSourceImpl) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) {
func (sads *SchemaFromSourceImpl) SchemaFromDump(SpProjectId string, SpInstanceId string, driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) {
f, n, err := getSeekable(ioHelper.In)
if err != nil {
utils.PrintSeekError(driver, err, ioHelper.Out)
Expand Down
106 changes: 54 additions & 52 deletions conversion/conversion_from_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"fmt"
"testing"

"github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)


func TestSchemaFromDatabase(t *testing.T) {
targetProfile := profiles.TargetProfile{
Conn: profiles.TargetProfileConnection{
Expand Down Expand Up @@ -65,88 +65,88 @@ func TestSchemaFromDatabase(t *testing.T) {
sourceProfileCloudDefault := profiles.SourceProfile{}
// Avoid getting/setting env variables in the unit tests.
testCases := []struct {
name string
sourceProfile profiles.SourceProfile
getInfoError error
processSchemaError error
errorExpected bool
name string
sourceProfile profiles.SourceProfile
getInfoError error
processSchemaError error
errorExpected bool
}{
{
name: "successful source profile config for bulk migration",
sourceProfile: sourceProfileConfigBulk,
getInfoError: nil,
name: "successful source profile config for bulk migration",
sourceProfile: sourceProfileConfigBulk,
getInfoError: nil,
processSchemaError: nil,
errorExpected: false,
errorExpected: false,
},
{
name: "source profile config for bulk migration: get info error",
sourceProfile: sourceProfileConfigBulk,
getInfoError: fmt.Errorf("error"),
name: "source profile config for bulk migration: get info error",
sourceProfile: sourceProfileConfigBulk,
getInfoError: fmt.Errorf("error"),
processSchemaError: nil,
errorExpected: true,
errorExpected: true,
},
{
name: "source profile config for bulk migration: process schema error",
sourceProfile: sourceProfileConfigBulk,
getInfoError: nil,
name: "source profile config for bulk migration: process schema error",
sourceProfile: sourceProfileConfigBulk,
getInfoError: nil,
processSchemaError: fmt.Errorf("error"),
errorExpected: true,
errorExpected: true,
},
{
name: "successful source profile config for dataflow migration",
sourceProfile: sourceProfileConfigDataflow,
getInfoError: nil,
name: "successful source profile config for dataflow migration",
sourceProfile: sourceProfileConfigDataflow,
getInfoError: nil,
processSchemaError: nil,
errorExpected: false,
errorExpected: false,
},
{
name: "source profile config for dataflow migration: get info error",
sourceProfile: sourceProfileConfigDataflow,
getInfoError: fmt.Errorf("error"),
name: "source profile config for dataflow migration: get info error",
sourceProfile: sourceProfileConfigDataflow,
getInfoError: fmt.Errorf("error"),
processSchemaError: nil,
errorExpected: true,
errorExpected: true,
},
{
name: "source profile config for dms migration",
sourceProfile: sourceProfileConfigDms,
getInfoError: nil,
name: "source profile config for dms migration",
sourceProfile: sourceProfileConfigDms,
getInfoError: nil,
processSchemaError: nil,
errorExpected: true,
errorExpected: true,
},
{
name: "invalid source profile config",
sourceProfile: sourceProfileConfigInvalid,
getInfoError: nil,
name: "invalid source profile config",
sourceProfile: sourceProfileConfigInvalid,
getInfoError: nil,
processSchemaError: nil,
errorExpected: true,
errorExpected: true,
},
{
name: "successful source profile cloud sql",
sourceProfile: sourceProfileCloudSql,
getInfoError: nil,
name: "successful source profile cloud sql",
sourceProfile: sourceProfileCloudSql,
getInfoError: nil,
processSchemaError: nil,
errorExpected: false,
errorExpected: false,
},
{
name: "source profile cloud sql: get info error",
sourceProfile: sourceProfileCloudSql,
getInfoError: fmt.Errorf("error"),
name: "source profile cloud sql: get info error",
sourceProfile: sourceProfileCloudSql,
getInfoError: fmt.Errorf("error"),
processSchemaError: nil,
errorExpected: true,
errorExpected: true,
},
{
name: "successful source profile default",
sourceProfile: sourceProfileCloudDefault,
getInfoError: nil,
name: "successful source profile default",
sourceProfile: sourceProfileCloudDefault,
getInfoError: nil,
processSchemaError: nil,
errorExpected: false,
errorExpected: false,
},
{
name: "source profile default: get info error",
sourceProfile: sourceProfileCloudDefault,
getInfoError: fmt.Errorf("error"),
name: "source profile default: get info error",
sourceProfile: sourceProfileCloudDefault,
getInfoError: fmt.Errorf("error"),
processSchemaError: nil,
errorExpected: true,
errorExpected: true,
},
}

Expand All @@ -159,8 +159,10 @@ func TestSchemaFromDatabase(t *testing.T) {
gim.On("GetInfoSchema", "migration-project-id", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(mysql.InfoSchemaImpl{}, tc.getInfoError)
ps.On("ProcessSchema", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.processSchemaError)

s := SchemaFromSourceImpl{}
s := SchemaFromSourceImpl{
DdlVerifier: &expressions_api.MockDDLVerifier{},
}
_, err := s.schemaFromDatabase("migration-project-id", tc.sourceProfile, targetProfile, &gim, &ps)
assert.Equal(t, tc.errorExpected, err != nil, tc.name)
}
}
}
18 changes: 10 additions & 8 deletions conversion/conversion_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
Expand All @@ -41,17 +42,20 @@ import (
"google.golang.org/protobuf/proto"
)

type ProcessDumpByDialectInterface interface{
type ProcessDumpByDialectInterface interface {
ProcessDump(driver string, conv *internal.Conv, r *internal.Reader) error
}

type ProcessDumpByDialectImpl struct{}
type ProcessDumpByDialectImpl struct {
DdlVerifier expressions_api.DDLVerifier
}

type PopulateDataConvInterface interface{
type PopulateDataConvInterface interface {
populateDataConv(conv *internal.Conv, config writer.BatchWriterConfig, client *sp.Client) *writer.BatchWriter
}

type PopulateDataConvImpl struct{}

// getSeekable returns a seekable file (with same content as f) and the size of the content (in bytes).
func getSeekable(f *os.File) (*os.File, int64, error) {
_, err := f.Seek(0, 0)
Expand Down Expand Up @@ -88,15 +92,14 @@ func getSeekable(f *os.File) (*os.File, int64, error) {
func (pdd *ProcessDumpByDialectImpl) ProcessDump(driver string, conv *internal.Conv, r *internal.Reader) error {
switch driver {
case constants.MYSQLDUMP:
return common.ProcessDbDump(conv, r, mysql.DbDumpImpl{})
return common.ProcessDbDump(conv, r, mysql.DbDumpImpl{}, pdd.DdlVerifier)
case constants.PGDUMP:
return common.ProcessDbDump(conv, r, postgres.DbDumpImpl{})
return common.ProcessDbDump(conv, r, postgres.DbDumpImpl{}, pdd.DdlVerifier)
default:
return fmt.Errorf("process dump for driver %s not supported", driver)
}
}


func (pdc *PopulateDataConvImpl) populateDataConv(conv *internal.Conv, config writer.BatchWriterConfig, client *sp.Client) *writer.BatchWriter {
rows := int64(0)
config.Write = func(m []*sp.Mutation) error {
Expand Down Expand Up @@ -130,7 +133,6 @@ func (pdc *PopulateDataConvImpl) populateDataConv(conv *internal.Conv, config wr
return batchWriter
}


func connectionConfig(sourceProfile profiles.SourceProfile) (interface{}, error) {
switch sourceProfile.Driver {
// For PG and MYSQL, When called as part of the subcommand flow, host/user/db etc will
Expand Down Expand Up @@ -199,4 +201,4 @@ func getDynamoDBClientConfig() (*aws.Config, error) {
cfg.Endpoint = aws.String(endpointOverride)
}
return &cfg, nil
}
}
2 changes: 1 addition & 1 deletion conversion/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (msads *MockSchemaFromSource) schemaFromDatabase(migrationProjectId string,
args := msads.Called(migrationProjectId, sourceProfile, targetProfile, getInfo, processSchema)
return args.Get(0).(*internal.Conv), args.Error(1)
}
func (msads *MockSchemaFromSource) SchemaFromDump(driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) {
func (msads *MockSchemaFromSource) SchemaFromDump(SpProjectId string, SpInstanceId string, driver string, spDialect string, ioHelper *utils.IOStreams, processDump ProcessDumpByDialectInterface) (*internal.Conv, error) {
args := msads.Called(driver, spDialect, ioHelper, processDump)
return args.Get(0).(*internal.Conv), args.Error(1)
}
Expand Down
Loading

0 comments on commit e18a9a0

Please sign in to comment.