-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e1e7d83
commit e06add5
Showing
3 changed files
with
140 additions
and
140 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,66 @@ | ||
package config | ||
|
||
import ( | ||
"github.com/sirupsen/logrus" | ||
"github.com/spf13/pflag" | ||
"github.com/spf13/viper" | ||
"github.com/sirupsen/logrus" | ||
"github.com/spf13/pflag" | ||
"github.com/spf13/viper" | ||
) | ||
|
||
const ( | ||
FileName = "config" | ||
FileFormat = "yaml" | ||
FileName = "config" | ||
FileFormat = "yaml" | ||
) | ||
|
||
type configuration struct { | ||
Slack struct { | ||
WebhookUrl string `mapstructure:"webhook-url"` | ||
} | ||
SQS struct { | ||
DLQName string `mapstructure:"dlq-name"` | ||
} | ||
Internal struct { | ||
WorkerPool int `mapstructure:"worker-pool"` | ||
StructuredLogs bool `mapstructure:"structured-logs"` | ||
} | ||
Slack struct { | ||
WebhookUrl string `mapstructure:"webhook-url"` | ||
} | ||
SQS struct { | ||
DLQName string `mapstructure:"dlq-name"` | ||
} | ||
Internal struct { | ||
WorkerPool int `mapstructure:"worker-pool"` | ||
StructuredLogs bool `mapstructure:"structured-logs"` | ||
} | ||
} | ||
|
||
func LoadConfiguration(log *logrus.Logger) *configuration { | ||
pflag.String("slack.webhook-url", "", "slack webhook url") | ||
pflag.String("sqs.dlq-name", "", "sqs dead-letter queue name") | ||
pflag.Int("internal.worker-pool", 1, "the size of the worker pool") | ||
pflag.Bool("internal.structured-logs", false, "print logs using json format") | ||
pflag.Parse() | ||
pflag.String("slack.webhook-url", "", "slack webhook url") | ||
pflag.String("sqs.dlq-name", "", "sqs dead-letter queue name") | ||
pflag.Int("internal.worker-pool", 1, "the size of the worker pool") | ||
pflag.Bool("internal.structured-logs", false, "print logs using json format") | ||
pflag.Parse() | ||
|
||
if err := viper.BindPFlags(pflag.CommandLine); err != nil { | ||
log.Fatalf("Fatal error parsing flags: %v", err) | ||
} | ||
if err := viper.BindPFlags(pflag.CommandLine); err != nil { | ||
log.Fatalf("Fatal error parsing flags: %v", err) | ||
} | ||
|
||
viper.SetConfigName(FileName) | ||
viper.SetConfigType(FileFormat) | ||
viper.AddConfigPath(".") | ||
viper.SetConfigName(FileName) | ||
viper.SetConfigType(FileFormat) | ||
viper.AddConfigPath(".") | ||
|
||
if err := viper.ReadInConfig(); err != nil { | ||
if _, ok := err.(viper.ConfigFileNotFoundError); !ok { | ||
log.Fatalf("Fatal error reading configuration file: %v", err) | ||
} | ||
} | ||
if err := viper.ReadInConfig(); err != nil { | ||
if _, ok := err.(viper.ConfigFileNotFoundError); !ok { | ||
log.Fatalf("Fatal error reading configuration file: %v", err) | ||
} | ||
} | ||
|
||
var config *configuration | ||
if err := viper.Unmarshal(&config); err != nil { | ||
log.Fatalf("Fatal error decoding file: %v", err) | ||
} | ||
var config *configuration | ||
if err := viper.Unmarshal(&config); err != nil { | ||
log.Fatalf("Fatal error decoding file: %v", err) | ||
} | ||
|
||
if config.Slack.WebhookUrl == "" { | ||
log.Fatal("Empty webhook URL") | ||
} | ||
if config.Slack.WebhookUrl == "" { | ||
log.Fatal("Empty webhook URL") | ||
} | ||
|
||
if config.SQS.DLQName == "" { | ||
log.Fatal("Empty dead-letter queue name") | ||
} | ||
if config.SQS.DLQName == "" { | ||
log.Fatal("Empty dead-letter queue name") | ||
} | ||
|
||
if config.Internal.WorkerPool < 1 { | ||
log.Fatal("Invalid worker-pool size") | ||
} | ||
if config.Internal.WorkerPool < 1 { | ||
log.Fatal("Invalid worker-pool size") | ||
} | ||
|
||
return config | ||
return config | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,89 +1,89 @@ | ||
package consumer | ||
|
||
import ( | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/sqs" | ||
"github.com/sirupsen/logrus" | ||
"sync" | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/sqs" | ||
"github.com/sirupsen/logrus" | ||
"sync" | ||
) | ||
|
||
type consumer struct { | ||
queueName string | ||
workerPool int | ||
queueName string | ||
workerPool int | ||
} | ||
|
||
func New(queueName string, workerPool int) *consumer { | ||
return &consumer{queueName, workerPool} | ||
return &consumer{queueName, workerPool} | ||
} | ||
|
||
func (c *consumer) Consume( | ||
log *logrus.Logger, | ||
fn func(message *sqs.Message, log *logrus.Entry) error, | ||
log *logrus.Logger, | ||
fn func(message *sqs.Message, log *logrus.Entry) error, | ||
) { | ||
for w := 1; w <= c.workerPool; w++ { | ||
go c.worker(fn, log.WithField("worker", w)) | ||
} | ||
for w := 1; w <= c.workerPool; w++ { | ||
go c.worker(fn, log.WithField("worker", w)) | ||
} | ||
} | ||
|
||
func (c *consumer) worker( | ||
fn func(message *sqs.Message, log *logrus.Entry) error, | ||
log *logrus.Entry, | ||
fn func(message *sqs.Message, log *logrus.Entry) error, | ||
log *logrus.Entry, | ||
) { | ||
log.Info("Starting") | ||
log.Info("Starting") | ||
|
||
sess := session.Must(session.NewSessionWithOptions(session.Options{ | ||
SharedConfigState: session.SharedConfigEnable, | ||
})) | ||
sess := session.Must(session.NewSessionWithOptions(session.Options{ | ||
SharedConfigState: session.SharedConfigEnable, | ||
})) | ||
|
||
svc := sqs.New(sess) | ||
svc := sqs.New(sess) | ||
|
||
result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ | ||
QueueName: aws.String(c.queueName), | ||
}) | ||
if err != nil { | ||
log.Fatalf("Fatal error retrieving queue URL: %v", err) | ||
} | ||
result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ | ||
QueueName: aws.String(c.queueName), | ||
}) | ||
if err != nil { | ||
log.Fatalf("Fatal error retrieving queue URL: %v", err) | ||
} | ||
|
||
queueUrl := result.QueueUrl | ||
queueUrl := result.QueueUrl | ||
|
||
for { | ||
output, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ | ||
AttributeNames: []*string{ | ||
aws.String(sqs.MessageSystemAttributeNameSentTimestamp), | ||
}, | ||
MessageAttributeNames: []*string{ | ||
aws.String(sqs.QueueAttributeNameAll), | ||
}, | ||
QueueUrl: queueUrl, | ||
MaxNumberOfMessages: aws.Int64(1), | ||
}) | ||
if err != nil { | ||
log.Errorf("Error retrieving messages: %v", err) | ||
continue | ||
} | ||
for { | ||
output, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ | ||
AttributeNames: []*string{ | ||
aws.String(sqs.MessageSystemAttributeNameSentTimestamp), | ||
}, | ||
MessageAttributeNames: []*string{ | ||
aws.String(sqs.QueueAttributeNameAll), | ||
}, | ||
QueueUrl: queueUrl, | ||
MaxNumberOfMessages: aws.Int64(1), | ||
}) | ||
if err != nil { | ||
log.Errorf("Error retrieving messages: %v", err) | ||
continue | ||
} | ||
|
||
var wg sync.WaitGroup | ||
for _, message := range output.Messages { | ||
wg.Add(1) | ||
go func(m *sqs.Message) { | ||
defer wg.Done() | ||
if err := fn(m, log); err != nil { | ||
log.Errorf("Error processing message (%s): %v", *m.MessageId, err) | ||
return | ||
} | ||
var wg sync.WaitGroup | ||
for _, message := range output.Messages { | ||
wg.Add(1) | ||
go func(m *sqs.Message) { | ||
defer wg.Done() | ||
if err := fn(m, log); err != nil { | ||
log.Errorf("Error processing message (%s): %v", *m.MessageId, err) | ||
return | ||
} | ||
|
||
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ | ||
QueueUrl: queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
}) | ||
if err != nil { | ||
log.Errorf("Error deleting message (%s): %v", *m.MessageId, err) | ||
return | ||
} | ||
}(message) | ||
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ | ||
QueueUrl: queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
}) | ||
if err != nil { | ||
log.Errorf("Error deleting message (%s): %v", *m.MessageId, err) | ||
return | ||
} | ||
}(message) | ||
|
||
wg.Wait() | ||
} | ||
} | ||
wg.Wait() | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,46 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"github.com/aws/aws-sdk-go/service/sqs" | ||
"github.com/hpedrorodrigues/dlq-x9/config" | ||
"github.com/hpedrorodrigues/dlq-x9/consumer" | ||
"github.com/sirupsen/logrus" | ||
"github.com/slack-go/slack" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"fmt" | ||
"github.com/aws/aws-sdk-go/service/sqs" | ||
"github.com/hpedrorodrigues/dlq-x9/config" | ||
"github.com/hpedrorodrigues/dlq-x9/consumer" | ||
"github.com/sirupsen/logrus" | ||
"github.com/slack-go/slack" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
) | ||
|
||
func main() { | ||
var logger = logrus.New() | ||
var logger = logrus.New() | ||
|
||
conf := config.LoadConfiguration(logger) | ||
conf := config.LoadConfiguration(logger) | ||
|
||
if conf.Internal.StructuredLogs { | ||
logger.SetFormatter(&logrus.JSONFormatter{}) | ||
} else { | ||
logger.SetFormatter(&logrus.TextFormatter{ | ||
FullTimestamp: true, | ||
}) | ||
} | ||
if conf.Internal.StructuredLogs { | ||
logger.SetFormatter(&logrus.JSONFormatter{}) | ||
} else { | ||
logger.SetFormatter(&logrus.TextFormatter{ | ||
FullTimestamp: true, | ||
}) | ||
} | ||
|
||
cons := consumer.New(conf.SQS.DLQName, conf.Internal.WorkerPool) | ||
cons := consumer.New(conf.SQS.DLQName, conf.Internal.WorkerPool) | ||
|
||
logger.Info("DLQ-X9") | ||
logger.Info("DLQ-X9") | ||
|
||
cons.Consume(logger, func(message *sqs.Message, log *logrus.Entry) error { | ||
log.Infof("Sending message to Slack channel [Id: %s]", *message.MessageId) | ||
cons.Consume(logger, func(message *sqs.Message, log *logrus.Entry) error { | ||
log.Infof("Sending message to Slack channel [Id: %s]", *message.MessageId) | ||
|
||
text := fmt.Sprintf( | ||
"Hey, a new message was pusblished to the DLQ `%s` (Id:`%s`): ```%s```", | ||
conf.SQS.DLQName, *message.MessageId, *message.Body, | ||
) | ||
return slack.PostWebhook(conf.Slack.WebhookUrl, &slack.WebhookMessage{Text: text}) | ||
}) | ||
text := fmt.Sprintf( | ||
"Hey, a new message was pusblished to the DLQ `%s` (Id:`%s`): ```%s```", | ||
conf.SQS.DLQName, *message.MessageId, *message.Body, | ||
) | ||
return slack.PostWebhook(conf.Slack.WebhookUrl, &slack.WebhookMessage{Text: text}) | ||
}) | ||
|
||
sigterm := make(chan os.Signal, 1) | ||
signal.Notify(sigterm, syscall.SIGTERM) | ||
signal.Notify(sigterm, syscall.SIGINT) | ||
<-sigterm | ||
sigterm := make(chan os.Signal, 1) | ||
signal.Notify(sigterm, syscall.SIGTERM) | ||
signal.Notify(sigterm, syscall.SIGINT) | ||
<-sigterm | ||
} |