From daf4eaad70169e87398098d87d4fbd70d0186382 Mon Sep 17 00:00:00 2001 From: Akshar Dave Date: Fri, 3 Nov 2023 15:24:13 -0400 Subject: [PATCH] handle notifyClose --- datafeed/mq.go | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/datafeed/mq.go b/datafeed/mq.go index 52f1cdc884..4809b7a31e 100644 --- a/datafeed/mq.go +++ b/datafeed/mq.go @@ -17,6 +17,10 @@ const ( mqConsumerConcurrency = 1 ) +const ( + consumerRetries = 10 +) + // MQService type MQService struct { logger hclog.Logger @@ -61,6 +65,19 @@ func newMQService(logger hclog.Logger, config *MQConfig, datafeedService *DataFe return mq, nil } +func (mq *MQService) restartWithRetries(ctx context.Context) (<-chan *proto.DataFeedReport, <-chan error, error) { + for i := 0; i < consumerRetries; i++ { + mq.logger.Debug("Restarting consumer with try", i) + time.Sleep(5 * time.Second) + reports, errors, err := mq.startConsumer(ctx, mqConsumerConcurrency) + if err == nil { + return reports, errors, err + } + } + + return nil, nil, fmt.Errorf("failed to restart consumer after %d retries", consumerRetries) +} + // startConsumeLoop func (mq *MQService) startConsumeLoop() { mq.logger.Debug("listening for MQ messages...") @@ -80,19 +97,15 @@ func (mq *MQService) startConsumeLoop() { mq.datafeedService.queueReportingTx(ProposeOutcome, report.MarketHash, report.Outcome) case err = <-errors: mq.logger.Error("error while consuming from message queue", "err", err) - mq.logger.Debug("Restarting consumer...") - time.Sleep(2 * time.Second) - reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency) + reports, errors, err = mq.restartWithRetries(ctx) if err != nil { - mq.logger.Error("Got Error during consumer restart", err) + mq.logger.Error("failed to start consumer - errors chan", err) } case <-common.GetTerminationSignalCh(): mq.logger.Debug("got sigterm, shuttown down mq consumer") - mq.logger.Debug("Restarting consumer...") - time.Sleep(2 * time.Second) - reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency) + reports, errors, err = mq.restartWithRetries(ctx) if err != nil { - mq.logger.Error("Got Error during consumer restart", err) + mq.logger.Error("failed to start consumer - sigterm", err) } } @@ -175,6 +188,14 @@ func (mq *MQService) startConsumer( }() } + go func() { + notifyCloseError := <-mq.connection.Channel.NotifyClose(make(chan *amqp.Error)) + if notifyCloseError != nil { + mq.logger.Debug("Got notifyCloseError error") + errors <- fmt.Errorf("Connection closed: %v", notifyCloseError) + } + }() + // stop the consumer upon sigterm go func() { <-ctx.Done()