Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub notificaiton setup for low downtime migration #656

Merged
merged 19 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

[![integration-tests-against-emulator](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/integration-tests-against-emulator.yaml/badge.svg)](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/integration-tests-against-emulator.yaml) [![code-coverage-check](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/test-coverage.yaml/badge.svg)](https://github.com/GoogleCloudPlatform/spanner-migration-tool/actions/workflows/test-coverage.yaml) [![codecov](https://codecov.io/gh/GoogleCloudPlatform/spanner-migration-tool/graph/badge.svg?token=HY9RCUlxzm)](https://codecov.io/gh/GoogleCloudPlatform/spanner-migration-tool)


> [!IMPORTANT]
> We have changed architecture of the minimal downtime migration and added Pub/Sub notifications component. There are some changes in required permissions because of the new component. Please go through [Permissions page](https://googlecloudplatform.github.io/spanner-migration-tool/permissions.html) and [design page](https://googlecloudplatform.github.io/spanner-migration-tool/minimal) of the documentation.


## Overview

Spanner migration tool is a stand-alone open source tool for Cloud Spanner evaluation and
Expand Down
19 changes: 16 additions & 3 deletions conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,14 @@ func dataFromDatabase(ctx context.Context, sourceProfile profiles.SourceProfile,
if err != nil {
return nil, err
}
err = infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
dfOutput, err := infoSchema.StartStreamingMigration(ctx, client, conv, streamInfo)
if err != nil {
return nil, err
}
dfJobId := dfOutput.JobID
gcloudCmd := dfOutput.GCloudCmd
streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg)
streaming.StoreGeneratedResources(conv, streamingCfg, dfJobId, gcloudCmd, targetProfile.Conn.Sp.Project, "")
return bw, nil
}
return performSnapshotMigration(config, conv, client, infoSchema, internal.AdditionalDataAttributes{ShardId: ""}), nil
Expand All @@ -321,6 +325,7 @@ func dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error) {
func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv) (*writer.BatchWriter, error) {
updateShardsWithDataflowConfig(sourceProfile.Config.ShardConfigurationDataflow)
conv.Audit.StreamingStats.ShardToDataStreamNameMap = make(map[string]string)
conv.Audit.StreamingStats.ShardToPubsubIdMap = make(map[string]internal.PubsubCfg)
conv.Audit.StreamingStats.ShardToDataflowInfoMap = make(map[string]internal.ShardedDataflowJobResources)
tableList, err := common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
Expand Down Expand Up @@ -348,13 +353,21 @@ func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile,
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)

pubsubCfg, err := streaming.CreatePubsubResources(ctx, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.PubsubCfg = *pubsubCfg
err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, targetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DataflowCfg.DbNameToShardIdMap = dbNameToShardIdMap
err = streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, targetProfile, streamingCfg, conv)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streaming.StoreGeneratedResources(conv, streamingCfg, dfOutput.JobID, dfOutput.GCloudCmd, targetProfile.Conn.Sp.Project, p.DataShardId)
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
_, err = common.RunParallelTasks(sourceProfile.Config.ShardConfigurationDataflow.DataShards, 20, asyncProcessShards, true)
Expand Down
3 changes: 3 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Spanner migration tool (SMT) is a stand-alone open source tool for Cloud Spanner

---

{: .highlight }
We have changed architecture of the minimal downtime migration and added Pub/Sub notifications component. There are changes on required permissions to run the migrations because of the new component. Please go through [Permissions page](./permissions.md) and [design page](./minimal/minimal.md) of the documentation.

Spanner migration tool is a stand-alone open source tool for Cloud Spanner evaluation and
migration, using data from an existing PostgreSQL, MySQL, SQL Server, Oracle or DynamoDB database.
The tool ingests schema and data from either a pg_dump/mysqldump file or directly
Expand Down
4 changes: 2 additions & 2 deletions docs/minimal/minimal.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ permalink: /minimal
{: .note }
Minimal downtime migrations are only supported for MySQL, Postgres and Oracle source databases.

A minimal downtime migration consists of two components, migration of existing data from the database and the stream of changes (writes and updates) that are made to the source database during migration, referred to as change database capture (CDC). Using Spanner migration tool, the entire process where Datastream reads data from the source database and writes to a GCS bucket and data flow reads data from GCS bucket and writes to spanner database can be orchestrated using a unified interface. Performing schema changes on the source database during the migration is not supported. This is the suggested mode of migration for most databases.
A minimal downtime migration consists of two components, migration of existing data from the database and the stream of changes (writes and updates) that are made to the source database during migration, referred to as change database capture (CDC). The process of migration involves Datastream reading data from the source database and writing to a GCS bucket, then GCS publishing a notification to Pub/Sub topic on each new file, then a Dataflow job when notified by the Pub/Sub subscription about the new file, reading the data from GCS bucket and writing to spanner database. With Spanner migration tool, this entire process can be orchestrated using a unified interface. Performing schema changes on the source database during the migration is not supported. This is the suggested mode of migration for most databases.

