Skip to content

Commit

Permalink
Fixes to work with fifo queues.
Browse files Browse the repository at this point in the history
  • Loading branch information
ejcx committed Aug 14, 2023
1 parent 9caa7b9 commit 0a4d740
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions internal/destinations/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqs
import (
"context"
"errors"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -104,10 +105,15 @@ func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error

var entries = []*awssqs.SendMessageBatchRequestEntry{}
for _, msg := range msgs {
entries = append(entries, &awssqs.SendMessageBatchRequestEntry{
messageRequest := &awssqs.SendMessageBatchRequestEntry{
Id: aws.String(ksuid.New().String()),
MessageBody: aws.String(string(msg.Value.RawLog)),
})
}
if strings.HasSuffix(s.queueURL, ".fifo") {
messageRequest.MessageGroupId = messageRequest.Id
messageRequest.MessageDeduplicationId = messageRequest.Id
}
entries = append(entries, messageRequest)
}

sendMessageInput := &awssqs.SendMessageBatchInput{
Expand Down

0 comments on commit 0a4d740

Please sign in to comment.