Skip to content

Commit

Permalink
re-apply v10 fixes and wait for tx receipt before reporting outTx tra…
Browse files Browse the repository at this point in the history
…cker
  • Loading branch information
ws4charlie committed Dec 22, 2023
1 parent 81fd4c5 commit 96c29dd
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 34 deletions.
58 changes: 36 additions & 22 deletions zetaclient/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol"
"github.com/zeta-chain/zetacore/common"
crosschainkeeper "github.com/zeta-chain/zetacore/x/crosschain/keeper"
"github.com/zeta-chain/zetacore/x/crosschain/types"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/config"
Expand Down Expand Up @@ -559,6 +560,13 @@ var lowestOutTxNonceToObserve = map[int64]uint64{
// FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called
// observeOutTx periodically checks all the txhash in potential outbound txs
func (ob *EVMChainClient) observeOutTx() {
// The garbage trackers associated with finalized cctxs
finalizedCctxTrackers := map[int64]map[uint64]bool{
5: make(map[uint64]bool), // Goerli
97: make(map[uint64]bool), // BSC testnet
80001: make(map[uint64]bool), // Mumbai
}

// read env variables if set
timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE"))
if err != nil || timeoutNonce <= 0 {
Expand Down Expand Up @@ -593,6 +601,21 @@ func (ob *EVMChainClient) observeOutTx() {
if nonceInt < lowestOutTxNonceToObserve[ob.chain.ChainId] {
continue
}
if finalizedCctxTrackers[ob.chain.ChainId][nonceInt] {
continue
}
// Skip those problematic trackers whose associated cctxs are no longer pending (Reverted/Outboundminted/Aborted)
cctx, err := ob.zetaClient.GetCctxByNonce(ob.chain.ChainId, nonceInt)
if err != nil || cctx == nil {
ob.logger.ObserveOutTx.Error().Err(err).Msgf("garbage tracker, error GetCctxByNonce for chain %d nonce %d", ob.chain.ChainId, nonceInt)
continue
}
if !crosschainkeeper.IsPending(*cctx) {
finalizedCctxTrackers[ob.chain.ChainId][nonceInt] = true
ob.logger.ObserveOutTx.Info().Msgf("garbage tracker chain %s nonce %d is not pending", ob.chain.String(), nonceInt)
continue
}

ob.Mu.Lock()
_, found := ob.outTXConfirmedReceipts[ob.GetTxID(nonceInt)]
ob.Mu.Unlock()
Expand Down Expand Up @@ -645,27 +668,32 @@ func (ob *EVMChainClient) queryTxByHash(txHash string, nonce uint64) (*ethtypes.
receipt, err := ob.evmClient.TransactionReceipt(ctxt, ethcommon.HexToHash(txHash))
if err != nil {
if err != ethereum.NotFound {
logger.Warn().Err(err).Msgf("TransactionReceipt/TransactionByHash error, txHash %s", txHash)
logger.Warn().Err(err).Msgf("queryTxByHash: TransactionReceipt/TransactionByHash error, txHash %s", txHash)
}
return nil, nil, err
}
transaction, _, err := ob.evmClient.TransactionByHash(ctxt, ethcommon.HexToHash(txHash))
transaction, isPending, err := ob.evmClient.TransactionByHash(ctxt, ethcommon.HexToHash(txHash))
if err != nil {
return nil, nil, err
}
if transaction.Nonce() != nonce {
return nil, nil, fmt.Errorf("queryTxByHash: txHash %s nonce mismatch: wanted %d, got tx nonce %d", txHash, nonce, transaction.Nonce())
}
confHeight := receipt.BlockNumber.Uint64() + ob.GetCoreParams().ConfirmationCount
if confHeight < 0 || confHeight >= math.MaxInt64 {
return nil, nil, fmt.Errorf("confHeight is out of range")
if confHeight >= math.MaxInt64 {
return nil, nil, fmt.Errorf("queryTxByHash: confHeight is out of range")
}

// #nosec G701 checked in range
if int64(confHeight) > ob.GetLastBlockHeight() {
log.Warn().Msgf("included but not confirmed: receipt block %d, current block %d", receipt.BlockNumber, ob.GetLastBlockHeight())
log.Warn().Msgf("queryTxByHash: included but not confirmed: receipt block %d, current block %d", receipt.BlockNumber, ob.GetLastBlockHeight())
return nil, nil, fmt.Errorf("included but not confirmed")
}
// transaction must NOT be pending
if isPending {
log.Error().Msgf("queryTxByHash: confirmed but still pending: txHash %s nonce %d receipt block %d", txHash, nonce, receipt.BlockNumber)
return nil, nil, fmt.Errorf("confirmed but still pending")
}
return receipt, transaction, nil
}

Expand Down Expand Up @@ -921,7 +949,9 @@ func (ob *EVMChainClient) observeInTX() error {

// query incoming gas asset
for bn := startBlock; bn <= toBlock; bn++ {
if common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains
if crosschainFlags.BlockHeaderVerificationFlags != nil &&
crosschainFlags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled &&
common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains
err = ob.postBlockHeader(toBlock)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting block header")
Expand All @@ -932,22 +962,6 @@ func (ob *EVMChainClient) observeInTX() error {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("error getting block: %d", bn)
continue
}
headerRLP, err := rlp.EncodeToBytes(block.Header())
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("error encoding block header: %d", bn)
continue
}

_, err = ob.zetaClient.PostAddBlockHeader(
ob.chain.ChainId,
block.Hash().Bytes(),
block.Number().Int64(),
common.NewEthereumHeader(headerRLP),
)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("error posting block header: %d", bn)
continue
}

for _, tx := range block.Transactions() {
if tx.To() == nil {
Expand Down
66 changes: 55 additions & 11 deletions zetaclient/evm_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"strconv"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
Expand Down Expand Up @@ -36,6 +37,10 @@ type EVMSigner struct {
erc20CustodyContractAddress ethcommon.Address
logger zerolog.Logger
ts *TelemetryServer

// for outTx tracker reporting
mu *sync.Mutex
outTxHashBeingReported map[string]bool
}

var _ ChainSigner = &EVMSigner{}
Expand Down Expand Up @@ -83,7 +88,9 @@ func NewEVMSigner(
logger: logger.With().
Str("chain", chain.ChainName.String()).
Str("module", "EVMSigner").Logger(),
ts: ts,
ts: ts,
mu: &sync.Mutex{},
outTxHashBeingReported: make(map[string]bool),
}, nil
}

Expand Down Expand Up @@ -569,11 +576,7 @@ func (signer *EVMSigner) TryProcessOutTx(
log.Warn().Err(err).Msgf("OutTx Broadcast error")
retry, report := HandleBroadcastError(err, strconv.FormatUint(send.GetCurrentOutTxParam().OutboundTxTssNonce, 10), toChain.String(), outTxHash)
if report {
zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1)
if err != nil {
logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash)
}
logger.Info().Msgf("Broadcast to core successful %s", zetaHash)
signer.ReportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger)
}
if !retry {
break
Expand All @@ -582,15 +585,56 @@ func (signer *EVMSigner) TryProcessOutTx(
continue
}
logger.Info().Msgf("Broadcast success: nonce %d to chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash)
zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1)
if err != nil {
logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash)
}
logger.Info().Msgf("Broadcast to core successful %s", zetaHash)
signer.ReportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger)
break // successful broadcast; no need to retry
}
}
}

