From a7c082f92337ea61e67e84f2510cc55bc9c29afb Mon Sep 17 00:00:00 2001 From: "David n. Jumani" Date: Sun, 12 May 2019 22:26:37 +0530 Subject: [PATCH] Adding concurrent batch writes --- README.md | 8 ++- batch_test.go | 18 +++++- batchwrite.go | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++ db_test.go | 9 ++- run_tests.sh | 27 +++++++++ 5 files changed, 219 insertions(+), 5 deletions(-) create mode 100755 run_tests.sh diff --git a/README.md b/README.md index ebf7691..5a30d80 100644 --- a/README.md +++ b/README.md @@ -119,8 +119,14 @@ err := db.Table("Books").Get("ID", 555).One(dynamo.AWSEncoding(&someBook)) By default, tests are run in offline mode. Create a table called `TestDB`, with a Number Parition Key called `UserID` and a String Sort Key called `Time`. Change the table name with the environment variable `DYNAMO_TEST_TABLE`. You must specify `DYNAMO_TEST_REGION`, setting it to the AWS region where your test table is. - ```bash +```bash DYNAMO_TEST_REGION=us-west-2 go test github.com/guregu/dynamo/... -cover +``` + +Or simply run the following command to test it locally : + + ```bash +./run_tests.sh ``` ### License diff --git a/batch_test.go b/batch_test.go index 44d1bd4..1aa4460 100644 --- a/batch_test.go +++ b/batch_test.go @@ -7,7 +7,7 @@ import ( const batchSize = 101 -func TestBatchGetWrite(t *testing.T) { +func testBatchGetWrite(t *testing.T, isSequential bool) { if testDB == nil { t.Skip(offlineSkipMsg) } @@ -29,7 +29,13 @@ func TestBatchGetWrite(t *testing.T) { } var wcc ConsumedCapacity - wrote, err := table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run() + var wrote int + var err error + if isSequential { + wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run() + } else { + wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).RunConcurrently() + } if wrote != batchSize { t.Error("unexpected wrote:", wrote, "≠", batchSize) } @@ -90,3 +96,11 @@ func TestBatchGetWrite(t *testing.T) { t.Error("expected 0 results, got", len(results)) } } + +func TestSequentialBatchGetWrite(t *testing.T) { + testBatchGetWrite(t, true) +} + +func TestConcurrentBatchGetWrite(t *testing.T) { + testBatchGetWrite(t, false) +} diff --git a/batchwrite.go b/batchwrite.go index 76514c8..a6d26e8 100644 --- a/batchwrite.go +++ b/batchwrite.go @@ -1,6 +1,7 @@ package dynamo import ( + "errors" "math" "github.com/aws/aws-sdk-go/aws" @@ -61,6 +62,166 @@ func (bw *BatchWrite) ConsumedCapacity(cc *ConsumedCapacity) *BatchWrite { return bw } +// Structure returned after a concurrent batch operation +type BatchResponse struct { + Result *dynamodb.BatchWriteItemOutput + Error error + BatchCounter int + Wrote int +} + +// Config used when calling RunConcurrently +type batchWriteConfig struct { + threads int +} + +// Parameter type to be passed to RunConcurrently +type BatchWriteOption func(*batchWriteConfig) + +// Sets the default config +func defaults(cfg *batchWriteConfig) { + cfg.threads = int(^uint(0) >> 1) +} + +// Sets the number of threads to process the request +func WithThreards(threads int) BatchWriteOption { + return func(cfg *batchWriteConfig) { + cfg.threads = threads + } +} + +func (bw *BatchWrite) writeBatch(ctx aws.Context, ops []*dynamodb.WriteRequest, batchCounter int, channel chan<- BatchResponse) { + + boff := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + wrote := 0 + + for { + var res *dynamodb.BatchWriteItemOutput + req := bw.input(ops) + err := retry(ctx, func() error { + var err error + res, err = bw.batch.table.db.client.BatchWriteItemWithContext(ctx, req) + return err + }) + if err != nil { + channel <- BatchResponse{ + Result: res, + Error: err, + BatchCounter: batchCounter, + Wrote: 0, + } + return + } + if bw.cc != nil { + for _, cc := range res.ConsumedCapacity { + addConsumedCapacity(bw.cc, cc) + } + } + + unprocessed := res.UnprocessedItems[bw.batch.table.Name()] + wrote = len(ops) - len(unprocessed) + if len(unprocessed) == 0 { + channel <- BatchResponse{ + Result: res, + Error: err, + BatchCounter: batchCounter, + Wrote: wrote, + } + return + } + ops = unprocessed + + // need to sleep when re-requesting, per spec + if err := aws.SleepWithContext(ctx, boff.NextBackOff()); err != nil { + channel <- BatchResponse{ + Result: nil, + Error: err, + BatchCounter: batchCounter, + Wrote: wrote, + } + return + } + } +} + +func splitBatches(requests []*dynamodb.WriteRequest) (batches [][]*dynamodb.WriteRequest) { + batches = [][]*dynamodb.WriteRequest{} + requestsLength := len(requests) + for i := 0; i < requestsLength; i += maxWriteOps { + end := i + maxWriteOps + if end > requestsLength { + end = requestsLength + } + batches = append(batches, requests[i:end]) + } + return batches +} + +func min(a int, b int) int { + + if a < b { + return a + } + return b +} + +// RunConcurrently executes this batch concurrently with the number of threads specified. +// By default, it uses 2147483647 theads (if even possible) in an attempt to run each batch in parallel +// In case of multiple errors, it will return only the last one. +func (bw *BatchWrite) RunConcurrently(opts ...BatchWriteOption) (wrote int, err error) { + ctx, cancel := defaultContext() + defer cancel() + return bw.RunConcurrentlyWithContext(ctx, opts...) +} + +func (bw *BatchWrite) RunConcurrentlyWithContext(ctx aws.Context, opts ...BatchWriteOption) (wrote int, err error) { + if bw.err != nil { + return 0, bw.err + } + + cfg := new(batchWriteConfig) + defaults(cfg) + for _, fn := range opts { + fn(cfg) + } + + // TODO : Can split the batches and run them concurrently ? + batches := splitBatches(bw.ops) + totalBatches := len(batches) + iterations := int(math.Ceil(float64(totalBatches) / float64(cfg.threads))) + + channel := make(chan BatchResponse) + + batchCounter := 0 + wrote = 0 + err = nil + for i := 0; i < iterations; i++ { + // Dispatch + end := min(batchCounter+cfg.threads, totalBatches) - batchCounter + for j := 0; j < end; j++ { + go bw.writeBatch(ctx, batches[batchCounter], batchCounter, channel) + batchCounter++ + } + + // Receive + for j := 0; j < end; j++ { + batchResponse, ok := <-channel + if ok == false { + return wrote, errors.New("Channel unexpectedly closed") + } + + if batchResponse.Error != nil { + err = batchResponse.Error + } + + wrote += batchResponse.Wrote + } + } + + close(channel) + return wrote, err +} + // Run executes this batch. // For batches with more than 25 operations, an error could indicate that // some records have been written and some have not. Consult the wrote @@ -68,6 +229,7 @@ func (bw *BatchWrite) ConsumedCapacity(cc *ConsumedCapacity) *BatchWrite { func (bw *BatchWrite) Run() (wrote int, err error) { ctx, cancel := defaultContext() defer cancel() + // TODO : Perhaps use RunConcurrentlyWithContext(dynamo.WithThreards(1)) instead ? return bw.RunWithContext(ctx) } diff --git a/db_test.go b/db_test.go index 210363e..9e33b22 100644 --- a/db_test.go +++ b/db_test.go @@ -18,8 +18,13 @@ var ( const offlineSkipMsg = "DYNAMO_TEST_REGION not set" func init() { - if region := os.Getenv("DYNAMO_TEST_REGION"); region != "" { - testDB = New(session.New(), &aws.Config{Region: aws.String(region)}) + region := os.Getenv("DYNAMO_TEST_REGION") + endpoint := os.Getenv("DYNAMO_ENDPOINT") + if region != "" && endpoint != "" { + testDB = New(session.New(), &aws.Config{ + Region: aws.String(region), + Endpoint: aws.String(endpoint), + }) } if table := os.Getenv("DYNAMO_TEST_TABLE"); table != "" { testTable = table diff --git a/run_tests.sh b/run_tests.sh new file mode 100755 index 0000000..5a8113c --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +docker rm -f dynamodb > /dev/null +docker run --name dynamodb -p 8000:8000 amazon/dynamodb-local > /dev/null & + + +export DYNAMO_ENDPOINT="http://localhost:8000" +export DYNAMO_TEST_REGION="us-west-2" +export DYNAMO_TEST_TABLE="TestDB" + +aws dynamodb delete-table \ +--table-name $DYNAMO_TEST_TABLE \ +--endpoint-url $DYNAMO_ENDPOINT > /dev/null 2>&1 + +aws dynamodb create-table \ + --table-name $DYNAMO_TEST_TABLE \ + --attribute-definitions \ + AttributeName=UserID,AttributeType=N \ + AttributeName=Time,AttributeType=S \ + --key-schema \ + AttributeName=UserID,KeyType=HASH \ + AttributeName=Time,KeyType=RANGE \ + --provisioned-throughput ReadCapacityUnits=1000,WriteCapacityUnits=1000 \ + --region $DYNAMO_TEST_REGION \ + --endpoint-url $DYNAMO_ENDPOINT > /dev/null + +go test . -cover