Skip to content
This repository has been archived by the owner on May 17, 2021. It is now read-only.

Commit

Permalink
Merge pull request #46 from dtan4/sync-more-25-items
Browse files Browse the repository at this point in the history
Divide BatchWriteItem API requests for many items
  • Loading branch information
dtan4 authored Jan 17, 2017
2 parents c26bbc8 + 3982237 commit 146595a
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 37 deletions.
106 changes: 69 additions & 37 deletions aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"github.com/pkg/errors"
)

const (
// http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
batchWriteItemMax = 25
)

// Client represents the wrapper of DynamoDB API client
type Client struct {
api dynamodbiface.DynamoDBAPI
Expand Down Expand Up @@ -64,34 +69,18 @@ func (c *Client) Delete(table, namespace string, secrets []*secret.Secret) error
return nil
}

writeRequests := []*dynamodb.WriteRequest{}

var writeRequest *dynamodb.WriteRequest
for i := 0; i < (len(secrets)-1)/batchWriteItemMax+1; i++ {
var max int

for _, secret := range secrets {
writeRequest = &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"namespace": &dynamodb.AttributeValue{
S: aws.String(namespace),
},
"key": &dynamodb.AttributeValue{
S: aws.String(secret.Key),
},
},
},
if len(secrets[i*batchWriteItemMax:]) >= batchWriteItemMax {
max = (i + 1) * batchWriteItemMax
} else {
max = i*batchWriteItemMax + len(secrets[i*batchWriteItemMax:])
}
writeRequests = append(writeRequests, writeRequest)
}

requestItems := make(map[string][]*dynamodb.WriteRequest)
requestItems[table] = writeRequests

_, err := c.api.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: requestItems,
})
if err != nil {
return errors.Wrap(err, "Failed to delete items.")
if err := c.doBatchDelete(table, namespace, secrets[i*batchWriteItemMax:max]); err != nil {
return errors.Wrap(err, "Failed to delete items.")
}
}

return nil
Expand Down Expand Up @@ -119,32 +108,60 @@ func (c *Client) DeleteNamespace(table, namespace string) error {
return errors.Wrapf(err, "Failed to list up secrets. namespace=%s", namespace)
}

writeRequests := []*dynamodb.WriteRequest{}
secrets := []*secret.Secret{}

for _, item := range resp.Items {
writeRequest := &dynamodb.WriteRequest{
secret := &secret.Secret{
Key: *item["key"].S,
Value: *item["value"].S,
}

secrets = append(secrets, secret)
}

for i := 0; i < (len(secrets)-1)/batchWriteItemMax+1; i++ {
var max int

if len(secrets[i*batchWriteItemMax:]) >= batchWriteItemMax {
max = (i + 1) * batchWriteItemMax
} else {
max = i*batchWriteItemMax + len(secrets[i*batchWriteItemMax:])
}

if err := c.doBatchDelete(table, namespace, secrets[i*batchWriteItemMax:max]); err != nil {
return errors.Wrap(err, "Failed to delete items.")
}
}

return nil
}

func (c *Client) doBatchDelete(table, namespace string, secrets []*secret.Secret) error {
writeRequests := []*dynamodb.WriteRequest{}

for _, secret := range secrets {
writeRequests = append(writeRequests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"namespace": &dynamodb.AttributeValue{
S: aws.String(namespace),
},
"key": &dynamodb.AttributeValue{
S: item["key"].S,
S: aws.String(secret.Key),
},
},
},
}
writeRequests = append(writeRequests, writeRequest)
})
}

requestItems := make(map[string][]*dynamodb.WriteRequest)
requestItems[table] = writeRequests

_, err = c.api.BatchWriteItem(&dynamodb.BatchWriteItemInput{
_, err := c.api.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: requestItems,
})
if err != nil {
return errors.Wrap(err, "Failed to delete items.")
return errors.Wrap(err, "Failed to insert items.")
}

return nil
Expand All @@ -156,12 +173,28 @@ func (c *Client) Insert(table, namespace string, secrets []*secret.Secret) error
return nil
}

writeRequests := []*dynamodb.WriteRequest{}
for i := 0; i < (len(secrets)-1)/batchWriteItemMax+1; i++ {
var max int

var writeRequest *dynamodb.WriteRequest
if len(secrets[i*batchWriteItemMax:]) >= batchWriteItemMax {
max = (i + 1) * batchWriteItemMax
} else {
max = i*batchWriteItemMax + len(secrets[i*batchWriteItemMax:])
}

if err := c.doBatchInsert(table, namespace, secrets[i*batchWriteItemMax:max]); err != nil {
return errors.Wrap(err, "Failed to insert items.")
}
}

return nil
}

func (c *Client) doBatchInsert(table, namespace string, secrets []*secret.Secret) error {
writeRequests := []*dynamodb.WriteRequest{}

for _, secret := range secrets {
writeRequest = &dynamodb.WriteRequest{
writeRequests = append(writeRequests, &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"namespace": &dynamodb.AttributeValue{
Expand All @@ -175,8 +208,7 @@ func (c *Client) Insert(table, namespace string, secrets []*secret.Secret) error
},
},
},
}
writeRequests = append(writeRequests, writeRequest)
})
}

requestItems := make(map[string][]*dynamodb.WriteRequest)
Expand Down
Loading

0 comments on commit 146595a

Please sign in to comment.