From c424fc6c12cb6e3e569ad200c979892952b71602 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Fri, 9 Feb 2024 09:22:53 +0100 Subject: [PATCH] consolidated state (#20) --- ethtxmanager/config.go | 9 ++--- ethtxmanager/ethtxmanager.go | 76 ++++++++++++++++++++++++++---------- ethtxmanager/memstorage.go | 3 ++ ethtxmanager/monitoredtx.go | 5 ++- test/main.go | 20 +++++----- 5 files changed, 76 insertions(+), 37 deletions(-) diff --git a/ethtxmanager/config.go b/ethtxmanager/config.go index aef85a9..1f8b97e 100644 --- a/ethtxmanager/config.go +++ b/ethtxmanager/config.go @@ -4,7 +4,6 @@ import ( "github.com/0xPolygonHermez/zkevm-ethtx-manager/config/types" "github.com/0xPolygonHermez/zkevm-ethtx-manager/etherman" "github.com/0xPolygonHermez/zkevm-ethtx-manager/log" - "github.com/ethereum/go-ethereum/common" ) // Config is configuration for ethereum transaction manager @@ -13,8 +12,10 @@ type Config struct { FrequencyToMonitorTxs types.Duration `mapstructure:"FrequencyToMonitorTxs"` // WaitTxToBeMined time to wait after transaction was sent to the ethereum WaitTxToBeMined types.Duration `mapstructure:"WaitTxToBeMined"` - // L1ConfirmationBlocks is the number of blocks to wait for a L1 tx to be confirmed - L1ConfirmationBlocks uint64 `mapstructure:"L1ConfirmationBlocks"` + // ConsolidationL1ConfirmationBlocks is the number of blocks to wait for a L1 tx to be consolidated + ConsolidationL1ConfirmationBlocks uint64 `mapstructure:"L1ConsolidationConfirmationBlocks"` + // FinalizationL1ConfirmationBlocks is the number of blocks to wait for a L1 tx to be finalized + FinalizationL1ConfirmationBlocks uint64 `mapstructure:"L1FinalizationConfirmationBlocks"` // PrivateKeys defines all the key store files that are going // to be read in order to provide the private keys to sign the L1 txs @@ -55,8 +56,6 @@ type Config struct { // max gas price limit: 110 // tx gas price = 110 MaxGasPriceLimit uint64 `mapstructure:"MaxGasPriceLimit"` - // Sender Address - From common.Address `mapstructure:"From"` // PersistenceFilename is the filename to store the memory storage PersistenceFilename string `mapstructure:"PersistenceFilename"` // Etherman configuration diff --git a/ethtxmanager/ethtxmanager.go b/ethtxmanager/ethtxmanager.go index 231266a..13f53c2 100644 --- a/ethtxmanager/ethtxmanager.go +++ b/ethtxmanager/ethtxmanager.go @@ -41,6 +41,7 @@ type Client struct { cfg Config etherman ethermanInterface storage storageInterface + from common.Address } type pending struct { @@ -79,6 +80,7 @@ func New(cfg Config) (*Client, error) { cfg: cfg, etherman: etherman, storage: NewMemStorage(cfg.PersistenceFilename), + from: auth.From, } log.Init(cfg.Log) @@ -147,11 +149,10 @@ func pendingL1Txs(URL string, from common.Address, httpHeaders map[string]string func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint64, value *big.Int, data []byte) (common.Hash, error) { var nonce uint64 var err error - var from = c.cfg.From if forcedNonce == nil { // get next nonce - nonce, err = c.etherman.CurrentNonce(ctx, from) + nonce, err = c.etherman.CurrentNonce(ctx, c.from) if err != nil { err := fmt.Errorf("failed to get current nonce: %w", err) log.Errorf(err.Error()) @@ -162,11 +163,11 @@ func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint6 } // get gas - gas, err := c.etherman.EstimateGas(ctx, from, to, value, data) + gas, err := c.etherman.EstimateGas(ctx, c.from, to, value, data) if err != nil { err := fmt.Errorf("failed to estimate gas: %w, data: %v", err, common.Bytes2Hex(data)) log.Error(err.Error()) - log.Debugf("failed to estimate gas for tx: from: %v, to: %v, value: %v", from.String(), to.String(), value.String()) + log.Debugf("failed to estimate gas for tx: from: %v, to: %v, value: %v", c.from.String(), to.String(), value.String()) if c.cfg.ForcedGas > 0 { gas = c.cfg.ForcedGas } else { @@ -194,7 +195,7 @@ func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint6 // create monitored tx mTx := monitoredTx{ - ID: id, From: from, To: to, + ID: id, From: c.from, To: to, Nonce: nonce, Value: value, Data: data, Gas: gas, GasPrice: gasPrice, Status: MonitoredTxStatusCreated, @@ -251,15 +252,13 @@ func (c *Client) Result(ctx context.Context, id common.Hash) (MonitoredTxResult, return c.buildResult(ctx, mTx) } -// SetStatusDone sets the status of a monitored tx to MonitoredStatusDone. -// this method is provided to the callers to decide when a monitored tx should be -// considered done, so they can start to ignore it when querying it by Status. -func (c *Client) setStatusFinalized(ctx context.Context, id common.Hash) error { +// setStatusConsolidated sets the status of a monitored tx to MonitoredStatusConsolidated. +func (c *Client) setStatusConsolidated(ctx context.Context, id common.Hash) error { mTx, err := c.storage.Get(ctx, id) if err != nil { return err } - mTx.Status = MonitoredTxStatusFinalized + mTx.Status = MonitoredTxStatusConsolidated return c.storage.Update(ctx, mTx) } @@ -310,7 +309,7 @@ func (c *Client) buildResult(ctx context.Context, mTx monitoredTx) (MonitoredTxR func (c *Client) Start() { // If no persistence file is uses check L1 for pending txs if c.cfg.PersistenceFilename == "" { - pendingTxs, err := pendingL1Txs(c.cfg.Etherman.URL, c.cfg.From, c.cfg.Etherman.HTTPHeaders) + pendingTxs, err := pendingL1Txs(c.cfg.Etherman.URL, c.from, c.cfg.Etherman.HTTPHeaders) if err != nil { log.Errorf("failed to get pending txs from L1: %v", err) } @@ -337,9 +336,13 @@ func (c *Client) Start() { if err != nil { c.logErrorAndWait("failed to monitor txs: %v", err) } - err = c.waitMinedTxTobeFinalized(context.Background()) + err = c.waitMinedTxToBeConsolidated(context.Background()) if err != nil { - c.logErrorAndWait("failed to wait mined tx to be finalized: %v", err) + c.logErrorAndWait("failed to wait consolidated tx to be finalized: %v", err) + } + err = c.waitConsolidatedTxToBeFinalized(context.Background()) + if err != nil { + c.logErrorAndWait("failed to wait consolidated tx to be finalized: %v", err) } } } @@ -380,9 +383,9 @@ func (c *Client) monitorTxs(ctx context.Context) error { return nil } -// waitMinedTxTobeFinalized checks all mined monitored txs and wait the number of -// l1 blocks configured to confirm the tx -func (c *Client) waitMinedTxTobeFinalized(ctx context.Context) error { +// waitMinedTxToBeConsolidated checks all consolidated monitored txs and wait the number of +// l1 blocks configured to consolidated the tx +func (c *Client) waitMinedTxToBeConsolidated(ctx context.Context) error { statusesFilter := []MonitoredTxStatus{MonitoredTxStatusMined} mTxs, err := c.storage.GetByStatus(ctx, statusesFilter) if err != nil { @@ -397,13 +400,44 @@ func (c *Client) waitMinedTxTobeFinalized(ctx context.Context) error { } for _, mTx := range mTxs { - if mTx.BlockNumber.Uint64()+c.cfg.L1ConfirmationBlocks <= currentBlockNumber { + if mTx.BlockNumber.Uint64()+c.cfg.ConsolidationL1ConfirmationBlocks <= currentBlockNumber { + mTxLogger := createMonitoredTxLogger(mTx) + mTxLogger.Infof("consolidated") + mTx.Status = MonitoredTxStatusConsolidated + err := c.storage.Update(ctx, mTx) + if err != nil { + return fmt.Errorf("failed to update mined monitored tx: %v", err) + } + } + } + + return nil +} + +// waitConsolidatedTxToBeFinalized checks all consolidated monitored txs and wait the number of +// l1 blocks configured to finalize the tx +func (c *Client) waitConsolidatedTxToBeFinalized(ctx context.Context) error { + statusesFilter := []MonitoredTxStatus{MonitoredTxStatusConsolidated} + mTxs, err := c.storage.GetByStatus(ctx, statusesFilter) + if err != nil { + return fmt.Errorf("failed to get consolidated monitored txs: %v", err) + } + + log.Debugf("found %v consolidated monitored tx to process", len(mTxs)) + + currentBlockNumber, err := c.etherman.GetLatestBlockNumber(ctx) + if err != nil { + return fmt.Errorf("failed to get latest block number: %v", err) + } + + for _, mTx := range mTxs { + if mTx.BlockNumber.Uint64()+c.cfg.FinalizationL1ConfirmationBlocks <= currentBlockNumber { mTxLogger := createMonitoredTxLogger(mTx) mTxLogger.Infof("finalized") mTx.Status = MonitoredTxStatusFinalized err := c.storage.Update(ctx, mTx) if err != nil { - return fmt.Errorf("failed to update mined monitored tx: %v", err) + return fmt.Errorf("failed to update consolidated monitored tx: %v", err) } } } @@ -746,14 +780,14 @@ func (c *Client) ProcessPendingMonitoredTxs(ctx context.Context, resultHandler R // if the result is confirmed, we set it as done do stop looking into this monitored tx if result.Status == MonitoredTxStatusMined { - err := c.setStatusFinalized(ctx, result.ID) + err := c.setStatusConsolidated(ctx, result.ID) if err != nil { - mTxResultLogger.Errorf("failed to set monitored tx as done, err: %v", err) + mTxResultLogger.Errorf("failed to set monitored tx as consolidated, err: %v", err) // if something goes wrong at this point, we skip this result and move to the next. // this result is going to be handled again in the next cycle by the outer loop. continue } else { - mTxResultLogger.Info("monitored tx confirmed") + mTxResultLogger.Info("monitored tx consolidated") } resultHandler(result) continue diff --git a/ethtxmanager/memstorage.go b/ethtxmanager/memstorage.go index 0a9249c..8a50326 100644 --- a/ethtxmanager/memstorage.go +++ b/ethtxmanager/memstorage.go @@ -14,6 +14,7 @@ import ( // MemStorage hold txs to be managed type MemStorage struct { TxsMutex sync.RWMutex + FileMutex sync.RWMutex Transactions map[common.Hash]monitoredTx PersistenceFilename string } @@ -51,6 +52,8 @@ func (s *MemStorage) persist() { if s.PersistenceFilename != "" { s.TxsMutex.RLock() defer s.TxsMutex.RUnlock() + s.FileMutex.Lock() + defer s.FileMutex.Unlock() jsonFile, _ := json.Marshal(s.Transactions) err := os.WriteFile(s.PersistenceFilename+".tmp", jsonFile, 0644) // nolint:gosec, gomnd if err != nil { diff --git a/ethtxmanager/monitoredtx.go b/ethtxmanager/monitoredtx.go index 2161555..aa8d381 100644 --- a/ethtxmanager/monitoredtx.go +++ b/ethtxmanager/monitoredtx.go @@ -24,7 +24,10 @@ const ( // status is Successful MonitoredTxStatusMined = MonitoredTxStatus("mined") - // MonitoredTxStatusFinalized means the tx was set by the owner as finalized + // MonitoredTxStatusConsolidated means the tx was already mined N blocks ago + MonitoredTxStatusConsolidated = MonitoredTxStatus("consolidated") + + // MonitoredTxStatusFinalized means the tx was already mined M (M > N) blocks ago MonitoredTxStatusFinalized = MonitoredTxStatus("finalized") ) diff --git a/test/main.go b/test/main.go index 1829726..595ce6f 100644 --- a/test/main.go +++ b/test/main.go @@ -18,15 +18,15 @@ var ( func main() { config := ethtxmanager.Config{ - FrequencyToMonitorTxs: types.Duration{Duration: 1 * time.Second}, - WaitTxToBeMined: types.Duration{Duration: 2 * time.Minute}, - L1ConfirmationBlocks: 4, - PrivateKeys: []types.KeystoreFileConfig{{Path: "test.keystore", Password: "testonly"}}, - ForcedGas: 0, - GasPriceMarginFactor: 1, - MaxGasPriceLimit: 0, - From: common.HexToAddress("0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"), - PersistenceFilename: "ethtxmanager-persistence.json", + FrequencyToMonitorTxs: types.Duration{Duration: 1 * time.Second}, + WaitTxToBeMined: types.Duration{Duration: 2 * time.Minute}, + ConsolidationL1ConfirmationBlocks: 5, + FinalizationL1ConfirmationBlocks: 10, + PrivateKeys: []types.KeystoreFileConfig{{Path: "test.keystore", Password: "testonly"}}, + ForcedGas: 0, + GasPriceMarginFactor: 1, + MaxGasPriceLimit: 0, + PersistenceFilename: "ethtxmanager-persistence.json", Etherman: etherman.Config{ URL: "http://localhost:8545", HTTPHeaders: map[string]string{}, @@ -50,7 +50,7 @@ func main() { if err != nil { log.Fatalf("Error creating etherman client: %s", err) } - nonce, err := testEtherman.CurrentNonce(ctx, config.From) + nonce, err := testEtherman.CurrentNonce(ctx, common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266")) if err != nil { log.Fatalf("Error getting nonce: %s", err) }