![](https://services.google.com/fh/files/helpcenter/asset-ripjb7eowf.png)

Expand All @@ -23,7 +23,7 @@ Sharded migrations are currently only supported for MySQL.

Spanner migration tool supports sharded migrations for MySQL. Spanner migration tool does this is by multiplexing a minimal downtime migration across multiple shards. It uses the user configured schema while transforming the data read from each shard, at the time of writing to Spanner automatically. This provides an integrated experience to perform an end-to-end sharded migration. Below is the architecture of how sharded migrations work:

![](https://services.google.com/fh/files/misc/smt_shard_arch.png)
![](https://services.google.com/fh/files/misc/hb_sharded_migrations_with_pubsub_1.png)

### Terminology

Expand Down
22 changes: 21 additions & 1 deletion docs/permissions.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ Ensure that Datastream and Dataflow apis are enabled on your project.
```sh
gcloud services enable storage.googleapis.com
```
5. Enable the Pub/Sub api by using:

```sh
gcloud services enable pubsub.googleapis.com
```

### Configuring connectivity for `spanner-migration-tool`

Expand Down Expand Up @@ -127,6 +132,20 @@ Grant the user **Editor role** to create buckets in the project.

Enable access to Datastream, Dataflow and Spanner using [service accounts](https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances).

### Pub/Sub

Grant the user [**Pub/Sub Editor**](https://cloud.google.com/pubsub/docs/access-control#pubsub.editor) to create Pub/Sub topic and subscription for low downtime migrations.

Additionally, we need to grant Pub/Sub publisher permission to GCS service agent. This will enable GCS to push a notification to a Pub/Sub topic whenever a new file is created. Refer to [this](https://cloud.google.com/storage/docs/reporting-changes#before-you-begin) page for more details.
1. Get the GCS service agent id using the following command:
```sh
gcloud storage service-agent --project=<PROJECT_ID>
```
2. Grant pubsub publisher role to the service agent using the following command:
```sh
gcloud projects add-iam-policy-binding PROJECT_ID --member=serviceAccount:<GCS_SERVICE_ACCOUNT_ID> --role=roles/pubsub.publisher
```

### Other Permissions

In addition to these, the `DatastreamToSpanner` pipeline created by SMT requires
Expand All @@ -142,4 +161,5 @@ the following roles as well:
- Cloud Spanner Database user
- Cloud Spanner Restore Admin
- Cloud Spanner Viewer
- Dataflow Worker
- Dataflow Worker
- Pub/Sub Subscriber
14 changes: 14 additions & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ type streamingStats struct {
DataflowGcloudCmd string
ShardToDataStreamNameMap map[string]string
ShardToDataflowInfoMap map[string]ShardedDataflowJobResources
PubsubCfg PubsubCfg
darshan-sj marked this conversation as resolved.
Show resolved Hide resolved
ShardToPubsubIdMap map[string]PubsubCfg
}

type PubsubCfg struct {
TopicId string
SubscriptionId string
NotificationId string
BucketName string
}

type DataflowOutput struct {
JobID string
GCloudCmd string
}

// Stores information related to rules during schema conversion
Expand Down
2 changes: 1 addition & 1 deletion sources/common/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type InfoSchema interface {
GetIndexes(conv *internal.Conv, table SchemaAndName, colNameIdMp map[string]string) ([]schema.Index, error)
ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, spCols []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) error
StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamInfo map[string]interface{}) (internal.DataflowOutput, error)
}

// SchemaAndName contains the schema and name for a table
Expand Down
4 changes: 2 additions & 2 deletions sources/dynamodb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
// StartStreamingMigration starts the streaming migration process by creating a seperate
// worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if
// customer wants to stop the process.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 222 in sources/dynamodb/schema.go

View check run for this annotation

Codecov / codecov/patch

sources/dynamodb/schema.go#L222

Added line #L222 was not covered by tests
fmt.Println("Processing of DynamoDB Streams started...")
fmt.Println("Use Ctrl+C to stop the process.")

Expand All @@ -243,7 +243,7 @@
fillConvWithStreamingStats(streamInfo, conv)

fmt.Println("DynamoDB Streams processed successfully.")
return nil
return internal.DataflowOutput{}, nil

Check warning on line 246 in sources/dynamodb/schema.go

View check run for this annotation

Codecov / codecov/patch

sources/dynamodb/schema.go#L246

Added line #L246 was not covered by tests
}

func getSchemaIndexStruct(indexName string, keySchema []*dynamodb.KeySchemaElement, colNameIdMap map[string]string) schema.Index {
Expand Down
21 changes: 15 additions & 6 deletions sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,19 @@
mp := make(map[string]interface{})
var (
tableList []string
err error
err error

Check warning on line 367 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L367

Added line #L367 was not covered by tests
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Mysql.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)

Check warning on line 379 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L370-L379

Added lines #L370 - L379 were not covered by tests
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -378,15 +387,15 @@

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 390 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L390

Added line #L390 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)

Check warning on line 393 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L393

Added line #L393 was not covered by tests
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
return internal.DataflowOutput{}, err

Check warning on line 396 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L396

Added line #L396 was not covered by tests
}
return nil
return dfOutput, nil

Check warning on line 398 in sources/mysql/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/mysql/infoschema.go#L398

Added line #L398 was not covered by tests
}

func toType(dataType string, columnType string, charLen sql.NullInt64, numericPrecision, numericScale sql.NullInt64) schema.Type {
Expand Down
21 changes: 15 additions & 6 deletions sources/oracle/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,19 @@
mp := make(map[string]interface{})
var (
tableList []string
err error
err error

Check warning on line 431 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L431

Added line #L431 was not covered by tests
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Oracle.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)

Check warning on line 443 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L434-L443

Added lines #L434 - L443 were not covered by tests
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -442,13 +451,13 @@

// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 454 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L454

Added line #L454 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)
err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)

Check warning on line 456 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L456

Added line #L456 was not covered by tests
if err != nil {
return err
return internal.DataflowOutput{}, err

Check warning on line 458 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L458

Added line #L458 was not covered by tests
}
return nil
return dfOutput, nil

Check warning on line 460 in sources/oracle/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/oracle/infoschema.go#L460

Added line #L460 was not covered by tests
}

