Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
darshan-sj committed Oct 12, 2023
1 parent 87018df commit b24045c
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 137 deletions.
13 changes: 10 additions & 3 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,12 @@ func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile,
if err != nil {
return nil, err
}
err = infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
dfJobId, gcloudCmd, err := infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
if err != nil {
return nil, err
}
streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg)
streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, "")
return bw, nil
}
return performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}), nil
Expand Down Expand Up @@ -349,16 +351,21 @@ func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile,
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)
err = streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg, streamingCfg.DatastreamCfg, conv)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.PubsubCfg = *pubsubCfg
err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
err = streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
dfJobId, gcloudCmd, err := streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, p.DataShardId)
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
_, err = common.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, true)
Expand Down
2 changes: 1 addition & 1 deletion sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type InfoSchema interface {
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) error
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (string, string, error)
}

// SchemaAndName contains the schema and name for a table
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
// StartStreamingMigration starts the streaming migration process by creating a seperate
// worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if
// customer wants to stop the process.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (string, string, error) {

Check warning on line 222 in sources/dynamodb/schema.go

View check run for this annotation

Codecov / codecov/patch

sources/dynamodb/schema.go#L222

Added line #L222 was not covered by tests
fmt.Println("Processing of DynamoDB Streams started...")
fmt.Println("Use Ctrl+C to stop the process.")

Expand All @@ -243,7 +243,7 @@ func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *s
fillConvWithStreamingStats(streamInfo, conv)

fmt.Println("DynamoDB Streams processed successfully.")
return nil
return "", "", nil

Check warning on line 246 in sources/dynamodb/schema.go

View check run for this annotation

Codecov / codecov/patch

sources/dynamodb/schema.go#L246

Added line #L246 was not covered by tests
}

func getSchemaIndexStruct(indexName string, keySchema []*dynamodb.KeySchemaElement, colNameIdMap map[string]string) schema.Index {
Expand Down
19 changes: 14 additions & 5 deletions sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Mysql.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)

Check warning on line 379 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L370-L379

Added lines #L370 - L379 were not covered by tests
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -378,15 +387,15 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) {

Check warning on line 390 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L390

Added line #L390 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
jobId, gcloudCmd, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)

Check warning on line 393 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L393

Added line #L393 was not covered by tests
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
return "", "", err

Check warning on line 396 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L396

Added line #L396 was not covered by tests
}
return nil
return jobId, gcloudCmd, nil

Check warning on line 398 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L398

Added line #L398 was not covered by tests
}

func toType(dataType string, columnType string, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64) schema.Type {
Expand Down
19 changes: 14 additions & 5 deletions sources/oracle/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Oracle.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)

Check warning on line 443 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L434-L443

Added lines #L434 - L443 were not covered by tests
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -442,13 +451,13 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) {

Check warning on line 454 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L454

Added line #L454 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)
err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
jobId, gcloudCmd, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)

Check warning on line 456 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L456

Added line #L456 was not covered by tests
if err != nil {
return err
return "", "", err

Check warning on line 458 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L458

Added line #L458 was not covered by tests
}
return nil
return jobId, gcloudCmd, nil

Check warning on line 460 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L460

Added line #L460 was not covered by tests
}

func toType(dataType string, typecode, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale sql.NullInt64) schema.Type {
Expand Down
19 changes: 14 additions & 5 deletions sources/postgres/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
if err != nil {
err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err)
}
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Pg.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)

Check warning on line 81 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L72-L81

Added lines #L72 - L81 were not covered by tests
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -82,15 +91,15 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) {

Check warning on line 94 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L94

Added line #L94 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
jobId, gcloudCmd, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)

Check warning on line 97 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L97

Added line #L97 was not covered by tests
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
return "", "", err

Check warning on line 100 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L100

Added line #L100 was not covered by tests
}
return nil
return jobId, gcloudCmd, nil

Check warning on line 102 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L102

Added line #L102 was not covered by tests
}

// GetToDdl function below implement the common.InfoSchema interface.
Expand Down
4 changes: 2 additions & 2 deletions sources/spanner/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
return nil
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) {
return "", "", nil

Check warning on line 83 in sources/spanner/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/spanner/infoschema.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

// GetTableName returns table name.
Expand Down
4 changes: 2 additions & 2 deletions sources/sqlserver/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
return nil
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) {
return "", "", nil

Check warning on line 62 in sources/sqlserver/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/sqlserver/infoschema.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

// GetTableName returns table name.
Expand Down
Loading

0 comments on commit b24045c

Please sign in to comment.