Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: awsdynamo #223

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions .github/workflows/test-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ on:
env:
testdir : ./aws

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
strategy:
Expand All @@ -25,30 +29,46 @@ jobs:
runs-on: ${{ matrix.os }}
timeout-minutes: 5
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Install Go
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}

- name: Checkout code
uses: actions/checkout@v3

- name: Setup Docker
- name: Cache Go modules
id: cache-go
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download Go modules
working-directory: ${{ env.testdir }}
run: docker-compose up -d
shell: bash
if: ${{ steps.cache-go.outputs.cache-hit != 'true' }}
run: go mod download

- name: Go Module Download
- name: Setup Docker
working-directory: ${{ env.testdir }}
env:
DOCKER_BUILDKIT: 1
run: |
go install gotest.tools/gotestsum@latest
go mod download
# Create the directory for the volume of dynamodb in advance, otherwise permission error will occur.
# https://stackoverflow.com/questions/45850688/unable-to-open-local-dynamodb-database-file-after-power-outage
mkdir -p ./docker/dynamodb/data
sudo chmod 777 ./docker/dynamodb/data
docker compose up -d

- name: Test
working-directory: ${{ env.testdir }}
timeout-minutes: 3
run: |
# shellcheck disable=SC2046
gotestsum --junitfile unit-tests.xml -- -v ./... -race -coverprofile="coverage.txt" -covermode=atomic -coverpkg=./...
go test -p 4 -parallel 4 -v ./... -race -coverprofile="coverage.txt" -covermode=atomic -coverpkg=./...

- uses: codecov/codecov-action@v3
with:
Expand Down
2 changes: 2 additions & 0 deletions aws/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
# Minio Data
docker/minio/config
docker/minio/data
# DynamoDB Data
docker/dynamodb/data
250 changes: 250 additions & 0 deletions aws/awsdynamo/awsdynamo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// nolint:typecheck
package awsdynamo

import (
"context"
"errors"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
awstime "github.com/aws/smithy-go/time"

"github.com/88labs/go-utils/aws/awsconfig"
"github.com/88labs/go-utils/aws/awsdynamo/dynamooptions"
)

var (
ErrNotFound = errors.New("record not found")
)

// PutItem Put the item in DynamoDB Upsert if it does not exist
func PutItem(ctx context.Context, region awsconfig.Region, tableName string, item any, opts ...dynamooptions.OptionDynamo) error {
c := dynamooptions.GetDynamoConf(opts...)
client, err := GetClient(ctx, region, c.MaxAttempts, c.MaxBackoffDelay)
if err != nil {
return err
}
putItem, err := attributevalue.MarshalMap(item)
if err != nil {
return err
}
putItemInput := &dynamodb.PutItemInput{
Item: putItem,
TableName: aws.String(tableName),
}
if _, err := client.PutItem(ctx, putItemInput); err != nil {
return err
}
return nil
}

// UpdateItem Update the attributes of the item in DynamoDB Upsert if it does not exist
// expression: https://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb/expression/#example_Builder_WithUpdate
func UpdateItem(
ctx context.Context,
region awsconfig.Region,
tableName, keyFieldName, key string,
update expression.UpdateBuilder,
out any,
opts ...dynamooptions.OptionDynamo,
) error {
c := dynamooptions.GetDynamoConf(opts...)
client, err := GetClient(ctx, region, c.MaxAttempts, c.MaxBackoffDelay)
if err != nil {
return err
}
expr, err := expression.NewBuilder().WithUpdate(update).Build()
if err != nil {
return err
}
putItemInput := &dynamodb.UpdateItemInput{
Key: map[string]types.AttributeValue{keyFieldName: &types.AttributeValueMemberS{Value: key}},
TableName: aws.String(tableName),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
ReturnConsumedCapacity: types.ReturnConsumedCapacityNone,
ReturnItemCollectionMetrics: types.ReturnItemCollectionMetricsNone,
ReturnValues: types.ReturnValueAllNew,
UpdateExpression: expr.Update(),
}
updatedItem, err := client.UpdateItem(ctx, putItemInput)
if err != nil {
return err
}
if updatedItem.Attributes == nil {
return ErrNotFound
}
if out != nil {
if err := attributevalue.UnmarshalMap(updatedItem.Attributes, &out); err != nil {
return err
}
}
return nil
}

// DeleteItem Delete DynamoDB item
// expression: https://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb/expression/#example_Builder_WithUpdate
// Mapping the retrieved item to `out`, must be a pointer to the `out`.
func DeleteItem(ctx context.Context, region awsconfig.Region, tableName, keyFieldName, key string, out any, opts ...dynamooptions.OptionDynamo) error {
c := dynamooptions.GetDynamoConf(opts...)
client, err := GetClient(ctx, region, c.MaxAttempts, c.MaxBackoffDelay)
if err != nil {
return err
}
deleteItemInput := &dynamodb.DeleteItemInput{
Key: map[string]types.AttributeValue{keyFieldName: &types.AttributeValueMemberS{Value: key}},
TableName: aws.String(tableName),
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
ReturnItemCollectionMetrics: types.ReturnItemCollectionMetricsSize,
ReturnValues: types.ReturnValueAllOld,
}
deletedItem, err := client.DeleteItem(ctx, deleteItemInput)
if err != nil {
return err
}
if deletedItem.Attributes == nil {
return ErrNotFound
}
if out != nil {
if err := attributevalue.UnmarshalMap(deletedItem.Attributes, &out); err != nil {
return err
}
}
return nil
}

