Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub notificaiton setup for low downtime migration #656

Merged
merged 19 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile,
updateShardsWithDataflowConfig(sourceProfile.Config.ShardConfigurationDataflow)
conv.Audit.StreamingStats.ShardToDataStreamNameMap = make(map[string]string)
conv.Audit.StreamingStats.ShardToDataflowJobMap = make(map[string]string)
conv.Audit.StreamingStats.ShardToPubsubIdMap = make(map[string]internal.PubsubCfg)
tableList, err := common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
fmt.Printf("unable to determine tableList from schema, falling back to full database")
Expand Down Expand Up @@ -349,7 +350,7 @@ func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile,
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)

err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg)
err = streaming.CreatePubsubNotificationAndLaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg, conv)
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
Expand Down
11 changes: 10 additions & 1 deletion internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,17 @@ type streamingStats struct {
SampleBadWrites []string // Records that faced errors while writing to Cloud Spanner.
DataStreamName string
DataflowJobId string
PubsubCfg PubsubCfg
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
ShardToDataStreamNameMap map[string]string
ShardToDataflowJobMap map[string]string
ShardToPubsubIdMap map[string]PubsubCfg
}

type PubsubCfg struct {
TopicId string
SubscriptionId string
NotificationId string
BucketName string
}

// Stores information related to rules during schema conversion
Expand Down Expand Up @@ -426,7 +435,7 @@ func addMissingPrimaryKeyWarning(tableId string, colId string, conv *Conv) {
columnLevelIssues = tableIssues.ColumnLevelIssues
} else {
columnLevelIssues = make(map[string][]SchemaIssue)
}
}
issues := columnLevelIssues[colId]
issues = append(issues, MissingPrimaryKey)
columnLevelIssues[colId] = issues
Expand Down
2 changes: 1 addition & 1 deletion sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion sources/oracle/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
err error
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion sources/postgres/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
if err != nil {
err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err)
}
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList, conv)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand Down
170 changes: 166 additions & 4 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

dataflow "cloud.google.com/go/dataflow/apiv1beta3"
datastream "cloud.google.com/go/datastream/apiv1"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
datastreampb "google.golang.org/genproto/googleapis/cloud/datastream/v1"
dataflowpb "google.golang.org/genproto/googleapis/dataflow/v1beta3"
Expand Down Expand Up @@ -84,6 +85,7 @@ type StreamingCfg struct {
DatastreamCfg DatastreamCfg
DataflowCfg DataflowCfg
TmpDir string
PubsubCfg internal.PubsubCfg
DataShardId string
}

Expand Down Expand Up @@ -138,6 +140,22 @@ func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string, tableList []s
streamingCfg.DataflowCfg.JobName = jobName
}

if streamingCfg.PubsubCfg.SubscriptionId == "" {
subscriptionId, err := utils.GenerateName("smt-sub-" + dbName)
if err != nil {
return fmt.Errorf("error generating pubsub subscription ID: %v", err)
}
streamingCfg.PubsubCfg.SubscriptionId = subscriptionId
}

if streamingCfg.PubsubCfg.TopicId == "" {
topicId, err := utils.GenerateName("smt-topic-" + dbName)
if err != nil {
return fmt.Errorf("error generating pubsub topic ID: %v", err)
}
streamingCfg.PubsubCfg.TopicId = topicId
}

filePath := streamingCfg.TmpDir
u, err := utils.ParseGCSFilePath(filePath)
if err != nil {
Expand Down Expand Up @@ -274,6 +292,98 @@ func getSourceStreamConfig(srcCfg *datastreampb.SourceConfig, sourceProfile prof
}
}

func CreatePubsubNotificationAndLaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, projectID string, streamingCfg StreamingCfg, conv *internal.Conv) error {
err := CreatePubsubNotification(ctx, projectID, streamingCfg, streamingCfg.DatastreamCfg, conv)
if err != nil {
return err
}
return LaunchStream(ctx, sourceProfile, dbList, projectID, streamingCfg.DatastreamCfg)
}
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved

func CreatePubsubNotification(ctx context.Context, projectID string, streamingCfg StreamingCfg, datastreamCfg DatastreamCfg, conv *internal.Conv) error {
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub client can not be created: %v", err)
}
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
defer pubsubClient.Close()
fmt.Println("Created pubsub client...")