func (signer *EVMSigner) ReportToOutTxTracker(zetaBridge ZetaCoreBridger, chainID int64, nonce uint64, outTxHash string, logger zerolog.Logger) {
// skip if already being reported
signer.mu.Lock()
defer signer.mu.Unlock()
if _, found := signer.outTxHashBeingReported[outTxHash]; found {
logger.Info().Msgf("ReportToOutTxTracker: outTxHash %s for chain %d nonce %d is being reported", outTxHash, chainID, nonce)
return
}
signer.outTxHashBeingReported[outTxHash] = true // mark as being reported

// report to outTxTracker with goroutine
go func() {
defer func() {
signer.mu.Lock()
delete(signer.outTxHashBeingReported, outTxHash)
signer.mu.Unlock()
}()

// try fetching tx receipt for 10 minutes
tStart := time.Now()
for {
if time.Since(tStart) > 10*time.Minute { // give up after 10 minutes
logger.Info().Msgf("ReportToOutTxTracker: outTxHash report timeout for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
return
}
receipt, err := signer.client.TransactionReceipt(context.TODO(), ethcommon.HexToHash(outTxHash))
if err != nil {
logger.Info().Err(err).Msgf("ReportToOutTxTracker: receipt not available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
time.Sleep(10 * time.Second)
continue
}
if receipt != nil {
logger.Info().Msgf("ReportToOutTxTracker: receipt available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
break
}
}

// report to outTxTracker
zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(chainID, nonce, outTxHash, nil, "", -1)
if err != nil {
logger.Err(err).Msgf("ReportToOutTxTracker: unable to add to tracker on ZetaCore for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
}
logger.Info().Msgf("ReportToOutTxTracker: reported outTxHash to core successful %s, chain %d nonce %d outTxHash %s", zetaHash, chainID, nonce, outTxHash)
}()
}

// SignERC20WithdrawTx
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (b *ZetaCoreBridge) GetAllOutTxTrackerByChain(chain common.Chain, order Ord
Pagination: &query.PageRequest{
Key: nil,
Offset: 0,
Limit: 2000,
Limit: 3000,
CountTotal: false,
Reverse: false,
},
Expand Down

1 comment on commit 96c29dd

@brewmaster012
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Please sign in to comment.