From b24045cb49e37f8d8d71392c069ae2826a62dd89 Mon Sep 17 00:00:00 2001 From: djagaluru Date: Thu, 12 Oct 2023 17:23:56 +0000 Subject: [PATCH] addressing comments --- conversion/conversion.go | 13 +- sources/common/infoschema.go | 2 +- sources/dynamodb/schema.go | 4 +- sources/mysql/infoschema.go | 19 ++- sources/oracle/infoschema.go | 19 ++- sources/postgres/infoschema.go | 19 ++- sources/spanner/infoschema.go | 4 +- sources/sqlserver/infoschema.go | 4 +- streaming/streaming.go | 221 ++++++++++++++++---------------- 9 files changed, 168 insertions(+), 137 deletions(-) diff --git a/conversion/conversion.go b/conversion/conversion.go index ce4b52285..f468c8b20 100644 --- a/conversion/conversion.go +++ b/conversion/conversion.go @@ -298,10 +298,12 @@ func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile, if err != nil { return nil, err } - err = infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo) + dfJobId, gcloudCmd, err := infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo) if err != nil { return nil, err } + streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg) + streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, "") return bw, nil } return performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}), nil @@ -349,16 +351,21 @@ func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile, return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId) - err = streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg, streamingCfg.DatastreamCfg, conv) + pubsubCfg, err := streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname) if err != nil { return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } + streamingCfg.PubsubCfg = *pubsubCfg err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg) if err != nil { return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap - err = streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv) + dfJobId, gcloudCmd, err := streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv) + if err != nil { + return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} + } + streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, p.DataShardId) return common.TaskResult[*profiles.DataShard]{Result: p, Err: err} } _, err = common.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, true) diff --git a/sources/common/infoschema.go b/sources/common/infoschema.go index 2495fa0ee..af4733817 100644 --- a/sources/common/infoschema.go +++ b/sources/common/infoschema.go @@ -41,7 +41,7 @@ type InfoSchema interface { GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error) - StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) error + StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (string, string, error) } // SchemaAndName contains the schema and name for a table diff --git a/sources/dynamodb/schema.go b/sources/dynamodb/schema.go index 581a7af76..82900ae25 100644 --- a/sources/dynamodb/schema.go +++ b/sources/dynamodb/schema.go @@ -219,7 +219,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration starts the streaming migration process by creating a seperate // worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if // customer wants to stop the process. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) error { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (string, string, error) { fmt.Println("Processing of DynamoDB Streams started...") fmt.Println("Use Ctrl+C to stop the process.") @@ -243,7 +243,7 @@ func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *s fillConvWithStreamingStats(streamInfo, conv) fmt.Println("DynamoDB Streams processed successfully.") - return nil + return "", "", nil } func getSchemaIndexStruct(indexName string, keySchema []*dynamodb.KeySchemaElement, colNameIdMap map[string]string) schema.Index { diff --git a/sources/mysql/infoschema.go b/sources/mysql/infoschema.go index 38248ed5f..32c161bbb 100644 --- a/sources/mysql/infoschema.go +++ b/sources/mysql/infoschema.go @@ -367,7 +367,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte err error ) tableList, err = common.GetIncludedSrcTablesFromConv(conv) - streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv) + streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Mysql.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList) + if err != nil { + return nil, fmt.Errorf("error reading streaming config: %v", err) + } + pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db) + if err != nil { + return nil, fmt.Errorf("error creating pubsub resources: %v", err) + } + streamingCfg.PubsubCfg = *pubsubCfg + streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err @@ -378,15 +387,15 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) + jobId, gcloudCmd, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) if err != nil { err = fmt.Errorf("error starting dataflow: %v", err) - return err + return "", "", err } - return nil + return jobId, gcloudCmd, nil } func toType(dataType string, columnType string, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64) schema.Type { diff --git a/sources/oracle/infoschema.go b/sources/oracle/infoschema.go index 4d4933a2f..88bf404df 100644 --- a/sources/oracle/infoschema.go +++ b/sources/oracle/infoschema.go @@ -431,7 +431,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte err error ) tableList, err = common.GetIncludedSrcTablesFromConv(conv) - streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv) + streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Oracle.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList) + if err != nil { + return nil, fmt.Errorf("error reading streaming config: %v", err) + } + pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User) + if err != nil { + return nil, fmt.Errorf("error creating pubsub resources: %v", err) + } + streamingCfg.PubsubCfg = *pubsubCfg + streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err @@ -442,13 +451,13 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) + jobId, gcloudCmd, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) if err != nil { - return err + return "", "", err } - return nil + return jobId, gcloudCmd, nil } func toType(dataType string, typecode, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale sql.NullInt64) schema.Type { diff --git a/sources/postgres/infoschema.go b/sources/postgres/infoschema.go index a73786ebb..780dc7e08 100644 --- a/sources/postgres/infoschema.go +++ b/sources/postgres/infoschema.go @@ -69,7 +69,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte if err != nil { err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err) } - streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv) + streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Pg.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList) + if err != nil { + return nil, fmt.Errorf("error reading streaming config: %v", err) + } + pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname) + if err != nil { + return nil, fmt.Errorf("error creating pubsub resources: %v", err) + } + streamingCfg.PubsubCfg = *pubsubCfg + streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList) if err != nil { err = fmt.Errorf("error starting datastream: %v", err) return nil, err @@ -82,15 +91,15 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte // StartStreamingMigration is used for automatic triggering of Dataflow job when // performing a streaming migration. -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error { +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) { streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg) - err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) + jobId, gcloudCmd, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv) if err != nil { err = fmt.Errorf("error starting dataflow: %v", err) - return err + return "", "", err } - return nil + return jobId, gcloudCmd, nil } // GetToDdl function below implement the common.InfoSchema interface. diff --git a/sources/spanner/infoschema.go b/sources/spanner/infoschema.go index 0dd13e36c..d052e4bd0 100644 --- a/sources/spanner/infoschema.go +++ b/sources/spanner/infoschema.go @@ -79,8 +79,8 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte return nil, nil } -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error { - return nil +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) { + return "", "", nil } // GetTableName returns table name. diff --git a/sources/sqlserver/infoschema.go b/sources/sqlserver/infoschema.go index 9605c13a5..79230e184 100644 --- a/sources/sqlserver/infoschema.go +++ b/sources/sqlserver/infoschema.go @@ -58,8 +58,8 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte return nil, nil } -func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error { - return nil +func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (string, string, error) { + return "", "", nil } // GetTableName returns table name. diff --git a/streaming/streaming.go b/streaming/streaming.go index 2a49a7cc7..498137377 100644 --- a/streaming/streaming.go +++ b/streaming/streaming.go @@ -141,22 +141,6 @@ func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string, tableList []s streamingCfg.DataflowCfg.JobName = jobName } - if streamingCfg.PubsubCfg.SubscriptionId == "" { - subscriptionId, err := utils.GenerateName("smt-sub-" + dbName) - if err != nil { - return fmt.Errorf("error generating pubsub subscription ID: %v", err) - } - streamingCfg.PubsubCfg.SubscriptionId = subscriptionId - } - - if streamingCfg.PubsubCfg.TopicId == "" { - topicId, err := utils.GenerateName("smt-topic-" + dbName) - if err != nil { - return fmt.Errorf("error generating pubsub topic ID: %v", err) - } - streamingCfg.PubsubCfg.TopicId = topicId - } - filePath := streamingCfg.TmpDir u, err := utils.ParseGCSFilePath(filePath) if err != nil { @@ -293,27 +277,17 @@ func getSourceStreamConfig(srcCfg *datastreampb.SourceConfig, sourceProfile prof } } -func CreatePubsubResources(ctx context.Context, projectID string, streamingCfg StreamingCfg, datastreamCfg DatastreamCfg, conv *internal.Conv) error { +func CreatePubsubResources(ctx context.Context, projectID string, datastreamDestinationConnCfg DstConnCfg, dbName string) (*internal.PubsubCfg, error) { pubsubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { - return fmt.Errorf("pubsub client can not be created: %v", err) + return nil, fmt.Errorf("pubsub client can not be created: %v", err) } defer pubsubClient.Close() - fmt.Println("Created pubsub client...") // Create pubsub topic and subscription - topicObj, err := pubsubClient.CreateTopic(ctx, streamingCfg.PubsubCfg.TopicId) + pubsubCfg, err := createPubsubTopicAndSubscription(ctx, pubsubClient, dbName) if err != nil { - return fmt.Errorf("pubsub topic could not be created: %v", err) - } - - _, err = pubsubClient.CreateSubscription(ctx, streamingCfg.PubsubCfg.SubscriptionId, pubsub.SubscriptionConfig{ - Topic: topicObj, - AckDeadline: time.Minute * 10, - RetentionDuration: time.Hour * 24 * 7, - }) - if err != nil { - return fmt.Errorf("pubsub subscription could not be created: %v", err) + return nil, err } // Fetch the created target profile and get the target gcs bucket name and path. @@ -321,36 +295,81 @@ func CreatePubsubResources(ctx context.Context, projectID string, streamingCfg S // Creating datastream client to fetch target profile. dsClient, err := datastream.NewClient(ctx) if err != nil { - return fmt.Errorf("datastream client can not be created: %v", err) + return nil, fmt.Errorf("datastream client can not be created: %v", err) } defer dsClient.Close() - dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", projectID, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name) - res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf}) + bucketName, prefix, err := fetchTargetBucketAndPath(ctx, dsClient, projectID, datastreamDestinationConnCfg) if err != nil { - return fmt.Errorf("could not get connection profiles: %v", err) - } - // Fetch the GCS path from the target connection profile. - gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile - bucketName := gcsProfile.Bucket - prefix := gcsProfile.RootPath + datastreamCfg.DestinationConnectionConfig.Prefix - if prefix != "" && prefix[len(prefix)-1] != '/' { - prefix = prefix + "/" - } - prefix = prefix + "data/" - if prefix[0] == '/' { - prefix = prefix[1:] + return nil, err } // Create pubsub notification on the target gcs path storageClient, err := storage.NewClient(ctx) if err != nil { - return fmt.Errorf("GCS client can not be created: %v", err) + return nil, fmt.Errorf("GCS client can not be created: %v", err) } defer storageClient.Close() + notificationID, err := createNotificationOnBucket(ctx, storageClient, projectID, pubsubCfg.TopicId, bucketName, prefix) + if err != nil { + return nil, err + } + pubsubCfg.BucketName = bucketName + pubsubCfg.NotificationId = notificationID + logger.Log.Info(fmt.Sprintf("Successfully created pubsub topic id=%s, subscription id=%s, notification for bucket=%s with id=%s.\n", pubsubCfg.TopicId, pubsubCfg.SubscriptionId, bucketName, notificationID)) + return &pubsubCfg, nil +} + +func createPubsubTopicAndSubscription(ctx context.Context, pubsubClient *pubsub.Client, dbName string) (internal.PubsubCfg, error) { + pubsubCfg := internal.PubsubCfg{} + // Generate ID + subscriptionId, err := utils.GenerateName("smt-sub-" + dbName) + if err != nil { + return pubsubCfg, fmt.Errorf("error generating pubsub subscription ID: %v", err) + } + pubsubCfg.SubscriptionId = subscriptionId + + topicId, err := utils.GenerateName("smt-topic-" + dbName) + if err != nil { + return pubsubCfg, fmt.Errorf("error generating pubsub topic ID: %v", err) + } + pubsubCfg.TopicId = topicId + + // Create Topic and Subscription + topicObj, err := pubsubClient.CreateTopic(ctx, pubsubCfg.TopicId) + if err != nil { + return pubsubCfg, fmt.Errorf("pubsub topic could not be created: %v", err) + } + + _, err = pubsubClient.CreateSubscription(ctx, pubsubCfg.SubscriptionId, pubsub.SubscriptionConfig{ + Topic: topicObj, + AckDeadline: time.Minute * 10, + RetentionDuration: time.Hour * 24 * 7, + }) + if err != nil { + return pubsubCfg, fmt.Errorf("pubsub subscription could not be created: %v", err) + } + return pubsubCfg, nil +} + +func fetchTargetBucketAndPath(ctx context.Context, datastreamClient *datastream.Client, projectID string, datastreamDestinationConnCfg DstConnCfg) (string, string, error) { + dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", projectID, datastreamDestinationConnCfg.Location, datastreamDestinationConnCfg.Name) + res, err := datastreamClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf}) + if err != nil { + return "", "", fmt.Errorf("could not get connection profiles: %v", err) + } + // Fetch the GCS path from the target connection profile. + gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile + bucketName := gcsProfile.Bucket + prefix := gcsProfile.RootPath + datastreamDestinationConnCfg.Prefix + prefix = concatDirectoryPath(prefix, "data/") + return bucketName, prefix, nil +} + +func createNotificationOnBucket(ctx context.Context, storageClient *storage.Client, projectID, topicID, bucketName, prefix string) (string, error) { notification := storage.Notification{ - TopicID: streamingCfg.PubsubCfg.TopicId, + TopicID: topicID, TopicProjectID: projectID, PayloadFormat: storage.JSONPayload, ObjectNamePrefix: prefix, @@ -358,26 +377,19 @@ func CreatePubsubResources(ctx context.Context, projectID string, streamingCfg S createdNotification, err := storageClient.Bucket(bucketName).AddNotification(ctx, ¬ification) if err != nil { - return fmt.Errorf("GCS Notification could not be created: %v", err) + return "", fmt.Errorf("GCS Notification could not be created: %v", err) } - logger.Log.Info(fmt.Sprintf("Successfully created pubsub topic id=%s, subscription id=%s, notification for bucket=%s with id=%s.\n", streamingCfg.PubsubCfg.TopicId, streamingCfg.PubsubCfg.SubscriptionId, bucketName, createdNotification.ID)) - streamingCfg.PubsubCfg.NotificationId = createdNotification.ID - streamingCfg.PubsubCfg.BucketName = bucketName - storeGeneratedPubsubResources(conv, streamingCfg.PubsubCfg, projectID, streamingCfg.DataShardId) - return nil + return createdNotification.ID, nil } -func storeGeneratedPubsubResources(conv *internal.Conv, pubsubCfg internal.PubsubCfg, project string, dataShardId string) { - conv.Audit.StreamingStats.PubsubCfg = pubsubCfg - if dataShardId != "" { - var resourceMutex sync.Mutex - resourceMutex.Lock() - conv.Audit.StreamingStats.ShardToPubsubIdMap[dataShardId] = pubsubCfg - resourceMutex.Unlock() +func concatDirectoryPath(basePath, subPath string) string { + if basePath == "" && subPath == "" { + return "" } - fmt.Println("\n------------------------------------------\n" + - "The Pubsub topic: " + pubsubCfg.TopicId + "and the subscription: " + pubsubCfg.SubscriptionId + - " will have to be manually cleaned up via the UI. Spanner migration tool will not delete them post completion of the migration.") + path := fmt.Sprintf("%s/%s/", basePath, subPath) + path = strings.ReplaceAll(path, "///", "/") + path = strings.ReplaceAll(path, "//", "/") + return path } // LaunchStream populates the parameters from the streaming config and triggers a stream on Cloud Datastream. @@ -576,7 +588,7 @@ func CleanupDataflowJob(ctx context.Context, client *dataflow.JobsV1Beta3Client, } // LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job. -func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) error { +func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (string, string, error) { project, instance, dbName, _ := targetProfile.GetResourceIds(ctx, time.Now(), "", nil) dataflowCfg := streamingCfg.DataflowCfg datastreamCfg := streamingCfg.DatastreamCfg @@ -584,7 +596,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile c, err := dataflow.NewFlexTemplatesClient(ctx) if err != nil { - return fmt.Errorf("could not create flex template client: %v", err) + return "", "", fmt.Errorf("could not create flex template client: %v", err) } defer c.Close() fmt.Println("Created flex template client...") @@ -592,7 +604,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile //Creating datastream client to fetch the gcs bucket using target profile. dsClient, err := datastream.NewClient(ctx) if err != nil { - return fmt.Errorf("datastream client can not be created: %v", err) + return "", "", fmt.Errorf("datastream client can not be created: %v", err) } defer dsClient.Close() @@ -600,7 +612,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", project, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name) res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf}) if err != nil { - return fmt.Errorf("could not get connection profiles: %v", err) + return "", "", fmt.Errorf("could not get connection profiles: %v", err) } gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile inputFilePattern := "gs://" + gcsProfile.Bucket + gcsProfile.RootPath + datastreamCfg.DestinationConnectionConfig.Prefix @@ -623,7 +635,7 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile if dataflowCfg.Network != "" { workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE if dataflowCfg.Subnetwork == "" { - return fmt.Errorf("if network is specified, subnetwork cannot be empty") + return "", "", fmt.Errorf("if network is specified, subnetwork cannot be empty") } else { dataflowSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", dataflowHostProjectId, dataflowCfg.Location, dataflowCfg.Subnetwork) } @@ -632,21 +644,21 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile if dataflowCfg.MaxWorkers != "" { intVal, err := strconv.ParseInt(dataflowCfg.MaxWorkers, 10, 64) if err != nil { - return fmt.Errorf("could not parse MaxWorkers parameter %s, please provide a positive integer as input", dataflowCfg.MaxWorkers) + return "", "", fmt.Errorf("could not parse MaxWorkers parameter %s, please provide a positive integer as input", dataflowCfg.MaxWorkers) } maxWorkers = int32(intVal) if maxWorkers < MIN_WORKER_LIMIT || maxWorkers > MAX_WORKER_LIMIT { - return fmt.Errorf("maxWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT) + return "", "", fmt.Errorf("maxWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT) } } if dataflowCfg.NumWorkers != "" { intVal, err := strconv.ParseInt(dataflowCfg.NumWorkers, 10, 64) if err != nil { - return fmt.Errorf("could not parse NumWorkers parameter %s, please provide a positive integer as input", dataflowCfg.NumWorkers) + return "", "", fmt.Errorf("could not parse NumWorkers parameter %s, please provide a positive integer as input", dataflowCfg.NumWorkers) } numWorkers = int32(intVal) if numWorkers < MIN_WORKER_LIMIT || numWorkers > MAX_WORKER_LIMIT { - return fmt.Errorf("numWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT) + return "", "", fmt.Errorf("numWorkers should lie in the range [%d, %d]", MIN_WORKER_LIMIT, MAX_WORKER_LIMIT) } } launchParameters := &dataflowpb.LaunchFlexTemplateParameter{ @@ -683,45 +695,38 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile respDf, err := c.LaunchFlexTemplate(ctx, req) if err != nil { fmt.Printf("flexTemplateRequest: %+v\n", req) - return fmt.Errorf("unable to launch template: %v", err) + return "", "", fmt.Errorf("unable to launch template: %v", err) } gcloudDfCmd := utils.GetGcloudDataflowCommand(req) logger.Log.Debug(fmt.Sprintf("\nEquivalent gCloud command for job %s:\n%s\n\n", req.LaunchParameter.JobName, gcloudDfCmd)) - storeGeneratedResources(conv, datastreamCfg, respDf, gcloudDfCmd, project, streamingCfg.DataShardId) - return nil + // storeGeneratedResources(conv, datastreamCfg, respDf, gcloudDfCmd, project, streamingCfg.DataShardId) + return respDf.Job.Id, gcloudDfCmd, nil } -func storeGeneratedResources(conv *internal.Conv, datastreamCfg DatastreamCfg, respDf *dataflowpb.LaunchFlexTemplateResponse, gcloudDataflowCmd string, project string, dataShardId string) { +func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJobId, gcloudDataflowCmd, project, dataShardId string) { + datastreamCfg := streamingCfg.DatastreamCfg + dataflowCfg := streamingCfg.DataflowCfg conv.Audit.StreamingStats.DataStreamName = datastreamCfg.StreamId - conv.Audit.StreamingStats.DataflowJobId = respDf.Job.Id + conv.Audit.StreamingStats.DataflowJobId = dfJobId conv.Audit.StreamingStats.DataflowGcloudCmd = gcloudDataflowCmd + conv.Audit.StreamingStats.PubsubCfg = streamingCfg.PubsubCfg if dataShardId != "" { var resourceMutex sync.Mutex resourceMutex.Lock() conv.Audit.StreamingStats.ShardToDataStreamNameMap[dataShardId] = datastreamCfg.StreamId - conv.Audit.StreamingStats.ShardToDataflowInfoMap[dataShardId] = internal.ShardedDataflowJobResources{JobId: respDf.Job.Id, GcloudCmd: gcloudDataflowCmd} + conv.Audit.StreamingStats.ShardToDataflowInfoMap[dataShardId] = internal.ShardedDataflowJobResources{JobId: dfJobId, GcloudCmd: gcloudDataflowCmd} + conv.Audit.StreamingStats.ShardToPubsubIdMap[dataShardId] = streamingCfg.PubsubCfg resourceMutex.Unlock() } fullStreamName := fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId) - dfJobDetails := fmt.Sprintf("project: %s, location: %s, name: %s, id: %s", project, respDf.Job.Location, respDf.Job.Name, respDf.Job.Id) - fmt.Println("\n------------------------------------------\n" + - "The Datastream job: " + fullStreamName + "and the Dataflow job: " + dfJobDetails + + dfJobDetails := fmt.Sprintf("project: %s, location: %s, name: %s, id: %s", project, dataflowCfg.Location, dataflowCfg.JobName, dfJobId) + logger.Log.Info("\n------------------------------------------\n") + logger.Log.Info("The Datastream job: " + fullStreamName + " ,the Dataflow job: " + dfJobDetails + + "\nThe Pubsub topic: " + streamingCfg.PubsubCfg.TopicId + " ,the subscription: " + streamingCfg.PubsubCfg.SubscriptionId + + " and the pubsub Notification id:" + streamingCfg.PubsubCfg.NotificationId + " on bucket: " + streamingCfg.PubsubCfg.BucketName + " will have to be manually cleaned up via the UI. Spanner migration tool will not delete them post completion of the migration.") } -func getStreamingConfig(sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, tableList []string) (StreamingCfg, error) { - switch sourceProfile.Conn.Ty { - case profiles.SourceProfileConnectionTypeMySQL: - return ReadStreamingConfig(sourceProfile.Conn.Mysql.StreamingConfig, targetProfile.Conn.Sp.Dbname, tableList) - case profiles.SourceProfileConnectionTypeOracle: - return ReadStreamingConfig(sourceProfile.Conn.Oracle.StreamingConfig, targetProfile.Conn.Sp.Dbname, tableList) - case profiles.SourceProfileConnectionTypePostgreSQL: - return ReadStreamingConfig(sourceProfile.Conn.Pg.StreamingConfig, targetProfile.Conn.Sp.Dbname, tableList) - default: - return StreamingCfg{}, fmt.Errorf("only MySQL, Oracle and PostgreSQL are supported as source streams") - } -} - func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg { //create dataflowcfg from pl receiver object inputDataflowConfig := pl.DataflowConfig @@ -748,11 +753,7 @@ func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg { return streamingCfg } -func StartDatastream(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, tableList []string, conv *internal.Conv) (StreamingCfg, error) { - streamingCfg, err := getStreamingConfig(sourceProfile, targetProfile, tableList) - if err != nil { - return streamingCfg, fmt.Errorf("error reading streaming config: %v", err) - } +func StartDatastream(ctx context.Context, streamingCfg StreamingCfg, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, tableList []string) (StreamingCfg, error) { driver := sourceProfile.Driver var dbList []profiles.LogicalShard switch driver { @@ -763,41 +764,37 @@ func StartDatastream(ctx context.Context, sourceProfile profiles.SourceProfile, case constants.POSTGRES: dbList = append(dbList, profiles.LogicalShard{DbName: streamingCfg.DatastreamCfg.Properties}) } - err = CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg, streamingCfg.DatastreamCfg, conv) - if err != nil { - return streamingCfg, fmt.Errorf("Error creating pubsub resources: %v", err) - } - err = LaunchStream(ctx, sourceProfile, dbList, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg) + err := LaunchStream(ctx, sourceProfile, dbList, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg) if err != nil { return streamingCfg, fmt.Errorf("error launching stream: %v", err) } return streamingCfg, nil } -func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) error { +func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (string, string, error) { convJSON, err := json.MarshalIndent(conv, "", " ") if err != nil { - return fmt.Errorf("can't encode session state to JSON: %v", err) + return "", "", fmt.Errorf("can't encode session state to JSON: %v", err) } err = utils.WriteToGCS(streamingCfg.TmpDir, "session.json", string(convJSON)) if err != nil { - return fmt.Errorf("error while writing to GCS: %v", err) + return "", "", fmt.Errorf("error while writing to GCS: %v", err) } transformationContextMap := map[string]interface{}{ "SchemaToShardId": streamingCfg.DataflowCfg.DbNameToShardIdMap, } transformationContext, err := json.Marshal(transformationContextMap) if err != nil { - return fmt.Errorf("failed to compute transformation context: %s", err.Error()) + return "", "", fmt.Errorf("failed to compute transformation context: %s", err.Error()) } err = utils.WriteToGCS(streamingCfg.TmpDir, "transformationContext.json", string(transformationContext)) if err != nil { - return fmt.Errorf("error while writing to GCS: %v", err) + return "", "", fmt.Errorf("error while writing to GCS: %v", err) } - err = LaunchDataflowJob(ctx, targetProfile, streamingCfg, conv) + dfJobId, gcloudCmd, err := LaunchDataflowJob(ctx, targetProfile, streamingCfg, conv) if err != nil { - return fmt.Errorf("error launching dataflow: %v", err) + return "", "", fmt.Errorf("error launching dataflow: %v", err) } - return nil + return dfJobId, gcloudCmd, nil }