Skip to content

Commit

Permalink
add integrations tests + docs (#23)
Browse files Browse the repository at this point in the history
* add integrations tests + docs

* Update README.md

Co-authored-by: Lovro Mažgon <[email protected]>

* Update README.md

Co-authored-by: Lovro Mažgon <[email protected]>

* address reviews

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
maha-hajja and lovromazgon authored Oct 24, 2024
1 parent ac17a8c commit bb67173
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 30 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ jobs:
with:
go-version-file: 'go.mod'

- name: Test
run: make test GOTEST_FLAGS="-v -count=1"
- name: Tests
run: make test-integration GOTEST_FLAGS="-v -count=1"
14 changes: 10 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ test:
go test $(GOTEST_FLAGS) -race ./...

.PHONY: test-integration
test-integration:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose.yml up -d
test-integration: up
go test $(GOTEST_FLAGS) -v -race ./...; ret=$$?; \
docker compose -f test/docker-compose.yml down; \
docker compose -f test/docker-compose.yml down -v; \
exit $$ret

.PHONY: up
up:
docker compose -f test/docker-compose.yml up --quiet-pull -d --wait

.PHONY: down
down:
docker compose -f test/docker-compose.yml down -v --remove-orphans

.PHONY: generate
generate:
go generate ./...
Expand Down
37 changes: 25 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
# Conduit Connector for DynamoDB

[Conduit](https://conduit.io) connector for [DynamoDB](https://aws.amazon.com/dynamodb/).
The DynamoDB connector is one of [Conduit](https://github.com/ConduitIO/conduit) standalone plugins. It provides a source
connector for [DynamoDB](https://aws.amazon.com/dynamodb/).

## How to build?

Run `make build` to build the connector.

## Testing

Run `make test` to run all the unit tests.
Run `make test` to run all the unit tests.

Run `make test-integration` to run all the integration tests. Tests require Docker to be installed and running.
The command will handle starting and stopping docker containers for you.

## Source
A source connector that pulls data from a DynamoDB table to downstream resources via Conduit.

A source connector pulls data from an external resource and pushes it to downstream resources via Conduit.
The connector starts with a snapshot of the data currently existent in the table, sends these records to the
destination, then starts the CDC (Change Data Capture) mode which will listen to events happening on the table
in real-time, and sends these event records to the destination (these events include: `updates`, `deletes`, and `inserts`).

The source connector uses [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to get CDC events,
so you need to enable the stream before running the connector. Check out the documentation for [how to enable a DynamoDB Stream](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling).

### Configuration

| name | description | required | default | example |
|-----------------------|-----------------------------------------------------------------------|----------|---------|----------------------|
| `table` | Table is the DynamoDB table name to pull data from. | true | | Employees |
| `aws.region` | AWS region. | true | | us-east-1 |
| `aws.accessKeyId` | AWS access key id. | true | | MY_ACCESS_KEY_ID |
| `aws.secretAccessKey` | AWS secret access key. | true | | MY_SECRET_ACCESS_KEY |
| `pollingPeriod` | Polling period for the CDC mode, formatted as a time.Duration string. | false | 1s | 100ms, 1m, 10m, 1h |
| `skipSnapshot` | Determines weather to skip the snapshot or not. | false | false | true |
| name | description | required | default | example |
|-----------------------|-----------------------------------------------------------------------|----------|---------|-----------------------|
| `table` | Table is the DynamoDB table name to pull data from. | true | | Employees |
| `aws.region` | AWS region. | true | | us-east-1 |
| `aws.accessKeyId` | AWS access key id. | true | | MY_ACCESS_KEY_ID |
| `aws.secretAccessKey` | AWS secret access key. | true | | MY_SECRET_ACCESS_KEY |
| `aws.url` | The URL for AWS (useful when testing the connector with localstack). | false | | http://localhost:4566 |
| `pollingPeriod` | Polling period for the CDC mode, formatted as a time.Duration string. | false | 1s | 100ms, 1m, 10m, 1h |
| `skipSnapshot` | Determines weather to skip the snapshot or not. | false | false | true |

<!-- Todo: working on adding some implementation details -->

![scarf pixel connector-dynamodb-readme](https://static.scarf.sh/a.png?x-pxid=cbb3901b-e502-4106-aa10-0b0726532dd6)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.17.41
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2
github.com/aws/smithy-go v1.22.0
github.com/conduitio/conduit-commons v0.4.0
github.com/conduitio/conduit-connector-sdk v0.11.0
github.com/golangci/golangci-lint v1.61.0
github.com/google/uuid v1.6.0
github.com/matryer/is v1.4.1
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
mvdan.cc/gofumpt v0.7.0
Expand Down Expand Up @@ -49,7 +51,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bkielbasa/cyclop v1.2.1 // indirect
github.com/blizzy78/varnamelen v0.8.0 // indirect
Expand Down Expand Up @@ -98,7 +99,6 @@ require (
github.com/golangci/revgrep v0.5.3 // indirect
github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gordonklaus/ineffassign v0.1.0 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions paramgen_src.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 55 additions & 6 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
smithyendpoints "github.com/aws/smithy-go/endpoints"
"github.com/conduitio-labs/conduit-connector-dynamodb/iterator"
"github.com/conduitio-labs/conduit-connector-dynamodb/position"
cconfig "github.com/conduitio/conduit-commons/config"
Expand All @@ -54,6 +56,8 @@ type SourceConfig struct {
AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"`
// AWS secret access key.
AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"`
// AWSURL The URL for AWS (useful when testing the connector with localstack).
AWSURL string `json:"aws.url"`
// polling period for the CDC mode, formatted as a time.Duration string.
PollingPeriod time.Duration `json:"pollingPeriod" default:"1s"`
// skipSnapshot determines weather to skip the snapshot or not.
Expand Down Expand Up @@ -91,11 +95,25 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
config.WithCredentialsProvider(aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(s.config.AWSAccessKeyID, s.config.AWSSecretAccessKey, ""))),
)
if err != nil {
return fmt.Errorf("error creating AWS session: %w", err)
return fmt.Errorf("could not load AWS config: %w", err)
}

s.dynamoDBClient = dynamodb.NewFromConfig(cfg)
s.streamsClient = dynamodbstreams.NewFromConfig(cfg)
// Set the endpoint if provided for testing
if s.config.AWSURL != "" {
s.dynamoDBClient = dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
o.EndpointResolverV2 = staticResolver{
BaseURL: s.config.AWSURL,
}
})
s.streamsClient = dynamodbstreams.NewFromConfig(cfg, func(o *dynamodbstreams.Options) {
o.EndpointResolverV2 = staticStreamResolver{
BaseURL: s.config.AWSURL,
}
})
} else {
s.dynamoDBClient = dynamodb.NewFromConfig(cfg)
s.streamsClient = dynamodbstreams.NewFromConfig(cfg)
}

partitionKey, sortKey, err := s.getKeyNamesFromTable(ctx)
if err != nil {
Expand All @@ -108,10 +126,10 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {

p, err := position.ParseRecordPosition(pos)
if err != nil {
return fmt.Errorf("error parssing position: %w", err)
return fmt.Errorf("error parsing position: %w", err)
}

// create the needed iterator
// Create the needed iterator
var itr Iterator
if s.config.SkipSnapshot {
itr, err = iterator.NewCDCIterator(ctx, s.config.Table, partitionKey, sortKey, s.config.PollingPeriod, s.streamsClient, s.streamArn, p)
Expand Down Expand Up @@ -171,7 +189,10 @@ func enableStream(ctx context.Context, client *dynamodb.Client, tableName string
},
}
_, err := client.UpdateTable(ctx, updateTableInput)
return fmt.Errorf("failed to enable stream on DynamoDB table: %w", err)
if err != nil {
return fmt.Errorf("failed to enable stream on DynamoDB table: %w", err)
}
return nil
}

func (s *Source) prepareStream(ctx context.Context) error {
Expand Down Expand Up @@ -250,3 +271,31 @@ func (s *Source) getKeyNamesFromTable(ctx context.Context) (partitionKey string,

return partitionKey, sortKey, nil
}

// staticResolver used to connect to a DynamoDB URL, for tests.
type staticResolver struct {
BaseURL string
}

func (s staticResolver) ResolveEndpoint(_ context.Context, _ dynamodb.EndpointParameters) (smithyendpoints.Endpoint, error) {
u, err := url.Parse(s.BaseURL)
if err != nil {
return smithyendpoints.Endpoint{}, fmt.Errorf("invalid URL: %w", err)
}

return smithyendpoints.Endpoint{URI: *u}, nil
}

// staticStreamResolver used to connect to a DynamoDB URL, for tests.
type staticStreamResolver struct {
BaseURL string
}

func (s staticStreamResolver) ResolveEndpoint(_ context.Context, _ dynamodbstreams.EndpointParameters) (smithyendpoints.Endpoint, error) {
u, err := url.Parse(s.BaseURL)
if err != nil {
return smithyendpoints.Endpoint{}, fmt.Errorf("invalid URL: %w", err)
}

return smithyendpoints.Endpoint{URI: *u}, nil
}
Loading

0 comments on commit bb67173

Please sign in to comment.