Skip to content

Commit

Permalink
Adding concurrent batch writes
Browse files Browse the repository at this point in the history
  • Loading branch information
David n. Jumani committed May 12, 2019
1 parent 41f0249 commit a7c082f
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 5 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ err := db.Table("Books").Get("ID", 555).One(dynamo.AWSEncoding(&someBook))

By default, tests are run in offline mode. Create a table called `TestDB`, with a Number Parition Key called `UserID` and a String Sort Key called `Time`. Change the table name with the environment variable `DYNAMO_TEST_TABLE`. You must specify `DYNAMO_TEST_REGION`, setting it to the AWS region where your test table is.

```bash
```bash
DYNAMO_TEST_REGION=us-west-2 go test github.com/guregu/dynamo/... -cover
```

Or simply run the following command to test it locally :

```bash
./run_tests.sh
```

### License
Expand Down
18 changes: 16 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

const batchSize = 101

func TestBatchGetWrite(t *testing.T) {
func testBatchGetWrite(t *testing.T, isSequential bool) {
if testDB == nil {
t.Skip(offlineSkipMsg)
}
Expand All @@ -29,7 +29,13 @@ func TestBatchGetWrite(t *testing.T) {
}

var wcc ConsumedCapacity
wrote, err := table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run()
var wrote int
var err error
if isSequential {
wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).Run()
} else {
wrote, err = table.Batch().Write().Put(items...).ConsumedCapacity(&wcc).RunConcurrently()
}
if wrote != batchSize {
t.Error("unexpected wrote:", wrote, "≠", batchSize)
}
Expand Down Expand Up @@ -90,3 +96,11 @@ func TestBatchGetWrite(t *testing.T) {
t.Error("expected 0 results, got", len(results))
}
}

func TestSequentialBatchGetWrite(t *testing.T) {
testBatchGetWrite(t, true)
}

func TestConcurrentBatchGetWrite(t *testing.T) {
testBatchGetWrite(t, false)
}
162 changes: 162 additions & 0 deletions batchwrite.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dynamo

import (
"errors"
"math"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -61,13 +62,174 @@ func (bw *BatchWrite) ConsumedCapacity(cc *ConsumedCapacity) *BatchWrite {
return bw
}

// Structure returned after a concurrent batch operation
type BatchResponse struct {
Result *dynamodb.BatchWriteItemOutput
Error error
BatchCounter int
Wrote int
}

// Config used when calling RunConcurrently
type batchWriteConfig struct {
threads int
}

// Parameter type to be passed to RunConcurrently
type BatchWriteOption func(*batchWriteConfig)

// Sets the default config
func defaults(cfg *batchWriteConfig) {
cfg.threads = int(^uint(0) >> 1)
}

// Sets the number of threads to process the request
func WithThreards(threads int) BatchWriteOption {
return func(cfg *batchWriteConfig) {
cfg.threads = threads
}
}

func (bw *BatchWrite) writeBatch(ctx aws.Context, ops []*dynamodb.WriteRequest, batchCounter int, channel chan<- BatchResponse) {

boff := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
wrote := 0

for {
var res *dynamodb.BatchWriteItemOutput
req := bw.input(ops)
err := retry(ctx, func() error {
var err error
res, err = bw.batch.table.db.client.BatchWriteItemWithContext(ctx, req)
return err
})
if err != nil {
channel <- BatchResponse{
Result: res,
Error: err,
BatchCounter: batchCounter,
Wrote: 0,
}
return
}
if bw.cc != nil {
for _, cc := range res.ConsumedCapacity {
addConsumedCapacity(bw.cc, cc)
}
}

unprocessed := res.UnprocessedItems[bw.batch.table.Name()]
wrote = len(ops) - len(unprocessed)
if len(unprocessed) == 0 {
channel <- BatchResponse{
Result: res,
Error: err,
BatchCounter: batchCounter,
Wrote: wrote,
}
return
}
ops = unprocessed

// need to sleep when re-requesting, per spec
if err := aws.SleepWithContext(ctx, boff.NextBackOff()); err != nil {
channel <- BatchResponse{
Result: nil,
Error: err,
BatchCounter: batchCounter,
Wrote: wrote,
}
return
}
}
}

func splitBatches(requests []*dynamodb.WriteRequest) (batches [][]*dynamodb.WriteRequest) {
batches = [][]*dynamodb.WriteRequest{}
requestsLength := len(requests)
for i := 0; i < requestsLength; i += maxWriteOps {
end := i + maxWriteOps
if end > requestsLength {
end = requestsLength
}
batches = append(batches, requests[i:end])
}
return batches
}

func min(a int, b int) int {

if a < b {
return a
}
return b
}

// RunConcurrently executes this batch concurrently with the number of threads specified.
// By default, it uses 2147483647 theads (if even possible) in an attempt to run each batch in parallel
// In case of multiple errors, it will return only the last one.
func (bw *BatchWrite) RunConcurrently(opts ...BatchWriteOption) (wrote int, err error) {
ctx, cancel := defaultContext()
defer cancel()
return bw.RunConcurrentlyWithContext(ctx, opts...)
}

func (bw *BatchWrite) RunConcurrentlyWithContext(ctx aws.Context, opts ...BatchWriteOption) (wrote int, err error) {
if bw.err != nil {
return 0, bw.err
}

cfg := new(batchWriteConfig)
defaults(cfg)
for _, fn := range opts {
fn(cfg)
}

// TODO : Can split the batches and run them concurrently ?
batches := splitBatches(bw.ops)
totalBatches := len(batches)
iterations := int(math.Ceil(float64(totalBatches) / float64(cfg.threads)))

channel := make(chan BatchResponse)

batchCounter := 0
wrote = 0
err = nil
for i := 0; i < iterations; i++ {
// Dispatch
end := min(batchCounter+cfg.threads, totalBatches) - batchCounter
for j := 0; j < end; j++ {
go bw.writeBatch(ctx, batches[batchCounter], batchCounter, channel)
batchCounter++
}

// Receive
for j := 0; j < end; j++ {
batchResponse, ok := <-channel
if ok == false {
return wrote, errors.New("Channel unexpectedly closed")
}

if batchResponse.Error != nil {
err = batchResponse.Error
}

wrote += batchResponse.Wrote
}
}

close(channel)
return wrote, err
}

// Run executes this batch.
// For batches with more than 25 operations, an error could indicate that
// some records have been written and some have not. Consult the wrote
// return amount to figure out which operations have succeeded.
func (bw *BatchWrite) Run() (wrote int, err error) {
ctx, cancel := defaultContext()
defer cancel()
// TODO : Perhaps use RunConcurrentlyWithContext(dynamo.WithThreards(1)) instead ?
return bw.RunWithContext(ctx)
}

Expand Down
9 changes: 7 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ var (
const offlineSkipMsg = "DYNAMO_TEST_REGION not set"

func init() {
if region := os.Getenv("DYNAMO_TEST_REGION"); region != "" {
testDB = New(session.New(), &aws.Config{Region: aws.String(region)})
region := os.Getenv("DYNAMO_TEST_REGION")
endpoint := os.Getenv("DYNAMO_ENDPOINT")
if region != "" && endpoint != "" {
testDB = New(session.New(), &aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
})
}
if table := os.Getenv("DYNAMO_TEST_TABLE"); table != "" {
testTable = table
Expand Down
27 changes: 27 additions & 0 deletions run_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

docker rm -f dynamodb > /dev/null
docker run --name dynamodb -p 8000:8000 amazon/dynamodb-local > /dev/null &


export DYNAMO_ENDPOINT="http://localhost:8000"
export DYNAMO_TEST_REGION="us-west-2"
export DYNAMO_TEST_TABLE="TestDB"

aws dynamodb delete-table \
--table-name $DYNAMO_TEST_TABLE \
--endpoint-url $DYNAMO_ENDPOINT > /dev/null 2>&1

aws dynamodb create-table \
--table-name $DYNAMO_TEST_TABLE \
--attribute-definitions \
AttributeName=UserID,AttributeType=N \
AttributeName=Time,AttributeType=S \
--key-schema \
AttributeName=UserID,KeyType=HASH \
AttributeName=Time,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=1000,WriteCapacityUnits=1000 \
--region $DYNAMO_TEST_REGION \
--endpoint-url $DYNAMO_ENDPOINT > /dev/null

go test . -cover

0 comments on commit a7c082f

Please sign in to comment.