diff --git a/internal/destinations/sqs/sqs.go b/internal/destinations/sqs/sqs.go index be3f6e3..69cd6cd 100644 --- a/internal/destinations/sqs/sqs.go +++ b/internal/destinations/sqs/sqs.go @@ -3,6 +3,7 @@ package sqs import ( "context" "errors" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -104,10 +105,15 @@ func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error var entries = []*awssqs.SendMessageBatchRequestEntry{} for _, msg := range msgs { - entries = append(entries, &awssqs.SendMessageBatchRequestEntry{ + messageRequest := &awssqs.SendMessageBatchRequestEntry{ Id: aws.String(ksuid.New().String()), MessageBody: aws.String(string(msg.Value.RawLog)), - }) + } + if strings.HasSuffix(s.queueURL, ".fifo") { + messageRequest.MessageGroupId = messageRequest.Id + messageRequest.MessageDeduplicationId = messageRequest.Id + } + entries = append(entries, messageRequest) } sendMessageInput := &awssqs.SendMessageBatchInput{