Skip to content

Commit

Permalink
retry only if processing completed for a packet (#1393)
Browse files Browse the repository at this point in the history
* retry only if processing completed for a packet

* nil safety

* keep assembled and lastProcessedHeight

* use counterparty key for remove packet retention

* Use channel for finishedProcessing
  • Loading branch information
agouin authored Feb 1, 2024
1 parent 4c41650 commit d821ab7
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 131 deletions.
47 changes: 30 additions & 17 deletions relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func (mp *messageProcessor) trackAndSendMessages(

retries := dst.trackProcessingMessage(t)
if t.assembledMsg() == nil {
dst.trackFinishedProcessingMessage(t)
continue
}

Expand Down Expand Up @@ -429,6 +430,9 @@ func (mp *messageProcessor) sendBatchMessages(
dst.log.Debug("Will relay messages", fields...)

callback := func(_ *provider.RelayerTxResponse, err error) {
for _, t := range batch {
dst.finishedProcessing <- t
}
// only increment metrics counts for successful packets
if err != nil || mp.metrics == nil {
return
Expand Down Expand Up @@ -466,6 +470,9 @@ func (mp *messageProcessor) sendBatchMessages(
}

if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks); err != nil {
for _, t := range batch {
dst.finishedProcessing <- t
}
errFields := []zapcore.Field{
zap.String("path_name", src.info.PathName),
zap.String("src_chain_id", src.info.ChainID),
Expand Down Expand Up @@ -510,26 +517,31 @@ func (mp *messageProcessor) sendSingleMessage(

// Set callback for packet messages so that we increment prometheus metrics on successful relays.
callbacks := []func(rtr *provider.RelayerTxResponse, err error){}
if t, ok := tracker.(packetMessageToTrack); ok {
callback := func(_ *provider.RelayerTxResponse, err error) {
// only increment metrics counts for successful packets
if err != nil || mp.metrics == nil {
return
}
var channel, port string
if t.msg.eventType == chantypes.EventTypeRecvPacket {
channel = t.msg.info.DestChannel
port = t.msg.info.DestPort
} else {
channel = t.msg.info.SourceChannel
port = t.msg.info.SourcePort
}
mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType)
}

callbacks = append(callbacks, callback)
callback := func(_ *provider.RelayerTxResponse, err error) {
dst.finishedProcessing <- tracker

t, ok := tracker.(packetMessageToTrack)
if !ok {
return
}
// only increment metrics counts for successful packets
if err != nil || mp.metrics == nil {
return
}
var channel, port string
if t.msg.eventType == chantypes.EventTypeRecvPacket {
channel = t.msg.info.DestChannel
port = t.msg.info.DestPort
} else {
channel = t.msg.info.SourceChannel
port = t.msg.info.SourcePort
}
mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType)
}

callbacks = append(callbacks, callback)

//During testing, this adds a callback so our test case can inspect the TX results
if PathProcMessageCollector != nil {
testCallback := func(rtr *provider.RelayerTxResponse, err error) {
Expand All @@ -546,6 +558,7 @@ func (mp *messageProcessor) sendSingleMessage(

err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks)
if err != nil {
dst.finishedProcessing <- tracker
errFields := []zapcore.Field{
zap.String("path_name", src.info.PathName),
zap.String("src_chain_id", src.info.ChainID),
Expand Down
Loading

0 comments on commit d821ab7

Please sign in to comment.