Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with batch transactions #24

Merged
merged 14 commits into from
Jan 26, 2024
2 changes: 1 addition & 1 deletion cmd/circle/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// CheckAttestation checks the iris api for attestation status and returns true if attestation is complete
func CheckAttestation(cfg config.Config, logger log.Logger, irisLookupId string, txHash string, sourceDomain, destDomain uint32) *types.AttestationResponse {
func CheckAttestation(cfg config.Config, logger log.Logger, irisLookupId string, txHash string, sourceDomain, destDomain types.Domain) *types.AttestationResponse {
logger.Debug(fmt.Sprintf("Checking attestation for %s%s%s for source tx %s from %d to %d", cfg.Circle.AttestationBaseUrl, "0x", irisLookupId, txHash, sourceDomain, destDomain))

client := http.Client{Timeout: 2 * time.Second}
Expand Down
182 changes: 99 additions & 83 deletions cmd/ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"cosmossdk.io/log"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/strangelove-ventures/noble-cctp-relayer/config"
Expand All @@ -25,127 +24,144 @@ func Broadcast(
ctx context.Context,
cfg config.Config,
logger log.Logger,
msg *types.MessageState,
msgs []*types.MessageState,
sequenceMap *types.SequenceMap,
) (*ethtypes.Transaction, error) {
) error {

// set up eth client
client, err := ethclient.Dial(cfg.Networks.Destination.Ethereum.RPC)
if err != nil {
return nil, fmt.Errorf("unable to dial ethereum client: %w", err)
return fmt.Errorf("unable to dial ethereum client: %w", err)
}
defer client.Close()

backend := NewContractBackendWrapper(client)

privEcdsaKey, ethereumAddress, err := GetEcdsaKeyAddress(cfg.Networks.Minters[0].MinterPrivateKey)
if err != nil {
return nil, err
return err
}

auth, err := bind.NewKeyedTransactorWithChainID(privEcdsaKey, big.NewInt(cfg.Networks.Destination.Ethereum.ChainId))
if err != nil {
return nil, fmt.Errorf("unable to create auth: %w", err)
return fmt.Errorf("unable to create auth: %w", err)
}

messageTransmitter, err := NewMessageTransmitter(common.HexToAddress(cfg.Networks.Source.Ethereum.MessageTransmitter), backend)
if err != nil {
return nil, fmt.Errorf("unable to create message transmitter: %w", err)
return fmt.Errorf("unable to create message transmitter: %w", err)
}

attestationBytes, err := hex.DecodeString(msg.Attestation[2:])
if err != nil {
return nil, errors.New("unable to decode message attestation")
}

for attempt := 0; attempt <= cfg.Networks.Destination.Ethereum.BroadcastRetries; attempt++ {
logger.Info(fmt.Sprintf(
"Broadcasting %s message from %d to %d: with source tx hash %s",
msg.Type,
msg.SourceDomain,
msg.DestDomain,
msg.SourceTxHash))
var broadcastErrors error
for _, msg := range msgs {

nonce := sequenceMap.Next(cfg.Networks.Destination.Ethereum.DomainId)
auth.Nonce = big.NewInt(nonce)
if msg.Status == types.Complete {
continue
}

// TODO remove
nextNonce, err := GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress)
attestationBytes, err := hex.DecodeString(msg.Attestation[2:])
if err != nil {
logger.Error("unable to retrieve account number")
} else {
auth.Nonce = big.NewInt(nextNonce)
return errors.New("unable to decode message attestation")
}
// TODO end remove

// check if nonce already used
co := &bind.CallOpts{
Pending: true,
Context: ctx,
}
for attempt := 0; attempt <= cfg.Networks.Destination.Ethereum.BroadcastRetries; attempt++ {
logger.Info(fmt.Sprintf(
"Broadcasting message from %d to %d: with source tx hash %s",
msg.SourceDomain,
msg.DestDomain,
msg.SourceTxHash))

nonce := sequenceMap.Next(cfg.Networks.Destination.Ethereum.DomainId)
auth.Nonce = big.NewInt(nonce)

// TODO remove
nextNonce, err := GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress)
if err != nil {
logger.Error("unable to retrieve account number")
} else {
auth.Nonce = big.NewInt(nextNonce)
}
// TODO end remove

logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce)

key := append(
common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4),
common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)...,
)

response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key)))
if nonceErr != nil {
logger.Debug("Error querying whether nonce was used. Continuing...")
} else {
fmt.Printf("received used nonce response: %d\n", response)
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
msg.SourceDomain, msg.Nonce))
msg.Status = types.Complete
return nil, errors.New("receive message was already broadcasted")
// check if nonce already used
co := &bind.CallOpts{
Pending: true,
Context: ctx,
}
}

