From abe8f654edced6305646a7a817023db13f01bc32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bernardo=20Pericacho=20S=C3=A1nchez?= Date: Wed, 9 Dec 2020 18:44:21 +0100 Subject: [PATCH] add message modifying visibility support (#6) --- README.md | 4 +++- message.go | 20 +++++++++++++++++++- mock_test.go | 6 +++++- subscriber.go | 7 ++++--- subscriber_test.go | 8 +++++--- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 8e5b070..90a9f4a 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,12 @@ HTSQS is a high throughput golang AWS SQS consumer. ## Features * **High throughput** - ability multiple consumers that concurrently receive messages from AWS SQS and push them into a single channel for consumption -* **Late ACK** - mechanism for acknowledging messages once they've been processed +* **Late ACK** - mechanism for acknowledging messages once they have been processed +* **Message visibility** modify message visibility * **Error processing** - error processing to decide whether to stop consuming and exponential backoff setup when errors occur * **Graceful shutdown** ## License + This project is licensed under [MIT License](./LICENSE). diff --git a/message.go b/message.go index 70d1950..f69f237 100644 --- a/message.go +++ b/message.go @@ -4,7 +4,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" ) -// SQSMessage is the SQS implementation of `SubscriberMessage`. +// SQSMessage is the implementation of a SQS message type SQSMessage struct { sub *Subscriber RawMessage *sqs.Message @@ -24,3 +24,21 @@ func (m *SQSMessage) Done() error { _, err := m.sub.sqs.DeleteMessage(deleteParams) return err } + +// ChangeMessageVisibility modifies current message visibility timeout to the one specified in the parameters. +// This is normally useful when the message processing is taking more than the default visibility timeout +func (m *SQSMessage) ChangeMessageVisibility(newVisibilityTimeout *int64) error { + changeVisibilityParams := &sqs.ChangeMessageVisibilityInput{ + QueueUrl: &m.sub.cfg.SqsQueueURL, + ReceiptHandle: m.RawMessage.ReceiptHandle, + VisibilityTimeout: newVisibilityTimeout, + } + + // Validate values + if err := changeVisibilityParams.Validate(); err != nil { + return err + } + + _, err := m.sub.sqs.ChangeMessageVisibility(changeVisibilityParams) + return err +} diff --git a/mock_test.go b/mock_test.go index 6f41540..b779720 100644 --- a/mock_test.go +++ b/mock_test.go @@ -16,7 +16,7 @@ func (s *sqsMock) ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageO return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil } stringMessage := string(message.Message()) - return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{{Body: &stringMessage}}}, nil + return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{{Body: &stringMessage, ReceiptHandle: &stringMessage}}}, nil default: return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil } @@ -25,3 +25,7 @@ func (s *sqsMock) ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageO func (s *sqsMock) DeleteMessage(*sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) { return nil, nil } + +func (s *sqsMock) ChangeMessageVisibility(*sqs.ChangeMessageVisibilityInput) (*sqs.ChangeMessageVisibilityOutput, error) { + return nil, nil +} diff --git a/subscriber.go b/subscriber.go index 1b1f6aa..f7cf554 100644 --- a/subscriber.go +++ b/subscriber.go @@ -50,6 +50,7 @@ func (b *atomicBool) setTrue() error { type receiver interface { ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) DeleteMessage(*sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) + ChangeMessageVisibility(params *sqs.ChangeMessageVisibilityInput) (*sqs.ChangeMessageVisibilityOutput, error) } // Logger interface allows to use other loggers than standard log.Logger @@ -103,11 +104,11 @@ type Subscriber struct { // Returns a channel of SubscriberMessage to consume them and a channel of errors func (s *Subscriber) Consume() (<-chan *SQSMessage, <-chan error, error) { if s.stopped.isSet() { - return nil, nil, errors.New("sqs subscriber is already stopped") + return nil, nil, errors.New("SQS subscriber is already stopped") } if s.consumed.setTrue() != nil { - return nil, nil, errors.New("sqs subscriber is already running") + return nil, nil, errors.New("SQS subscriber is already running") } var wg sync.WaitGroup @@ -177,7 +178,7 @@ func (s *Subscriber) Consume() (<-chan *SQSMessage, <-chan error, error) { // Stop stop gracefully the Subscriber func (s *Subscriber) Stop() error { if err := s.stopped.setTrue(); err != nil { - return errors.New("sqs subscriber is already stopped") + return errors.New("SQS subscriber is already stopped") } return <-s.stop } diff --git a/subscriber_test.go b/subscriber_test.go index f0d295d..6cdbffd 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/stretchr/testify/require" @@ -35,22 +36,23 @@ func TestSubscriber(t *testing.T) { // try to start consuming again while we are still consuming _, _, err = subs.Consume() - require.EqualError(t, err, "sqs subscriber is already running") + require.EqualError(t, err, "SQS subscriber is already running") i := 0 for m := range messages { require.True(t, strings.HasPrefix(string(m.Message()), "Message: ")) + require.NoError(t, m.ChangeMessageVisibility(aws.Int64(43200))) require.NoError(t, m.Done()) i++ } require.NoError(t, <-stopErrChannel) require.NoError(t, <-errch) require.Equal(t, numMessages, i) - require.EqualError(t, subs.Stop(), "sqs subscriber is already stopped") + require.EqualError(t, subs.Stop(), "SQS subscriber is already stopped") // try to start consuming again when the consumer has already been used _, _, err = subs.Consume() - require.EqualError(t, err, "sqs subscriber is already stopped") + require.EqualError(t, err, "SQS subscriber is already stopped") } func TestSubscriberDefaults(t *testing.T) {