forked from guregu/dynamo
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathput.go
147 lines (130 loc) · 3.82 KB
/
put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package helixddb
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// Put is a request to create or replace an item.
// See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
type Put struct {
table Table
returnType string
item map[string]types.AttributeValue
subber
condition string
err error
cc *ConsumedCapacity
}
// Put creates a new request to create or replace an item.
func (table Table) Put(item interface{}) *Put {
encoded, err := marshalItem(item)
return &Put{
table: table,
item: encoded,
err: err,
}
}
// If specifies a conditional expression for this put to succeed.
// Use single quotes to specificy reserved names inline (like 'Count').
// Use the placeholder ? within the expression to substitute values, and use $ for names.
// You need to use quoted or placeholder names when the name is a reserved word in DynamoDB.
// Multiple calls to If will be combined with AND.
func (p *Put) If(expr string, args ...interface{}) *Put {
expr, err := p.subExprN(expr, args...)
p.setError(err)
if p.condition != "" {
p.condition += " AND "
}
p.condition += wrapExpr(expr)
return p
}
// ConsumedCapacity will measure the throughput capacity consumed by this operation and add it to cc.
func (p *Put) ConsumedCapacity(cc *ConsumedCapacity) *Put {
p.cc = cc
return p
}
// Run executes this put.
func (p *Put) Run() error {
ctx, cancel := defaultContext()
defer cancel()
return p.RunWithContext(ctx)
}
// Run executes this put.
func (p *Put) RunWithContext(ctx context.Context) error {
p.returnType = "NONE"
_, err := p.run(ctx)
return err
}
// OldValue executes this put, unmarshaling the previous value into out.
// Returns ErrNotFound is there was no previous value.
func (p *Put) OldValue(out interface{}) error {
ctx, cancel := defaultContext()
defer cancel()
return p.OldValueWithContext(ctx, out)
}
// OldValueWithContext executes this put, unmarshaling the previous value into out.
// Returns ErrNotFound is there was no previous value.
func (p *Put) OldValueWithContext(ctx context.Context, out interface{}) error {
p.returnType = "ALL_OLD"
output, err := p.run(ctx)
switch {
case err != nil:
return err
case output.Attributes == nil:
return ErrNotFound
}
return unmarshalItem(output.Attributes, out)
}
func (p *Put) run(ctx context.Context) (output *dynamodb.PutItemOutput, err error) {
if p.err != nil {
return nil, p.err
}
req := p.input()
retry(ctx, func() error {
output, err = p.table.db.client.PutItem(ctx, req)
return err
})
if p.cc != nil {
addConsumedCapacity(p.cc, output.ConsumedCapacity)
}
return
}
func (p *Put) input() *dynamodb.PutItemInput {
input := &dynamodb.PutItemInput{
TableName: &p.table.name,
Item: p.item,
ReturnValues: types.ReturnValue(p.returnType),
ExpressionAttributeNames: p.nameExpr,
ExpressionAttributeValues: p.valueExpr,
}
if p.condition != "" {
input.ConditionExpression = &p.condition
}
if p.cc != nil {
input.ReturnConsumedCapacity = types.ReturnConsumedCapacityIndexes
}
return input
}
func (p *Put) writeTxItem() (*types.TransactWriteItem, error) {
if p.err != nil {
return nil, p.err
}
input := p.input()
item := &types.TransactWriteItem{
Put: &types.Put{
TableName: input.TableName,
Item: input.Item,
ExpressionAttributeNames: input.ExpressionAttributeNames,
ExpressionAttributeValues: input.ExpressionAttributeValues,
ConditionExpression: input.ConditionExpression,
// TODO: add support when aws-sdk-go updates
// ReturnValuesOnConditionCheckFailure: aws.String(dynamodb.ReturnValuesOnConditionCheckFailureAllOld),
},
}
return item, nil
}
func (p *Put) setError(err error) {
if p.err == nil {
p.err = err
}
}