// broadcast txn
tx, err := messageTransmitter.ReceiveMessage(
auth,
msg.MsgSentBytes,
attestationBytes,
)
if err == nil {
msg.Status = types.Complete
return tx, nil
} else {
logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error()))
if parsedErr, ok := err.(JsonError); ok {
if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" {
logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce)

key := append(
common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4),
common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)...,
)

response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key)))
if nonceErr != nil {
logger.Debug("Error querying whether nonce was used. Continuing...")
} else {
fmt.Printf("received used nonce response: %d\n", response)
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
msg.SourceDomain, msg.Nonce))
msg.Status = types.Complete
return nil, parsedErr
return errors.New("receive message was already broadcasted")
}
}

match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error())
if match {
numberRegex := regexp.MustCompile("[0-9]+")
nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0)
if err != nil {
nextNonce, err = GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress)
// broadcast txn
tx, err := messageTransmitter.ReceiveMessage(
auth,
msg.MsgSentBytes,
attestationBytes,
)
if err == nil {
msg.Status = types.Complete

fullLog, err := tx.MarshalJSON()
if err != nil {
logger.Error("error marshalling eth tx log", err)
}

msg.DestTxHash = tx.Hash().Hex()

logger.Info(fmt.Sprintf("Successfully broadcast %s to Ethereum. Tx hash: %s, FULL LOG: %s", msg.SourceTxHash, msg.DestTxHash, string(fullLog)))
continue
} else {
logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error()))
if parsedErr, ok := err.(JsonError); ok {
if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" {
msg.Status = types.Complete
return parsedErr
}

match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error())
if match {
numberRegex := regexp.MustCompile("[0-9]+")
nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0)
if err != nil {
logger.Error("unable to retrieve account number")
nextNonce, err = GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress)
if err != nil {
logger.Error("unable to retrieve account number")
}
}
sequenceMap.Put(cfg.Networks.Destination.Ethereum.DomainId, nextNonce)
}
sequenceMap.Put(cfg.Networks.Destination.Ethereum.DomainId, nextNonce)
}
}

// if it's not the last attempt, retry
// TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints.
if attempt != cfg.Networks.Destination.Ethereum.BroadcastRetries {
logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Ethereum.BroadcastRetryInterval))
time.Sleep(time.Duration(cfg.Networks.Destination.Ethereum.BroadcastRetryInterval) * time.Second)
// if it's not the last attempt, retry
// TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints.
if attempt != cfg.Networks.Destination.Ethereum.BroadcastRetries {
logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Ethereum.BroadcastRetryInterval))
time.Sleep(time.Duration(cfg.Networks.Destination.Ethereum.BroadcastRetryInterval) * time.Second)
}
continue
}
continue
}
// retried max times with failure
msg.Status = types.Failed
broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts"))
}
msg.Status = types.Failed

return nil, errors.New("reached max number of broadcast attempts")
return broadcastErrors
}
23 changes: 16 additions & 7 deletions cmd/ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//go:embed abi/MessageTransmitter.json
var content embed.FS

func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *types.MessageState) {
func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *types.TxState) {
// set up client
messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json")
if err != nil {
Expand Down Expand Up @@ -72,7 +72,7 @@ func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *t
}
logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash))

