Skip to content

Commit

Permalink
Adding more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
hpedrorodrigues committed Feb 18, 2021
1 parent de50079 commit e1e7d83
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ type configuration struct {
DLQName string `mapstructure:"dlq-name"`
}
Internal struct {
WorkerPool int `mapstructure:"worker-pool"`
WorkerPool int `mapstructure:"worker-pool"`
StructuredLogs bool `mapstructure:"structured-logs"`
}
}

func LoadConfiguration(log *logrus.Logger) *configuration {
pflag.String("slack.webhook-url", "", "slack webhook url")
pflag.String("sqs.dlq-name", "", "sqs dead-letter queue name")
pflag.Int("internal.worker-pool", 1, "the size of the worker pool")
pflag.Bool("internal.structured-logs", false, "print logs using json format")
pflag.Parse()

if err := viper.BindPFlags(pflag.CommandLine); err != nil {
Expand Down
28 changes: 17 additions & 11 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,26 @@ import (
type consumer struct {
queueName string
workerPool int
log *logrus.Logger
}

func New(queueName string, workerPool int, log *logrus.Logger) *consumer {
return &consumer{queueName, workerPool, log}
func New(queueName string, workerPool int) *consumer {
return &consumer{queueName, workerPool}
}

func (c *consumer) Consume(fn func(message *sqs.Message) error) {
func (c *consumer) Consume(
log *logrus.Logger,
fn func(message *sqs.Message, log *logrus.Entry) error,
) {
for w := 1; w <= c.workerPool; w++ {
go c.worker(w, fn)
go c.worker(fn, log.WithField("worker", w))
}
}

func (c *consumer) worker(id int, fn func(message *sqs.Message) error) {
c.log.Infof("Starting worker: %d", id)
func (c *consumer) worker(
fn func(message *sqs.Message, log *logrus.Entry) error,
log *logrus.Entry,
) {
log.Info("Starting")

sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Expand All @@ -37,7 +42,7 @@ func (c *consumer) worker(id int, fn func(message *sqs.Message) error) {
QueueName: aws.String(c.queueName),
})
if err != nil {
c.log.Fatalf("Fatal error retrieving queue URL: %v", err)
log.Fatalf("Fatal error retrieving queue URL: %v", err)
}

queueUrl := result.QueueUrl
Expand All @@ -54,7 +59,7 @@ func (c *consumer) worker(id int, fn func(message *sqs.Message) error) {
MaxNumberOfMessages: aws.Int64(1),
})
if err != nil {
c.log.Errorf("Error retrieving messages: %v", err)
log.Errorf("Error retrieving messages: %v", err)
continue
}

Expand All @@ -63,7 +68,8 @@ func (c *consumer) worker(id int, fn func(message *sqs.Message) error) {
wg.Add(1)
go func(m *sqs.Message) {
defer wg.Done()
if err := fn(m); err != nil {
if err := fn(m, log); err != nil {
log.Errorf("Error processing message (%s): %v", *m.MessageId, err)
return
}

Expand All @@ -72,7 +78,7 @@ func (c *consumer) worker(id int, fn func(message *sqs.Message) error) {
ReceiptHandle: message.ReceiptHandle,
})
if err != nil {
c.log.Errorf("Error deleting message (%s): %v", *m.MessageId, err)
log.Errorf("Error deleting message (%s): %v", *m.MessageId, err)
return
}
}(message)
Expand Down
21 changes: 16 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,25 @@ import (
)

func main() {
var log = logrus.New()
var logger = logrus.New()

conf := config.LoadConfiguration(log)
cons := consumer.New(conf.SQS.DLQName, conf.Internal.WorkerPool, log)
conf := config.LoadConfiguration(logger)

log.Info("DLQ-X9")
if conf.Internal.StructuredLogs {
logger.SetFormatter(&logrus.JSONFormatter{})
} else {
logger.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
}

cons := consumer.New(conf.SQS.DLQName, conf.Internal.WorkerPool)

logger.Info("DLQ-X9")

cons.Consume(logger, func(message *sqs.Message, log *logrus.Entry) error {
log.Infof("Sending message to Slack channel [Id: %s]", *message.MessageId)

cons.Consume(func(message *sqs.Message) error {
text := fmt.Sprintf(
"Hey, a new message was pusblished to the DLQ `%s` (Id:`%s`): ```%s```",
conf.SQS.DLQName, *message.MessageId, *message.Body,
Expand Down

0 comments on commit e1e7d83

Please sign in to comment.