Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to AWS SDK v2 #206

Closed
wants to merge 13 commits into from
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## dynamo [![GoDoc](https://godoc.org/github.com/guregu/dynamo?status.svg)](https://godoc.org/github.com/guregu/dynamo)
`import "github.com/guregu/dynamo"`

dynamo is an expressive [DynamoDB](https://aws.amazon.com/dynamodb/) client for Go, with an easy but powerful API. dynamo integrates with the official [AWS SDK](https://github.com/aws/aws-sdk-go/).
dynamo is an expressive [DynamoDB](https://aws.amazon.com/dynamodb/) client for Go, with an easy but powerful API. dynamo integrates with the official [AWS SDK v2](https://github.com/aws/aws-sdk-go-v2/).

This library is stable and versioned with Go modules.

Expand All @@ -12,9 +12,11 @@ package dynamo

import (
"time"
"context"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/guregu/dynamo"
)

Expand All @@ -34,13 +36,16 @@ type widget struct {


func main() {
sess := session.Must(session.NewSession())
db := dynamo.New(sess, &aws.Config{Region: aws.String("us-west-2")})
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
db := dynamo.New(cfg)
table := db.Table("Widgets")

// put item
w := widget{UserID: 613, Time: time.Now(), Msg: "hello"}
err := table.Put(w).Run()
err = table.Put(w).Run()

// get the same item
var result widget
Expand Down
32 changes: 19 additions & 13 deletions batchget.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package dynamo

import (
"context"
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go/time"
"github.com/cenkalti/backoff/v4"
)

Expand Down Expand Up @@ -106,7 +108,7 @@ func (bg *BatchGet) All(out interface{}) error {
}

// AllWithContext executes this request and unmarshals all results to out, which must be a pointer to a slice.
func (bg *BatchGet) AllWithContext(ctx aws.Context, out interface{}) error {
func (bg *BatchGet) AllWithContext(ctx context.Context, out interface{}) error {
iter := newBGIter(bg, unmarshalAppend, bg.err)
for iter.NextWithContext(ctx, out) {
}
Expand All @@ -128,7 +130,7 @@ func (bg *BatchGet) input(start int) *dynamodb.BatchGetItemInput {
}

in := &dynamodb.BatchGetItemInput{
RequestItems: make(map[string]*dynamodb.KeysAndAttributes, 1),
RequestItems: make(map[string]types.KeysAndAttributes, 1),
}

if bg.projection != "" {
Expand All @@ -138,10 +140,10 @@ func (bg *BatchGet) input(start int) *dynamodb.BatchGetItemInput {
}
}
if bg.cc != nil {
in.ReturnConsumedCapacity = aws.String(dynamodb.ReturnConsumedCapacityIndexes)
in.ReturnConsumedCapacity = types.ReturnConsumedCapacityIndexes
}

var kas *dynamodb.KeysAndAttributes
var kas *types.KeysAndAttributes
for _, get := range bg.reqs[start:end] {
if kas == nil {
kas = get.keysAndAttribs()
Expand All @@ -155,7 +157,7 @@ func (bg *BatchGet) input(start int) *dynamodb.BatchGetItemInput {
if bg.consistent {
kas.ConsistentRead = &bg.consistent
}
in.RequestItems[bg.batch.table.Name()] = kas
in.RequestItems[bg.batch.table.Name()] = *kas
return in
}

Expand Down Expand Up @@ -201,7 +203,7 @@ func (itr *bgIter) Next(out interface{}) bool {
return itr.NextWithContext(ctx, out)
}

func (itr *bgIter) NextWithContext(ctx aws.Context, out interface{}) bool {
func (itr *bgIter) NextWithContext(ctx context.Context, out interface{}) bool {
// stop if we have an error
if ctx.Err() != nil {
itr.err = ctx.Err()
Expand Down Expand Up @@ -230,8 +232,12 @@ redo:

if itr.output != nil && itr.idx >= len(itr.output.Responses[tableName]) {
var unprocessed int
if itr.output.UnprocessedKeys != nil && itr.output.UnprocessedKeys[tableName] != nil {
unprocessed = len(itr.output.UnprocessedKeys[tableName].Keys)

if itr.output.UnprocessedKeys != nil {
_, ok := itr.output.UnprocessedKeys[tableName]
if ok {
unprocessed = len(itr.output.UnprocessedKeys[tableName].Keys)
}
}
itr.processed += len(itr.input.RequestItems[tableName].Keys) - unprocessed
// have we exhausted all results?
Expand All @@ -248,7 +254,7 @@ redo:
// no, prepare a new request with the remaining keys
itr.input.RequestItems = itr.output.UnprocessedKeys
// we need to sleep here a bit as per the official docs
if err := aws.SleepWithContext(ctx, itr.backoff.NextBackOff()); err != nil {
if err := time.SleepWithContext(ctx, itr.backoff.NextBackOff()); err != nil {
// timed out
itr.err = err
return false
Expand All @@ -259,15 +265,15 @@ redo:

itr.err = retry(ctx, func() error {
var err error
itr.output, err = itr.bg.batch.table.db.client.BatchGetItemWithContext(ctx, itr.input)
itr.output, err = itr.bg.batch.table.db.client.BatchGetItem(ctx, itr.input)
return err
})
if itr.err != nil {
return false
}
if itr.bg.cc != nil {
for _, cc := range itr.output.ConsumedCapacity {
addConsumedCapacity(itr.bg.cc, cc)
addConsumedCapacity(itr.bg.cc, &cc)
}
}

Expand Down
26 changes: 14 additions & 12 deletions batchwrite.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package dynamo

import (
"context"
"math"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go/time"
"github.com/cenkalti/backoff/v4"
)

Expand All @@ -14,7 +16,7 @@ const maxWriteOps = 25
// BatchWrite is a BatchWriteItem operation.
type BatchWrite struct {
batch Batch
ops []*dynamodb.WriteRequest
ops []types.WriteRequest
err error
cc *ConsumedCapacity
}
Expand All @@ -33,7 +35,7 @@ func (bw *BatchWrite) Put(items ...interface{}) *BatchWrite {
for _, item := range items {
encoded, err := marshalItem(item)
bw.setError(err)
bw.ops = append(bw.ops, &dynamodb.WriteRequest{PutRequest: &dynamodb.PutRequest{
bw.ops = append(bw.ops, types.WriteRequest{PutRequest: &types.PutRequest{
Item: encoded,
}})
}
Expand All @@ -48,7 +50,7 @@ func (bw *BatchWrite) Delete(keys ...Keyed) *BatchWrite {
del.Range(bw.batch.rangeKey, rk)
bw.setError(del.err)
}
bw.ops = append(bw.ops, &dynamodb.WriteRequest{DeleteRequest: &dynamodb.DeleteRequest{
bw.ops = append(bw.ops, types.WriteRequest{DeleteRequest: &types.DeleteRequest{
Key: del.key(),
}})
}
Expand All @@ -71,7 +73,7 @@ func (bw *BatchWrite) Run() (wrote int, err error) {
return bw.RunWithContext(ctx)
}

func (bw *BatchWrite) RunWithContext(ctx aws.Context) (wrote int, err error) {
func (bw *BatchWrite) RunWithContext(ctx context.Context) (wrote int, err error) {
if bw.err != nil {
return 0, bw.err
}
Expand All @@ -95,15 +97,15 @@ func (bw *BatchWrite) RunWithContext(ctx aws.Context) (wrote int, err error) {
req := bw.input(ops)
err := retry(ctx, func() error {
var err error
res, err = bw.batch.table.db.client.BatchWriteItemWithContext(ctx, req)
res, err = bw.batch.table.db.client.BatchWriteItem(ctx, req)
return err
})
if err != nil {
return wrote, err
}
if bw.cc != nil {
for _, cc := range res.ConsumedCapacity {
addConsumedCapacity(bw.cc, cc)
addConsumedCapacity(bw.cc, &cc)
}
}

Expand All @@ -115,7 +117,7 @@ func (bw *BatchWrite) RunWithContext(ctx aws.Context) (wrote int, err error) {
ops = unprocessed

// need to sleep when re-requesting, per spec
if err := aws.SleepWithContext(ctx, boff.NextBackOff()); err != nil {
if err := time.SleepWithContext(ctx, boff.NextBackOff()); err != nil {
// timed out
return wrote, err
}
Expand All @@ -125,14 +127,14 @@ func (bw *BatchWrite) RunWithContext(ctx aws.Context) (wrote int, err error) {
return wrote, nil
}

func (bw *BatchWrite) input(ops []*dynamodb.WriteRequest) *dynamodb.BatchWriteItemInput {
func (bw *BatchWrite) input(ops []types.WriteRequest) *dynamodb.BatchWriteItemInput {
input := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
RequestItems: map[string][]types.WriteRequest{
bw.batch.table.Name(): ops,
},
}
if bw.cc != nil {
input.ReturnConsumedCapacity = aws.String(dynamodb.ReturnConsumedCapacityIndexes)
input.ReturnConsumedCapacity = types.ReturnConsumedCapacityIndexes
}
return input
}
Expand Down
18 changes: 9 additions & 9 deletions conditioncheck.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package dynamo

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

// ConditionCheck represents a condition for a write transaction to succeed.
// It is used along with WriteTx.Check.
type ConditionCheck struct {
table Table
hashKey string
hashValue *dynamodb.AttributeValue
hashValue types.AttributeValue
rangeKey string
rangeValue *dynamodb.AttributeValue
rangeValue types.AttributeValue

condition string
subber
Expand Down Expand Up @@ -66,11 +66,11 @@ func (check *ConditionCheck) IfNotExists() *ConditionCheck {
return check.If("attribute_not_exists($)", check.hashKey)
}

func (check *ConditionCheck) writeTxItem() (*dynamodb.TransactWriteItem, error) {
func (check *ConditionCheck) writeTxItem() (*types.TransactWriteItem, error) {
if check.err != nil {
return nil, check.err
}
item := &dynamodb.ConditionCheck{
item := &types.ConditionCheck{
TableName: aws.String(check.table.name),
Key: check.keys(),
ExpressionAttributeNames: check.nameExpr,
Expand All @@ -79,13 +79,13 @@ func (check *ConditionCheck) writeTxItem() (*dynamodb.TransactWriteItem, error)
if check.condition != "" {
item.ConditionExpression = aws.String(check.condition)
}
return &dynamodb.TransactWriteItem{
return &types.TransactWriteItem{
ConditionCheck: item,
}, nil
}

func (check *ConditionCheck) keys() map[string]*dynamodb.AttributeValue {
keys := map[string]*dynamodb.AttributeValue{check.hashKey: check.hashValue}
func (check *ConditionCheck) keys() map[string]types.AttributeValue {
keys := map[string]types.AttributeValue{check.hashKey: check.hashValue}
if check.rangeKey != "" {
keys[check.rangeKey] = check.rangeValue
}
Expand Down
Loading