processingQueue <- parsedMsg
processingQueue <- &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}}

// It might help to wait a small amount of time between sending messages into the processing queue
// so that account sequences / nonces are set correctly
Expand All @@ -81,6 +81,7 @@ func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *t

// consume stream
go func() {
var txState *types.TxState
for {
select {
case err := <-sub.Err():
Expand All @@ -93,12 +94,20 @@ func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *t
continue
}
logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash))
if txState == nil {
txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}}
} else if parsedMsg.SourceTxHash != txState.TxHash {
processingQueue <- txState
txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}}
} else {
txState.Msgs = append(txState.Msgs, parsedMsg)

processingQueue <- parsedMsg

// It might help to wait a small amount of time between sending messages into the processing queue
// so that account sequences / nonces are set correctly
// time.Sleep(10 * time.Millisecond)
}
default:
if txState != nil {
processingQueue <- txState
txState = nil
}
}
}
}()
Expand Down
25 changes: 12 additions & 13 deletions cmd/ethereum/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
package ethereum_test

import (
"os"
"testing"
"time"

"cosmossdk.io/log"
"github.com/rs/zerolog"
eth "github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum"
"github.com/strangelove-ventures/noble-cctp-relayer/config"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"github.com/stretchr/testify/require"
"os"
"testing"
"time"
)

var cfg config.Config
var logger log.Logger
var processingQueue chan *types.MessageState
var processingQueue chan *types.TxState

func init() {
cfg = config.Parse("../../.ignore/unit_tests.yaml")

logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.ErrorLevel))
processingQueue = make(chan *types.MessageState, 10000)
processingQueue = make(chan *types.TxState, 10000)
}

// tests for a historical log
Expand All @@ -32,21 +33,19 @@ func TestStartListener(t *testing.T) {

time.Sleep(5 * time.Second)

msg := <-processingQueue
tx := <-processingQueue

expectedMsg := &types.MessageState{
IrisLookupId: "a404f4155166a1fc7ffee145b5cac6d0f798333745289ab1db171344e226ef0c",
Type: "mint",
Status: "created",
SourceDomain: 0,
DestDomain: 4,
SourceTxHash: "0xe1d7729de300274ee3a2fd20ba179b14a8e3ffcd9d847c506b06760f0dad7802",
}
require.Equal(t, expectedMsg.IrisLookupId, msg.IrisLookupId)
require.Equal(t, expectedMsg.Type, msg.Type)
require.Equal(t, expectedMsg.Status, msg.Status)
require.Equal(t, expectedMsg.SourceDomain, msg.SourceDomain)
require.Equal(t, expectedMsg.DestDomain, msg.DestDomain)
require.Equal(t, expectedMsg.SourceTxHash, msg.SourceTxHash)
require.Equal(t, expectedMsg.IrisLookupId, tx.Msgs[0].IrisLookupId)
require.Equal(t, expectedMsg.Status, tx.Msgs[0].Status)
require.Equal(t, expectedMsg.SourceDomain, tx.Msgs[0].SourceDomain)
require.Equal(t, expectedMsg.DestDomain, tx.Msgs[0].DestDomain)
require.Equal(t, expectedMsg.SourceTxHash, tx.Msgs[0].SourceTxHash)

}
7 changes: 4 additions & 3 deletions cmd/ethereum/util_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package ethereum_test

import (
"os"
"testing"

"cosmossdk.io/log"
"github.com/rs/zerolog"
"github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum"
"github.com/strangelove-ventures/noble-cctp-relayer/config"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
"github.com/stretchr/testify/require"
"os"
"testing"
)

func init() {
cfg = config.Parse("../../.ignore/unit_tests.yaml")

logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.ErrorLevel))
processingQueue = make(chan *types.MessageState, 10000)
processingQueue = make(chan *types.TxState, 10000)
}

func TestGetEthereumAccountNonce(t *testing.T) {
Expand Down
Loading