Skip to content

Commit

Permalink
consolidated state (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM authored Feb 9, 2024
1 parent 64b5aad commit c424fc6
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 37 deletions.
9 changes: 4 additions & 5 deletions ethtxmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/0xPolygonHermez/zkevm-ethtx-manager/config/types"
"github.com/0xPolygonHermez/zkevm-ethtx-manager/etherman"
"github.com/0xPolygonHermez/zkevm-ethtx-manager/log"
"github.com/ethereum/go-ethereum/common"
)

// Config is configuration for ethereum transaction manager
Expand All @@ -13,8 +12,10 @@ type Config struct {
FrequencyToMonitorTxs types.Duration `mapstructure:"FrequencyToMonitorTxs"`
// WaitTxToBeMined time to wait after transaction was sent to the ethereum
WaitTxToBeMined types.Duration `mapstructure:"WaitTxToBeMined"`
// L1ConfirmationBlocks is the number of blocks to wait for a L1 tx to be confirmed
L1ConfirmationBlocks uint64 `mapstructure:"L1ConfirmationBlocks"`
// ConsolidationL1ConfirmationBlocks is the number of blocks to wait for a L1 tx to be consolidated
ConsolidationL1ConfirmationBlocks uint64 `mapstructure:"L1ConsolidationConfirmationBlocks"`
// FinalizationL1ConfirmationBlocks is the number of blocks to wait for a L1 tx to be finalized
FinalizationL1ConfirmationBlocks uint64 `mapstructure:"L1FinalizationConfirmationBlocks"`

// PrivateKeys defines all the key store files that are going
// to be read in order to provide the private keys to sign the L1 txs
Expand Down Expand Up @@ -55,8 +56,6 @@ type Config struct {
// max gas price limit: 110
// tx gas price = 110
MaxGasPriceLimit uint64 `mapstructure:"MaxGasPriceLimit"`
// Sender Address
From common.Address `mapstructure:"From"`
// PersistenceFilename is the filename to store the memory storage
PersistenceFilename string `mapstructure:"PersistenceFilename"`
// Etherman configuration
Expand Down
76 changes: 55 additions & 21 deletions ethtxmanager/ethtxmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Client struct {
cfg Config
etherman ethermanInterface
storage storageInterface
from common.Address
}

type pending struct {
Expand Down Expand Up @@ -79,6 +80,7 @@ func New(cfg Config) (*Client, error) {
cfg: cfg,
etherman: etherman,
storage: NewMemStorage(cfg.PersistenceFilename),
from: auth.From,
}

log.Init(cfg.Log)
Expand Down Expand Up @@ -147,11 +149,10 @@ func pendingL1Txs(URL string, from common.Address, httpHeaders map[string]string
func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint64, value *big.Int, data []byte) (common.Hash, error) {
var nonce uint64
var err error
var from = c.cfg.From

if forcedNonce == nil {
// get next nonce
nonce, err = c.etherman.CurrentNonce(ctx, from)
nonce, err = c.etherman.CurrentNonce(ctx, c.from)
if err != nil {
err := fmt.Errorf("failed to get current nonce: %w", err)
log.Errorf(err.Error())
Expand All @@ -162,11 +163,11 @@ func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint6
}

// get gas
gas, err := c.etherman.EstimateGas(ctx, from, to, value, data)
gas, err := c.etherman.EstimateGas(ctx, c.from, to, value, data)
if err != nil {
err := fmt.Errorf("failed to estimate gas: %w, data: %v", err, common.Bytes2Hex(data))
log.Error(err.Error())
log.Debugf("failed to estimate gas for tx: from: %v, to: %v, value: %v", from.String(), to.String(), value.String())
log.Debugf("failed to estimate gas for tx: from: %v, to: %v, value: %v", c.from.String(), to.String(), value.String())
if c.cfg.ForcedGas > 0 {
gas = c.cfg.ForcedGas
} else {
Expand Down Expand Up @@ -194,7 +195,7 @@ func (c *Client) Add(ctx context.Context, to *common.Address, forcedNonce *uint6

// create monitored tx
mTx := monitoredTx{
ID: id, From: from, To: to,
ID: id, From: c.from, To: to,
Nonce: nonce, Value: value, Data: data,
Gas: gas, GasPrice: gasPrice,
Status: MonitoredTxStatusCreated,
Expand Down Expand Up @@ -251,15 +252,13 @@ func (c *Client) Result(ctx context.Context, id common.Hash) (MonitoredTxResult,
return c.buildResult(ctx, mTx)
}

// SetStatusDone sets the status of a monitored tx to MonitoredStatusDone.
// this method is provided to the callers to decide when a monitored tx should be
// considered done, so they can start to ignore it when querying it by Status.
func (c *Client) setStatusFinalized(ctx context.Context, id common.Hash) error {
// setStatusConsolidated sets the status of a monitored tx to MonitoredStatusConsolidated.
func (c *Client) setStatusConsolidated(ctx context.Context, id common.Hash) error {
mTx, err := c.storage.Get(ctx, id)
if err != nil {
return err
}
mTx.Status = MonitoredTxStatusFinalized
mTx.Status = MonitoredTxStatusConsolidated
return c.storage.Update(ctx, mTx)
}

Expand Down Expand Up @@ -310,7 +309,7 @@ func (c *Client) buildResult(ctx context.Context, mTx monitoredTx) (MonitoredTxR
func (c *Client) Start() {
// If no persistence file is uses check L1 for pending txs
if c.cfg.PersistenceFilename == "" {
pendingTxs, err := pendingL1Txs(c.cfg.Etherman.URL, c.cfg.From, c.cfg.Etherman.HTTPHeaders)
pendingTxs, err := pendingL1Txs(c.cfg.Etherman.URL, c.from, c.cfg.Etherman.HTTPHeaders)
if err != nil {
log.Errorf("failed to get pending txs from L1: %v", err)
}
Expand All @@ -337,9 +336,13 @@ func (c *Client) Start() {
if err != nil {
c.logErrorAndWait("failed to monitor txs: %v", err)
}
err = c.waitMinedTxTobeFinalized(context.Background())
err = c.waitMinedTxToBeConsolidated(context.Background())
if err != nil {
c.logErrorAndWait("failed to wait mined tx to be finalized: %v", err)
c.logErrorAndWait("failed to wait consolidated tx to be finalized: %v", err)
}
err = c.waitConsolidatedTxToBeFinalized(context.Background())
if err != nil {
c.logErrorAndWait("failed to wait consolidated tx to be finalized: %v", err)
}
}
}
Expand Down Expand Up @@ -380,9 +383,9 @@ func (c *Client) monitorTxs(ctx context.Context) error {
return nil
}

// waitMinedTxTobeFinalized checks all mined monitored txs and wait the number of
// l1 blocks configured to confirm the tx
func (c *Client) waitMinedTxTobeFinalized(ctx context.Context) error {
// waitMinedTxToBeConsolidated checks all consolidated monitored txs and wait the number of
// l1 blocks configured to consolidated the tx
func (c *Client) waitMinedTxToBeConsolidated(ctx context.Context) error {
statusesFilter := []MonitoredTxStatus{MonitoredTxStatusMined}
mTxs, err := c.storage.GetByStatus(ctx, statusesFilter)
if err != nil {
Expand All @@ -397,13 +400,44 @@ func (c *Client) waitMinedTxTobeFinalized(ctx context.Context) error {
}

for _, mTx := range mTxs {
if mTx.BlockNumber.Uint64()+c.cfg.L1ConfirmationBlocks <= currentBlockNumber {
if mTx.BlockNumber.Uint64()+c.cfg.ConsolidationL1ConfirmationBlocks <= currentBlockNumber {
mTxLogger := createMonitoredTxLogger(mTx)
mTxLogger.Infof("consolidated")
mTx.Status = MonitoredTxStatusConsolidated
err := c.storage.Update(ctx, mTx)
if err != nil {
return fmt.Errorf("failed to update mined monitored tx: %v", err)
}
}
}

return nil
}

// waitConsolidatedTxToBeFinalized checks all consolidated monitored txs and wait the number of
// l1 blocks configured to finalize the tx
func (c *Client) waitConsolidatedTxToBeFinalized(ctx context.Context) error {
statusesFilter := []MonitoredTxStatus{MonitoredTxStatusConsolidated}
mTxs, err := c.storage.GetByStatus(ctx, statusesFilter)
if err != nil {
return fmt.Errorf("failed to get consolidated monitored txs: %v", err)
}

log.Debugf("found %v consolidated monitored tx to process", len(mTxs))

currentBlockNumber, err := c.etherman.GetLatestBlockNumber(ctx)
if err != nil {
return fmt.Errorf("failed to get latest block number: %v", err)
}

for _, mTx := range mTxs {
if mTx.BlockNumber.Uint64()+c.cfg.FinalizationL1ConfirmationBlocks <= currentBlockNumber {
mTxLogger := createMonitoredTxLogger(mTx)
mTxLogger.Infof("finalized")
mTx.Status = MonitoredTxStatusFinalized
err := c.storage.Update(ctx, mTx)
if err != nil {
return fmt.Errorf("failed to update mined monitored tx: %v", err)
return fmt.Errorf("failed to update consolidated monitored tx: %v", err)
}
}
}
Expand Down Expand Up @@ -746,14 +780,14 @@ func (c *Client) ProcessPendingMonitoredTxs(ctx context.Context, resultHandler R

// if the result is confirmed, we set it as done do stop looking into this monitored tx
if result.Status == MonitoredTxStatusMined {
err := c.setStatusFinalized(ctx, result.ID)
err := c.setStatusConsolidated(ctx, result.ID)
if err != nil {
mTxResultLogger.Errorf("failed to set monitored tx as done, err: %v", err)
mTxResultLogger.Errorf("failed to set monitored tx as consolidated, err: %v", err)
// if something goes wrong at this point, we skip this result and move to the next.
// this result is going to be handled again in the next cycle by the outer loop.
continue
} else {
mTxResultLogger.Info("monitored tx confirmed")
mTxResultLogger.Info("monitored tx consolidated")
}
resultHandler(result)
continue
Expand Down
3 changes: 3 additions & 0 deletions ethtxmanager/memstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// MemStorage hold txs to be managed
type MemStorage struct {
TxsMutex sync.RWMutex
FileMutex sync.RWMutex
Transactions map[common.Hash]monitoredTx
PersistenceFilename string
}
Expand Down Expand Up @@ -51,6 +52,8 @@ func (s *MemStorage) persist() {
if s.PersistenceFilename != "" {
s.TxsMutex.RLock()
defer s.TxsMutex.RUnlock()
s.FileMutex.Lock()
defer s.FileMutex.Unlock()
jsonFile, _ := json.Marshal(s.Transactions)
err := os.WriteFile(s.PersistenceFilename+".tmp", jsonFile, 0644) // nolint:gosec, gomnd
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion ethtxmanager/monitoredtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ const (
// status is Successful
MonitoredTxStatusMined = MonitoredTxStatus("mined")

// MonitoredTxStatusFinalized means the tx was set by the owner as finalized
// MonitoredTxStatusConsolidated means the tx was already mined N blocks ago
MonitoredTxStatusConsolidated = MonitoredTxStatus("consolidated")

// MonitoredTxStatusFinalized means the tx was already mined M (M > N) blocks ago
MonitoredTxStatusFinalized = MonitoredTxStatus("finalized")
)

Expand Down
20 changes: 10 additions & 10 deletions test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ var (

func main() {
config := ethtxmanager.Config{
FrequencyToMonitorTxs: types.Duration{Duration: 1 * time.Second},
WaitTxToBeMined: types.Duration{Duration: 2 * time.Minute},
L1ConfirmationBlocks: 4,
PrivateKeys: []types.KeystoreFileConfig{{Path: "test.keystore", Password: "testonly"}},
ForcedGas: 0,
GasPriceMarginFactor: 1,
MaxGasPriceLimit: 0,
From: common.HexToAddress("0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"),
PersistenceFilename: "ethtxmanager-persistence.json",
FrequencyToMonitorTxs: types.Duration{Duration: 1 * time.Second},
WaitTxToBeMined: types.Duration{Duration: 2 * time.Minute},
ConsolidationL1ConfirmationBlocks: 5,
FinalizationL1ConfirmationBlocks: 10,
PrivateKeys: []types.KeystoreFileConfig{{Path: "test.keystore", Password: "testonly"}},
ForcedGas: 0,
GasPriceMarginFactor: 1,
MaxGasPriceLimit: 0,
PersistenceFilename: "ethtxmanager-persistence.json",
Etherman: etherman.Config{
URL: "http://localhost:8545",
HTTPHeaders: map[string]string{},
Expand All @@ -50,7 +50,7 @@ func main() {
if err != nil {
log.Fatalf("Error creating etherman client: %s", err)
}
nonce, err := testEtherman.CurrentNonce(ctx, config.From)
nonce, err := testEtherman.CurrentNonce(ctx, common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"))
if err != nil {
log.Fatalf("Error getting nonce: %s", err)
}
Expand Down

0 comments on commit c424fc6

Please sign in to comment.