Skip to content

Commit

Permalink
Merge pull request #29 from 0xPolygon/feature/rollupExitTree
Browse files Browse the repository at this point in the history
Feature/rollup exit tree
  • Loading branch information
arnaubennassar authored Jul 31, 2024
2 parents bfdc806 + 4869094 commit e971132
Show file tree
Hide file tree
Showing 33 changed files with 1,713 additions and 684 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM golang:1.22.5-alpine3.20 AS build

WORKDIR $GOPATH/src/github.com/0xPolygon/cdk

RUN apk update && apk add --no-cache make build-base git
RUN apk update && apk add --no-cache make build-base git
# INSTALL DEPENDENCIES
COPY go.mod go.sum /src/
RUN cd /src && go mod download
Expand Down
19 changes: 10 additions & 9 deletions aggoracle/chaingersender/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewEVMChainGERSender(
func (c *EVMChainGERSender) IsGERAlreadyInjected(ger common.Hash) (bool, error) {
timestamp, err := c.gerContract.GlobalExitRootMap(&bind.CallOpts{Pending: false}, ger)
if err != nil {
return false, err
return false, fmt.Errorf("error calling gerContract.GlobalExitRootMap: %w", err)
}
return timestamp.Cmp(big.NewInt(0)) != 0, nil
}
Expand All @@ -86,28 +86,29 @@ func (c *EVMChainGERSender) UpdateGERWaitUntilMined(ctx context.Context, ger com
return err
}
data, err := abi.Pack("updateGlobalExitRoot", ger)
if err != nil {
return err
}
id, err := c.ethTxMan.Add(ctx, &c.gerAddr, nil, big.NewInt(0), data, c.gasOffset, nil)
if err != nil {
return err
}
for {
time.Sleep(c.waitPeriodMonitorTx)
log.Debugf("waiting for tx %s to be mined", id.Hex())
res, err := c.ethTxMan.Result(ctx, id)
if err != nil {
log.Error("error calling ethTxMan.Result: ", err)
}
switch res.Status {
case ethtxmanager.MonitoredTxStatusCreated:
continue
case ethtxmanager.MonitoredTxStatusSent:
case ethtxmanager.MonitoredTxStatusCreated,
ethtxmanager.MonitoredTxStatusSent:
continue
case ethtxmanager.MonitoredTxStatusFailed:
return fmt.Errorf("tx %s failed", res.ID)
case ethtxmanager.MonitoredTxStatusMined:
return nil
case ethtxmanager.MonitoredTxStatusSafe:
return nil
case ethtxmanager.MonitoredTxStatusFinalized:
case ethtxmanager.MonitoredTxStatusMined,
ethtxmanager.MonitoredTxStatusSafe,
ethtxmanager.MonitoredTxStatusFinalized:
return nil
default:
log.Error("unexpected tx status: ", res.Status)
Expand Down
5 changes: 3 additions & 2 deletions aggoracle/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aggoracle_test
import (
"context"
"errors"
"fmt"
"math/big"
"strconv"
"testing"
Expand Down Expand Up @@ -59,7 +60,7 @@ func commonSetup(t *testing.T) (
require.NoError(t, err)
// Syncer
dbPathSyncer := t.TempDir()
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, common.Address{}, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down Expand Up @@ -208,6 +209,6 @@ func runTest(
require.NoError(t, err)
isInjected, err := sender.IsGERAlreadyInjected(expectedGER)
require.NoError(t, err)
require.True(t, isInjected)
require.True(t, isInjected, fmt.Sprintf("iteration %d, GER: %s", i, common.Bytes2Hex(expectedGER[:])))
}
}
53 changes: 30 additions & 23 deletions aggoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,9 @@ import (
"github.com/0xPolygon/cdk/l1infotreesync"
"github.com/0xPolygon/cdk/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)

type EthClienter interface {
ethereum.LogFilterer
ethereum.BlockNumberReader
ethereum.ChainReader
bind.ContractBackend
}

type L1InfoTreer interface {
GetLatestInfoUntilBlock(ctx context.Context, blockNum uint64) (*l1infotreesync.L1InfoTreeLeaf, error)
}
Expand All @@ -31,15 +23,15 @@ type ChainSender interface {

type AggOracle struct {
ticker *time.Ticker
l1Client EthClienter
l1Client ethereum.ChainReader
l1Info L1InfoTreer
chainSender ChainSender
blockFinality *big.Int
}

func New(
chainSender ChainSender,
l1Client EthClienter,
l1Client ethereum.ChainReader,
l1InfoTreeSyncer L1InfoTreer,
blockFinalityType etherman.BlockNumberFinality,
waitPeriodNextGER time.Duration,
Expand All @@ -59,22 +51,30 @@ func New(
}

func (a *AggOracle) Start(ctx context.Context) {
var (
blockNumToFetch uint64
gerToInject common.Hash
err error
)
for {
select {
case <-a.ticker.C:
gerToInject, err := a.getLastFinalisedGER(ctx)
blockNumToFetch, gerToInject, err = a.getLastFinalisedGER(ctx, blockNumToFetch)
if err != nil {
if err == l1infotreesync.ErrBlockNotProcessed || err == l1infotreesync.ErrNotFound {
log.Debugf("syncer is not ready: %v", err)
if err == l1infotreesync.ErrBlockNotProcessed {
log.Debugf("syncer is not ready for the block %d", blockNumToFetch)
} else if err == l1infotreesync.ErrNotFound {
blockNumToFetch = 0
log.Debugf("syncer has not found any GER until block %d", blockNumToFetch)
} else {
log.Error("error calling isGERAlreadyInjected: ", err)
log.Error("error calling getLastFinalisedGER: ", err)
}
continue
}
if alreadyInjectd, err := a.chainSender.IsGERAlreadyInjected(gerToInject); err != nil {
if alreadyInjected, err := a.chainSender.IsGERAlreadyInjected(gerToInject); err != nil {
log.Error("error calling isGERAlreadyInjected: ", err)
continue
} else if alreadyInjectd {
} else if alreadyInjected {
log.Debugf("GER %s already injected", gerToInject.Hex())
continue
}
Expand All @@ -90,14 +90,21 @@ func (a *AggOracle) Start(ctx context.Context) {
}
}

func (a *AggOracle) getLastFinalisedGER(ctx context.Context) (common.Hash, error) {
header, err := a.l1Client.HeaderByNumber(ctx, a.blockFinality)
if err != nil {
return common.Hash{}, err
// getLastFinalisedGER tries to return a finalised GER:
// If blockNumToFetch != 0: it will try to fetch it until the given block
// Else it will ask the L1 client for the latest finalised block and use that
// If it fails to get the GER from the syncer, it will retunr the block number that used to query
func (a *AggOracle) getLastFinalisedGER(ctx context.Context, blockNumToFetch uint64) (uint64, common.Hash, error) {
if blockNumToFetch == 0 {
header, err := a.l1Client.HeaderByNumber(ctx, a.blockFinality)
if err != nil {
return 0, common.Hash{}, err
}
blockNumToFetch = header.Number.Uint64()
}
info, err := a.l1Info.GetLatestInfoUntilBlock(ctx, header.Number.Uint64())
info, err := a.l1Info.GetLatestInfoUntilBlock(ctx, blockNumToFetch)
if err != nil {
return common.Hash{}, err
return blockNumToFetch, common.Hash{}, err
}
return info.GlobalExitRoot, nil
return 0, info.GlobalExitRoot, nil
}
2 changes: 1 addition & 1 deletion bridgesync/bridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func new(
return nil, err
}
if lastProcessedBlock < initialBlock {
err = processor.ProcessBlock(sync.Block{
err = processor.ProcessBlock(ctx, sync.Block{
Num: initialBlock,
})
if err != nil {
Expand Down
63 changes: 24 additions & 39 deletions bridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"log"
"math/big"
"path"

dbCommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/sync"
"github.com/0xPolygon/cdk/tree"
"github.com/ethereum/go-ethereum/common"
"github.com/iden3/go-iden3-crypto/keccak256"
"github.com/ledgerwatch/erigon-lib/kv"
Expand All @@ -19,8 +20,6 @@ import (
const (
eventsTableSufix = "-events"
lastBlockTableSufix = "-lastBlock"
rootTableSufix = "-root"
rhtTableSufix = "-rht"
)

var (
Expand Down Expand Up @@ -84,38 +83,34 @@ type processor struct {
db kv.RwDB
eventsTable string
lastBlockTable string
tree *tree
exitTree *tree.AppendOnlyTree
}

func newProcessor(ctx context.Context, dbPath, dbPrefix string) (*processor, error) {
eventsTable := dbPrefix + eventsTableSufix
lastBlockTable := dbPrefix + lastBlockTableSufix
rootTable := dbPrefix + rootTableSufix
rhtTable := dbPrefix + rhtTableSufix
db, err := mdbx.NewMDBX(nil).
Path(dbPath).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TableCfg{
eventsTable: {},
lastBlockTable: {},
rootTable: {},
rhtTable: {},
}
}).
Open()
if err != nil {
return nil, err
}

tree, err := newTree(ctx, rhtTable, rootTable, db)
exitTreeDBPath := path.Join(dbPath, "exittree")
exitTree, err := tree.NewAppendOnly(ctx, exitTreeDBPath, dbPrefix)
if err != nil {
return nil, err
}
return &processor{
db: db,
eventsTable: eventsTable,
lastBlockTable: lastBlockTable,
tree: tree,
exitTree: exitTree,
}, nil
}

Expand Down Expand Up @@ -144,7 +139,7 @@ func (p *processor) GetClaimsAndBridges(
}
defer c.Close()

for k, v, err := c.Seek(dbCommon.Uint64To2Bytes(fromBlock)); k != nil; k, v, err = c.Next() {
for k, v, err := c.Seek(dbCommon.Uint64ToBytes(fromBlock)); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,8 +176,8 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) {
}
}

func (p *processor) Reorg(firstReorgedBlock uint64) error {
tx, err := p.db.BeginRw(context.Background())
func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
tx, err := p.db.BeginRw(ctx)
if err != nil {
return err
}
Expand All @@ -191,7 +186,7 @@ func (p *processor) Reorg(firstReorgedBlock uint64) error {
return err
}
defer c.Close()
firstKey := dbCommon.Uint64To2Bytes(firstReorgedBlock)
firstKey := dbCommon.Uint64ToBytes(firstReorgedBlock)
firstDepositCountReorged := int64(-1)
for k, v, err := c.Seek(firstKey); k != nil; k, _, err = c.Next() {
if err != nil {
Expand Down Expand Up @@ -221,22 +216,15 @@ func (p *processor) Reorg(firstReorgedBlock uint64) error {
return err
}
if firstDepositCountReorged != -1 {
var lastValidDepositCount uint32
if firstDepositCountReorged == 0 {
lastValidDepositCount = 0
} else {
lastValidDepositCount = uint32(firstDepositCountReorged) - 1
}
if err := p.tree.reorg(tx, lastValidDepositCount); err != nil {
if err := p.exitTree.Reorg(ctx, uint32(firstDepositCountReorged)); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}

func (p *processor) ProcessBlock(block sync.Block) error {
ctx := context.Background()
func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
tx, err := p.db.BeginRw(ctx)
if err != nil {
return err
Expand All @@ -256,7 +244,7 @@ func (p *processor) ProcessBlock(block sync.Block) error {
tx.Rollback()
return err
}
if err := tx.Put(p.eventsTable, dbCommon.Uint64To2Bytes(block.Num), value); err != nil {
if err := tx.Put(p.eventsTable, dbCommon.Uint64ToBytes(block.Num), value); err != nil {
tx.Rollback()
return err
}
Expand All @@ -267,24 +255,21 @@ func (p *processor) ProcessBlock(block sync.Block) error {
return err
}

for i, bridge := range bridges {
if err := p.tree.addLeaf(tx, bridge.DepositCount, bridge.Hash()); err != nil {
if i != 0 {
tx.Rollback()
if err2 := p.tree.initLastLeftCacheAndLastDepositCount(ctx); err2 != nil {
log.Fatalf(
"after failing to add a leaf to the tree with error: %v, error initializing the cache with error: %v",
err, err2,
)
}
return err
}
}
leaves := []tree.Leaf{}
for _, bridge := range bridges {
leaves = append(leaves, tree.Leaf{
Index: bridge.DepositCount,
Hash: bridge.Hash(),
})
}
if err := p.exitTree.AddLeaves(ctx, leaves); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}

func (p *processor) updateLastProcessedBlock(tx kv.RwTx, blockNum uint64) error {
blockNumBytes := dbCommon.Uint64To2Bytes(blockNum)
blockNumBytes := dbCommon.Uint64ToBytes(blockNum)
return tx.Put(p.lastBlockTable, lastBlokcKey, blockNumBytes)
}
Loading

0 comments on commit e971132

Please sign in to comment.