-
Notifications
You must be signed in to change notification settings - Fork 1
/
upsertDb.go
161 lines (143 loc) · 4.89 KB
/
upsertDb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// DynoObject represents an object in dynamoDB.
// Used to represent key value data such as keys, table items...
type DynoNotation map[string]types.AttributeValue
// newclient constructs a new dynamodb client using a default configuration
// and a provided profile name (created via aws configure cmd).
func newclient() (*dynamodb.Client, error) {
region := os.Getenv("REGION")
url := os.Getenv("URL")
accsKeyID := os.Getenv("ACCESSKEYID")
secretAccessKey := os.Getenv("SECRETACCESSKEY")
fmt.Println(url, "URL")
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(region),
config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{URL: url}, nil
})),
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
Value: aws.Credentials{
AccessKeyID: accsKeyID, SecretAccessKey: secretAccessKey, SessionToken: "",
Source: "Mock credentials used above for local instance",
},
}),
)
if err != nil {
return nil, err
}
c := dynamodb.NewFromConfig(cfg)
return c, nil
}
// createTable creates a table in the client's dynamodb instance
// using the table parameters specified in input.
func createTable(c *dynamodb.Client,
tableName string, input *dynamodb.CreateTableInput,
) error {
var tableDesc *types.TableDescription
table, err := c.CreateTable(context.TODO(), input)
if err != nil {
log.Printf("Failed to create table `%v` with error: %v\n", tableName, err)
} else {
waiter := dynamodb.NewTableExistsWaiter(c)
err = waiter.Wait(context.TODO(), &dynamodb.DescribeTableInput{
TableName: aws.String(tableName)}, 5*time.Minute)
if err != nil {
log.Printf("Failed to wait on create table `%v` with error: %v\n", tableName, err)
}
tableDesc = table.TableDescription
}
fmt.Printf("Created table `%s` with details: %v\n\n", tableName, tableDesc)
return err
}
// listTables returns a list of table names in the client's dynamodb instance.
func listTables(c *dynamodb.Client, input *dynamodb.ListTablesInput) ([]string, error) {
tables, err := c.ListTables(
context.TODO(),
&dynamodb.ListTablesInput{},
)
if err != nil {
return nil, err
}
return tables.TableNames, nil
}
// putItem inserts an item (key + attributes) in to a dynamodb table.
func putItem(c *dynamodb.Client, tableName string, item DynoNotation) (err error) {
_, err = c.PutItem(context.TODO(), &dynamodb.PutItemInput{
TableName: aws.String(tableName), Item: item,
})
if err != nil {
return err
}
return nil
}
// // putItems batch inserts multiple items in to a dynamodb table.
// func putItems(c *dynamodb.Client, tableName string, items []DynoNotation) (err error) {
// // dynamodb batch limit is 25
// if len(items) > 25 {
// return fmt.Errorf("Max batch size is 25, attempted `%d`", len(items))
// }
// // create requests
// writeRequests := make([]types.WriteRequest, len(items))
// for i, item := range items {
// writeRequests[i] = types.WriteRequest{PutRequest: &types.PutRequest{Item: item}}
// }
// // write batch
// _, err = c.BatchWriteItem(
// context.TODO(),
// &dynamodb.BatchWriteItemInput{
// RequestItems: map[string][]types.WriteRequest{tableName: writeRequests},
// },
// )
// if err != nil {
// return err
// }
// return nil
// }
// // getItem returns an item if found based on the key provided.
// // the key could be either a primary or composite key and values map.
// func getItem(c *dynamodb.Client, tableName string, key DynoNotation) (item DynoNotation, err error) {
// resp, err := c.GetItem(context.TODO(), &dynamodb.GetItemInput{Key: key, TableName: aws.String(tableName)})
// if err != nil {
// return nil, err
// }
// return resp.Item, nil //
// }
func getCiBuildPayload(client *dynamodb.Client) []CiBuildPayload {
var payload []CiBuildPayload
originAttr, _ := attributevalue.Marshal("Tekton")
keyExpr := expression.Key("origin").Equal(expression.Value(originAttr))
expr, err := expression.NewBuilder().WithKeyCondition(keyExpr).Build()
if err != nil {
log.Fatal(err)
}
query, err := client.Query(context.TODO(), &dynamodb.QueryInput{
TableName: aws.String("TektonCI"),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
KeyConditionExpression: expr.KeyCondition(),
})
if err != nil {
log.Fatal(err)
}
// unmarshal list of items
err = attributevalue.UnmarshalListOfMaps(query.Items, &payload)
if err != nil {
log.Fatal(err)
}
return payload
}