Skip to content

Commit

Permalink
handle notifyClose
Browse files Browse the repository at this point in the history
  • Loading branch information
akshar committed Nov 3, 2023
1 parent 0edec90 commit daf4eaa
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions datafeed/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const (
mqConsumerConcurrency = 1
)

const (
consumerRetries = 10
)

// MQService
type MQService struct {
logger hclog.Logger
Expand Down Expand Up @@ -61,6 +65,19 @@ func newMQService(logger hclog.Logger, config *MQConfig, datafeedService *DataFe
return mq, nil
}

func (mq *MQService) restartWithRetries(ctx context.Context) (<-chan *proto.DataFeedReport, <-chan error, error) {
for i := 0; i < consumerRetries; i++ {
mq.logger.Debug("Restarting consumer with try", i)
time.Sleep(5 * time.Second)
reports, errors, err := mq.startConsumer(ctx, mqConsumerConcurrency)
if err == nil {
return reports, errors, err
}
}

return nil, nil, fmt.Errorf("failed to restart consumer after %d retries", consumerRetries)
}

// startConsumeLoop
func (mq *MQService) startConsumeLoop() {
mq.logger.Debug("listening for MQ messages...")
Expand All @@ -80,19 +97,15 @@ func (mq *MQService) startConsumeLoop() {
mq.datafeedService.queueReportingTx(ProposeOutcome, report.MarketHash, report.Outcome)
case err = <-errors:
mq.logger.Error("error while consuming from message queue", "err", err)
mq.logger.Debug("Restarting consumer...")
time.Sleep(2 * time.Second)
reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency)
reports, errors, err = mq.restartWithRetries(ctx)
if err != nil {
mq.logger.Error("Got Error during consumer restart", err)
mq.logger.Error("failed to start consumer - errors chan", err)
}
case <-common.GetTerminationSignalCh():
mq.logger.Debug("got sigterm, shuttown down mq consumer")
mq.logger.Debug("Restarting consumer...")
time.Sleep(2 * time.Second)
reports, errors, err = mq.startConsumer(ctx, mqConsumerConcurrency)
reports, errors, err = mq.restartWithRetries(ctx)
if err != nil {
mq.logger.Error("Got Error during consumer restart", err)
mq.logger.Error("failed to start consumer - sigterm", err)
}

}
Expand Down Expand Up @@ -175,6 +188,14 @@ func (mq *MQService) startConsumer(
}()
}

go func() {
notifyCloseError := <-mq.connection.Channel.NotifyClose(make(chan *amqp.Error))
if notifyCloseError != nil {
mq.logger.Debug("Got notifyCloseError error")
errors <- fmt.Errorf("Connection closed: %v", notifyCloseError)
}
}()

// stop the consumer upon sigterm
go func() {
<-ctx.Done()
Expand Down

0 comments on commit daf4eaa

Please sign in to comment.