From 6d7f649c0081515c8565d93a5488f1cde237cd87 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Thu, 18 Jan 2024 19:20:06 -0700 Subject: [PATCH] Refactorrrrrrrrrr --- README.md | 8 +- {cmd/circle => circle}/attestation.go | 9 +- {cmd/circle => circle}/attestation_test.go | 10 +- cmd/ethereum/broadcast.go | 167 ------- cmd/ethereum/listener.go | 114 ----- cmd/noble/broadcast.go | 274 ----------- cmd/noble/listener.go | 113 ----- cmd/process.go | 154 ++---- cmd/process_test.go | 88 ++-- cmd/root.go | 68 +-- config/config.go | 73 --- {cmd/noble/cosmos => cosmos}/codec.go | 0 .../noble/cosmos => cosmos}/cosmosprovider.go | 0 {cmd/noble/cosmos => cosmos}/grpc_shim.go | 0 {cmd/noble/cosmos => cosmos}/query.go | 0 {cmd/noble/cosmos => cosmos}/query_test.go | 2 +- {cmd/ethereum => ethereum}/abi/ERC20.json | 0 .../abi/MessageTransmitter.json | 0 .../abi/TokenMessenger.json | 0 .../abi/TokenMessengerWithMetadata.json | 0 {cmd/ethereum => ethereum}/broadcast_test.go | 4 +- ethereum/chain.go | 380 +++++++++++++++ ethereum/config.go | 38 ++ .../contract_backend_wrapper.go | 0 .../contracts}/MessageTransmitter.go | 2 +- {cmd => ethereum/contracts}/TokenMessenger.go | 2 +- .../contracts}/TokenMessengerWithMetadata.go | 2 +- {cmd/ethereum => ethereum}/listener_test.go | 22 +- {cmd/ethereum => ethereum}/util.go | 0 {cmd/ethereum => ethereum}/util_test.go | 13 +- noble/chain.go | 459 ++++++++++++++++++ noble/config.go | 37 ++ {cmd/noble => noble}/listener_test.go | 21 +- noble/message_state.go | 77 +++ types/chain.go | 43 ++ types/config.go | 34 ++ types/message_state.go | 87 ---- types/message_state.go.bak | 192 -------- types/message_state_test.go | 13 +- types/sequence_map.go | 8 +- 40 files changed, 1231 insertions(+), 1283 deletions(-) rename {cmd/circle => circle}/attestation.go (62%) rename {cmd/circle => circle}/attestation_test.go (56%) delete mode 100644 cmd/ethereum/broadcast.go delete mode 100644 cmd/ethereum/listener.go delete mode 100644 cmd/noble/broadcast.go delete mode 100644 cmd/noble/listener.go delete mode 100644 config/config.go rename {cmd/noble/cosmos => cosmos}/codec.go (100%) rename {cmd/noble/cosmos => cosmos}/cosmosprovider.go (100%) rename {cmd/noble/cosmos => cosmos}/grpc_shim.go (100%) rename {cmd/noble/cosmos => cosmos}/query.go (100%) rename {cmd/noble/cosmos => cosmos}/query_test.go (86%) rename {cmd/ethereum => ethereum}/abi/ERC20.json (100%) rename {cmd/ethereum => ethereum}/abi/MessageTransmitter.json (100%) rename {cmd/ethereum => ethereum}/abi/TokenMessenger.json (100%) rename {cmd/ethereum => ethereum}/abi/TokenMessengerWithMetadata.json (100%) rename {cmd/ethereum => ethereum}/broadcast_test.go (82%) create mode 100644 ethereum/chain.go create mode 100644 ethereum/config.go rename {cmd/ethereum => ethereum}/contract_backend_wrapper.go (100%) rename {cmd/ethereum => ethereum/contracts}/MessageTransmitter.go (99%) rename {cmd => ethereum/contracts}/TokenMessenger.go (99%) rename {cmd => ethereum/contracts}/TokenMessengerWithMetadata.go (99%) rename {cmd/ethereum => ethereum}/listener_test.go (73%) rename {cmd/ethereum => ethereum}/util.go (100%) rename {cmd/ethereum => ethereum}/util_test.go (58%) create mode 100644 noble/chain.go create mode 100644 noble/config.go rename {cmd/noble => noble}/listener_test.go (70%) create mode 100644 noble/message_state.go create mode 100644 types/chain.go create mode 100644 types/config.go delete mode 100644 types/message_state.go.bak diff --git a/README.md b/README.md index 3012fd84..103698ed 100644 --- a/README.md +++ b/README.md @@ -42,10 +42,10 @@ localhost:8000/tx/?type=forward ### Generating Go ABI bindings ```shell -abigen --abi cmd/ethereum/abi/TokenMessenger.json --pkg cmd --type TokenMessenger --out cmd/TokenMessenger.go -abigen --abi cmd/ethereum/abi/TokenMessengerWithMetadata.json --pkg cmd --type TokenMessengerWithMetadata --out cmd/TokenMessengerWithMetadata.go -abigen --abi cmd/ethereum/abi/ERC20.json --pkg integration_testing --type ERC20 --out integration/ERC20.go -abigen --abi cmd/ethereum/abi/MessageTransmitter.json --pkg cmd --type MessageTransmitter --out cmd/MessageTransmitter.go +abigen --abi ethereum/abi/TokenMessenger.json --pkg contracts --type TokenMessenger --out ethereum/contracts/TokenMessenger.go +abigen --abi ethereum/abi/TokenMessengerWithMetadata.json --pkg contracts --type TokenMessengerWithMetadata --out ethereum/contracts/TokenMessengerWithMetadata.go +abigen --abi ethereum/abi/ERC20.json --pkg integration_testing --type ERC20 --out integration/ERC20.go +abigen --abi ethereum/abi/MessageTransmitter.json --pkg contracts- --type MessageTransmitter --out ethereum/contracts/MessageTransmitter.go ``` ### Useful links diff --git a/cmd/circle/attestation.go b/circle/attestation.go similarity index 62% rename from cmd/circle/attestation.go rename to circle/attestation.go index 0fac048e..c51da8a6 100644 --- a/cmd/circle/attestation.go +++ b/circle/attestation.go @@ -8,17 +8,16 @@ import ( "time" "cosmossdk.io/log" - "github.com/strangelove-ventures/noble-cctp-relayer/config" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) // CheckAttestation checks the iris api for attestation status and returns true if attestation is complete -func CheckAttestation(cfg config.Config, logger log.Logger, irisLookupId string, txHash string, sourceDomain, destDomain types.Domain) *types.AttestationResponse { - logger.Debug(fmt.Sprintf("Checking attestation for %s%s%s for source tx %s from %d to %d", cfg.Circle.AttestationBaseUrl, "0x", irisLookupId, txHash, sourceDomain, destDomain)) +func CheckAttestation(attestationURL string, logger log.Logger, irisLookupId string, txHash string, sourceDomain, destDomain types.Domain) *types.AttestationResponse { + logger.Debug(fmt.Sprintf("Checking attestation for %s%s%s for source tx %s from %d to %d", attestationURL, "0x", irisLookupId, txHash, sourceDomain, destDomain)) client := http.Client{Timeout: 2 * time.Second} - rawResponse, err := client.Get(cfg.Circle.AttestationBaseUrl + "0x" + irisLookupId) + rawResponse, err := client.Get(attestationURL + "0x" + irisLookupId) if err != nil { logger.Debug("error during request: " + err.Error()) return nil @@ -39,7 +38,7 @@ func CheckAttestation(cfg config.Config, logger log.Logger, irisLookupId string, logger.Debug("unable to unmarshal response") return nil } - logger.Info(fmt.Sprintf("Attestation found for %s%s%s", cfg.Circle.AttestationBaseUrl, "0x", irisLookupId)) + logger.Info(fmt.Sprintf("Attestation found for %s%s%s", attestationURL, "0x", irisLookupId)) return &response } diff --git a/cmd/circle/attestation_test.go b/circle/attestation_test.go similarity index 56% rename from cmd/circle/attestation_test.go rename to circle/attestation_test.go index 1e68cc51..a2fa2521 100644 --- a/cmd/circle/attestation_test.go +++ b/circle/attestation_test.go @@ -6,12 +6,12 @@ import ( "cosmossdk.io/log" "github.com/rs/zerolog" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/circle" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + "github.com/strangelove-ventures/noble-cctp-relayer/circle" + "github.com/strangelove-ventures/noble-cctp-relayer/types" "github.com/stretchr/testify/require" ) -var cfg config.Config +var cfg types.Config var logger log.Logger func init() { @@ -20,12 +20,12 @@ func init() { } func TestAttestationIsReady(t *testing.T) { - resp := circle.CheckAttestation(cfg, logger, "85bbf7e65a5992e6317a61f005e06d9972a033d71b514be183b179e1b47723fe", "", 0, 4) + resp := circle.CheckAttestation(cfg.Circle.AttestationBaseUrl, logger, "85bbf7e65a5992e6317a61f005e06d9972a033d71b514be183b179e1b47723fe", "", 0, 4) require.NotNil(t, resp) require.Equal(t, "complete", resp.Status) } func TestAttestationNotFound(t *testing.T) { - resp := circle.CheckAttestation(cfg, logger, "not an attestation", "", 0, 4) + resp := circle.CheckAttestation(cfg.Circle.AttestationBaseUrl, logger, "not an attestation", "", 0, 4) require.Nil(t, resp) } diff --git a/cmd/ethereum/broadcast.go b/cmd/ethereum/broadcast.go deleted file mode 100644 index 31cef849..00000000 --- a/cmd/ethereum/broadcast.go +++ /dev/null @@ -1,167 +0,0 @@ -package ethereum - -import ( - "context" - "encoding/hex" - "errors" - "fmt" - "math/big" - "regexp" - "strconv" - "time" - - "cosmossdk.io/log" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/strangelove-ventures/noble-cctp-relayer/config" - "github.com/strangelove-ventures/noble-cctp-relayer/types" -) - -// Broadcast broadcasts a message to Ethereum -func Broadcast( - ctx context.Context, - cfg config.Config, - logger log.Logger, - msgs []*types.MessageState, - sequenceMap *types.SequenceMap, -) error { - - // set up eth client - client, err := ethclient.Dial(cfg.Networks.Destination.Ethereum.RPC) - if err != nil { - return fmt.Errorf("unable to dial ethereum client: %w", err) - } - defer client.Close() - - backend := NewContractBackendWrapper(client) - - privEcdsaKey, ethereumAddress, err := GetEcdsaKeyAddress(cfg.Networks.Minters[0].MinterPrivateKey) - if err != nil { - return err - } - - auth, err := bind.NewKeyedTransactorWithChainID(privEcdsaKey, big.NewInt(cfg.Networks.Destination.Ethereum.ChainId)) - if err != nil { - return fmt.Errorf("unable to create auth: %w", err) - } - - messageTransmitter, err := NewMessageTransmitter(common.HexToAddress(cfg.Networks.Source.Ethereum.MessageTransmitter), backend) - if err != nil { - return fmt.Errorf("unable to create message transmitter: %w", err) - } - - var broadcastErrors error - for _, msg := range msgs { - - if msg.Status == types.Complete { - continue - } - - attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) - if err != nil { - return errors.New("unable to decode message attestation") - } - - for attempt := 0; attempt <= cfg.Networks.Destination.Ethereum.BroadcastRetries; attempt++ { - logger.Info(fmt.Sprintf( - "Broadcasting message from %d to %d: with source tx hash %s", - msg.SourceDomain, - msg.DestDomain, - msg.SourceTxHash)) - - nonce := sequenceMap.Next(cfg.Networks.Destination.Ethereum.DomainId) - auth.Nonce = big.NewInt(nonce) - - // TODO remove - nextNonce, err := GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress) - if err != nil { - logger.Error("unable to retrieve account number") - } else { - auth.Nonce = big.NewInt(nextNonce) - } - // TODO end remove - - // check if nonce already used - co := &bind.CallOpts{ - Pending: true, - Context: ctx, - } - - logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce) - - key := append( - common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4), - common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)..., - ) - - response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key))) - if nonceErr != nil { - logger.Debug("Error querying whether nonce was used. Continuing...") - } else { - fmt.Printf("received used nonce response: %d\n", response) - if response.Uint64() == uint64(1) { - // nonce has already been used, mark as complete - logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", - msg.SourceDomain, msg.Nonce)) - msg.Status = types.Complete - return errors.New("receive message was already broadcasted") - } - } - - // broadcast txn - tx, err := messageTransmitter.ReceiveMessage( - auth, - msg.MsgSentBytes, - attestationBytes, - ) - if err == nil { - msg.Status = types.Complete - - fullLog, err := tx.MarshalJSON() - if err != nil { - logger.Error("error marshalling eth tx log", err) - } - - msg.DestTxHash = tx.Hash().Hex() - - logger.Info(fmt.Sprintf("Successfully broadcast %s to Ethereum. Tx hash: %s, FULL LOG: %s", msg.SourceTxHash, msg.DestTxHash, string(fullLog))) - continue - } else { - logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error())) - if parsedErr, ok := err.(JsonError); ok { - if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" { - msg.Status = types.Complete - return parsedErr - } - - match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error()) - if match { - numberRegex := regexp.MustCompile("[0-9]+") - nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0) - if err != nil { - nextNonce, err = GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, ethereumAddress) - if err != nil { - logger.Error("unable to retrieve account number") - } - } - sequenceMap.Put(cfg.Networks.Destination.Ethereum.DomainId, nextNonce) - } - } - - // if it's not the last attempt, retry - // TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints. - if attempt != cfg.Networks.Destination.Ethereum.BroadcastRetries { - logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Ethereum.BroadcastRetryInterval)) - time.Sleep(time.Duration(cfg.Networks.Destination.Ethereum.BroadcastRetryInterval) * time.Second) - } - continue - } - } - // retried max times with failure - msg.Status = types.Failed - broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts")) - } - return broadcastErrors -} diff --git a/cmd/ethereum/listener.go b/cmd/ethereum/listener.go deleted file mode 100644 index 1e7088b7..00000000 --- a/cmd/ethereum/listener.go +++ /dev/null @@ -1,114 +0,0 @@ -package ethereum - -import ( - "bytes" - "context" - "embed" - "fmt" - "math/big" - "os" - - "cosmossdk.io/log" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/pascaldekloe/etherstream" - "github.com/strangelove-ventures/noble-cctp-relayer/config" - "github.com/strangelove-ventures/noble-cctp-relayer/types" -) - -//go:embed abi/MessageTransmitter.json -var content embed.FS - -func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *types.TxState) { - // set up client - messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json") - if err != nil { - logger.Error("unable to read MessageTransmitter abi", "err", err) - os.Exit(1) - } - messageTransmitterABI, err := abi.JSON(bytes.NewReader(messageTransmitter)) - if err != nil { - logger.Error("unable to parse MessageTransmitter abi", "err", err) - } - - messageSent := messageTransmitterABI.Events["MessageSent"] - - ethClient, err := ethclient.DialContext(context.Background(), cfg.Networks.Source.Ethereum.RPC) - if err != nil { - logger.Error("unable to initialize ethereum client", "err", err) - os.Exit(1) - } - - messageTransmitterAddress := common.HexToAddress(cfg.Networks.Source.Ethereum.MessageTransmitter) - etherReader := etherstream.Reader{Backend: ethClient} - - query := ethereum.FilterQuery{ - Addresses: []common.Address{messageTransmitterAddress}, - Topics: [][]common.Hash{{messageSent.ID}}, - FromBlock: big.NewInt(int64(cfg.Networks.Source.Ethereum.StartBlock - cfg.Networks.Source.Ethereum.LookbackPeriod)), - } - - logger.Info(fmt.Sprintf( - "Starting Ethereum listener at block %d looking back %d blocks", - cfg.Networks.Source.Ethereum.StartBlock, - cfg.Networks.Source.Ethereum.LookbackPeriod)) - - // websockets do not query history - // https://github.com/ethereum/go-ethereum/issues/15063 - stream, sub, history, err := etherReader.QueryWithHistory(context.Background(), &query) - if err != nil { - logger.Error("unable to subscribe to logs", "err", err) - os.Exit(1) - } - - // process history - for _, historicalLog := range history { - parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog) - if err != nil { - logger.Error("Unable to parse history log into MessageState, skipping", "err", err) - continue - } - logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - - processingQueue <- &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - - // It might help to wait a small amount of time between sending messages into the processing queue - // so that account sequences / nonces are set correctly - // time.Sleep(10 * time.Millisecond) - } - - // consume stream - go func() { - var txState *types.TxState - for { - select { - case err := <-sub.Err(): - logger.Error("connection closed", "err", err) - os.Exit(1) - case streamLog := <-stream: - parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) - if err != nil { - logger.Error("Unable to parse ws log into MessageState, skipping") - continue - } - logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - if txState == nil { - txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - } else if parsedMsg.SourceTxHash != txState.TxHash { - processingQueue <- txState - txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} - } else { - txState.Msgs = append(txState.Msgs, parsedMsg) - - } - default: - if txState != nil { - processingQueue <- txState - txState = nil - } - } - } - }() -} diff --git a/cmd/noble/broadcast.go b/cmd/noble/broadcast.go deleted file mode 100644 index 8cd77b74..00000000 --- a/cmd/noble/broadcast.go +++ /dev/null @@ -1,274 +0,0 @@ -package noble - -import ( - "context" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "io" - "math/rand" - "net/http" - "regexp" - "strconv" - "time" - - "cosmossdk.io/log" - nobletypes "github.com/circlefin/noble-cctp/x/cctp/types" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - libclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" - sdkClient "github.com/cosmos/cosmos-sdk/client" - clientTx "github.com/cosmos/cosmos-sdk/client/tx" - "github.com/cosmos/cosmos-sdk/codec" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" - "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/cosmos/cosmos-sdk/types/tx/signing" - xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" - xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/noble/cosmos" - "github.com/strangelove-ventures/noble-cctp-relayer/config" - "github.com/strangelove-ventures/noble-cctp-relayer/types" -) - -// Broadcast broadcasts a message to Noble -func Broadcast( - ctx context.Context, - cfg config.Config, - logger log.Logger, - msgs []*types.MessageState, - sequenceMap *types.SequenceMap, -) error { - // set up sdk context - interfaceRegistry := codectypes.NewInterfaceRegistry() - nobletypes.RegisterInterfaces(interfaceRegistry) - cdc := codec.NewProtoCodec(interfaceRegistry) - sdkContext := sdkClient.Context{ - TxConfig: xauthtx.NewTxConfig(cdc, xauthtx.DefaultSignModes), - } - - // build txn - txBuilder := sdkContext.TxConfig.NewTxBuilder() - - // get priv key - nobleAddress := cfg.Networks.Minters[4].MinterAddress - keyBz, err := hex.DecodeString(cfg.Networks.Minters[4].MinterPrivateKey) - if err != nil { - return fmt.Errorf("unable to parse Noble private key") - } - privKey := secp256k1.PrivKey{Key: keyBz} - - rpcClient, err := NewRPCClient(cfg.Networks.Destination.Noble.RPC, 10*time.Second) - if err != nil { - return errors.New("failed to set up rpc client") - } - - cc, err := cosmos.NewProvider(cfg.Networks.Source.Noble.RPC) - if err != nil { - return fmt.Errorf("unable to build cosmos provider for noble: %w", err) - } - - // sign and broadcast txn - for attempt := 0; attempt <= cfg.Networks.Destination.Noble.BroadcastRetries; attempt++ { - - var receiveMsgs []sdk.Msg - for _, msg := range msgs { - - used, err := cc.QueryUsedNonce(ctx, types.Domain(msg.SourceDomain), msg.Nonce) - if err != nil { - return fmt.Errorf("unable to query used nonce: %w", err) - } - - if used { - msg.Status = types.Complete - logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used", msg.Nonce)) - continue - } - - attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) - if err != nil { - return fmt.Errorf("unable to decode message attestation") - } - - receiveMsgs = append(receiveMsgs, nobletypes.NewMsgReceiveMessage( - nobleAddress, - msg.MsgSentBytes, - attestationBytes, - )) - - logger.Info(fmt.Sprintf( - "Broadcasting message from %d to %d: with source tx hash %s", - msg.SourceDomain, - msg.DestDomain, - msg.SourceTxHash)) - } - - err = txBuilder.SetMsgs(receiveMsgs...) - if err != nil { - return fmt.Errorf("failed to set messages on tx: %w", err) - } - - txBuilder.SetGasLimit(cfg.Networks.Destination.Noble.GasLimit) - // TODO: make configurable - txBuilder.SetMemo("Thank you for relaying with Strangelove") - - accountSequence := sequenceMap.Next(cfg.Networks.Destination.Noble.DomainId) - //TODO: don't need to fetch this everytime - accountNumber, _, err := GetNobleAccountNumberSequence(cfg.Networks.Destination.Noble.API, nobleAddress) - - if err != nil { - return fmt.Errorf("failed to retrieve account number and sequence: %w", err) - } - - sigV2 := signing.SignatureV2{ - PubKey: privKey.PubKey(), - Data: &signing.SingleSignatureData{ - SignMode: sdkContext.TxConfig.SignModeHandler().DefaultMode(), - Signature: nil, - }, - Sequence: uint64(accountSequence), - } - - signerData := xauthsigning.SignerData{ - ChainID: cfg.Networks.Destination.Noble.ChainId, - AccountNumber: uint64(accountNumber), - Sequence: uint64(accountSequence), - } - - txBuilder.SetSignatures(sigV2) - - sigV2, err = clientTx.SignWithPrivKey( - sdkContext.TxConfig.SignModeHandler().DefaultMode(), - signerData, - txBuilder, - &privKey, - sdkContext.TxConfig, - uint64(accountSequence), - ) - if err != nil { - return fmt.Errorf("failed to sign tx: %w", err) - } - - if err := txBuilder.SetSignatures(sigV2); err != nil { - return fmt.Errorf("failed to set signatures: %w", err) - } - - // Generated Protobuf-encoded bytes. - txBytes, err := sdkContext.TxConfig.TxEncoder()(txBuilder.GetTx()) - if err != nil { - return fmt.Errorf("failed to proto encode tx: %w", err) - } - - rpcResponse, err := rpcClient.BroadcastTxSync(context.Background(), txBytes) - if err != nil || (rpcResponse != nil && rpcResponse.Code != 0) { - // Log the error - logger.Error(fmt.Sprintf("error during broadcast: %s", getErrorString(err, rpcResponse))) - - if err != nil || rpcResponse == nil { - // Log retry information - logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Noble.BroadcastRetryInterval)) - time.Sleep(time.Duration(cfg.Networks.Destination.Noble.BroadcastRetryInterval) * time.Second) - // wait a random amount of time to lower probability of concurrent message nonce collision - time.Sleep(time.Duration(rand.Intn(5)) * time.Second) - continue - } - - // Log details for non-zero response code - logger.Error(fmt.Sprintf("received non-zero: %d - %s", rpcResponse.Code, rpcResponse.Log)) - - // Handle specific error code (32) - if rpcResponse.Code == 32 { - newAccountSequence := extractAccountSequence(logger, rpcResponse.Log, nobleAddress, cfg.Networks.Destination.Noble.API) - logger.Debug(fmt.Sprintf("retrying with new account sequence: %d", newAccountSequence)) - sequenceMap.Put(cfg.Networks.Destination.Noble.DomainId, newAccountSequence) - } - - // Log retry information - logger.Info(fmt.Sprintf("Retrying in %d seconds", cfg.Networks.Destination.Noble.BroadcastRetryInterval)) - time.Sleep(time.Duration(cfg.Networks.Destination.Noble.BroadcastRetryInterval) * time.Second) - // wait a random amount of time to lower probability of concurrent message nonce collision - time.Sleep(time.Duration(rand.Intn(5)) * time.Second) - continue - } - - // Tx was successfully broadcast - for _, msg := range msgs { - msg.DestTxHash = rpcResponse.Hash.String() - msg.Status = types.Complete - } - logger.Info(fmt.Sprintf("Successfully broadcast %s to Noble. Tx hash: %s", msgs[0].SourceTxHash, msgs[0].DestTxHash)) - - return nil - } - - for _, msg := range msgs { - if msg.Status != types.Complete { - msg.Status = types.Failed - } - } - - return errors.New("reached max number of broadcast attempts") -} - -// getErrorString returns the appropriate value to log when tx broadcast errors are encountered. -func getErrorString(err error, rpcResponse *ctypes.ResultBroadcastTx) string { - if rpcResponse != nil { - return rpcResponse.Log - } - return err.Error() -} - -// extractAccountSequence attempts to extract the account sequence number from the RPC response logs when -// account sequence mismatch errors are encountered. If the account sequence number cannot be extracted from the logs, -// it is retrieved by making a request to the API endpoint. -func extractAccountSequence(logger log.Logger, rpcResponseLog, nobleAddress, nobleAPI string) int64 { - pattern := `expected (\d+), got (\d+)` - re := regexp.MustCompile(pattern) - match := re.FindStringSubmatch(rpcResponseLog) - - if len(match) == 3 { - // Extract the numbers from the match. - newAccountSequence, _ := strconv.ParseInt(match[1], 10, 0) - return newAccountSequence - } - - // Otherwise, just request the account sequence - _, newAccountSequence, err := GetNobleAccountNumberSequence(nobleAPI, nobleAddress) - if err != nil { - logger.Error("unable to retrieve account number") - } - - return newAccountSequence -} - -// NewRPCClient initializes a new tendermint RPC client connected to the specified address. -func NewRPCClient(addr string, timeout time.Duration) (*rpchttp.HTTP, error) { - httpClient, err := libclient.DefaultHTTPClient(addr) - if err != nil { - return nil, err - } - httpClient.Timeout = timeout - rpcClient, err := rpchttp.NewWithClient(addr, "/websocket", httpClient) - if err != nil { - return nil, err - } - return rpcClient, nil -} - -func GetNobleAccountNumberSequence(urlBase string, address string) (int64, int64, error) { - rawResp, err := http.Get(fmt.Sprintf("%s/cosmos/auth/v1beta1/accounts/%s", urlBase, address)) - if err != nil { - return 0, 0, errors.New("unable to fetch account number, sequence") - } - body, _ := io.ReadAll(rawResp.Body) - var resp types.AccountResp - err = json.Unmarshal(body, &resp) - if err != nil { - return 0, 0, fmt.Errorf("unable to parse account number, sequence. Raw HHTP Get response: %s", string(body)) - } - accountNumber, _ := strconv.ParseInt(resp.AccountNumber, 10, 0) - accountSequence, _ := strconv.ParseInt(resp.Sequence, 10, 0) - - return accountNumber, accountSequence, nil -} diff --git a/cmd/noble/listener.go b/cmd/noble/listener.go deleted file mode 100644 index c978f074..00000000 --- a/cmd/noble/listener.go +++ /dev/null @@ -1,113 +0,0 @@ -package noble - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "strconv" - "sync" - "time" - - "cosmossdk.io/log" - "github.com/strangelove-ventures/noble-cctp-relayer/config" - "github.com/strangelove-ventures/noble-cctp-relayer/types" -) - -func StartListener(cfg config.Config, logger log.Logger, processingQueue chan *types.TxState) { - // set up client - - logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks", - cfg.Networks.Source.Noble.StartBlock, - cfg.Networks.Source.Noble.LookbackPeriod)) - - var wg sync.WaitGroup - wg.Add(1) - - // enqueue block heights - currentBlock := cfg.Networks.Source.Noble.StartBlock - lookback := cfg.Networks.Source.Noble.LookbackPeriod - chainTip := GetNobleChainTip(cfg) - blockQueue := make(chan uint64, 1000000) - - // history - currentBlock = currentBlock - lookback - for currentBlock <= chainTip { - blockQueue <- currentBlock - currentBlock++ - } - - // listen for new blocks - go func() { - for { - chainTip = GetNobleChainTip(cfg) - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i - } - currentBlock = chainTip + 1 - } - time.Sleep(6 * time.Second) - } - }() - - // constantly query for blocks - for i := 0; i < int(cfg.Networks.Source.Noble.Workers); i++ { - go func() { - for { - block := <-blockQueue - rawResponse, err := http.Get(fmt.Sprintf("%s/tx_search?query=\"tx.height=%d\"", cfg.Networks.Source.Noble.RPC, block)) - if err != nil { - logger.Debug(fmt.Sprintf("unable to query Noble block %d", block)) - continue - } - if rawResponse.StatusCode != http.StatusOK { - logger.Debug(fmt.Sprintf("non 200 response received for Noble block %d", block)) - time.Sleep(5 * time.Second) - blockQueue <- block - continue - } - - body, err := io.ReadAll(rawResponse.Body) - if err != nil { - logger.Debug(fmt.Sprintf("unable to parse Noble block %d", block)) - continue - } - - response := types.BlockResultsResponse{} - err = json.Unmarshal(body, &response) - if err != nil { - logger.Debug(fmt.Sprintf("unable to unmarshal Noble block %d", block)) - continue - } - - for _, tx := range response.Result.Txs { - parsedMsgs, err := types.NobleLogToMessageState(tx) - if err != nil { - logger.Error("unable to parse Noble log to message state", "err", err.Error()) - continue - } - for _, parsedMsg := range parsedMsgs { - logger.Info(fmt.Sprintf("New stream msg with nonce %d from %d with tx hash %s", parsedMsg.Nonce, parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) - } - processingQueue <- &types.TxState{TxHash: tx.Hash, Msgs: parsedMsgs} - } - } - }() - } - - wg.Wait() -} - -func GetNobleChainTip(cfg config.Config) uint64 { - rawResponse, _ := http.Get(cfg.Networks.Source.Noble.RPC + "/block") - body, _ := io.ReadAll(rawResponse.Body) - - response := types.BlockResponse{} - err := json.Unmarshal(body, &response) - if err != nil { - fmt.Println(err.Error()) - } - res, _ := strconv.ParseInt(response.Result.Block.Header.Height, 10, 0) - return uint64(res) -} diff --git a/cmd/process.go b/cmd/process.go index 9c6cbde3..7c4f91dc 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -1,32 +1,19 @@ package cmd import ( - "bytes" "context" - "encoding/hex" "fmt" "os" - "strings" - "sync" + "os/signal" + "syscall" "time" "cosmossdk.io/log" "github.com/spf13/cobra" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/circle" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/noble" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + "github.com/strangelove-ventures/noble-cctp-relayer/circle" "github.com/strangelove-ventures/noble-cctp-relayer/types" ) -type Processor struct { - Mu sync.RWMutex -} - -func NewProcessor() *Processor { - return &Processor{} -} - var startCmd = &cobra.Command{ Use: "start", Short: "Start relaying CCTP transactions from Ethereum to Noble", @@ -42,62 +29,53 @@ var State = types.NewStateMap() var sequenceMap = types.NewSequenceMap() func Start(cmd *cobra.Command, args []string) { + // messageState processing queue + var processingQueue = make(chan *types.TxState, 10000) - p := NewProcessor() + sigTerm := make(chan os.Signal, 1) - var wg sync.WaitGroup - wg.Add(1) + registeredDomains := make(map[types.Domain]types.Chain) - // initialize minter account sequences - for key := range Cfg.Networks.Minters { - switch key { - case 0: - ethNonce, err := ethereum.GetEthereumAccountNonce( - Cfg.Networks.Destination.Ethereum.RPC, - Cfg.Networks.Minters[0].MinterAddress) + for name, cfg := range Cfg.Chains { + c, err := cfg.Chain(name) + if err != nil { + Logger.Error("Error creating chain", "err: ", err) + os.Exit(1) + } - if err != nil { - Logger.Error("Error retrieving Ethereum account nonce") - os.Exit(1) - } - sequenceMap.Put(key, ethNonce) - case 4: - _, nextMinterSequence, err := noble.GetNobleAccountNumberSequence( - Cfg.Networks.Destination.Noble.API, - Cfg.Networks.Minters[4].MinterAddress) - - if err != nil { - Logger.Error("Error retrieving Noble account sequence") - os.Exit(1) - } - sequenceMap.Put(key, nextMinterSequence) + if err := c.InitializeBroadcaster(cmd.Context(), Logger, sequenceMap); err != nil { + Logger.Error("Error initializing broadcaster", "err: ", err) + os.Exit(1) } - // ...initialize more here - } + go c.StartListener(cmd.Context(), Logger, processingQueue, sigTerm) - // messageState processing queue - var processingQueue = make(chan *types.TxState, 10000) + if _, ok := registeredDomains[c.Domain()]; ok { + Logger.Error("Duplicate domain found", "domain", c.Domain()) + os.Exit(1) + } - // spin up Processor worker pool - for i := 0; i < int(Cfg.ProcessorWorkerCount); i++ { - go p.StartProcessor(cmd.Context(), Cfg, Logger, processingQueue, sequenceMap) + registeredDomains[c.Domain()] = c } - // listeners listen for events, parse them, and enqueue them to processingQueue - if Cfg.Networks.Source.Ethereum.Enabled { - ethereum.StartListener(Cfg, Logger, processingQueue) - } - if Cfg.Networks.Source.Noble.Enabled { - noble.StartListener(Cfg, Logger, processingQueue) + // spin up Processor worker pool + for i := 0; i < int(Cfg.ProcessorWorkerCount); i++ { + go StartProcessor(cmd.Context(), Cfg, Logger, registeredDomains, processingQueue, sequenceMap) } - // ...register more chain listeners here - wg.Wait() + signal.Notify(sigTerm, os.Interrupt, syscall.SIGTERM) + <-sigTerm } // StartProcessor is the main processing pipeline. -func (p *Processor) StartProcessor(ctx context.Context, cfg config.Config, logger log.Logger, processingQueue chan *types.TxState, sequenceMap *types.SequenceMap) { +func StartProcessor( + ctx context.Context, + cfg types.Config, + logger log.Logger, + registeredDomains map[types.Domain]types.Chain, + processingQueue chan *types.TxState, + sequenceMap *types.SequenceMap, +) { for { dequeuedTx := <-processingQueue @@ -117,13 +95,13 @@ func (p *Processor) StartProcessor(ctx context.Context, cfg config.Config, logge // if a filter's condition is met, mark as filtered if filterDisabledCCTPRoutes(cfg, logger, msg) || - filterInvalidDestinationCallers(cfg, logger, msg) { + filterInvalidDestinationCallers(registeredDomains, logger, msg) { msg.Status = types.Filtered } // if the message is burned or pending, check for an attestation if msg.Status == types.Created || msg.Status == types.Pending { - response := circle.CheckAttestation(cfg, logger, msg.IrisLookupId, msg.SourceTxHash, msg.SourceDomain, msg.DestDomain) + response := circle.CheckAttestation(cfg.Circle.AttestationBaseUrl, logger, msg.IrisLookupId, msg.SourceTxHash, msg.SourceDomain, msg.DestDomain) if response != nil { if msg.Status == types.Created && response.Status == "pending_confirmations" { logger.Debug("Attestation is created but still pending confirmations for 0x" + msg.IrisLookupId + ". Retrying...") @@ -156,21 +134,17 @@ func (p *Processor) StartProcessor(ctx context.Context, cfg config.Config, logge } // if the message is attested to, try to broadcast for domain, msgs := range broadcastMsgs { - var err error - switch domain { - case 0: // ethereum - err = ethereum.Broadcast(ctx, cfg, logger, msgs, sequenceMap) - case 4: // noble - err = noble.Broadcast(ctx, cfg, logger, msgs, sequenceMap) + chain, ok := registeredDomains[domain] + if !ok { + logger.Error("No chain registered for domain", "domain", domain) + continue } - if err != nil { - // TODO: add dest domain to error log - logger.Error("unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs)) + if err := chain.Broadcast(ctx, logger, msgs, sequenceMap); err != nil { + logger.Error("unable to mint one or more transfers", "error(s)", err, "total_transfers", len(msgs), "name", chain.Name(), "domain", domain) requeue = true continue } - // ...add minters for different domains here for _, msg := range msgs { msg.Status = types.Complete @@ -185,8 +159,8 @@ func (p *Processor) StartProcessor(ctx context.Context, cfg config.Config, logge } // filterDisabledCCTPRoutes returns true if we haven't enabled relaying from a source domain to a destination domain -func filterDisabledCCTPRoutes(cfg config.Config, logger log.Logger, msg *types.MessageState) bool { - val, ok := cfg.Networks.EnabledRoutes[msg.SourceDomain] +func filterDisabledCCTPRoutes(cfg types.Config, logger log.Logger, msg *types.MessageState) bool { + val, ok := cfg.EnabledRoutes[msg.SourceDomain] result := !(ok && val == msg.DestDomain) if result { logger.Info(fmt.Sprintf("Filtered tx %s because relaying from %d to %d is not enabled", @@ -196,40 +170,14 @@ func filterDisabledCCTPRoutes(cfg config.Config, logger log.Logger, msg *types.M } // filterInvalidDestinationCallers returns true if the minter is not the destination caller for the specified domain -func filterInvalidDestinationCallers(cfg config.Config, logger log.Logger, msg *types.MessageState) bool { - zeroByteArr := make([]byte, 32) - result := false - - switch msg.DestDomain { - case 4: - bech32DestinationCaller, err := types.DecodeDestinationCaller(msg.DestinationCaller) - if err != nil { - result = true - } - if !bytes.Equal(msg.DestinationCaller, zeroByteArr) && - bech32DestinationCaller != cfg.Networks.Minters[msg.DestDomain].MinterAddress { - result = true - } - if result { - logger.Info(fmt.Sprintf("Filtered tx %s because the destination caller %s is specified and it's not the minter %s", - msg.SourceTxHash, msg.DestinationCaller, cfg.Networks.Minters[msg.DestDomain].MinterAddress)) - } - - default: // minting to evm - decodedMinter, err := hex.DecodeString(strings.ReplaceAll(cfg.Networks.Minters[0].MinterAddress, "0x", "")) - if err != nil { - return !bytes.Equal(msg.DestinationCaller, zeroByteArr) - } - - decodedMinterPadded := make([]byte, 32) - copy(decodedMinterPadded[12:], decodedMinter) - - if !bytes.Equal(msg.DestinationCaller, zeroByteArr) && !bytes.Equal(msg.DestinationCaller, decodedMinterPadded) { - result = true - } +func filterInvalidDestinationCallers(registeredDomains map[types.Domain]types.Chain, logger log.Logger, msg *types.MessageState) bool { + chain, ok := registeredDomains[msg.DestDomain] + if !ok { + logger.Error("No chain registered for domain", "domain", msg.DestDomain) + return true } - return result + return !chain.IsDestinationCaller(msg.DestinationCaller) } func LookupKey(sourceTxHash string) string { diff --git a/cmd/process_test.go b/cmd/process_test.go index 43fb3cbf..3f561989 100644 --- a/cmd/process_test.go +++ b/cmd/process_test.go @@ -9,42 +9,50 @@ import ( "cosmossdk.io/log" "github.com/rs/zerolog" "github.com/strangelove-ventures/noble-cctp-relayer/cmd" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/noble" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + "github.com/strangelove-ventures/noble-cctp-relayer/noble" "github.com/strangelove-ventures/noble-cctp-relayer/types" "github.com/stretchr/testify/require" ) -var cfg config.Config +var cfg types.Config var logger log.Logger var processingQueue chan *types.TxState var sequenceMap *types.SequenceMap -func setupTest() { - cfg = config.Parse("../.ignore/unit_tests.yaml") +func setupTest(t *testing.T) map[types.Domain]types.Chain { + var err error + cfg, err = types.Parse("../.ignore/unit_tests.yaml") + require.NoError(t, err, "Error parsing config") + logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.DebugLevel)) processingQueue = make(chan *types.TxState, 10000) - _, nextMinterSequence, err := noble.GetNobleAccountNumberSequence( - cfg.Networks.Destination.Noble.API, - cfg.Networks.Minters[4].MinterAddress) + n, err := cfg.Chains["noble"].(*noble.ChainConfig).Chain("noble") + require.NoError(t, err, "Error creating noble chain") + + _, nextMinterSequence, err := n.(*noble.Noble).AccountInfo(context.TODO()) + require.NoError(t, err, "Error retrieving account sequence") - if err != nil { - logger.Error("Error retrieving account sequence", "err: ", err) - os.Exit(1) - } sequenceMap = types.NewSequenceMap() sequenceMap.Put(types.Domain(4), nextMinterSequence) + registeredDomains := make(map[types.Domain]types.Chain) + for name, cfgg := range cfg.Chains { + c, err := cfgg.Chain(name) + require.NoError(t, err, "Error creating chain") + + registeredDomains[c.Domain()] = c + } + + return registeredDomains + } // new log -> create state entry func TestProcessNewLog(t *testing.T) { - setupTest() - - p := cmd.Processor{} + registeredDomains := setupTest(t) - go p.StartProcessor(context.TODO(), cfg, logger, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), cfg, logger, registeredDomains, processingQueue, sequenceMap) emptyBz := make([]byte, 32) expectedState := &types.TxState{ @@ -65,20 +73,15 @@ func TestProcessNewLog(t *testing.T) { actualState, _ := cmd.State.Load(expectedState.TxHash) - p.Mu.RLock() require.Equal(t, types.Created, actualState.Msgs[0].Status) - p.Mu.RUnlock() - } // created message -> check attestation -> mark as attested -> mark as complete -> remove from state func TestProcessCreatedLog(t *testing.T) { - setupTest() - cfg.Networks.EnabledRoutes[0] = 5 // skip mint - - p := cmd.NewProcessor() + registeredDomains := setupTest(t) + cfg.EnabledRoutes[0] = 5 // skip mint - go p.StartProcessor(context.TODO(), cfg, logger, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), cfg, logger, registeredDomains, processingQueue, sequenceMap) emptyBz := make([]byte, 32) @@ -102,20 +105,16 @@ func TestProcessCreatedLog(t *testing.T) { actualState, ok := cmd.State.Load(expectedState.TxHash) require.True(t, ok) - p.Mu.RLock() require.Equal(t, types.Complete, actualState.Msgs[0].Status) - p.Mu.RUnlock() } // created message -> disabled cctp route -> filtered func TestProcessDisabledCctpRoute(t *testing.T) { - setupTest() + registeredDomains := setupTest(t) - delete(cfg.Networks.EnabledRoutes, 0) + delete(cfg.EnabledRoutes, 0) - p := cmd.NewProcessor() - - go p.StartProcessor(context.TODO(), cfg, logger, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), cfg, logger, registeredDomains, processingQueue, sequenceMap) emptyBz := make([]byte, 32) expectedState := &types.TxState{ @@ -138,18 +137,14 @@ func TestProcessDisabledCctpRoute(t *testing.T) { actualState, ok := cmd.State.Load(expectedState.TxHash) require.True(t, ok) - p.Mu.RLock() require.Equal(t, types.Filtered, actualState.Msgs[0].Status) - p.Mu.RUnlock() } // created message -> different destination caller -> filtered func TestProcessInvalidDestinationCaller(t *testing.T) { - setupTest() - - p := cmd.NewProcessor() + registeredDomains := setupTest(t) - go p.StartProcessor(context.TODO(), cfg, logger, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), cfg, logger, registeredDomains, processingQueue, sequenceMap) nonEmptyBytes := make([]byte, 31) nonEmptyBytes = append(nonEmptyBytes, 0x1) @@ -174,18 +169,14 @@ func TestProcessInvalidDestinationCaller(t *testing.T) { actualState, ok := cmd.State.Load(expectedState.TxHash) require.True(t, ok) - p.Mu.RLock() require.Equal(t, types.Filtered, actualState.Msgs[0].Status) - p.Mu.RUnlock() } // created message -> not \ -> filtered func TestProcessNonBurnMessageWhenDisabled(t *testing.T) { - setupTest() + registeredDomains := setupTest(t) - p := cmd.NewProcessor() - - go p.StartProcessor(context.TODO(), cfg, logger, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), cfg, logger, registeredDomains, processingQueue, sequenceMap) emptyBz := make([]byte, 32) expectedState := &types.TxState{ @@ -208,20 +199,15 @@ func TestProcessNonBurnMessageWhenDisabled(t *testing.T) { actualState, ok := cmd.State.Load(expectedState.TxHash) require.True(t, ok) - p.Mu.RLock() require.Equal(t, types.Filtered, actualState.Msgs[0].Status) - p.Mu.RUnlock() - } // test batch transactions where multiple messages can be sent with the same tx hash // MsgSentBytes defer between messages func TestBatchTx(t *testing.T) { - setupTest() - - p := cmd.NewProcessor() + registeredDomains := setupTest(t) - go p.StartProcessor(context.TODO(), cfg, logger, processingQueue, sequenceMap) + go cmd.StartProcessor(context.TODO(), cfg, logger, registeredDomains, processingQueue, sequenceMap) emptyBz := make([]byte, 32) expectedState := &types.TxState{ @@ -251,7 +237,5 @@ func TestBatchTx(t *testing.T) { actualState, ok := cmd.State.Load(expectedState.TxHash) require.True(t, ok) - p.Mu.RLock() require.Equal(t, 2, len(actualState.Msgs)) - p.Mu.RUnlock() } diff --git a/cmd/root.go b/cmd/root.go index 4d7c1732..95385a49 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,30 +1,20 @@ package cmd import ( - "context" - "encoding/hex" - "encoding/json" - "fmt" - "io" "net/http" "os" "strconv" - "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" - "github.com/cosmos/cosmos-sdk/types/bech32" - "github.com/ethereum/go-ethereum/ethclient" "github.com/gin-gonic/gin" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum" "github.com/strangelove-ventures/noble-cctp-relayer/types" "cosmossdk.io/log" "github.com/rs/zerolog" "github.com/spf13/cobra" - "github.com/strangelove-ventures/noble-cctp-relayer/config" ) var ( - Cfg config.Config + Cfg types.Config cfgFile string verbose bool @@ -56,57 +46,13 @@ func init() { Logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.InfoLevel)) } - Cfg = config.Parse(cfgFile) - Logger.Info("successfully parsed config file", "location", cfgFile) - - Logger.Info(Cfg.Networks.Source.Ethereum.RPC) - // Set minter addresses from priv keys - for i, minter := range Cfg.Networks.Minters { - switch i { - case 0: - _, address, err := ethereum.GetEcdsaKeyAddress(minter.MinterPrivateKey) - if err != nil { - Logger.Error(fmt.Sprintf("Unable to parse ecdsa key from source %d", i)) - os.Exit(1) - } - minter.MinterAddress = address - Cfg.Networks.Minters[0] = minter - case 4: - keyBz, err := hex.DecodeString(minter.MinterPrivateKey) - if err != nil { - Logger.Error(fmt.Sprintf("Unable to parse key from source %d", i)) - os.Exit(1) - } - privKey := secp256k1.PrivKey{Key: keyBz} - address, err := bech32.ConvertAndEncode("noble", privKey.PubKey().Address()) - if err != nil { - Logger.Error(fmt.Sprintf("Unable to parse ecdsa key from source %d", i)) - os.Exit(1) - } - minter.MinterAddress = address - Cfg.Networks.Minters[4] = minter - } - } - - // Set default listener blocks - - // if Ethereum start block not set, default to latest - if Cfg.Networks.Source.Ethereum.Enabled && Cfg.Networks.Source.Ethereum.StartBlock == 0 { - client, _ := ethclient.Dial(Cfg.Networks.Source.Ethereum.RPC) - defer client.Close() - header, _ := client.HeaderByNumber(context.Background(), nil) - Cfg.Networks.Source.Ethereum.StartBlock = header.Number.Uint64() - } - - // if Noble start block not set, default to latest - if Cfg.Networks.Source.Noble.Enabled && Cfg.Networks.Source.Noble.StartBlock == 0 { - rawResponse, _ := http.Get(Cfg.Networks.Source.Noble.RPC + "/block") - body, _ := io.ReadAll(rawResponse.Body) - response := types.BlockResponse{} - _ = json.Unmarshal(body, &response) - height, _ := strconv.ParseInt(response.Result.Block.Header.Height, 10, 0) - Cfg.Networks.Source.Noble.StartBlock = uint64(height) + var err error + Cfg, err = types.Parse(cfgFile) + if err != nil { + Logger.Error("unable to parse config file", "location", cfgFile, "err", err) + os.Exit(1) } + Logger.Info("successfully parsed config file", "location", cfgFile) // start api server go startApi() diff --git a/config/config.go b/config/config.go deleted file mode 100644 index 2e850004..00000000 --- a/config/config.go +++ /dev/null @@ -1,73 +0,0 @@ -package config - -import ( - "os" - - "github.com/strangelove-ventures/noble-cctp-relayer/types" - "gopkg.in/yaml.v3" -) - -type Config struct { - Networks struct { - Source struct { - Ethereum struct { - DomainId types.Domain `yaml:"domain-id"` - RPC string `yaml:"rpc"` - MessageTransmitter string `yaml:"message-transmitter"` - RequestQueueSize uint32 `yaml:"request-queue-size"` - StartBlock uint64 `yaml:"start-block"` - LookbackPeriod uint64 `yaml:"lookback-period"` - Enabled bool `yaml:"enabled"` - } `yaml:"ethereum"` - Noble struct { - DomainId types.Domain `yaml:"domain-id"` - RPC string `yaml:"rpc"` - RequestQueueSize uint32 `yaml:"request-queue-size"` - StartBlock uint64 `yaml:"start-block"` - LookbackPeriod uint64 `yaml:"lookback-period"` - Workers uint32 `yaml:"workers"` - Enabled bool `yaml:"enabled"` - } `yaml:"noble"` - } `yaml:"source"` - Destination struct { - Ethereum struct { - DomainId types.Domain `yaml:"domain-id"` - ChainId int64 `yaml:"chain-id"` - RPC string `yaml:"rpc"` - BroadcastRetries int `yaml:"broadcast-retries"` - BroadcastRetryInterval int `yaml:"broadcast-retry-interval"` - } `yaml:"ethereum"` - Noble struct { - DomainId types.Domain `yaml:"domain-id"` - RPC string `yaml:"rpc"` - API string `yaml:"api"` - ChainId string `yaml:"chain-id"` - GasLimit uint64 `yaml:"gas-limit"` - BroadcastRetries int `yaml:"broadcast-retries"` - BroadcastRetryInterval int `yaml:"broadcast-retry-interval"` - FilterForwardsByIbcChannel bool `yaml:"filter-forwards-by-ibc-channel"` - ForwardingChannelWhitelist []string `yaml:"forwarding-channel-whitelist"` - } `yaml:"noble"` - } `yaml:"destination"` - EnabledRoutes map[types.Domain]types.Domain `yaml:"enabled-routes"` - Minters map[types.Domain]struct { - MinterAddress string `yaml:"minter-address"` - MinterPrivateKey string `yaml:"minter-private-key"` - } `yaml:"minters"` - } `yaml:"networks"` - Circle struct { - AttestationBaseUrl string `yaml:"attestation-base-url"` - FetchRetries int `yaml:"fetch-retries"` - FetchRetryInterval int `yaml:"fetch-retry-interval"` - } `yaml:"circle"` - ProcessorWorkerCount uint32 `yaml:"processor-worker-count"` - Api struct { - TrustedProxies []string `yaml:"trusted-proxies"` - } `yaml:"api"` -} - -func Parse(file string) (cfg Config) { - data, _ := os.ReadFile(file) - _ = yaml.Unmarshal(data, &cfg) - return -} diff --git a/cmd/noble/cosmos/codec.go b/cosmos/codec.go similarity index 100% rename from cmd/noble/cosmos/codec.go rename to cosmos/codec.go diff --git a/cmd/noble/cosmos/cosmosprovider.go b/cosmos/cosmosprovider.go similarity index 100% rename from cmd/noble/cosmos/cosmosprovider.go rename to cosmos/cosmosprovider.go diff --git a/cmd/noble/cosmos/grpc_shim.go b/cosmos/grpc_shim.go similarity index 100% rename from cmd/noble/cosmos/grpc_shim.go rename to cosmos/grpc_shim.go diff --git a/cmd/noble/cosmos/query.go b/cosmos/query.go similarity index 100% rename from cmd/noble/cosmos/query.go rename to cosmos/query.go diff --git a/cmd/noble/cosmos/query_test.go b/cosmos/query_test.go similarity index 86% rename from cmd/noble/cosmos/query_test.go rename to cosmos/query_test.go index d0156c5b..0ecd20f0 100644 --- a/cmd/noble/cosmos/query_test.go +++ b/cosmos/query_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/noble/cosmos" + "github.com/strangelove-ventures/noble-cctp-relayer/cosmos" "github.com/stretchr/testify/require" ) diff --git a/cmd/ethereum/abi/ERC20.json b/ethereum/abi/ERC20.json similarity index 100% rename from cmd/ethereum/abi/ERC20.json rename to ethereum/abi/ERC20.json diff --git a/cmd/ethereum/abi/MessageTransmitter.json b/ethereum/abi/MessageTransmitter.json similarity index 100% rename from cmd/ethereum/abi/MessageTransmitter.json rename to ethereum/abi/MessageTransmitter.json diff --git a/cmd/ethereum/abi/TokenMessenger.json b/ethereum/abi/TokenMessenger.json similarity index 100% rename from cmd/ethereum/abi/TokenMessenger.json rename to ethereum/abi/TokenMessenger.json diff --git a/cmd/ethereum/abi/TokenMessengerWithMetadata.json b/ethereum/abi/TokenMessengerWithMetadata.json similarity index 100% rename from cmd/ethereum/abi/TokenMessengerWithMetadata.json rename to ethereum/abi/TokenMessengerWithMetadata.json diff --git a/cmd/ethereum/broadcast_test.go b/ethereum/broadcast_test.go similarity index 82% rename from cmd/ethereum/broadcast_test.go rename to ethereum/broadcast_test.go index 76ad9f6a..40e1fd7d 100644 --- a/cmd/ethereum/broadcast_test.go +++ b/ethereum/broadcast_test.go @@ -9,7 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum" + "github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts" "github.com/stretchr/testify/require" ) @@ -28,7 +28,7 @@ func TestEthUsedNonce(t *testing.T) { require.NoError(t, err) defer client.Close() - messageTransmitter, err := ethereum.NewMessageTransmitter(common.HexToAddress("0x0a992d191deec32afe36203ad87d7d289a738f81"), client) + messageTransmitter, err := contracts.NewMessageTransmitter(common.HexToAddress("0x0a992d191deec32afe36203ad87d7d289a738f81"), client) require.NoError(t, err) co := &bind.CallOpts{ diff --git a/ethereum/chain.go b/ethereum/chain.go new file mode 100644 index 00000000..8722217f --- /dev/null +++ b/ethereum/chain.go @@ -0,0 +1,380 @@ +package ethereum + +import ( + "bytes" + "context" + "crypto/ecdsa" + "embed" + "encoding/hex" + "errors" + "fmt" + "math/big" + "os" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "cosmossdk.io/log" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/pascaldekloe/etherstream" + "github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +//go:embed abi/MessageTransmitter.json +var content embed.FS + +var _ types.Chain = (*Ethereum)(nil) + +type Ethereum struct { + name string + chainID int64 + domain types.Domain + rpcURL string + wsURL string + messageTransmitterAddress string + startBlock uint64 + lookbackPeriod uint64 + privateKey *ecdsa.PrivateKey + minterAddress string + maxRetries int + retryIntervalSeconds int + + mu sync.Mutex +} + +func NewChain( + name string, + chainID int64, + domain types.Domain, + rpcURL string, + wsURL string, + messageTransmitterAddress string, + startBlock uint64, + lookbackPeriod uint64, + privateKey string, + maxRetries int, + retryIntervalSeconds int, +) (*Ethereum, error) { + privEcdsaKey, ethereumAddress, err := GetEcdsaKeyAddress(privateKey) + if err != nil { + return nil, err + } + return &Ethereum{ + name: name, + chainID: chainID, + domain: domain, + rpcURL: rpcURL, + wsURL: wsURL, + messageTransmitterAddress: messageTransmitterAddress, + startBlock: startBlock, + lookbackPeriod: lookbackPeriod, + privateKey: privEcdsaKey, + minterAddress: ethereumAddress, + maxRetries: maxRetries, + retryIntervalSeconds: retryIntervalSeconds, + }, nil +} + +func (e *Ethereum) Name() string { + return e.name +} + +func (e *Ethereum) Domain() types.Domain { + return e.domain +} + +func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool { + zeroByteArr := make([]byte, 32) + + decodedMinter, err := hex.DecodeString(strings.ReplaceAll(e.minterAddress, "0x", "")) + if err != nil && bytes.Equal(destinationCaller, zeroByteArr) { + return true + } + + decodedMinterPadded := make([]byte, 32) + copy(decodedMinterPadded[12:], decodedMinter) + + return bytes.Equal(destinationCaller, zeroByteArr) || bytes.Equal(destinationCaller, decodedMinterPadded) +} + +func (e *Ethereum) InitializeBroadcaster( + ctx context.Context, + logger log.Logger, + sequenceMap *types.SequenceMap, +) error { + nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress) + if err != nil { + return fmt.Errorf("unable to retrieve evm account nonce: %w", err) + } + sequenceMap.Put(e.Domain(), uint64(nextNonce)) + + return nil +} + +func (e *Ethereum) StartListener( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, + quit chan os.Signal, +) { + logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) + + // set up client + messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json") + if err != nil { + logger.Error("unable to read MessageTransmitter abi", "err", err) + os.Exit(1) + } + messageTransmitterABI, err := abi.JSON(bytes.NewReader(messageTransmitter)) + if err != nil { + logger.Error("unable to parse MessageTransmitter abi", "err", err) + } + + messageSent := messageTransmitterABI.Events["MessageSent"] + + ethClient, err := ethclient.DialContext(context.Background(), e.wsURL) + if err != nil { + logger.Error("unable to initialize ethereum client", "err", err) + os.Exit(1) + } + + defer ethClient.Close() + + messageTransmitterAddress := common.HexToAddress(e.messageTransmitterAddress) + etherReader := etherstream.Reader{Backend: ethClient} + + if e.startBlock == 0 { + header, err := ethClient.HeaderByNumber(context.Background(), nil) + if err != nil { + logger.Error("unable to retrieve latest eth block header", "err", err) + os.Exit(1) + } + + e.startBlock = header.Number.Uint64() + } + + query := ethereum.FilterQuery{ + Addresses: []common.Address{messageTransmitterAddress}, + Topics: [][]common.Hash{{messageSent.ID}}, + FromBlock: big.NewInt(int64(e.startBlock - e.lookbackPeriod)), + } + + logger.Info(fmt.Sprintf( + "Starting Ethereum listener at block %d looking back %d blocks", + e.startBlock, + e.lookbackPeriod)) + + // websockets do not query history + // https://github.com/ethereum/go-ethereum/issues/15063 + stream, sub, history, err := etherReader.QueryWithHistory(context.Background(), &query) + if err != nil { + logger.Error("unable to subscribe to logs", "err", err) + os.Exit(1) + } + + // process history + for _, historicalLog := range history { + parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog) + if err != nil { + logger.Error("Unable to parse history log into MessageState, skipping", "err", err) + continue + } + logger.Info(fmt.Sprintf("New historical msg from source domain %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + + processingQueue <- &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + + // It might help to wait a small amount of time between sending messages into the processing queue + // so that account sequences / nonces are set correctly + // time.Sleep(10 * time.Millisecond) + } + + // consume stream + go func() { + var txState *types.TxState + for { + select { + case <-quit: + return + case err := <-sub.Err(): + logger.Error("connection closed", "err", err) + os.Exit(1) + case streamLog := <-stream: + parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog) + if err != nil { + logger.Error("Unable to parse ws log into MessageState, skipping") + continue + } + logger.Info(fmt.Sprintf("New stream msg from %d with tx hash %s", parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + if txState == nil { + txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } else if parsedMsg.SourceTxHash != txState.TxHash { + processingQueue <- txState + txState = &types.TxState{TxHash: parsedMsg.SourceTxHash, Msgs: []*types.MessageState{parsedMsg}} + } else { + txState.Msgs = append(txState.Msgs, parsedMsg) + + } + default: + if txState != nil { + processingQueue <- txState + txState = nil + } + } + } + }() +} + +func (e *Ethereum) Broadcast( + ctx context.Context, + logger log.Logger, + msgs []*types.MessageState, + sequenceMap *types.SequenceMap, +) error { + + // set up eth client + client, err := ethclient.Dial(e.rpcURL) + if err != nil { + return fmt.Errorf("unable to dial ethereum client: %w", err) + } + defer client.Close() + + backend := NewContractBackendWrapper(client) + + auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID)) + if err != nil { + return fmt.Errorf("unable to create auth: %w", err) + } + + messageTransmitter, err := contracts.NewMessageTransmitter(common.HexToAddress(e.messageTransmitterAddress), backend) + if err != nil { + return fmt.Errorf("unable to create message transmitter: %w", err) + } + + var broadcastErrors error +MsgLoop: + for _, msg := range msgs { + + if msg.Status == types.Complete { + continue + } + + attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) + if err != nil { + return errors.New("unable to decode message attestation") + } + + for attempt := 0; attempt <= e.maxRetries; attempt++ { + logger.Info(fmt.Sprintf( + "Broadcasting message from %d to %d: with source tx hash %s", + msg.SourceDomain, + msg.DestDomain, + msg.SourceTxHash)) + + nonce := sequenceMap.Next(e.domain) + auth.Nonce = big.NewInt(int64(nonce)) + + e.mu.Lock() + + // TODO remove + nextNonce, err := GetEthereumAccountNonce(e.rpcURL, e.minterAddress) + if err != nil { + logger.Error("unable to retrieve account number") + } else { + auth.Nonce = big.NewInt(nextNonce) + } + // TODO end remove + + // check if nonce already used + co := &bind.CallOpts{ + Pending: true, + Context: ctx, + } + + logger.Debug("Checking if nonce was used for broadcast to Ethereum", "source_domain", msg.SourceDomain, "nonce", msg.Nonce) + + key := append( + common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4), + common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)..., + ) + + response, nonceErr := messageTransmitter.UsedNonces(co, [32]byte(crypto.Keccak256(key))) + if nonceErr != nil { + logger.Debug("Error querying whether nonce was used. Continuing...") + } else { + fmt.Printf("received used nonce response: %d\n", response) + if response.Uint64() == uint64(1) { + // nonce has already been used, mark as complete + logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", + msg.SourceDomain, msg.Nonce)) + msg.Status = types.Complete + e.mu.Unlock() + continue MsgLoop + } + } + + // broadcast txn + tx, err := messageTransmitter.ReceiveMessage( + auth, + msg.MsgSentBytes, + attestationBytes, + ) + if err == nil { + msg.Status = types.Complete + + fullLog, err := tx.MarshalJSON() + if err != nil { + logger.Error("error marshalling eth tx log", err) + } + + msg.DestTxHash = tx.Hash().Hex() + + logger.Info(fmt.Sprintf("Successfully broadcast %s to Ethereum. Tx hash: %s, FULL LOG: %s", msg.SourceTxHash, msg.DestTxHash, string(fullLog))) + e.mu.Unlock() + continue MsgLoop + } + + logger.Error(fmt.Sprintf("error during broadcast: %s", err.Error())) + if parsedErr, ok := err.(JsonError); ok { + if parsedErr.ErrorCode() == 3 && parsedErr.Error() == "execution reverted: Nonce already used" { + msg.Status = types.Complete + logger.Error(fmt.Sprintf("This account nonce has already been used: %d", nonce)) + e.mu.Unlock() + continue MsgLoop + } + + match, _ := regexp.MatchString("nonce too low: next nonce [0-9]+, tx nonce [0-9]+", parsedErr.Error()) + if match { + numberRegex := regexp.MustCompile("[0-9]+") + nextNonce, err := strconv.ParseInt(numberRegex.FindAllString(parsedErr.Error(), 1)[0], 10, 0) + if err != nil { + nextNonce, err = GetEthereumAccountNonce(e.rpcURL, e.minterAddress) + if err != nil { + logger.Error("unable to retrieve account number") + } + } + sequenceMap.Put(e.domain, uint64(nextNonce)) + } + } + e.mu.Unlock() + + // if it's not the last attempt, retry + // TODO increase the destination.ethereum.broadcast retries (3-5) and retry interval (15s). By checking for used nonces, there is no gas cost for failed mints. + if attempt != e.maxRetries { + logger.Info(fmt.Sprintf("Retrying in %d seconds", e.retryIntervalSeconds)) + time.Sleep(time.Duration(e.retryIntervalSeconds) * time.Second) + } + } + // retried max times with failure + msg.Status = types.Failed + broadcastErrors = errors.Join(broadcastErrors, errors.New("reached max number of broadcast attempts")) + } + return broadcastErrors +} diff --git a/ethereum/config.go b/ethereum/config.go new file mode 100644 index 00000000..2be09d18 --- /dev/null +++ b/ethereum/config.go @@ -0,0 +1,38 @@ +package ethereum + +import "github.com/strangelove-ventures/noble-cctp-relayer/types" + +var _ types.ChainConfig = (*ChainConfig)(nil) + +type ChainConfig struct { + DomainID types.Domain `yaml:"domain-id"` + ChainID int64 `yaml:"chain-id"` + RPC string `yaml:"rpc"` + WS string `yaml:"ws"` + MessageTransmitter string `yaml:"message-transmitter"` + + StartBlock uint64 `yaml:"start-block"` + LookbackPeriod uint64 `yaml:"lookback-period"` + + BroadcastRetries int `yaml:"broadcast-retries"` + BroadcastRetryInterval int `yaml:"broadcast-retry-interval"` + + // TODO move to keyring + MinterPrivateKey string `yaml:"minter-private-key"` +} + +func (c *ChainConfig) Chain(name string) (types.Chain, error) { + return NewChain( + name, + c.ChainID, + c.DomainID, + c.RPC, + c.WS, + c.MessageTransmitter, + c.StartBlock, + c.LookbackPeriod, + c.MinterPrivateKey, + c.BroadcastRetries, + c.BroadcastRetryInterval, + ) +} diff --git a/cmd/ethereum/contract_backend_wrapper.go b/ethereum/contract_backend_wrapper.go similarity index 100% rename from cmd/ethereum/contract_backend_wrapper.go rename to ethereum/contract_backend_wrapper.go diff --git a/cmd/ethereum/MessageTransmitter.go b/ethereum/contracts/MessageTransmitter.go similarity index 99% rename from cmd/ethereum/MessageTransmitter.go rename to ethereum/contracts/MessageTransmitter.go index 6f448db2..7a491324 100644 --- a/cmd/ethereum/MessageTransmitter.go +++ b/ethereum/contracts/MessageTransmitter.go @@ -1,7 +1,7 @@ // Code generated - DO NOT EDIT. // This file is a generated binding and any manual changes will be lost. -package ethereum +package contracts import ( "errors" diff --git a/cmd/TokenMessenger.go b/ethereum/contracts/TokenMessenger.go similarity index 99% rename from cmd/TokenMessenger.go rename to ethereum/contracts/TokenMessenger.go index a168c20f..92aede51 100644 --- a/cmd/TokenMessenger.go +++ b/ethereum/contracts/TokenMessenger.go @@ -1,7 +1,7 @@ // Code generated - DO NOT EDIT. // This file is a generated binding and any manual changes will be lost. -package cmd +package contracts import ( "errors" diff --git a/cmd/TokenMessengerWithMetadata.go b/ethereum/contracts/TokenMessengerWithMetadata.go similarity index 99% rename from cmd/TokenMessengerWithMetadata.go rename to ethereum/contracts/TokenMessengerWithMetadata.go index 86bfa733..aaa66ab4 100644 --- a/cmd/TokenMessengerWithMetadata.go +++ b/ethereum/contracts/TokenMessengerWithMetadata.go @@ -1,7 +1,7 @@ // Code generated - DO NOT EDIT. // This file is a generated binding and any manual changes will be lost. -package cmd +package contracts import ( "errors" diff --git a/cmd/ethereum/listener_test.go b/ethereum/listener_test.go similarity index 73% rename from cmd/ethereum/listener_test.go rename to ethereum/listener_test.go index 97577d8c..1418e06d 100644 --- a/cmd/ethereum/listener_test.go +++ b/ethereum/listener_test.go @@ -1,24 +1,28 @@ package ethereum_test import ( + "context" "os" "testing" "time" "cosmossdk.io/log" "github.com/rs/zerolog" - eth "github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + "github.com/strangelove-ventures/noble-cctp-relayer/ethereum" "github.com/strangelove-ventures/noble-cctp-relayer/types" "github.com/stretchr/testify/require" ) -var cfg config.Config +var cfg types.Config var logger log.Logger var processingQueue chan *types.TxState func init() { - cfg = config.Parse("../../.ignore/unit_tests.yaml") + var err error + cfg, err = types.Parse("../../.ignore/unit_tests.yaml") + if err != nil { + panic(err) + } logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.ErrorLevel)) processingQueue = make(chan *types.TxState, 10000) @@ -26,10 +30,14 @@ func init() { // tests for a historical log func TestStartListener(t *testing.T) { + ethCfg := ethereum.ChainConfig{ + StartBlock: 9702735, + LookbackPeriod: 0, + } + eth, err := ethCfg.Chain("ethereum") + require.NoError(t, err) - cfg.Networks.Source.Ethereum.StartBlock = 9702735 - cfg.Networks.Source.Ethereum.LookbackPeriod = 0 - go eth.StartListener(cfg, logger, processingQueue) + go eth.StartListener(context.TODO(), logger, processingQueue, nil) time.Sleep(5 * time.Second) diff --git a/cmd/ethereum/util.go b/ethereum/util.go similarity index 100% rename from cmd/ethereum/util.go rename to ethereum/util.go diff --git a/cmd/ethereum/util_test.go b/ethereum/util_test.go similarity index 58% rename from cmd/ethereum/util_test.go rename to ethereum/util_test.go index 0856a5aa..4406b821 100644 --- a/cmd/ethereum/util_test.go +++ b/ethereum/util_test.go @@ -6,27 +6,30 @@ import ( "cosmossdk.io/log" "github.com/rs/zerolog" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/ethereum" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + "github.com/strangelove-ventures/noble-cctp-relayer/ethereum" "github.com/strangelove-ventures/noble-cctp-relayer/types" "github.com/stretchr/testify/require" ) func init() { - cfg = config.Parse("../../.ignore/unit_tests.yaml") + var err error + cfg, err = types.Parse("../../.ignore/unit_tests.yaml") + if err != nil { + panic(err) + } logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.ErrorLevel)) processingQueue = make(chan *types.TxState, 10000) } func TestGetEthereumAccountNonce(t *testing.T) { - _, err := ethereum.GetEthereumAccountNonce(cfg.Networks.Destination.Ethereum.RPC, "0x4996f29b254c77972fff8f25e6f7797b3c9a0eb6") + _, err := ethereum.GetEthereumAccountNonce(cfg.Chains["ethereum"].(*ethereum.ChainConfig).RPC, "0x4996f29b254c77972fff8f25e6f7797b3c9a0eb6") require.Nil(t, err) } // Return public ecdsa key and address given the private key func TestGetEcdsaKeyAddress(t *testing.T) { - key, addr, err := ethereum.GetEcdsaKeyAddress(cfg.Networks.Minters[0].MinterPrivateKey) + key, addr, err := ethereum.GetEcdsaKeyAddress(cfg.Chains["ethereum"].(*ethereum.ChainConfig).MinterPrivateKey) require.NotNil(t, key) require.NotNil(t, addr) require.Nil(t, err) diff --git a/noble/chain.go b/noble/chain.go new file mode 100644 index 00000000..cebe3a2e --- /dev/null +++ b/noble/chain.go @@ -0,0 +1,459 @@ +package noble + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + "fmt" + "math/rand" + "os" + "regexp" + "strconv" + "sync" + "time" + + "cosmossdk.io/log" + nobletypes "github.com/circlefin/noble-cctp/x/cctp/types" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + sdkClient "github.com/cosmos/cosmos-sdk/client" + clientTx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/bech32" + "github.com/cosmos/cosmos-sdk/types/tx/signing" + xauthsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" + xauthtx "github.com/cosmos/cosmos-sdk/x/auth/tx" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + "github.com/strangelove-ventures/noble-cctp-relayer/cosmos" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +var _ types.Chain = (*Noble)(nil) + +type Noble struct { + cc *cosmos.CosmosProvider + chainID string + + privateKey *secp256k1.PrivKey + minterAddress string + accountNumber uint64 + + startBlock uint64 + lookbackPeriod uint64 + workers uint32 + + gasLimit uint64 + txMemo string + maxRetries int + retryIntervalSeconds int + + mu sync.Mutex +} + +func NewChain( + rpcURL string, + chainID string, + privateKey string, + startBlock uint64, + lookbackPeriod uint64, + workers uint32, + gasLimit uint64, + txMemo string, + maxRetries int, + retryIntervalSeconds int, +) (*Noble, error) { + cc, err := cosmos.NewProvider(rpcURL) + if err != nil { + return nil, fmt.Errorf("unable to build cosmos provider for noble: %w", err) + } + + keyBz, err := hex.DecodeString(privateKey) + if err != nil { + return nil, fmt.Errorf("unable to parse noble private key: %w", err) + } + + privKey := secp256k1.PrivKey{Key: keyBz} + + address := privKey.PubKey().Address() + minterAddress := sdk.MustBech32ifyAddressBytes("noble", address) + + return &Noble{ + cc: cc, + chainID: chainID, + startBlock: startBlock, + lookbackPeriod: lookbackPeriod, + workers: workers, + privateKey: &privKey, + minterAddress: minterAddress, + gasLimit: gasLimit, + txMemo: txMemo, + maxRetries: maxRetries, + retryIntervalSeconds: retryIntervalSeconds, + }, nil +} + +func (n *Noble) AccountInfo(ctx context.Context) (uint64, uint64, error) { + res, err := authtypes.NewQueryClient(n.cc).Account(ctx, &authtypes.QueryAccountRequest{ + Address: n.minterAddress, + }) + if err != nil { + return 0, 0, fmt.Errorf("unable to query account for noble: %w", err) + } + var acc authtypes.AccountI + if err := n.cc.Cdc.InterfaceRegistry.UnpackAny(res.Account, &acc); err != nil { + return 0, 0, fmt.Errorf("unable to unpack account for noble: %w", err) + } + + return acc.GetAccountNumber(), acc.GetSequence(), nil +} + +func (n *Noble) Name() string { + return "Noble" +} + +func (n *Noble) Domain() types.Domain { + return 4 +} + +func (n *Noble) IsDestinationCaller(destinationCaller []byte) bool { + zeroByteArr := make([]byte, 32) + + if bytes.Equal(destinationCaller, zeroByteArr) { + return true + } + + bech32DestinationCaller, err := decodeDestinationCaller(destinationCaller) + if err != nil { + return false + } + + return bech32DestinationCaller == n.minterAddress +} + +// DecodeDestinationCaller transforms an encoded Noble cctp address into a noble bech32 address +// left padded input -> bech32 output +func decodeDestinationCaller(input []byte) (string, error) { + if len(input) <= 12 { + return "", errors.New("destinationCaller is too short") + } + output, err := bech32.ConvertAndEncode("noble", input[12:]) + if err != nil { + return "", errors.New("unable to encode destination caller") + } + return output, nil +} + +func (n *Noble) InitializeBroadcaster( + ctx context.Context, + logger log.Logger, + sequenceMap *types.SequenceMap, +) error { + accountNumber, accountSequence, err := n.AccountInfo(ctx) + if err != nil { + return fmt.Errorf("unable to get account info for noble: %w", err) + } + + n.accountNumber = accountNumber + sequenceMap.Put(n.Domain(), accountSequence) + + return nil +} + +func (n *Noble) StartListener( + ctx context.Context, + logger log.Logger, + processingQueue chan *types.TxState, + quit chan os.Signal, +) { + logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain()) + + if n.startBlock == 0 { + // get the latest block + chainTip, err := n.chainTip(ctx) + if err != nil { + panic(fmt.Errorf("unable to get chain tip for noble: %w", err)) + } + n.startBlock = chainTip + } + + logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks", + n.startBlock, + n.lookbackPeriod)) + + accountNumber, _, err := n.AccountInfo(ctx) + if err != nil { + panic(fmt.Errorf("unable to get account info for noble: %w", err)) + } + + n.accountNumber = accountNumber + + // enqueue block heights + currentBlock := n.startBlock + lookback := n.lookbackPeriod + chainTip, err := n.chainTip(ctx) + blockQueue := make(chan uint64, 1000000) + + // history + currentBlock = currentBlock - lookback + for currentBlock <= chainTip { + blockQueue <- currentBlock + currentBlock++ + } + + // listen for new blocks + go func() { + for { + select { + case <-quit: + return + default: + chainTip, err = n.chainTip(ctx) + if err == nil { + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i + } + currentBlock = chainTip + 1 + } + } + time.Sleep(6 * time.Second) + } + } + }() + + // constantly query for blocks + for i := 0; i < int(n.workers); i++ { + go func() { + for { + select { + case <-quit: + return + default: + block := <-blockQueue + res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "") + if err != nil { + logger.Debug(fmt.Sprintf("unable to query Noble block %d", block)) + blockQueue <- block + } + + for _, tx := range res.Txs { + parsedMsgs, err := txToMessageState(tx) + if err != nil { + logger.Error("unable to parse Noble log to message state", "err", err.Error()) + continue + } + for _, parsedMsg := range parsedMsgs { + logger.Info(fmt.Sprintf("New stream msg with nonce %d from %d with tx hash %s", parsedMsg.Nonce, parsedMsg.SourceDomain, parsedMsg.SourceTxHash)) + } + processingQueue <- &types.TxState{TxHash: tx.Hash.String(), Msgs: parsedMsgs} + } + } + } + }() + } + + <-quit +} + +func (n *Noble) chainTip(ctx context.Context) (uint64, error) { + res, err := n.cc.RPCClient.Status(ctx) + if err != nil { + return 0, fmt.Errorf("unable to query status for noble: %w", err) + } + return uint64(res.SyncInfo.LatestBlockHeight), nil +} + +func (n *Noble) Broadcast( + ctx context.Context, + logger log.Logger, + msgs []*types.MessageState, + sequenceMap *types.SequenceMap, +) error { + // set up sdk context + interfaceRegistry := codectypes.NewInterfaceRegistry() + nobletypes.RegisterInterfaces(interfaceRegistry) + cdc := codec.NewProtoCodec(interfaceRegistry) + sdkContext := sdkClient.Context{ + TxConfig: xauthtx.NewTxConfig(cdc, xauthtx.DefaultSignModes), + } + + // build txn + txBuilder := sdkContext.TxConfig.NewTxBuilder() + + // sign and broadcast txn + for attempt := 0; attempt <= n.maxRetries; attempt++ { + + var receiveMsgs []sdk.Msg + for _, msg := range msgs { + + used, err := n.cc.QueryUsedNonce(ctx, types.Domain(msg.SourceDomain), msg.Nonce) + if err != nil { + return fmt.Errorf("unable to query used nonce: %w", err) + } + + if used { + msg.Status = types.Complete + logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used", msg.Nonce)) + continue + } + + attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) + if err != nil { + return fmt.Errorf("unable to decode message attestation") + } + + receiveMsgs = append(receiveMsgs, nobletypes.NewMsgReceiveMessage( + n.minterAddress, + msg.MsgSentBytes, + attestationBytes, + )) + + logger.Info(fmt.Sprintf( + "Broadcasting message from %d to %d: with source tx hash %s", + msg.SourceDomain, + msg.DestDomain, + msg.SourceTxHash)) + } + + if err := txBuilder.SetMsgs(receiveMsgs...); err != nil { + return fmt.Errorf("failed to set messages on tx: %w", err) + } + + txBuilder.SetGasLimit(n.gasLimit) + + txBuilder.SetMemo(n.txMemo) + + n.mu.Lock() + + accountSequence := sequenceMap.Next(n.Domain()) + + sigV2 := signing.SignatureV2{ + PubKey: n.privateKey.PubKey(), + Data: &signing.SingleSignatureData{ + SignMode: sdkContext.TxConfig.SignModeHandler().DefaultMode(), + Signature: nil, + }, + Sequence: uint64(accountSequence), + } + + signerData := xauthsigning.SignerData{ + ChainID: n.chainID, + AccountNumber: uint64(n.accountNumber), + Sequence: uint64(accountSequence), + } + + txBuilder.SetSignatures(sigV2) + + sigV2, err := clientTx.SignWithPrivKey( + sdkContext.TxConfig.SignModeHandler().DefaultMode(), + signerData, + txBuilder, + n.privateKey, + sdkContext.TxConfig, + uint64(accountSequence), + ) + if err != nil { + n.mu.Unlock() + return fmt.Errorf("failed to sign tx: %w", err) + } + + if err := txBuilder.SetSignatures(sigV2); err != nil { + n.mu.Unlock() + return fmt.Errorf("failed to set signatures: %w", err) + } + + // Generated Protobuf-encoded bytes. + txBytes, err := sdkContext.TxConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + n.mu.Unlock() + return fmt.Errorf("failed to proto encode tx: %w", err) + } + + rpcResponse, err := n.cc.RPCClient.BroadcastTxSync(context.Background(), txBytes) + if err != nil || (rpcResponse != nil && rpcResponse.Code != 0) { + // Log the error + logger.Error(fmt.Sprintf("error during broadcast: %s", getErrorString(err, rpcResponse))) + + if err != nil || rpcResponse == nil { + // Log retry information + logger.Info(fmt.Sprintf("Retrying in %d seconds", n.retryIntervalSeconds)) + time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second) + // wait a random amount of time to lower probability of concurrent message nonce collision + time.Sleep(time.Duration(rand.Intn(5)) * time.Second) + n.mu.Unlock() + continue + } + + // Log details for non-zero response code + logger.Error(fmt.Sprintf("received non-zero: %d - %s", rpcResponse.Code, rpcResponse.Log)) + + // Handle specific error code (32) + if rpcResponse.Code == 32 { + newAccountSequence := n.extractAccountSequence(ctx, logger, rpcResponse.Log) + logger.Debug(fmt.Sprintf("retrying with new account sequence: %d", newAccountSequence)) + sequenceMap.Put(n.Domain(), newAccountSequence) + } + + // Log retry information + logger.Info(fmt.Sprintf("Retrying in %d seconds", n.retryIntervalSeconds)) + time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second) + // wait a random amount of time to lower probability of concurrent message nonce collision + time.Sleep(time.Duration(rand.Intn(5)) * time.Second) + n.mu.Unlock() + continue + } + + n.mu.Unlock() + + // Tx was successfully broadcast + for _, msg := range msgs { + msg.DestTxHash = rpcResponse.Hash.String() + msg.Status = types.Complete + } + logger.Info(fmt.Sprintf("Successfully broadcast %s to Noble. Tx hash: %s", msgs[0].SourceTxHash, msgs[0].DestTxHash)) + + return nil + } + + for _, msg := range msgs { + if msg.Status != types.Complete { + msg.Status = types.Failed + } + } + + return errors.New("reached max number of broadcast attempts") +} + +// getErrorString returns the appropriate value to log when tx broadcast errors are encountered. +func getErrorString(err error, rpcResponse *ctypes.ResultBroadcastTx) string { + if rpcResponse != nil { + return rpcResponse.Log + } + return err.Error() +} + +// extractAccountSequence attempts to extract the account sequence number from the RPC response logs when +// account sequence mismatch errors are encountered. If the account sequence number cannot be extracted from the logs, +// it is retrieved by making a request to the API endpoint. +func (n *Noble) extractAccountSequence(ctx context.Context, logger log.Logger, rpcResponseLog string) uint64 { + pattern := `expected (\d+), got (\d+)` + re := regexp.MustCompile(pattern) + match := re.FindStringSubmatch(rpcResponseLog) + + if len(match) == 3 { + // Extract the numbers from the match. + newAccountSequence, _ := strconv.ParseUint(match[1], 10, 64) + return newAccountSequence + } + + // Otherwise, just request the account sequence + _, newAccountSequence, err := n.AccountInfo(ctx) + if err != nil { + logger.Error("unable to retrieve account sequence") + } + + return newAccountSequence +} diff --git a/noble/config.go b/noble/config.go new file mode 100644 index 00000000..a9085a16 --- /dev/null +++ b/noble/config.go @@ -0,0 +1,37 @@ +package noble + +import "github.com/strangelove-ventures/noble-cctp-relayer/types" + +var _ types.ChainConfig = (*ChainConfig)(nil) + +type ChainConfig struct { + RPC string `yaml:"rpc"` + ChainID string `yaml:"chain-id"` + + StartBlock uint64 `yaml:"start-block"` + LookbackPeriod uint64 `yaml:"lookback-period"` + Workers uint32 `yaml:"workers"` + + TxMemo string `yaml:"tx-memo"` + GasLimit uint64 `yaml:"gas-limit"` + BroadcastRetries int `yaml:"broadcast-retries"` + BroadcastRetryInterval int `yaml:"broadcast-retry-interval"` + + // TODO move to keyring + MinterPrivateKey string `yaml:"minter-private-key"` +} + +func (c *ChainConfig) Chain(name string) (types.Chain, error) { + return NewChain( + c.RPC, + c.ChainID, + c.MinterPrivateKey, + c.StartBlock, + c.LookbackPeriod, + c.Workers, + c.GasLimit, + c.TxMemo, + c.BroadcastRetries, + c.BroadcastRetryInterval, + ) +} diff --git a/cmd/noble/listener_test.go b/noble/listener_test.go similarity index 70% rename from cmd/noble/listener_test.go rename to noble/listener_test.go index a1d668a8..108f9829 100644 --- a/cmd/noble/listener_test.go +++ b/noble/listener_test.go @@ -1,33 +1,40 @@ package noble_test import ( + "context" "os" "testing" "time" "cosmossdk.io/log" "github.com/rs/zerolog" - "github.com/strangelove-ventures/noble-cctp-relayer/cmd/noble" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + "github.com/strangelove-ventures/noble-cctp-relayer/noble" "github.com/strangelove-ventures/noble-cctp-relayer/types" "github.com/stretchr/testify/require" ) -var cfg config.Config +var cfg types.Config var logger log.Logger var processingQueue chan *types.TxState func init() { - cfg = config.Parse("../../.ignore/unit_tests.yaml") + var err error + cfg, err = types.Parse("../../.ignore/unit_tests.yaml") + if err != nil { + panic(err) + } logger = log.NewLogger(os.Stdout, log.LevelOption(zerolog.DebugLevel)) processingQueue = make(chan *types.TxState, 10000) - cfg.Networks.Source.Noble.Workers = 1 + cfg.Chains["noble"].(*noble.ChainConfig).Workers = 1 } func TestStartListener(t *testing.T) { - cfg.Networks.Source.Noble.StartBlock = 3273557 - go noble.StartListener(cfg, logger, processingQueue) + cfg.Chains["noble"].(*noble.ChainConfig).StartBlock = 3273557 + n, err := cfg.Chains["noble"].(*noble.ChainConfig).Chain("noble") + require.NoError(t, err) + + go n.StartListener(context.TODO(), logger, processingQueue, nil) time.Sleep(20 * time.Second) diff --git a/noble/message_state.go b/noble/message_state.go new file mode 100644 index 00000000..bf995552 --- /dev/null +++ b/noble/message_state.go @@ -0,0 +1,77 @@ +package noble + +import ( + "encoding/base64" + "encoding/hex" + "errors" + "fmt" + "time" + + ctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/strangelove-ventures/noble-cctp-relayer/types" +) + +// NobleLogToMessageState transforms a Noble log into a messageState +func txToMessageState(tx *ctypes.ResultTx) ([]*types.MessageState, error) { + if tx.TxResult.Code != 0 { + return nil, nil + } + + var messageStates []*types.MessageState + + for i, event := range tx.TxResult.Events { + if event.Type == "circle.cctp.v1.MessageSent" { + //fmt.Printf("Saw cctp message %s - %d:%d\n", tx., i, j) + var parsed bool + var parseErrs error + for _, attr := range event.Attributes { + if attr.Key == "message" { + fmt.Printf("Saw message attribute %s - %d\n", tx.Hash, i) + encoded := attr.Value[1 : len(attr.Value)-1] + rawMessageSentBytes, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + parseErrs = errors.Join(parseErrs, fmt.Errorf("failed to decode message: %w", err)) + continue + } + + hashed := crypto.Keccak256(rawMessageSentBytes) + hashedHexStr := hex.EncodeToString(hashed) + + msg, err := new(types.Message).Parse(rawMessageSentBytes) + if err != nil { + parseErrs = errors.Join(parseErrs, fmt.Errorf("failed to parse message: %w", err)) + continue + } + + parsed = true + + now := time.Now() + + messageState := &types.MessageState{ + IrisLookupId: hashedHexStr, + Status: types.Created, + SourceDomain: types.Domain(msg.SourceDomain), + DestDomain: types.Domain(msg.DestinationDomain), + Nonce: msg.Nonce, + SourceTxHash: tx.Hash.String(), + MsgSentBytes: rawMessageSentBytes, + DestinationCaller: msg.DestinationCaller, + Created: now, + Updated: now, + } + + messageStates = append(messageStates, messageState) + + fmt.Printf("Appended transfer from 4 to %d\n", msg.DestinationDomain) + } + } + if !parsed { + return nil, fmt.Errorf("unable to parse cctp message. tx hash %s: %w", tx.Hash, parseErrs) + } + } + } + + return messageStates, nil + +} diff --git a/types/chain.go b/types/chain.go new file mode 100644 index 00000000..967b7b8f --- /dev/null +++ b/types/chain.go @@ -0,0 +1,43 @@ +package types + +import ( + "context" + "os" + + "cosmossdk.io/log" +) + +// Chain is an interface for common CCTP source and destination chain operations. +type Chain interface { + // Name returns the name of the chain. + Name() string + + // Domain returns the domain ID of the chain. + Domain() Domain + + // IsDestinationCaller returns true if the specified destination caller is the minter for the specified domain. + IsDestinationCaller(destinationCaller []byte) bool + + // InitializeBroadcaster initializes the minter account info for the chain. + InitializeBroadcaster( + ctx context.Context, + logger log.Logger, + sequenceMap *SequenceMap, + ) error + + // StartListener starts a listener for observing new CCTP burn messages. + StartListener( + ctx context.Context, + logger log.Logger, + processingQueue chan *TxState, + quit chan os.Signal, + ) + + // Broadcast broadcasts CCTP mint messages to the chain. + Broadcast( + ctx context.Context, + logger log.Logger, + msgs []*MessageState, + sequenceMap *SequenceMap, + ) error +} diff --git a/types/config.go b/types/config.go new file mode 100644 index 00000000..2da8dcb8 --- /dev/null +++ b/types/config.go @@ -0,0 +1,34 @@ +package types + +import ( + "os" + + "gopkg.in/yaml.v3" +) + +type Config struct { + Chains map[string]ChainConfig `yaml:"chains"` + EnabledRoutes map[Domain]Domain `yaml:"enabled-routes"` + Circle struct { + AttestationBaseUrl string `yaml:"attestation-base-url"` + FetchRetries int `yaml:"fetch-retries"` + FetchRetryInterval int `yaml:"fetch-retry-interval"` + } `yaml:"circle"` + ProcessorWorkerCount uint32 `yaml:"processor-worker-count"` + Api struct { + TrustedProxies []string `yaml:"trusted-proxies"` + } `yaml:"api"` +} + +func Parse(file string) (cfg Config, err error) { + data, err := os.ReadFile(file) + if err != nil { + return + } + err = yaml.Unmarshal(data, &cfg) + return cfg, err +} + +type ChainConfig interface { + Chain(name string) (Chain, error) +} diff --git a/types/message_state.go b/types/message_state.go index 4d0630db..3bfcd0dc 100644 --- a/types/message_state.go +++ b/types/message_state.go @@ -2,15 +2,11 @@ package types import ( "bytes" - "encoding/base64" "encoding/hex" - "encoding/json" - "errors" "fmt" "time" "github.com/circlefin/noble-cctp/x/cctp/types" - "github.com/cosmos/cosmos-sdk/types/bech32" "github.com/ethereum/go-ethereum/accounts/abi" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -83,89 +79,6 @@ func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log) return nil, fmt.Errorf("unable to parse tx into message, tx hash %s", log.TxHash.Hex()) } -// NobleLogToMessageState transforms a Noble log into a messageState -func NobleLogToMessageState(tx Tx) ([]*MessageState, error) { - var eventsList []struct { - Events []Event `json:"events"` - } - if tx.TxResult.Code != 0 { - return nil, nil - } - if err := json.Unmarshal([]byte(tx.TxResult.Log), &eventsList); err != nil { - return nil, fmt.Errorf("unable to parse log events: %s", tx.TxResult.Log) - } - - var messageStates []*MessageState - - for i, log := range eventsList { - for j, event := range log.Events { - if event.Type == "circle.cctp.v1.MessageSent" { - fmt.Printf("Saw cctp message %s - %d:%d\n", tx.Hash, i, j) - var parsed bool - var parseErrs error - for _, attr := range event.Attributes { - if attr.Key == "message" { - fmt.Printf("Saw message attribute %s - %d:%d\n", tx.Hash, i, j) - encoded := attr.Value[1 : len(attr.Value)-1] - rawMessageSentBytes, err := base64.StdEncoding.DecodeString(encoded) - if err != nil { - parseErrs = errors.Join(parseErrs, fmt.Errorf("failed to decode message: %w", err)) - continue - } - - hashed := crypto.Keccak256(rawMessageSentBytes) - hashedHexStr := hex.EncodeToString(hashed) - - msg, err := new(types.Message).Parse(rawMessageSentBytes) - if err != nil { - parseErrs = errors.Join(parseErrs, fmt.Errorf("failed to parse message: %w", err)) - continue - } - - parsed = true - - messageState := &MessageState{ - IrisLookupId: hashedHexStr, - Status: Created, - SourceDomain: Domain(msg.SourceDomain), - DestDomain: Domain(msg.DestinationDomain), - Nonce: msg.Nonce, - SourceTxHash: tx.Hash, - MsgSentBytes: rawMessageSentBytes, - DestinationCaller: msg.DestinationCaller, - Created: time.Now(), - Updated: time.Now(), - } - - messageStates = append(messageStates, messageState) - - fmt.Printf("Appended transfer from 4 to %d\n", msg.DestinationDomain) - } - } - if !parsed { - return nil, fmt.Errorf("unable to parse cctp message. tx hash %s: %w", tx.Hash, parseErrs) - } - } - } - } - - return messageStates, nil - -} - -// DecodeDestinationCaller transforms an encoded Noble cctp address into a noble bech32 address -// left padded input -> bech32 output -func DecodeDestinationCaller(input []byte) (string, error) { - if len(input) <= 12 { - return "", errors.New("destinationCaller is too short") - } - output, err := bech32.ConvertAndEncode("noble", input[12:]) - if err != nil { - return "", errors.New("unable to encode destination caller") - } - return output, nil -} - // Equal checks if two MessageState instances are equal func (m *MessageState) Equal(other *MessageState) bool { return (m.IrisLookupId == other.IrisLookupId && diff --git a/types/message_state.go.bak b/types/message_state.go.bak deleted file mode 100644 index f7d106c3..00000000 --- a/types/message_state.go.bak +++ /dev/null @@ -1,192 +0,0 @@ -package types - -import ( - "bytes" - "encoding/base64" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "strconv" - "time" - - "github.com/circlefin/noble-cctp/x/cctp/types" - "github.com/cosmos/cosmos-sdk/types/bech32" - "github.com/ethereum/go-ethereum/accounts/abi" - ethtypes "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" -) - -const ( - Created string = "created" - Pending string = "pending" - Attested string = "attested" - Complete string = "complete" - Failed string = "failed" - Filtered string = "filtered" - - Mint string = "mint" - Forward string = "forward" -) - -type TxState struct { - TxHash string - Msgs []*MessageState -} - -type MessageState struct { - IrisLookupId string // hex encoded MessageSent bytes - // Type string // 'mint' or 'forward' - Status string // created, pending, attested, complete, failed, filtered - Attestation string // hex encoded attestation - SourceDomain uint32 // source domain id - DestDomain uint32 // destination domain id - SourceTxHash string - DestTxHash string - MsgSentBytes []byte // bytes of the MessageSent message transmitter event - DestinationCaller []byte // address authorized to call transaction - Channel string // "channel-%d" if a forward, empty if not a forward - Created time.Time - Updated time.Time - Nonce uint64 -} - -// EvmLogToMessageState transforms an evm log into a messageState given an ABI -func EvmLogToMessageState(abi abi.ABI, messageSent abi.Event, log *ethtypes.Log) (messageState *MessageState, err error) { - event := make(map[string]interface{}) - _ = abi.UnpackIntoMap(event, messageSent.Name, log.Data) - - rawMessageSentBytes := event["message"].([]byte) - message, _ := new(types.Message).Parse(rawMessageSentBytes) - - hashed := crypto.Keccak256(rawMessageSentBytes) - hashedHexStr := hex.EncodeToString(hashed) - - messageState = &MessageState{ - IrisLookupId: hashedHexStr, - Status: Created, - SourceDomain: message.SourceDomain, - DestDomain: message.DestinationDomain, - SourceTxHash: log.TxHash.Hex(), - MsgSentBytes: rawMessageSentBytes, - DestinationCaller: message.DestinationCaller, - Nonce: message.Nonce, - Created: time.Now(), - Updated: time.Now(), - } - - if _, err := new(BurnMessage).Parse(message.MessageBody); err == nil { - messageState.Type = Mint - return messageState, nil - } - - if forward, err := new(MetadataMessage).Parse(message.MessageBody); err == nil { - messageState.Type = Forward - // add forward channel to object so we can filter later - messageState.Channel = "channel-" + strconv.Itoa(int(forward.Channel)) - return messageState, nil - } - - return nil, fmt.Errorf("unable to parse tx into message, tx hash %s", log.TxHash.Hex()) -} - -// NobleLogToMessageState transforms a Noble log into a messageState -func NobleLogToMessageState(tx Tx) ([]*MessageState, error) { - var eventsList []struct { - Events []Event `json:"events"` - } - if tx.TxResult.Code != 0 { - return nil, nil - } - if err := json.Unmarshal([]byte(tx.TxResult.Log), &eventsList); err != nil { - return nil, fmt.Errorf("unable to parse log events: %s", tx.TxResult.Log) - } - - var messageStates []*MessageState - - for i, log := range eventsList { - for j, event := range log.Events { - if event.Type == "circle.cctp.v1.MessageSent" { - fmt.Printf("Saw cctp message %s - %d:%d\n", tx.Hash, i, j) - var parsed bool - var parseErrs error - for _, attr := range event.Attributes { - if attr.Key == "message" { - fmt.Printf("Saw message attribute %s - %d:%d\n", tx.Hash, i, j) - encoded := attr.Value[1 : len(attr.Value)-1] - rawMessageSentBytes, err := base64.StdEncoding.DecodeString(encoded) - if err != nil { - parseErrs = errors.Join(parseErrs, fmt.Errorf("failed to decode message: %w", err)) - continue - } - - hashed := crypto.Keccak256(rawMessageSentBytes) - hashedHexStr := hex.EncodeToString(hashed) - - msg, err := new(types.Message).Parse(rawMessageSentBytes) - if err != nil { - parseErrs = errors.Join(parseErrs, fmt.Errorf("failed to parse message: %w", err)) - continue - } - - parsed = true - - messageState := &MessageState{ - IrisLookupId: hashedHexStr, - Type: Mint, - Status: Created, - SourceDomain: msg.SourceDomain, - DestDomain: msg.DestinationDomain, - Nonce: msg.Nonce, - SourceTxHash: tx.Hash, - MsgSentBytes: rawMessageSentBytes, - DestinationCaller: msg.DestinationCaller, - Created: time.Now(), - Updated: time.Now(), - } - - messageStates = append(messageStates, messageState) - - fmt.Printf("Appended transfer from 4 to %d\n", msg.DestinationDomain) - } - } - if !parsed { - return nil, fmt.Errorf("unable to parse cctp message. tx hash %s: %w", tx.Hash, parseErrs) - } - } - } - } - - return messageStates, nil - -} - -// DecodeDestinationCaller transforms an encoded Noble cctp address into a noble bech32 address -// left padded input -> bech32 output -func DecodeDestinationCaller(input []byte) (string, error) { - if len(input) <= 12 { - return "", errors.New("destinationCaller is too short") - } - output, err := bech32.ConvertAndEncode("noble", input[12:]) - if err != nil { - return "", errors.New("unable to encode destination caller") - } - return output, nil -} - -// Equal checks if two MessageState instances are equal -func (m *MessageState) Equal(other *MessageState) bool { - return (m.IrisLookupId == other.IrisLookupId && - m.Type == other.Type && - m.Status == other.Status && - m.Attestation == other.Attestation && - m.SourceDomain == other.SourceDomain && - m.DestDomain == other.DestDomain && - m.SourceTxHash == other.SourceTxHash && - m.DestTxHash == other.DestTxHash && - bytes.Equal(m.MsgSentBytes, other.MsgSentBytes) && - bytes.Equal(m.DestinationCaller, other.DestinationCaller) && - m.Channel == other.Channel && - m.Created == other.Created && - m.Updated == other.Updated) -} diff --git a/types/message_state_test.go b/types/message_state_test.go index 971e4f19..2036c3b4 100644 --- a/types/message_state_test.go +++ b/types/message_state_test.go @@ -12,15 +12,19 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/pascaldekloe/etherstream" - "github.com/strangelove-ventures/noble-cctp-relayer/config" + ethinternal "github.com/strangelove-ventures/noble-cctp-relayer/ethereum" "github.com/strangelove-ventures/noble-cctp-relayer/types" "github.com/stretchr/testify/require" ) -var cfg config.Config +var cfg types.Config func init() { - cfg = config.Parse("../.ignore/unit_tests.yaml") + var err error + cfg, err = types.Parse("../../.ignore/unit_tests.yaml") + if err != nil { + panic(err) + } } func TestToMessageStateSuccess(t *testing.T) { @@ -33,7 +37,7 @@ func TestToMessageStateSuccess(t *testing.T) { messageSent := messageTransmitterABI.Events["MessageSent"] - ethClient, err := ethclient.DialContext(context.Background(), cfg.Networks.Source.Ethereum.RPC) + ethClient, err := ethclient.DialContext(context.Background(), cfg.Chains["ethereum"].(*ethinternal.ChainConfig).RPC) require.Nil(t, err) // changed to mainnet address @@ -53,6 +57,7 @@ func TestToMessageStateSuccess(t *testing.T) { require.Nil(t, err) messageState, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &history[0]) + require.NoError(t, err) fmt.Println(messageState) diff --git a/types/sequence_map.go b/types/sequence_map.go index 917dd5e3..e997b4b1 100644 --- a/types/sequence_map.go +++ b/types/sequence_map.go @@ -8,22 +8,22 @@ import ( type SequenceMap struct { mu sync.Mutex // map destination domain -> minter account sequence - sequenceMap map[Domain]int64 + sequenceMap map[Domain]uint64 } func NewSequenceMap() *SequenceMap { return &SequenceMap{ - sequenceMap: map[Domain]int64{}, + sequenceMap: map[Domain]uint64{}, } } -func (m *SequenceMap) Put(destDomain Domain, val int64) { +func (m *SequenceMap) Put(destDomain Domain, val uint64) { m.mu.Lock() defer m.mu.Unlock() m.sequenceMap[destDomain] = val } -func (m *SequenceMap) Next(destDomain Domain) int64 { +func (m *SequenceMap) Next(destDomain Domain) uint64 { m.mu.Lock() defer m.mu.Unlock() result := m.sequenceMap[destDomain]