Skip to content

Commit

Permalink
add message modifying visibility support (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardopericacho authored Dec 9, 2020
1 parent 9aa5970 commit abe8f65
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 9 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ HTSQS is a high throughput golang AWS SQS consumer.
## Features

* **High throughput** - ability multiple consumers that concurrently receive messages from AWS SQS and push them into a single channel for consumption
* **Late ACK** - mechanism for acknowledging messages once they've been processed
* **Late ACK** - mechanism for acknowledging messages once they have been processed
* **Message visibility** modify message visibility
* **Error processing** - error processing to decide whether to stop consuming and exponential backoff setup when errors occur
* **Graceful shutdown**

## License

This project is licensed under [MIT License](./LICENSE).

20 changes: 19 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
)

// SQSMessage is the SQS implementation of `SubscriberMessage`.
// SQSMessage is the implementation of a SQS message
type SQSMessage struct {
sub *Subscriber
RawMessage *sqs.Message
Expand All @@ -24,3 +24,21 @@ func (m *SQSMessage) Done() error {
_, err := m.sub.sqs.DeleteMessage(deleteParams)
return err
}

// ChangeMessageVisibility modifies current message visibility timeout to the one specified in the parameters.
// This is normally useful when the message processing is taking more than the default visibility timeout
func (m *SQSMessage) ChangeMessageVisibility(newVisibilityTimeout *int64) error {
changeVisibilityParams := &sqs.ChangeMessageVisibilityInput{
QueueUrl: &m.sub.cfg.SqsQueueURL,
ReceiptHandle: m.RawMessage.ReceiptHandle,
VisibilityTimeout: newVisibilityTimeout,
}

// Validate values
if err := changeVisibilityParams.Validate(); err != nil {
return err
}

_, err := m.sub.sqs.ChangeMessageVisibility(changeVisibilityParams)
return err
}
6 changes: 5 additions & 1 deletion mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *sqsMock) ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageO
return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil
}
stringMessage := string(message.Message())
return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{{Body: &stringMessage}}}, nil
return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{{Body: &stringMessage, ReceiptHandle: &stringMessage}}}, nil
default:
return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil
}
Expand All @@ -25,3 +25,7 @@ func (s *sqsMock) ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageO
func (s *sqsMock) DeleteMessage(*sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) {
return nil, nil
}

func (s *sqsMock) ChangeMessageVisibility(*sqs.ChangeMessageVisibilityInput) (*sqs.ChangeMessageVisibilityOutput, error) {
return nil, nil
}
7 changes: 4 additions & 3 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (b *atomicBool) setTrue() error {
type receiver interface {
ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error)
DeleteMessage(*sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error)
ChangeMessageVisibility(params *sqs.ChangeMessageVisibilityInput) (*sqs.ChangeMessageVisibilityOutput, error)
}

// Logger interface allows to use other loggers than standard log.Logger
Expand Down Expand Up @@ -103,11 +104,11 @@ type Subscriber struct {
// Returns a channel of SubscriberMessage to consume them and a channel of errors
func (s *Subscriber) Consume() (<-chan *SQSMessage, <-chan error, error) {
if s.stopped.isSet() {
return nil, nil, errors.New("sqs subscriber is already stopped")
return nil, nil, errors.New("SQS subscriber is already stopped")
}

if s.consumed.setTrue() != nil {
return nil, nil, errors.New("sqs subscriber is already running")
return nil, nil, errors.New("SQS subscriber is already running")
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -177,7 +178,7 @@ func (s *Subscriber) Consume() (<-chan *SQSMessage, <-chan error, error) {
// Stop stop gracefully the Subscriber
func (s *Subscriber) Stop() error {
if err := s.stopped.setTrue(); err != nil {
return errors.New("sqs subscriber is already stopped")
return errors.New("SQS subscriber is already stopped")
}
return <-s.stop
}
Expand Down
8 changes: 5 additions & 3 deletions subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -35,22 +36,23 @@ func TestSubscriber(t *testing.T) {

// try to start consuming again while we are still consuming
_, _, err = subs.Consume()
require.EqualError(t, err, "sqs subscriber is already running")
require.EqualError(t, err, "SQS subscriber is already running")

i := 0
for m := range messages {
require.True(t, strings.HasPrefix(string(m.Message()), "Message: "))
require.NoError(t, m.ChangeMessageVisibility(aws.Int64(43200)))
require.NoError(t, m.Done())
i++
}
require.NoError(t, <-stopErrChannel)
require.NoError(t, <-errch)
require.Equal(t, numMessages, i)
require.EqualError(t, subs.Stop(), "sqs subscriber is already stopped")
require.EqualError(t, subs.Stop(), "SQS subscriber is already stopped")

// try to start consuming again when the consumer has already been used
_, _, err = subs.Consume()
require.EqualError(t, err, "sqs subscriber is already stopped")
require.EqualError(t, err, "SQS subscriber is already stopped")
}

func TestSubscriberDefaults(t *testing.T) {
Expand Down

0 comments on commit abe8f65

Please sign in to comment.