Skip to content

Commit

Permalink
Move sqs API call out of concurrent loop
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jan 14, 2025
1 parent 14358e0 commit b5759aa
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pkg/source/sqs/sqs_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,23 @@ func (ss *sqsSource) Read(sf *sourceiface.SourceFunctions) error {

ProcessLoop:
for {
msgRes, err := ss.client.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(ss.queueURL),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(10),
WaitTimeSeconds: aws.Int64(1),
})
if err != nil {
return errors.Wrap(err, "Failed to get message from SQS queue")
}
timePulled := time.Now().UTC()

select {
case <-ss.exitSignal:
break ProcessLoop
Expand All @@ -164,7 +181,7 @@ ProcessLoop:
wg.Add(1)
go func() {
defer wg.Done()
err := ss.process(sf)
err := ss.process(sf, msgRes, timePulled)
if err != nil {
ss.processErrorSignal <- err
}
Expand All @@ -177,23 +194,7 @@ ProcessLoop:
return processErr
}

func (ss *sqsSource) process(sf *sourceiface.SourceFunctions) error {
msgRes, err := ss.client.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(ss.queueURL),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(10),
WaitTimeSeconds: aws.Int64(1),
})
if err != nil {
return errors.Wrap(err, "Failed to get message from SQS queue")
}
timePulled := time.Now().UTC()
func (ss *sqsSource) process(sf *sourceiface.SourceFunctions, msgRes *sqs.ReceiveMessageOutput, timePulled time.Time) error {

var messages []*models.Message
for _, msg := range msgRes.Messages {
Expand Down Expand Up @@ -237,7 +238,7 @@ func (ss *sqsSource) process(sf *sourceiface.SourceFunctions) error {
})
}

err = sf.WriteToTarget(messages)
err := sf.WriteToTarget(messages)
if err != nil {
ss.log.WithFields(log.Fields{"error": err}).Error(err)
}
Expand Down

0 comments on commit b5759aa

Please sign in to comment.