From 93710fe7462b2bf6d1e8bc1e08757cd7f47c90b4 Mon Sep 17 00:00:00 2001 From: Luis Carvalho Date: Mon, 23 Sep 2024 17:49:57 +0100 Subject: [PATCH] chore: continue message processing after error --- relayer/errors.go | 8 ++++++++ relayer/message_attester.go | 17 ++++++++++++----- relayer/message_estimator.go | 22 ++++++++++++++++------ relayer/message_relayer.go | 16 +++++++++++----- relayer/message_signer.go | 12 ++++++++++-- 5 files changed, 57 insertions(+), 18 deletions(-) diff --git a/relayer/errors.go b/relayer/errors.go index cc05f53b..6b9fbadd 100644 --- a/relayer/errors.go +++ b/relayer/errors.go @@ -39,3 +39,11 @@ func handleProcessError(ctx context.Context, err error) error { logger.WithError(err).Error("error returned in process loop") return nil } + +// isFatal returns true if the error is unrecoverable and the caller should +// return it immediately +func isFatal(err error) bool { + return goerrors.Is(err, context.Canceled) || + goerrors.Is(err, context.DeadlineExceeded) || + errors.IsUnrecoverable(err) +} diff --git a/relayer/message_attester.go b/relayer/message_attester.go index 2699f23b..614edec8 100644 --- a/relayer/message_attester.go +++ b/relayer/message_attester.go @@ -44,6 +44,14 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process }) messagesInQueue, err := r.palomaClient.QueryMessagesForAttesting(ctx, queueName) + if err != nil { + logger.WithError(err).Error("couldn't get messages to attest") + if isFatal(err) { + return err + } + // Move on to the next queue on the same chain + continue + } logger = logger.WithFields(log.Fields{ "message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 { @@ -52,10 +60,6 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process }) logger.Debug("got ", len(messagesInQueue), " messages from ", queueName) - if err != nil { - logger.WithError(err).Error("couldn't get messages to attest") - return err - } if len(messagesInQueue) > 0 { logger := logger.WithFields(log.Fields{ @@ -75,7 +79,10 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process Error(ctx); err != nil { logger.WithError(err).Error("failed to send Paloma status update") } - return err + + if isFatal(err) { + return err + } } } } diff --git a/relayer/message_estimator.go b/relayer/message_estimator.go index 49e0bd45..4a0f2057 100644 --- a/relayer/message_estimator.go +++ b/relayer/message_estimator.go @@ -43,6 +43,14 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce }) messagesInQueue, err := r.palomaClient.QueryMessagesForEstimating(ctx, queueName) + if err != nil { + logger.WithError(err).Error("couldn't get messages to estimate") + if isFatal(err) { + return err + } + // Move on to the next queue on the same chain + continue + } logger = logger.WithFields(log.Fields{ "message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 { @@ -51,10 +59,6 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce }) logger.Debug("got ", len(messagesInQueue), " messages from ", queueName) - if err != nil { - logger.WithError(err).Error("couldn't get messages to estimate") - return err - } if len(messagesInQueue) > 0 { logger := logger.WithFields(log.Fields{ @@ -66,7 +70,11 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce estimates, err := p.EstimateMessages(ctx, queue.FromString(queueName), messagesInQueue) if err != nil { logger.WithError(err).Error("error estimating messages") - return err + if isFatal(err) { + return err + } + // Move on to the next queue on the same chain + continue } filteredEstimates := make([]chain.MessageWithEstimate, 0, len(estimates)) @@ -93,7 +101,9 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce err = r.palomaClient.AddMessagesGasEstimate(ctx, queueName, filteredEstimates...) if err != nil { logger.WithError(err).Error("failed to send estimates to Paloma") - return err + if isFatal(err) { + return err + } } } diff --git a/relayer/message_relayer.go b/relayer/message_relayer.go index da94834b..17c0d8b5 100644 --- a/relayer/message_relayer.go +++ b/relayer/message_relayer.go @@ -70,6 +70,14 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo }) messagesInQueue, err := r.palomaClient.QueryMessagesForRelaying(ctx, queueName) + if err != nil { + logger.WithError(err).Error("couldn't get messages to relay") + if isFatal(err) { + return err + } + // Move on to the next queue on the same chain + continue + } logger = logger.WithFields(log.Fields{ "message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 { @@ -78,10 +86,6 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo }) logger.Debug("got ", len(messagesInQueue), " messages from ", queueName) - if err != nil { - logger.WithError(err).Error("couldn't get messages to relay") - return err - } for _, v := range messagesInQueue { r.msgCache.records[v.ID] = struct{}{} @@ -103,7 +107,9 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo return msg.ID }), }).Error("error relaying messages") - return err + if isFatal(err) { + return err + } } } } diff --git a/relayer/message_signer.go b/relayer/message_signer.go index b1e812d9..e53327dd 100644 --- a/relayer/message_signer.go +++ b/relayer/message_signer.go @@ -45,7 +45,11 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor messagesForSigning, err := r.palomaClient.QueryMessagesForSigning(ctx, queueName) if err != nil { logger.Error("failed getting messages to sign") - return err + if isFatal(err) { + return err + } + // Move on to the next queue on the same chain + continue } logger = logger.WithFields(log.Fields{ @@ -65,6 +69,8 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor signedMessages, err := p.SignMessages(ctx, messagesForSigning...) if err != nil { logger.WithError(err).Error("unable to sign messages") + // If we fail to sign this batch, we will fail to sign them + // all, so might as well return now return err } logger = logger.WithFields(log.Fields{ @@ -78,7 +84,9 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor if err = r.broadcastSignatures(ctx, queueName, signedMessages); err != nil { logger.WithError(err).Error("couldn't broadcast signatures and process attestation") - return err + if isFatal(err) { + return err + } } }