forked from italolelis/outboxer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sqs.go
107 lines (83 loc) · 2.48 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Package SQS is the AWS SQS implementation of an event stream.
package sqs
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/italolelis/outboxer"
)
const (
// QueueNameOption is the queue name option
QueueNameOption = "queue_name"
// ExplicitHashKeyOption is the explicit hash key option
DelaySecondsOption = "delay_seconds"
// MessageGroupIdOption is the grouping id sequence option
MessageGroupIdOption = "message_group_id"
// MessageDedupIdOption is the deduplication id option
MessageDedupIdOption = "message_dedup_id"
)
// SQS is the wrapper for the SQS library
type SQS struct {
conn sqsiface.SQSAPI
}
type options struct {
queueName *string
delaySeconds *int64
msgGroupId *string
msgDedupId *string
}
// New creates a new instance of SQS
func New(conn sqsiface.SQSAPI) *SQS {
return &SQS{conn: conn}
}
// Send sends the message to the event stream
func (r *SQS) Send(ctx context.Context, evt *outboxer.OutboxMessage) error {
opts := r.parseOptions(evt.Options)
input := &sqs.SendMessageInput{
QueueUrl: opts.queueName,
MessageBody: aws.String(string((evt.Payload))),
DelaySeconds: opts.delaySeconds,
MessageGroupId: opts.msgGroupId,
MessageDeduplicationId: opts.msgDedupId,
}
msgAttributes := r.parseHeaders(evt.Headers)
if len(msgAttributes) > 0 {
input.MessageAttributes = msgAttributes
}
_, err := r.conn.SendMessageWithContext(ctx, input)
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
return nil
}
func (r *SQS) parseOptions(opts outboxer.DynamicValues) *options {
opt := options{}
if data, ok := opts[QueueNameOption]; ok {
opt.queueName = aws.String(data.(string))
}
if data, ok := opts[DelaySecondsOption]; ok {
opt.delaySeconds = aws.Int64(data.(int64))
}
if data, ok := opts[MessageGroupIdOption]; ok {
opt.msgGroupId = aws.String(data.(string))
}
if data, ok := opts[MessageDedupIdOption]; ok {
opt.msgDedupId = aws.String(data.(string))
}
return &opt
}
func (r *SQS) parseHeaders(headers outboxer.DynamicValues) (response map[string]*sqs.MessageAttributeValue) {
if len(headers) == 0 {
return
}
response = make(map[string]*sqs.MessageAttributeValue)
for key, value := range headers {
response[key] = &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(value.(string)),
}
}
return response
}