topicObj, err := pubsubClient.CreateTopic(ctx, streamingCfg.PubsubCfg.TopicId)
if err != nil {
return fmt.Errorf("pubsub topic could not be created: %v", err)
}

_, err = pubsubClient.CreateSubscription(ctx, streamingCfg.PubsubCfg.SubscriptionId, pubsub.SubscriptionConfig{
Topic: topicObj,
AckDeadline: time.Minute * 10,
RetentionDuration: time.Hour * 24 * 7,
})
if err != nil {
return fmt.Errorf("pubsub subscription could not be created: %v", err)
}
fmt.Printf("Successfully created pubsub topic and subscription")
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved

//Creating datastream client to fetch the gcs bucket using target profile.
dsClient, err := datastream.NewClient(ctx)
if err != nil {
return fmt.Errorf("datastream client can not be created: %v", err)
}
defer dsClient.Close()

// Fetch the GCS path from the destination connection profile.
dstProf := fmt.Sprintf("projects/%s/locations/%s/connectionProfiles/%s", projectID, datastreamCfg.DestinationConnectionConfig.Location, datastreamCfg.DestinationConnectionConfig.Name)
res, err := dsClient.GetConnectionProfile(ctx, &datastreampb.GetConnectionProfileRequest{Name: dstProf})
if err != nil {
return fmt.Errorf("could not get connection profiles: %v", err)
}
gcsProfile := res.Profile.(*datastreampb.ConnectionProfile_GcsProfile).GcsProfile
bucketName := gcsProfile.Bucket
prefix := gcsProfile.RootPath + datastreamCfg.DestinationConnectionConfig.Prefix
if prefix != "" && prefix[len(prefix)-1] != '/' {
prefix = prefix + "/"
}
prefix = prefix + "data/"
if prefix[0] == '/' {
prefix = prefix[1:]
}

storageClient, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("GCS storage client can not be created: %v", err)
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
}
defer storageClient.Close()

notification := storage.Notification{
TopicID: streamingCfg.PubsubCfg.TopicId,
TopicProjectID: projectID,
PayloadFormat: storage.JSONPayload,
ObjectNamePrefix: prefix,
}
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved

createdNotification, err := storageClient.Bucket(bucketName).AddNotification(ctx, &notification)
if err != nil {
return fmt.Errorf("GCS Notification could not be created: %v", err)
}
fmt.Printf("Successfully created pubsub topic, subscription and notification for bucket %s.\n", bucketName)
streamingCfg.PubsubCfg.NotificationId = createdNotification.ID
streamingCfg.PubsubCfg.BucketName = bucketName
storeGeneratedPubsubResources(conv, streamingCfg.PubsubCfg, projectID, streamingCfg.DataShardId)
return nil
}

func storeGeneratedPubsubResources(conv *internal.Conv, pubsubCfg internal.PubsubCfg, project string, dataShardId string) {
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
conv.Audit.StreamingStats.PubsubCfg = pubsubCfg
if dataShardId != "" {
var resourceMutex sync.Mutex
resourceMutex.Lock()
conv.Audit.StreamingStats.ShardToPubsubIdMap[dataShardId] = pubsubCfg
resourceMutex.Unlock()
}
fmt.Println("\n------------------------------------------\n" +
"The Pubsub topic: " + pubsubCfg.TopicId + "and the subscription: " + pubsubCfg.SubscriptionId +
" will have to be manually cleaned up via the UI. Spanner migration tool will not delete them post completion of the migration.")
}

// LaunchStream populates the parameters from the streaming config and triggers a stream on Cloud Datastream.
func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, projectID string, datastreamCfg DatastreamCfg) error {
fmt.Println("Launching stream ", fmt.Sprintf("projects/%s/locations/%s", projectID, datastreamCfg.StreamLocation))
Expand All @@ -283,9 +393,14 @@ func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbL
}
defer dsClient.Close()
fmt.Println("Created client...")
prefix := datastreamCfg.DestinationConnectionConfig.Prefix
if prefix != "" && prefix[len(prefix)-1] != '/' {
prefix = prefix + "/"
}
prefix = prefix + "data/"
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved

gcsDstCfg := &datastreampb.GcsDestinationConfig{
Path: datastreamCfg.DestinationConnectionConfig.Prefix,
Path: prefix,
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
FileFormat: &datastreampb.GcsDestinationConfig_AvroFileFormat{},
}
srcCfg := &datastreampb.SourceConfig{
Expand Down Expand Up @@ -360,6 +475,18 @@ func CleanUpStreamingJobs(ctx context.Context, conv *internal.Conv, projectID, r
return fmt.Errorf("datastream client can not be created: %v", err)
}
defer dsClient.Close()
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub client cannot be created: %v", err)
}
defer pubsubClient.Close()
fmt.Println("Created pubsub client...")
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved

storageClient, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("storage client cannot be created: %v", err)
}
defer storageClient.Close()
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved

//clean up for single instance migrations
if conv.Audit.StreamingStats.DataflowJobId != "" {
Expand All @@ -368,6 +495,9 @@ func CleanUpStreamingJobs(ctx context.Context, conv *internal.Conv, projectID, r
if conv.Audit.StreamingStats.DataStreamName != "" {
CleanupDatastream(ctx, dsClient, conv.Audit.StreamingStats.DataStreamName, projectID, region)
}
if conv.Audit.StreamingStats.PubsubCfg.TopicId != "" && !conv.IsSharded {
CleanupPubsubResources(ctx, pubsubClient, storageClient, conv.Audit.StreamingStats.PubsubCfg, projectID)
}
// clean up jobs for sharded migrations (with error handling)
for _, dfId := range conv.Audit.StreamingStats.ShardToDataflowJobMap {
err := CleanupDataflowJob(ctx, c, dfId, projectID, region)
Expand All @@ -381,10 +511,41 @@ func CleanUpStreamingJobs(ctx context.Context, conv *internal.Conv, projectID, r
fmt.Printf("Cleanup of the datastream: %s was unsuccessful, please clean up the stream manually", dsName)
}
}
for _, pubsubCfg := range conv.Audit.StreamingStats.ShardToPubsubIdMap {
CleanupPubsubResources(ctx, pubsubClient, storageClient, pubsubCfg, projectID)
}
fmt.Println("Clean up complete")
return nil
}

func CleanupPubsubResources(ctx context.Context, pubsubClient *pubsub.Client, storageClient *storage.Client, pubsubCfg internal.PubsubCfg, projectID string) {
subscription := pubsubClient.Subscription(pubsubCfg.SubscriptionId)

err := subscription.Delete(ctx)
if err != nil {
fmt.Printf("Cleanup of the pubsub subscription: %s Failed, please clean up the pubsub subscription manually\n error=%v\n", pubsubCfg.SubscriptionId, err)
} else {
fmt.Printf("Successfully deleted subscription: %s\n\n", pubsubCfg.SubscriptionId)
}

topic := pubsubClient.Topic(pubsubCfg.TopicId)

err = topic.Delete(ctx)
if err != nil {
fmt.Printf("Cleanup of the pubsub topic: %s Failed, please clean up the pubsub topic manually\n error=%v\n", pubsubCfg.TopicId, err)
} else {
fmt.Printf("Successfully deleted topic: %s\n\n", pubsubCfg.TopicId)
}

bucket := storageClient.Bucket(pubsubCfg.BucketName)

if err := bucket.DeleteNotification(ctx, pubsubCfg.NotificationId); err != nil {
fmt.Printf("Cleanup of GCS pubsub notification: %s failed.\n error=%v\n", pubsubCfg.NotificationId, err)
} else {
fmt.Printf("Successfully deleted GCS pubsub notification: %s\n\n", pubsubCfg.NotificationId)
}
}

func CleanupDatastream(ctx context.Context, client *datastream.Client, dsName string, projectID, region string) error {
req := &datastreampb.DeleteStreamRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/streams/%s", projectID, region, dsName),
Expand Down Expand Up @@ -495,13 +656,14 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
JobName: dataflowCfg.JobName,
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: "gs://dataflow-templates-southamerica-west1/2023-09-12-00_RC00/flex/Cloud_Datastream_to_Spanner"},
Parameters: map[string]string{
"inputFilePattern": inputFilePattern,
"inputFilePattern": inputFilePattern + "data/",
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
"streamName": fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId),
"instanceId": instance,
"databaseId": dbName,
"sessionFilePath": streamingCfg.TmpDir + "session.json",
"deadLetterQueueDirectory": inputFilePattern + "dlq",
"transformationContextFilePath": streamingCfg.TmpDir + "transformationContext.json",
"gcsPubSubSubscription": fmt.Sprintf("projects/%s/subscriptions/%s", project, streamingCfg.PubsubCfg.SubscriptionId),
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MaxWorkers: maxWorkers,
Expand Down Expand Up @@ -586,7 +748,7 @@ func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg {
return streamingCfg
}

func StartDatastream(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, tableList []string) (StreamingCfg, error) {
func StartDatastream(ctx context.Context, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, tableList []string, conv *internal.Conv) (StreamingCfg, error) {
streamingCfg, err := getStreamingConfig(sourceProfile, targetProfile, tableList)
if err != nil {
return streamingCfg, fmt.Errorf("error reading streaming config: %v", err)
Expand All @@ -601,7 +763,7 @@ func StartDatastream(ctx context.Context, sourceProfile profiles.SourceProfile,
case constants.POSTGRES:
dbList = append(dbList, profiles.LogicalShard{DbName: streamingCfg.DatastreamCfg.Properties})
}
err = LaunchStream(ctx, sourceProfile, dbList, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg)
err = CreatePubsubNotificationAndLaunchStream(ctx, sourceProfile, dbList, targetProfile.Conn.Sp.Project, streamingCfg, conv)
if err != nil {
return streamingCfg, fmt.Errorf("error launching stream: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion ui/dist/ui/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
<style>.mat-typography{font:400 14px/20px Roboto,Helvetica Neue,sans-serif;letter-spacing:normal}html,body{height:100%;box-sizing:border-box}body{margin:0;font-family:Roboto,Helvetica Neue,sans-serif}</style><link rel="stylesheet" href="styles.77cd407a0323ef69.css" media="print" onload="this.media='all'"><noscript><link rel="stylesheet" href="styles.77cd407a0323ef69.css"></noscript></head>
<body class="mat-typography">
<app-root></app-root>
<script src="runtime.7cb62255c16cf7ce.js" type="module"></script><script src="polyfills.b6adefa6020709e6.js" type="module"></script><script src="main.011490a25382400a.js" type="module"></script>
<script src="runtime.7cb62255c16cf7ce.js" type="module"></script><script src="polyfills.b6adefa6020709e6.js" type="module"></script><script src="main.e88ee20f7da6ab64.js" type="module"></script>

</body></html>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ <h3> Generated Resources:</h3>
target="_blank">{{resourcesGenerated.DataflowJobName}}</a>
</li>
</span>
<span *ngIf="resourcesGenerated.PubsubTopicName!=='' && (selectedMigrationType ==='lowdt') && !isSharded">
<li> Pubsub topic: <a [href]="resourcesGenerated.PubsubTopicUrl"
target="_blank">{{resourcesGenerated.PubsubTopicName}}</a></li>
</span>
<span *ngIf="resourcesGenerated.PubsubSubscriptionName!=='' && (selectedMigrationType ==='lowdt') && !isSharded">
<li> Pubsub subscription: <a [href]="resourcesGenerated.PubsubSubscriptionUrl"
target="_blank">{{resourcesGenerated.PubsubSubscriptionName}}</a>
</li>
</span>
<span *ngIf="resourcesGenerated.DataStreamJobName!=='' && (selectedMigrationType ==='lowdt') && isSharded">
<li *ngFor="let stream of resourcesGenerated.ShardToDatastreamMap | keyvalue">
Datastream job for shardId: {{ stream.key }} - <a [href]="stream.value.JobUrl"
Expand All @@ -337,6 +346,17 @@ <h3> Generated Resources:</h3>
target="_blank">{{dfJob.value.JobName}}</a>
</li>
</span>
<span *ngIf="resourcesGenerated.PubsubTopicName!=='' && (selectedMigrationType ==='lowdt') && isSharded">
<li *ngFor="let topic of resourcesGenerated.ShardToPubsubTopicMap | keyvalue">
Pubsub topic for shardId: {{ topic.key }} - <a [href]="topic.value.JobUrl"
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
target="_blank">{{topic.value.JobName}}</a></li>
</span>
<span *ngIf="resourcesGenerated.PubsubSubscriptionName!=='' && (selectedMigrationType ==='lowdt') && isSharded">
<li *ngFor="let subscription of resourcesGenerated.ShardToPubsubSubscriptionMap | keyvalue">
Pubsub subscription for shardId: {{ subscription.key }} - <a [href]="subscription.value.JobUrl"
target="_blank">{{subscription.value.JobName}}</a>
</li>
</span>
<br><br>
<b>Note: </b>Spanner migration tool has orchestrated the migration successfully. For minimal downtime
migrations, it is safe to close Spanner migration tool now without affecting the progress of the migration.
Expand Down
Loading