Skip to content

Commit

Permalink
handle notifyClose
Browse files Browse the repository at this point in the history
  • Loading branch information
akshar committed Nov 3, 2023
1 parent 0edec90 commit b5d5017
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions datafeed/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const (
mqConsumerConcurrency = 1
)

const (
consumerRetries = 10
)

// MQService
type MQService struct {
logger hclog.Logger
Expand Down Expand Up @@ -61,6 +65,21 @@ func newMQService(logger hclog.Logger, config *MQConfig, datafeedService *DataFe
return mq, nil
}

func (mq *MQService) restartWithRetries(ctx context.Context) (<-chan *proto.DataFeedReport, <-chan error, error) {
for i := 0; i < consumerRetries; i++ {
mq.logger.Debug("Restarting consumer with try", i)
time.Sleep(5 * time.Second)
reports, errors, err := mq.startConsumer(ctx, mqConsumerConcurrency)
if err == nil {
return reports, errors, err
}
}

erroMsg := fmt.Sprintf("failed to restart consumer after %d retries", consumerRetries)
mq.logger.Error(erroMsg)
return nil, nil, fmt.Errorf(erroMsg)
}

// startConsumeLoop
func (mq *MQService) startConsumeLoop() {
mq.logger.Debug("listening for MQ messages...")
Expand All @@ -80,20 +99,10 @@ func (mq *MQService) startConsumeLoop() {
mq.datafeedService.queueReportingTx(ProposeOutcome, report.MarketHash, report.Outcome)
case err = <-errors:
mq.logger.Error("error while consuming from message queue", "err", err)
mq.logger.Debug("Restarting consumer...")
time.Sleep(2 * time.Second)
reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency)
if err != nil {
mq.logger.Error("Got Error during consumer restart", err)
}
reports, errors, _ = mq.restartWithRetries(ctx)
case <-common.GetTerminationSignalCh():
mq.logger.Debug("got sigterm, shuttown down mq consumer")
mq.logger.Debug("Restarting consumer...")
time.Sleep(2 * time.Second)
reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency)
if err != nil {
mq.logger.Error("Got Error during consumer restart", err)
}
reports, errors, _ = mq.restartWithRetries(ctx)

}
}
Expand Down Expand Up @@ -175,6 +184,14 @@ func (mq *MQService) startConsumer(
}()
}

go func() {
notifyCloseError := <-mq.connection.Channel.NotifyClose(make(chan *amqp.Error))
if notifyCloseError != nil {
mq.logger.Debug("Got notifyCloseError error")
errors <- fmt.Errorf("Connection closed: %v", notifyCloseError)
}
}()

// stop the consumer upon sigterm
go func() {
<-ctx.Done()
Expand Down

0 comments on commit b5d5017

Please sign in to comment.