Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: continue message processing after error #441

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions relayer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ func handleProcessError(ctx context.Context, err error) error {
logger.WithError(err).Error("error returned in process loop")
return nil
}

// isFatal returns true if the error is unrecoverable and the caller should
// return it immediately
func isFatal(err error) bool {
return goerrors.Is(err, context.Canceled) ||
goerrors.Is(err, context.DeadlineExceeded) ||
errors.IsUnrecoverable(err)
}
17 changes: 12 additions & 5 deletions relayer/message_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process
})

messagesInQueue, err := r.palomaClient.QueryMessagesForAttesting(ctx, queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to attest")
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
"message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 {
Expand All @@ -52,10 +60,6 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process
})

logger.Debug("got ", len(messagesInQueue), " messages from ", queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to attest")
return err
}

if len(messagesInQueue) > 0 {
logger := logger.WithFields(log.Fields{
Expand All @@ -75,7 +79,10 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process
Error(ctx); err != nil {
logger.WithError(err).Error("failed to send Paloma status update")
}
return err

if isFatal(err) {
return err
}
}
}
}
Expand Down
22 changes: 16 additions & 6 deletions relayer/message_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
})

messagesInQueue, err := r.palomaClient.QueryMessagesForEstimating(ctx, queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to estimate")
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
"message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 {
Expand All @@ -51,10 +59,6 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
})

logger.Debug("got ", len(messagesInQueue), " messages from ", queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to estimate")
return err
}

if len(messagesInQueue) > 0 {
logger := logger.WithFields(log.Fields{
Expand All @@ -66,7 +70,11 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
estimates, err := p.EstimateMessages(ctx, queue.FromString(queueName), messagesInQueue)
if err != nil {
logger.WithError(err).Error("error estimating messages")
return err
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

filteredEstimates := make([]chain.MessageWithEstimate, 0, len(estimates))
Expand All @@ -93,7 +101,9 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
err = r.palomaClient.AddMessagesGasEstimate(ctx, queueName, filteredEstimates...)
if err != nil {
logger.WithError(err).Error("failed to send estimates to Paloma")
return err
if isFatal(err) {
return err
}
}
}

Expand Down
16 changes: 11 additions & 5 deletions relayer/message_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo
})

messagesInQueue, err := r.palomaClient.QueryMessagesForRelaying(ctx, queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to relay")
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
"message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 {
Expand All @@ -78,10 +86,6 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo
})

logger.Debug("got ", len(messagesInQueue), " messages from ", queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to relay")
return err
}

for _, v := range messagesInQueue {
r.msgCache.records[v.ID] = struct{}{}
Expand All @@ -103,7 +107,9 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo
return msg.ID
}),
}).Error("error relaying messages")
return err
if isFatal(err) {
return err
}
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions relayer/message_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor
messagesForSigning, err := r.palomaClient.QueryMessagesForSigning(ctx, queueName)
if err != nil {
logger.Error("failed getting messages to sign")
return err
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
Expand All @@ -65,6 +69,8 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor
signedMessages, err := p.SignMessages(ctx, messagesForSigning...)
if err != nil {
logger.WithError(err).Error("unable to sign messages")
// If we fail to sign this batch, we will fail to sign them
// all, so might as well return now
return err
}
logger = logger.WithFields(log.Fields{
Expand All @@ -78,7 +84,9 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor

if err = r.broadcastSignatures(ctx, queueName, signedMessages); err != nil {
logger.WithError(err).Error("couldn't broadcast signatures and process attestation")
return err
if isFatal(err) {
return err
}
}
}

Expand Down
Loading