From e06add53aa02df72115166dcd65e59561bd7b6f0 Mon Sep 17 00:00:00 2001 From: Pedro Rodrigues Date: Thu, 18 Feb 2021 15:43:02 -0300 Subject: [PATCH] Formatting code --- config/config.go | 90 +++++++++++++++---------------- consumer/consumer.go | 126 +++++++++++++++++++++---------------------- main.go | 64 +++++++++++----------- 3 files changed, 140 insertions(+), 140 deletions(-) diff --git a/config/config.go b/config/config.go index 94d3b41..5643af2 100644 --- a/config/config.go +++ b/config/config.go @@ -1,66 +1,66 @@ package config import ( - "github.com/sirupsen/logrus" - "github.com/spf13/pflag" - "github.com/spf13/viper" + "github.com/sirupsen/logrus" + "github.com/spf13/pflag" + "github.com/spf13/viper" ) const ( - FileName = "config" - FileFormat = "yaml" + FileName = "config" + FileFormat = "yaml" ) type configuration struct { - Slack struct { - WebhookUrl string `mapstructure:"webhook-url"` - } - SQS struct { - DLQName string `mapstructure:"dlq-name"` - } - Internal struct { - WorkerPool int `mapstructure:"worker-pool"` - StructuredLogs bool `mapstructure:"structured-logs"` - } + Slack struct { + WebhookUrl string `mapstructure:"webhook-url"` + } + SQS struct { + DLQName string `mapstructure:"dlq-name"` + } + Internal struct { + WorkerPool int `mapstructure:"worker-pool"` + StructuredLogs bool `mapstructure:"structured-logs"` + } } func LoadConfiguration(log *logrus.Logger) *configuration { - pflag.String("slack.webhook-url", "", "slack webhook url") - pflag.String("sqs.dlq-name", "", "sqs dead-letter queue name") - pflag.Int("internal.worker-pool", 1, "the size of the worker pool") - pflag.Bool("internal.structured-logs", false, "print logs using json format") - pflag.Parse() + pflag.String("slack.webhook-url", "", "slack webhook url") + pflag.String("sqs.dlq-name", "", "sqs dead-letter queue name") + pflag.Int("internal.worker-pool", 1, "the size of the worker pool") + pflag.Bool("internal.structured-logs", false, "print logs using json format") + pflag.Parse() - if err := viper.BindPFlags(pflag.CommandLine); err != nil { - log.Fatalf("Fatal error parsing flags: %v", err) - } + if err := viper.BindPFlags(pflag.CommandLine); err != nil { + log.Fatalf("Fatal error parsing flags: %v", err) + } - viper.SetConfigName(FileName) - viper.SetConfigType(FileFormat) - viper.AddConfigPath(".") + viper.SetConfigName(FileName) + viper.SetConfigType(FileFormat) + viper.AddConfigPath(".") - if err := viper.ReadInConfig(); err != nil { - if _, ok := err.(viper.ConfigFileNotFoundError); !ok { - log.Fatalf("Fatal error reading configuration file: %v", err) - } - } + if err := viper.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + log.Fatalf("Fatal error reading configuration file: %v", err) + } + } - var config *configuration - if err := viper.Unmarshal(&config); err != nil { - log.Fatalf("Fatal error decoding file: %v", err) - } + var config *configuration + if err := viper.Unmarshal(&config); err != nil { + log.Fatalf("Fatal error decoding file: %v", err) + } - if config.Slack.WebhookUrl == "" { - log.Fatal("Empty webhook URL") - } + if config.Slack.WebhookUrl == "" { + log.Fatal("Empty webhook URL") + } - if config.SQS.DLQName == "" { - log.Fatal("Empty dead-letter queue name") - } + if config.SQS.DLQName == "" { + log.Fatal("Empty dead-letter queue name") + } - if config.Internal.WorkerPool < 1 { - log.Fatal("Invalid worker-pool size") - } + if config.Internal.WorkerPool < 1 { + log.Fatal("Invalid worker-pool size") + } - return config + return config } diff --git a/consumer/consumer.go b/consumer/consumer.go index 54fc4e2..ce9deed 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -1,89 +1,89 @@ package consumer import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/sirupsen/logrus" - "sync" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/sirupsen/logrus" + "sync" ) type consumer struct { - queueName string - workerPool int + queueName string + workerPool int } func New(queueName string, workerPool int) *consumer { - return &consumer{queueName, workerPool} + return &consumer{queueName, workerPool} } func (c *consumer) Consume( - log *logrus.Logger, - fn func(message *sqs.Message, log *logrus.Entry) error, + log *logrus.Logger, + fn func(message *sqs.Message, log *logrus.Entry) error, ) { - for w := 1; w <= c.workerPool; w++ { - go c.worker(fn, log.WithField("worker", w)) - } + for w := 1; w <= c.workerPool; w++ { + go c.worker(fn, log.WithField("worker", w)) + } } func (c *consumer) worker( - fn func(message *sqs.Message, log *logrus.Entry) error, - log *logrus.Entry, + fn func(message *sqs.Message, log *logrus.Entry) error, + log *logrus.Entry, ) { - log.Info("Starting") + log.Info("Starting") - sess := session.Must(session.NewSessionWithOptions(session.Options{ - SharedConfigState: session.SharedConfigEnable, - })) + sess := session.Must(session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + })) - svc := sqs.New(sess) + svc := sqs.New(sess) - result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: aws.String(c.queueName), - }) - if err != nil { - log.Fatalf("Fatal error retrieving queue URL: %v", err) - } + result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(c.queueName), + }) + if err != nil { + log.Fatalf("Fatal error retrieving queue URL: %v", err) + } - queueUrl := result.QueueUrl + queueUrl := result.QueueUrl - for { - output, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ - AttributeNames: []*string{ - aws.String(sqs.MessageSystemAttributeNameSentTimestamp), - }, - MessageAttributeNames: []*string{ - aws.String(sqs.QueueAttributeNameAll), - }, - QueueUrl: queueUrl, - MaxNumberOfMessages: aws.Int64(1), - }) - if err != nil { - log.Errorf("Error retrieving messages: %v", err) - continue - } + for { + output, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + AttributeNames: []*string{ + aws.String(sqs.MessageSystemAttributeNameSentTimestamp), + }, + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + QueueUrl: queueUrl, + MaxNumberOfMessages: aws.Int64(1), + }) + if err != nil { + log.Errorf("Error retrieving messages: %v", err) + continue + } - var wg sync.WaitGroup - for _, message := range output.Messages { - wg.Add(1) - go func(m *sqs.Message) { - defer wg.Done() - if err := fn(m, log); err != nil { - log.Errorf("Error processing message (%s): %v", *m.MessageId, err) - return - } + var wg sync.WaitGroup + for _, message := range output.Messages { + wg.Add(1) + go func(m *sqs.Message) { + defer wg.Done() + if err := fn(m, log); err != nil { + log.Errorf("Error processing message (%s): %v", *m.MessageId, err) + return + } - _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: queueUrl, - ReceiptHandle: message.ReceiptHandle, - }) - if err != nil { - log.Errorf("Error deleting message (%s): %v", *m.MessageId, err) - return - } - }(message) + _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: queueUrl, + ReceiptHandle: message.ReceiptHandle, + }) + if err != nil { + log.Errorf("Error deleting message (%s): %v", *m.MessageId, err) + return + } + }(message) - wg.Wait() - } - } + wg.Wait() + } + } } diff --git a/main.go b/main.go index 1d7b92a..961a61d 100644 --- a/main.go +++ b/main.go @@ -1,46 +1,46 @@ package main import ( - "fmt" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/hpedrorodrigues/dlq-x9/config" - "github.com/hpedrorodrigues/dlq-x9/consumer" - "github.com/sirupsen/logrus" - "github.com/slack-go/slack" - "os" - "os/signal" - "syscall" + "fmt" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/hpedrorodrigues/dlq-x9/config" + "github.com/hpedrorodrigues/dlq-x9/consumer" + "github.com/sirupsen/logrus" + "github.com/slack-go/slack" + "os" + "os/signal" + "syscall" ) func main() { - var logger = logrus.New() + var logger = logrus.New() - conf := config.LoadConfiguration(logger) + conf := config.LoadConfiguration(logger) - if conf.Internal.StructuredLogs { - logger.SetFormatter(&logrus.JSONFormatter{}) - } else { - logger.SetFormatter(&logrus.TextFormatter{ - FullTimestamp: true, - }) - } + if conf.Internal.StructuredLogs { + logger.SetFormatter(&logrus.JSONFormatter{}) + } else { + logger.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + }) + } - cons := consumer.New(conf.SQS.DLQName, conf.Internal.WorkerPool) + cons := consumer.New(conf.SQS.DLQName, conf.Internal.WorkerPool) - logger.Info("DLQ-X9") + logger.Info("DLQ-X9") - cons.Consume(logger, func(message *sqs.Message, log *logrus.Entry) error { - log.Infof("Sending message to Slack channel [Id: %s]", *message.MessageId) + cons.Consume(logger, func(message *sqs.Message, log *logrus.Entry) error { + log.Infof("Sending message to Slack channel [Id: %s]", *message.MessageId) - text := fmt.Sprintf( - "Hey, a new message was pusblished to the DLQ `%s` (Id:`%s`): ```%s```", - conf.SQS.DLQName, *message.MessageId, *message.Body, - ) - return slack.PostWebhook(conf.Slack.WebhookUrl, &slack.WebhookMessage{Text: text}) - }) + text := fmt.Sprintf( + "Hey, a new message was pusblished to the DLQ `%s` (Id:`%s`): ```%s```", + conf.SQS.DLQName, *message.MessageId, *message.Body, + ) + return slack.PostWebhook(conf.Slack.WebhookUrl, &slack.WebhookMessage{Text: text}) + }) - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGTERM) - signal.Notify(sigterm, syscall.SIGINT) - <-sigterm + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGTERM) + signal.Notify(sigterm, syscall.SIGINT) + <-sigterm }