// GetItem Get the item in DynamoDB
// Mapping the retrieved item to `out`, must be a pointer to the `out`.
func GetItem(ctx context.Context, region awsconfig.Region, tableName, keyFieldName, key string, out any, opts ...dynamooptions.OptionDynamo) error {
c := dynamooptions.GetDynamoConf(opts...)
client, err := GetClient(ctx, region, c.MaxAttempts, c.MaxBackoffDelay)
if err != nil {
return err
}
getItemInput := &dynamodb.GetItemInput{
Key: map[string]types.AttributeValue{
keyFieldName: &types.AttributeValueMemberS{Value: key},
},
TableName: aws.String(tableName),
// https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html
ConsistentRead: aws.Bool(true),
}
getItem, err := client.GetItem(ctx, getItemInput)
if err != nil {
return err
}
if getItem.Item == nil {
return ErrNotFound
}
if err := attributevalue.UnmarshalMap(getItem.Item, &out); err != nil {
return err
}
return nil
}

// BatchGetItem Retrieve Dynamodb items in a batch process
// Return the retrieved item as a slice of type `T`.
// Note that the order of retrieval is not the order in which the keys are specified.
func BatchGetItem[T any, Key ~string](ctx context.Context, region awsconfig.Region, tableName, keyFieldName string, keys []Key, _ T, opts ...dynamooptions.OptionDynamo) ([]T, error) {
// DynamoDB allows a maximum batch size of 100 items.
// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchGetItem.html
const MaxBatchSize = 100

c := dynamooptions.GetDynamoConf(opts...)
client, err := GetClient(ctx, region, c.MaxAttempts, c.MaxBackoffDelay)
if err != nil {
return nil, err
}

reqKeys := make([]map[string]types.AttributeValue, len(keys))
for i, key := range keys {
reqKeys[i] = map[string]types.AttributeValue{
keyFieldName: &types.AttributeValueMemberS{Value: string(key)},
}
}

resultItems := make([]T, 0, len(keys))

start := 0
end := start + MaxBatchSize
for start < len(reqKeys) {
getReqs := make([]map[string]types.AttributeValue, 0, MaxBatchSize)
if end > len(reqKeys) {
end = len(reqKeys)
}
for _, v := range reqKeys[start:end] {
getReqs = append(getReqs, v)
}
getItems, err := client.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{
tomtwinkle marked this conversation as resolved.
Show resolved Hide resolved
RequestItems: map[string]types.KeysAndAttributes{
tableName: {Keys: getReqs},
},
})
if err != nil {
return nil, fmt.Errorf("received batch error %+#v for batch getting. %v\n", getItems, err)
}

for _, v := range getItems.Responses[tableName] {
var ret T
if err := attributevalue.UnmarshalMap(v, &ret); err != nil {
return nil, fmt.Errorf("Couldn't unmarshal item %+#v for batch getting. %v\n", v, err)
}
resultItems = append(resultItems, ret)
}
start = end
end += MaxBatchSize
}

return resultItems, nil
}

// BatchWriteItem Write Dynamodb items in a batch process
func BatchWriteItem[T any](ctx context.Context, region awsconfig.Region, tableName string, items []T, opts ...dynamooptions.OptionDynamo) error {
// DynamoDB allows a maximum batch size of 25 items.
// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
const MaxBatchSize = 25

c := dynamooptions.GetDynamoConf(opts...)
client, err := GetClient(ctx, region, c.MaxAttempts, c.MaxBackoffDelay)
if err != nil {
return err
}

start := 0
end := start + MaxBatchSize
for start < len(items) {
writeReqs := make([]types.WriteRequest, 0, MaxBatchSize)
if end > len(items) {
end = len(items)
}
for _, v := range items[start:end] {
item, err := attributevalue.MarshalMap(v)
if err != nil {
return fmt.Errorf("Couldn't marshal item %+#v for batch writing. %v\n", v, err)
} else {
writeReqs = append(
writeReqs,
types.WriteRequest{PutRequest: &types.PutRequest{Item: item}},
)
}
}
if _, err := client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{tableName: writeReqs},
},
); err != nil {
return fmt.Errorf("received batch error %+#v for batch writing. %v\n", writeReqs, err)
}
if err := awstime.SleepWithContext(ctx, 10*time.Millisecond); err != nil {
return err
}
start = end
end += MaxBatchSize
}

return err
}
Loading
Loading