diff --git a/component/mqtt/component.go b/component/mqtt/component.go new file mode 100644 index 000000000..2bf978ac6 --- /dev/null +++ b/component/mqtt/component.go @@ -0,0 +1,222 @@ +// Package mqtt provides an instrumented subscriber for MQTT v5. +package mqtt + +import ( + "context" + "errors" + "net/url" + "time" + + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/beatlabs/patron/correlation" + "github.com/beatlabs/patron/log" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/google/uuid" +) + +const ( + defaultRetries = 10 + defaultRetryWait = time.Second +) + +type retry struct { + count uint + wait time.Duration +} + +type config struct { + maxMessages *int64 +} + +// DefaultConfig provides a config with sane default and logging enabled on the callbacks. +func DefaultConfig(brokerURLs []*url.URL, clientID string, router paho.Router) (autopaho.ClientConfig, error) { + if len(brokerURLs) == 0 { + return autopaho.ClientConfig{}, errors.New("no broker URLs provided") + } + + if clientID == "" { + return autopaho.ClientConfig{}, errors.New("no client id provided") + } + + if router == nil { + return autopaho.ClientConfig{}, errors.New("no router provided") + } + + return autopaho.ClientConfig{ + BrokerUrls: brokerURLs, + KeepAlive: 30, + ConnectRetryDelay: 5 * time.Second, + ConnectTimeout: 1 * time.Second, + OnConnectionUp: func(_ *autopaho.ConnectionManager, conAck *paho.Connack) { + log.Infof("connection is up with reason code: %d\n", conAck.ReasonCode) + }, + OnConnectError: func(err error) { + log.Errorf("failed to connect: %v\n", err) + }, + ClientConfig: paho.ClientConfig{ + ClientID: clientID, + Router: router, + OnServerDisconnect: func(disconnect *paho.Disconnect) { + log.Warnf("server disconnect received with reason code: %d\n", disconnect.ReasonCode) + }, + OnClientError: func(err error) { + log.Errorf("client error occurred: %v\n", err) + }, + PublishHook: func(publish *paho.Publish) { + log.Debugf("message published to topic: %s\n", publish.Topic) + }, + }, + }, nil +} + +// Component implementation of an async component. +type Component struct { + name string + cm *autopaho.ConnectionManager + cfg autopaho.ClientConfig + sub *paho.Subscribe + retry retry +} + +type Subscription struct { + name string + handler paho.MessageHandler + options paho.SubscribeOptions +} + +// New creates a new component with support for functional configuration. +func New(name string, cfg autopaho.ClientConfig, subs []Subscription, oo ...OptionFunc) (*Component, error) { + if name == "" { + return nil, errors.New("component name is empty") + } + + if len(subs) == 0 { + return nil, errors.New("subscriptions is empty") + } + + rt := &paho.StandardRouter{} + subscribe := &paho.Subscribe{ + Subscriptions: make(map[string]paho.SubscribeOptions, len(subs)), + } + + for _, sub := range subs { + // TODO: middleware for observability + rt.RegisterHandler(sub.name, sub.handler) + subscribe.Subscriptions[sub.name] = sub.options + } + + cfg.Router = rt + + cmp := &Component{ + name: name, + cfg: cfg, + sub: subscribe, + retry: retry{ + count: defaultRetries, + wait: defaultRetryWait, + }, + } + + for _, optionFunc := range oo { + err := optionFunc(cmp) + if err != nil { + return nil, err + } + } + + return cmp, nil +} + +// Run starts the consumer processing loop messages. +func (c *Component) Run(ctx context.Context) error { + chErr := make(chan error) + + go c.consume(ctx, chErr) + + for { + select { + case err := <-chErr: + return err + case <-ctx.Done(): + log.FromContext(ctx).Info("context cancellation received. exiting...") + return nil + } + } +} + +func (c *Component) consume(ctx context.Context, chErr chan error) { + logger := log.FromContext(ctx) + + retries := c.retry.count + + for { + if ctx.Err() != nil { + return + } + + logger.Debug("consume: connecting to broker") + + cm, err := autopaho.NewConnection(ctx, c.cfg) + if err != nil { + logger.Errorf("failed to create new connection: %v, sleeping for %v", err, c.retry.wait) + time.Sleep(c.retry.wait) + retries-- + if retries > 0 { + continue + } + chErr <- err + return + } + + sa, err := cm.Subscribe(ctx, c.sub) + if err != nil { + logger.Errorf("failed to subscribe: %v, sleeping for %v", err, c.retry.wait) + time.Sleep(c.retry.wait) + retries-- + if retries > 0 { + continue + } + chErr <- err + return + } + logger.Debugf("subscription acknowledgment: %v", sa) + + retries = c.retry.count + + if ctx.Err() != nil { + return + } + } +} + +func messageCountInc(queue string, state messageState, hasError bool, count int) { + hasErrorString := "false" + if hasError { + hasErrorString = "true" + } + + messageCounterVec.WithLabelValues(queue, string(state), hasErrorString).Add(float64(count)) +} + +func getCorrelationID(ma map[string]*sqs.MessageAttributeValue) string { + for key, value := range ma { + if key == correlation.HeaderID { + if value.StringValue != nil { + return *value.StringValue + } + break + } + } + return uuid.New().String() +} + +func mapHeader(ma map[string]*sqs.MessageAttributeValue) map[string]string { + mp := make(map[string]string) + for key, value := range ma { + if value.StringValue != nil { + mp[key] = *value.StringValue + } + } + return mp +} diff --git a/component/mqtt/option.go b/component/mqtt/option.go new file mode 100644 index 000000000..a791b0da8 --- /dev/null +++ b/component/mqtt/option.go @@ -0,0 +1,22 @@ +package mqtt + +import ( + "errors" +) + +// OptionFunc definition for configuring the component in a functional way. +type OptionFunc func(*Component) error + +// MaxMessages option for setting the max number of messages fetched. +// Allowed values are between 1 and 10. +// If messages can be processed very quickly, maxing out this value is fine, otherwise having a high value is risky as it might trigger the visibility timeout. +// Having a value too small isn't recommended either, as it increases the number of SQS API requests, thus AWS costs. +func MaxMessages(maxMessages int64) OptionFunc { + return func(c *Component) error { + if maxMessages <= 0 || maxMessages > 10 { + return errors.New("max messages should be between 1 and 10") + } + c.cfg.maxMessages = &maxMessages + return nil + } +} diff --git a/component/mqtt/option_test.go b/component/mqtt/option_test.go new file mode 100644 index 000000000..4d3e6583a --- /dev/null +++ b/component/mqtt/option_test.go @@ -0,0 +1,45 @@ +package mqtt + +import ( + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" +) + +func TestMaxMessages(t *testing.T) { + t.Parallel() + type args struct { + maxMessages *int64 + } + tests := map[string]struct { + args args + expectedErr string + }{ + "success": { + args: args{maxMessages: aws.Int64(5)}, + }, + "zero message size": { + args: args{maxMessages: aws.Int64(0)}, + expectedErr: "max messages should be between 1 and 10", + }, + "over max message size": { + args: args{maxMessages: aws.Int64(11)}, + expectedErr: "max messages should be between 1 and 10", + }, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + t.Parallel() + c := &Component{} + err := MaxMessages(*tt.args.maxMessages)(c) + if tt.expectedErr != "" { + assert.EqualError(t, err, tt.expectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, c.cfg.maxMessages, tt.args.maxMessages) + } + }) + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 3bd06c80c..d5d4f4394 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,8 +60,8 @@ services: image: localstack/localstack:0.14 network_mode: bridge ports: - - "127.0.0.1:4510-4559:4510-4559" # external service port range - - "127.0.0.1:4566:4566" # LocalStack Edge Proxy + - "127.0.0.1:4510-4559:4510-4559" # external service port range + - "127.0.0.1:4566:4566" # LocalStack Edge Proxy environment: - SERVICES=sns,sqs - DEBUG=1 @@ -87,8 +87,6 @@ services: protocol: tcp mode: host environment: - HIVEMQ_CONTROL_CENTER_USER: 'admin' - HIVEMQ_CONTROL_CENTER_PASSWORD: '123456' HIVEMQ_CLUSTER_TRANSPORT_TYPE: 'TCP' mongo: image: mongo:5 diff --git a/vendor/modules.txt b/vendor/modules.txt index 31cd271f5..d01696d2d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -109,12 +109,20 @@ github.com/golang/snappy # github.com/google/uuid v1.3.0 ## explicit github.com/google/uuid +<<<<<<< HEAD # github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket +======= +<<<<<<< Updated upstream +>>>>>>> 12900c42 (Temp) # github.com/hashicorp/errwrap v1.0.0 github.com/hashicorp/errwrap # github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror +======= +# github.com/gorilla/websocket v1.4.2 +github.com/gorilla/websocket +>>>>>>> Stashed changes # github.com/hashicorp/go-uuid v1.0.2 github.com/hashicorp/go-uuid # github.com/hashicorp/golang-lru v0.5.4