diff --git a/datafeed/mq.go b/datafeed/mq.go index 52f1cdc884..672ccef92a 100644 --- a/datafeed/mq.go +++ b/datafeed/mq.go @@ -17,6 +17,10 @@ const ( mqConsumerConcurrency = 1 ) +const ( + consumerRetries = 10 +) + // MQService type MQService struct { logger hclog.Logger @@ -61,6 +65,21 @@ func newMQService(logger hclog.Logger, config *MQConfig, datafeedService *DataFe return mq, nil } +func (mq *MQService) restartWithRetries(ctx context.Context) (<-chan *proto.DataFeedReport, <-chan error, error) { + for i := 0; i < consumerRetries; i++ { + mq.logger.Debug("Restarting consumer with try", i) + time.Sleep(5 * time.Second) + reports, errors, err := mq.startConsumer(ctx, mqConsumerConcurrency) + if err == nil { + return reports, errors, err + } + } + + erroMsg := fmt.Sprintf("failed to restart consumer after %d retries", consumerRetries) + mq.logger.Error(erroMsg) + return nil, nil, fmt.Errorf(erroMsg) +} + // startConsumeLoop func (mq *MQService) startConsumeLoop() { mq.logger.Debug("listening for MQ messages...") @@ -80,20 +99,10 @@ func (mq *MQService) startConsumeLoop() { mq.datafeedService.queueReportingTx(ProposeOutcome, report.MarketHash, report.Outcome) case err = <-errors: mq.logger.Error("error while consuming from message queue", "err", err) - mq.logger.Debug("Restarting consumer...") - time.Sleep(2 * time.Second) - reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency) - if err != nil { - mq.logger.Error("Got Error during consumer restart", err) - } + reports, errors, _ = mq.restartWithRetries(ctx) case <-common.GetTerminationSignalCh(): mq.logger.Debug("got sigterm, shuttown down mq consumer") - mq.logger.Debug("Restarting consumer...") - time.Sleep(2 * time.Second) - reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency) - if err != nil { - mq.logger.Error("Got Error during consumer restart", err) - } + reports, errors, _ = mq.restartWithRetries(ctx) } } @@ -175,6 +184,14 @@ func (mq *MQService) startConsumer( }() } + go func() { + notifyCloseError := <-mq.connection.Channel.NotifyClose(make(chan *amqp.Error)) + if notifyCloseError != nil { + mq.logger.Debug("Got notifyCloseError error") + errors <- fmt.Errorf("Connection closed: %v", notifyCloseError) + } + }() + // stop the consumer upon sigterm go func() { <-ctx.Done()