func toType(dataType string, typecode, elementDataType sql.NullString, charLen sql.NullInt64, numericPrecision, numericScale, elementCharMaxLen, elementNumericPrecision, elementNumericScale sql.NullInt64) schema.Type {
Expand Down
23 changes: 15 additions & 8 deletions sources/postgres/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,22 @@
mp := make(map[string]interface{})
var (
tableList []string
err error
err error

Check warning on line 66 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L66

Added line #L66 was not covered by tests
)
tableList, err = common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
err = fmt.Errorf("error fetching the tableList to setup datastream migration, defaulting to all tables: %v", err)
}
streamingCfg, err := streaming.StartDatastream(ctx, isi.SourceProfile, isi.TargetProfile, tableList)
streamingCfg, err := streaming.ReadStreamingConfig(isi.SourceProfile.Conn.Pg.StreamingConfig, isi.TargetProfile.Conn.Sp.Dbname, tableList)
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.TargetProfile.Conn.Sp.Project, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, streamingCfg, isi.SourceProfile, isi.TargetProfile, tableList)

Check warning on line 81 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L72-L81

Added lines #L72 - L81 were not covered by tests
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
return nil, err
Expand All @@ -78,19 +87,17 @@
return mp, err
}



// StartStreamingMigration is used for automatic triggering of Dataflow job when
// performing a streaming migration.
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {

Check warning on line 92 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L92

Added line #L92 was not covered by tests
streamingCfg, _ := streamingInfo["streamingCfg"].(streaming.StreamingCfg)

err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)
dfOutput, err := streaming.StartDataflow(ctx, isi.TargetProfile, streamingCfg, conv)

Check warning on line 95 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L95

Added line #L95 was not covered by tests
if err != nil {
err = fmt.Errorf("error starting dataflow: %v", err)
return err
return internal.DataflowOutput{}, err

Check warning on line 98 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L98

Added line #L98 was not covered by tests
}
return nil
return dfOutput, nil

Check warning on line 100 in sources/postgres/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/postgres/infoschema.go#L100

Added line #L100 was not covered by tests
}

// GetToDdl function below implement the common.InfoSchema interface.
Expand Down
4 changes: 2 additions & 2 deletions sources/spanner/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
return nil
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *spanner.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
return internal.DataflowOutput{}, nil

Check warning on line 83 in sources/spanner/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/spanner/infoschema.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

// GetTableName returns table name.
Expand Down
4 changes: 2 additions & 2 deletions sources/sqlserver/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
return nil, nil
}

func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) error {
return nil
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, streamingInfo map[string]interface{}) (internal.DataflowOutput, error) {
return internal.DataflowOutput{}, nil

Check warning on line 62 in sources/sqlserver/infoschema.go

View check run for this annotation

Codecov / codecov/patch

sources/sqlserver/infoschema.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

// GetTableName returns table name.
Expand Down
Loading
Loading