diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 885d3c2..eff8b5b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,5 +16,5 @@ jobs: with: go-version-file: 'go.mod' - - name: Test - run: make test GOTEST_FLAGS="-v -count=1" + - name: Tests + run: make test-integration GOTEST_FLAGS="-v -count=1" diff --git a/Makefile b/Makefile index 8249449..41b3266 100644 --- a/Makefile +++ b/Makefile @@ -9,13 +9,19 @@ test: go test $(GOTEST_FLAGS) -race ./... .PHONY: test-integration -test-integration: - # run required docker containers, execute integration tests, stop containers after tests - docker compose -f test/docker-compose.yml up -d +test-integration: up go test $(GOTEST_FLAGS) -v -race ./...; ret=$$?; \ - docker compose -f test/docker-compose.yml down; \ + docker compose -f test/docker-compose.yml down -v; \ exit $$ret +.PHONY: up +up: + docker compose -f test/docker-compose.yml up --quiet-pull -d --wait + +.PHONY: down +down: + docker compose -f test/docker-compose.yml down -v --remove-orphans + .PHONY: generate generate: go generate ./... diff --git a/README.md b/README.md index f330e72..1f6d8f9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Conduit Connector for DynamoDB - -[Conduit](https://conduit.io) connector for [DynamoDB](https://aws.amazon.com/dynamodb/). +The DynamoDB connector is one of [Conduit](https://github.com/ConduitIO/conduit) standalone plugins. It provides a source +connector for [DynamoDB](https://aws.amazon.com/dynamodb/). ## How to build? @@ -8,20 +8,33 @@ Run `make build` to build the connector. ## Testing -Run `make test` to run all the unit tests. +Run `make test` to run all the unit tests. + +Run `make test-integration` to run all the integration tests. Tests require Docker to be installed and running. +The command will handle starting and stopping docker containers for you. ## Source +A source connector that pulls data from a DynamoDB table to downstream resources via Conduit. -A source connector pulls data from an external resource and pushes it to downstream resources via Conduit. +The connector starts with a snapshot of the data currently existent in the table, sends these records to the +destination, then starts the CDC (Change Data Capture) mode which will listen to events happening on the table +in real-time, and sends these event records to the destination (these events include: `updates`, `deletes`, and `inserts`). + +The source connector uses [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to get CDC events, +so you need to enable the stream before running the connector. Check out the documentation for [how to enable a DynamoDB Stream](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling). ### Configuration -| name | description | required | default | example | -|-----------------------|-----------------------------------------------------------------------|----------|---------|----------------------| -| `table` | Table is the DynamoDB table name to pull data from. | true | | Employees | -| `aws.region` | AWS region. | true | | us-east-1 | -| `aws.accessKeyId` | AWS access key id. | true | | MY_ACCESS_KEY_ID | -| `aws.secretAccessKey` | AWS secret access key. | true | | MY_SECRET_ACCESS_KEY | -| `pollingPeriod` | Polling period for the CDC mode, formatted as a time.Duration string. | false | 1s | 100ms, 1m, 10m, 1h | -| `skipSnapshot` | Determines weather to skip the snapshot or not. | false | false | true | +| name | description | required | default | example | +|-----------------------|-----------------------------------------------------------------------|----------|---------|-----------------------| +| `table` | Table is the DynamoDB table name to pull data from. | true | | Employees | +| `aws.region` | AWS region. | true | | us-east-1 | +| `aws.accessKeyId` | AWS access key id. | true | | MY_ACCESS_KEY_ID | +| `aws.secretAccessKey` | AWS secret access key. | true | | MY_SECRET_ACCESS_KEY | +| `aws.url` | The URL for AWS (useful when testing the connector with localstack). | false | | http://localhost:4566 | +| `pollingPeriod` | Polling period for the CDC mode, formatted as a time.Duration string. | false | 1s | 100ms, 1m, 10m, 1h | +| `skipSnapshot` | Determines weather to skip the snapshot or not. | false | false | true | + + +![scarf pixel connector-dynamodb-readme](https://static.scarf.sh/a.png?x-pxid=cbb3901b-e502-4106-aa10-0b0726532dd6) \ No newline at end of file diff --git a/go.mod b/go.mod index a2e785c..c9ab1e6 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,11 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.41 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2 github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2 + github.com/aws/smithy-go v1.22.0 github.com/conduitio/conduit-commons v0.4.0 github.com/conduitio/conduit-connector-sdk v0.11.0 github.com/golangci/golangci-lint v1.61.0 + github.com/google/uuid v1.6.0 github.com/matryer/is v1.4.1 gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 mvdan.cc/gofumpt v0.7.0 @@ -49,7 +51,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect - github.com/aws/smithy-go v1.22.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bkielbasa/cyclop v1.2.1 // indirect github.com/blizzy78/varnamelen v0.8.0 // indirect @@ -98,7 +99,6 @@ require ( github.com/golangci/revgrep v0.5.3 // indirect github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gordonklaus/ineffassign v0.1.0 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect diff --git a/paramgen_src.go b/paramgen_src.go index b5eb723..4080f01 100644 --- a/paramgen_src.go +++ b/paramgen_src.go @@ -11,6 +11,7 @@ const ( SourceConfigAwsAccessKeyId = "aws.accessKeyId" SourceConfigAwsRegion = "aws.region" SourceConfigAwsSecretAccessKey = "aws.secretAccessKey" + SourceConfigAwsUrl = "aws.url" SourceConfigPollingPeriod = "pollingPeriod" SourceConfigSkipSnapshot = "skipSnapshot" SourceConfigTable = "table" @@ -42,6 +43,12 @@ func (SourceConfig) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, + SourceConfigAwsUrl: { + Default: "", + Description: "AWSURL The URL for AWS (useful when testing the connector with localstack).", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, SourceConfigPollingPeriod: { Default: "1s", Description: "polling period for the CDC mode, formatted as a time.Duration string.", diff --git a/source.go b/source.go index 6216130..393a4b6 100644 --- a/source.go +++ b/source.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "net/url" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -28,6 +29,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" + smithyendpoints "github.com/aws/smithy-go/endpoints" "github.com/conduitio-labs/conduit-connector-dynamodb/iterator" "github.com/conduitio-labs/conduit-connector-dynamodb/position" cconfig "github.com/conduitio/conduit-commons/config" @@ -54,6 +56,8 @@ type SourceConfig struct { AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"` // AWS secret access key. AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"` + // AWSURL The URL for AWS (useful when testing the connector with localstack). + AWSURL string `json:"aws.url"` // polling period for the CDC mode, formatted as a time.Duration string. PollingPeriod time.Duration `json:"pollingPeriod" default:"1s"` // skipSnapshot determines weather to skip the snapshot or not. @@ -91,11 +95,25 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error { config.WithCredentialsProvider(aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(s.config.AWSAccessKeyID, s.config.AWSSecretAccessKey, ""))), ) if err != nil { - return fmt.Errorf("error creating AWS session: %w", err) + return fmt.Errorf("could not load AWS config: %w", err) } - s.dynamoDBClient = dynamodb.NewFromConfig(cfg) - s.streamsClient = dynamodbstreams.NewFromConfig(cfg) + // Set the endpoint if provided for testing + if s.config.AWSURL != "" { + s.dynamoDBClient = dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.EndpointResolverV2 = staticResolver{ + BaseURL: s.config.AWSURL, + } + }) + s.streamsClient = dynamodbstreams.NewFromConfig(cfg, func(o *dynamodbstreams.Options) { + o.EndpointResolverV2 = staticStreamResolver{ + BaseURL: s.config.AWSURL, + } + }) + } else { + s.dynamoDBClient = dynamodb.NewFromConfig(cfg) + s.streamsClient = dynamodbstreams.NewFromConfig(cfg) + } partitionKey, sortKey, err := s.getKeyNamesFromTable(ctx) if err != nil { @@ -108,10 +126,10 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error { p, err := position.ParseRecordPosition(pos) if err != nil { - return fmt.Errorf("error parssing position: %w", err) + return fmt.Errorf("error parsing position: %w", err) } - // create the needed iterator + // Create the needed iterator var itr Iterator if s.config.SkipSnapshot { itr, err = iterator.NewCDCIterator(ctx, s.config.Table, partitionKey, sortKey, s.config.PollingPeriod, s.streamsClient, s.streamArn, p) @@ -171,7 +189,10 @@ func enableStream(ctx context.Context, client *dynamodb.Client, tableName string }, } _, err := client.UpdateTable(ctx, updateTableInput) - return fmt.Errorf("failed to enable stream on DynamoDB table: %w", err) + if err != nil { + return fmt.Errorf("failed to enable stream on DynamoDB table: %w", err) + } + return nil } func (s *Source) prepareStream(ctx context.Context) error { @@ -250,3 +271,31 @@ func (s *Source) getKeyNamesFromTable(ctx context.Context) (partitionKey string, return partitionKey, sortKey, nil } + +// staticResolver used to connect to a DynamoDB URL, for tests. +type staticResolver struct { + BaseURL string +} + +func (s staticResolver) ResolveEndpoint(_ context.Context, _ dynamodb.EndpointParameters) (smithyendpoints.Endpoint, error) { + u, err := url.Parse(s.BaseURL) + if err != nil { + return smithyendpoints.Endpoint{}, fmt.Errorf("invalid URL: %w", err) + } + + return smithyendpoints.Endpoint{URI: *u}, nil +} + +// staticStreamResolver used to connect to a DynamoDB URL, for tests. +type staticStreamResolver struct { + BaseURL string +} + +func (s staticStreamResolver) ResolveEndpoint(_ context.Context, _ dynamodbstreams.EndpointParameters) (smithyendpoints.Endpoint, error) { + u, err := url.Parse(s.BaseURL) + if err != nil { + return smithyendpoints.Endpoint{}, fmt.Errorf("invalid URL: %w", err) + } + + return smithyendpoints.Endpoint{URI: *u}, nil +} diff --git a/source_integration_test.go b/source_integration_test.go new file mode 100644 index 0000000..a3b49f3 --- /dev/null +++ b/source_integration_test.go @@ -0,0 +1,376 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dynamodb + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/conduitio-labs/conduit-connector-dynamodb/position" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/google/uuid" + "github.com/matryer/is" +) + +var ( + PartitionKey = "pkey" + SortKey = "skey" +) + +// Records is a slice of opencdc.Record, that can be sorted by the sort key under record.Key["skey"]. +type Records []opencdc.Record + +// Implementing the sort interface. +func (r Records) Len() int { + return len(r) +} + +func (r Records) Less(i, j int) bool { + // Ensure that both records have a sort key + sortKeyI, okI := r[i].Key.(opencdc.StructuredData)[SortKey].(string) + sortKeyJ, okJ := r[j].Key.(opencdc.StructuredData)[SortKey].(string) + + // If either key is not present or not a string + if !okI || !okJ { + return false + } + + // Compare the sort key values + return sortKeyI < sortKeyJ +} + +func (r Records) Swap(i, j int) { + r[i], r[j] = r[j], r[i] +} + +func TestSource_SuccessfulSnapshot(t *testing.T) { + is := is.New(t) + ctx := context.Background() + client, cfg := prepareIntegrationTest(ctx, t) + + testTable := cfg[SourceConfigTable] + source := &Source{} + err := source.Configure(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + // insert 5 rows + err = insertRecord(ctx, client, testTable, 0, 5) + is.NoErr(err) + + err = source.Open(ctx, nil) + if err != nil { + t.Fatal(err) + } + + var got Records + for { + rec, err := source.Read(ctx) + if errors.Is(err, sdk.ErrBackoffRetry) { + break + } + got = append(got, rec) + } + is.True(got != nil) + is.Equal(5, got.Len()) + // sort the records then assert the values. + sort.Sort(got) + for i, rec := range got { + is.Equal(rec.Payload.After, opencdc.StructuredData{PartitionKey: fmt.Sprintf("pkey%d", i), SortKey: fmt.Sprintf("%d", i)}) + } + _ = source.Teardown(ctx) +} + +func TestSource_SnapshotRestart(t *testing.T) { + is := is.New(t) + ctx := context.Background() + client, cfg := prepareIntegrationTest(ctx, t) + testTable := cfg[SourceConfigTable] + source := &Source{} + err := source.Configure(ctx, cfg) + is.NoErr(err) + + // add rows + err = insertRecord(ctx, client, testTable, 0, 6) + is.NoErr(err) + + // set a non nil position + pos := position.Position{ + IteratorType: position.TypeSnapshot, + PartitionKey: "pkey2", + SortKey: "2", + } + recPos, err := pos.ToRecordPosition() + is.NoErr(err) + err = source.Open(ctx, recPos) + is.NoErr(err) + + var got Records + for i := 0; i < 6; i++ { + rec, err := source.Read(ctx) + is.NoErr(err) + got = append(got, rec) + } + // if the read records are five, then the snapshot started again successfully, from nil position + is.True(got != nil) + is.Equal(6, got.Len()) +} + +func TestSource_EmptyTable(t *testing.T) { + is := is.New(t) + ctx := context.Background() + _, cfg := prepareIntegrationTest(ctx, t) + + source := &Source{} + err := source.Configure(ctx, cfg) + is.NoErr(err) + err = source.Open(ctx, nil) + is.NoErr(err) + + _, err = source.Read(ctx) + is.True(errors.Is(err, sdk.ErrBackoffRetry)) + + _ = source.Teardown(ctx) +} + +func TestSource_NonExistentTable(t *testing.T) { + is := is.New(t) + ctx := context.Background() + _, cfg := prepareIntegrationTest(ctx, t) + + source := &Source{} + + // set the table name to a unique uuid, so it doesn't exist. + cfg[SourceConfigTable] = uuid.NewString() + + err := source.Configure(ctx, cfg) + is.NoErr(err) + + // table existence check at "Open" + err = source.Open(ctx, nil) + is.True(err != nil) + is.True(strings.Contains(err.Error(), "Cannot do operations on a non-existent table")) +} + +func TestSource_CDC(t *testing.T) { + is := is.New(t) + ctx := context.Background() + client, cfg := prepareIntegrationTest(ctx, t) + // + testTable := cfg[SourceConfigTable] + source := &Source{} + err := source.Configure(ctx, cfg) + is.NoErr(err) + + // add rows + err = insertRecord(ctx, client, testTable, 1, 2) + is.NoErr(err) + + err = source.Open(ctx, nil) + is.NoErr(err) + + // snapshot, one record + rec, err := source.Read(ctx) + is.NoErr(err) + is.Equal(rec.Payload.After, opencdc.StructuredData{PartitionKey: "pkey1", SortKey: "1"}) + + // add a row, will be captured by CDC + err = insertRecord(ctx, client, testTable, 2, 3) + is.NoErr(err) + + // update the latest row, will be captured by CDC + _, err = client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: aws.String(testTable), + Key: map[string]types.AttributeValue{ + PartitionKey: &types.AttributeValueMemberS{Value: "pkey1"}, // partition key + SortKey: &types.AttributeValueMemberN{Value: "1"}, // sort key + }, + UpdateExpression: aws.String("SET #attr = :newValue"), + ExpressionAttributeNames: map[string]string{ + "#attr": "data", // alias 'data' to avoid reserved keyword error + }, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":newValue": &types.AttributeValueMemberS{Value: "newValue"}, // the new value + }, + }) + is.NoErr(err) + + // delete the latest row, will be captured by CDC + _, err = client.DeleteItem(ctx, &dynamodb.DeleteItemInput{ + TableName: aws.String(testTable), + Key: map[string]types.AttributeValue{ + PartitionKey: &types.AttributeValueMemberS{Value: "pkey2"}, + SortKey: &types.AttributeValueMemberN{Value: "2"}, + }, + }) + is.NoErr(err) + + // cdc + i := 0 + for { + rec2, err := source.Read(ctx) + if err == nil { + switch i { + case 0: + is.True(rec2.Operation == opencdc.OperationCreate) + is.Equal(rec2.Payload.After, opencdc.StructuredData{PartitionKey: "pkey2", SortKey: "2"}) + case 1: + is.True(rec2.Operation == opencdc.OperationUpdate) + is.Equal(rec2.Payload.After, opencdc.StructuredData{"data": "newValue", PartitionKey: "pkey1", SortKey: "1"}) + case 2: + is.True(rec2.Operation == opencdc.OperationDelete) + is.Equal(rec2.Payload.Before, opencdc.StructuredData{PartitionKey: "pkey2", SortKey: "2"}) + } + i++ + } + if i == 3 { + break + } + } + + _ = source.Teardown(ctx) +} + +func prepareIntegrationTest(ctx context.Context, t *testing.T) (*dynamodb.Client, map[string]string) { + t.Helper() + cfg := map[string]string{ + SourceConfigAwsAccessKeyId: "test", + SourceConfigAwsSecretAccessKey: "test", + SourceConfigAwsRegion: "us-east-1", + SourceConfigPollingPeriod: "10ms", + SourceConfigAwsUrl: "http://localhost:4566", // docker url + } + + client, err := newDynamoClients(ctx, cfg) + if err != nil { + t.Fatalf("could not create dynamoDB clients: %v", err) + } + + table := "conduit-dynamodb-source-test-" + uuid.NewString() + cfg[SourceConfigTable] = table + + // create table + err = createTable(ctx, client, table, PartitionKey, SortKey) + if err != nil { + t.Fatalf("could not create dynamoDB table: %v", err) + } + + t.Cleanup(func() { + err := deleteTable(ctx, client, table) + if err != nil { + t.Logf("failed to delete the table: %v", err) + } + }) + + return client, cfg +} + +func newDynamoClients(ctx context.Context, cfg map[string]string) (*dynamodb.Client, error) { + clientCfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(cfg[SourceConfigAwsRegion]), + config.WithCredentialsProvider(aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(cfg[SourceConfigAwsAccessKeyId], cfg[SourceConfigAwsSecretAccessKey], ""))), + ) + if err != nil { + return nil, fmt.Errorf("error creating AWS session: %w", err) + } + + dynamoDBClient := dynamodb.NewFromConfig(clientCfg, func(o *dynamodb.Options) { + o.EndpointResolverV2 = staticResolver{ + BaseURL: cfg[SourceConfigAwsUrl], + } + }) + return dynamoDBClient, nil +} + +func insertRecord(ctx context.Context, client *dynamodb.Client, tableName string, from int, to int) error { + for i := 0; i < to-from; i++ { + _, err := client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(tableName), + Item: map[string]types.AttributeValue{ + PartitionKey: &types.AttributeValueMemberS{Value: fmt.Sprintf("pkey%d", from+i)}, + SortKey: &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", from+i)}, + }, + }) + if err != nil { + return fmt.Errorf("error inserting record: %w", err) + } + } + return nil +} + +func createTable(ctx context.Context, client *dynamodb.Client, tableName string, partitionKey string, sortKey string) error { + // Define the table schema with additional attributes + input := &dynamodb.CreateTableInput{ + TableName: aws.String(tableName), + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String(partitionKey), // Partition key + AttributeType: types.ScalarAttributeTypeS, // String + }, + { + AttributeName: aws.String(sortKey), // Sort key + AttributeType: types.ScalarAttributeTypeN, // number + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(partitionKey), // Partition key + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String(sortKey), // Sort key + KeyType: types.KeyTypeRange, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(5), + WriteCapacityUnits: aws.Int64(5), + }, + } + + // Create the table + _, err := client.CreateTable(ctx, input) + if err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + + return nil +} + +func deleteTable(ctx context.Context, client *dynamodb.Client, tableName string) error { + // Create the input for the DeleteTable call + input := &dynamodb.DeleteTableInput{ + TableName: aws.String(tableName), + } + + // Delete the table + _, err := client.DeleteTable(ctx, input) + if err != nil { + return fmt.Errorf("failed to delete table: %w", err) + } + + return nil +} diff --git a/test/docker-compose.yml b/test/docker-compose.yml index a7cdebb..054b0eb 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -1,5 +1,10 @@ -# Configure the services needed for the integration tests. -# More information at https://docs.docker.com/compose/ +version: '3.8' services: - hello-world: - image: "hello-world:latest" + localstack: + image: localstack/localstack:3.6.0 + ports: + - "4566:4566" + environment: + - SERVICES=dynamodb + - EDGE_PORT=4566 + - DEFAULT_REGION=us-east-1