Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/rollup exit tree #29

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM golang:1.22.5-alpine3.20 AS build

WORKDIR $GOPATH/src/github.com/0xPolygon/cdk

RUN apk update && apk add --no-cache make build-base git
RUN apk update && apk add --no-cache make build-base git
# INSTALL DEPENDENCIES
COPY go.mod go.sum /src/
RUN cd /src && go mod download
Expand Down
19 changes: 10 additions & 9 deletions aggoracle/chaingersender/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewEVMChainGERSender(
func (c *EVMChainGERSender) IsGERAlreadyInjected(ger common.Hash) (bool, error) {
timestamp, err := c.gerContract.GlobalExitRootMap(&bind.CallOpts{Pending: false}, ger)
if err != nil {
return false, err
return false, fmt.Errorf("error calling gerContract.GlobalExitRootMap: %w", err)
}
return timestamp.Cmp(big.NewInt(0)) != 0, nil
}
Expand All @@ -86,28 +86,29 @@ func (c *EVMChainGERSender) UpdateGERWaitUntilMined(ctx context.Context, ger com
return err
}
data, err := abi.Pack("updateGlobalExitRoot", ger)
if err != nil {
return err
}
id, err := c.ethTxMan.Add(ctx, &c.gerAddr, nil, big.NewInt(0), data, c.gasOffset, nil)
if err != nil {
return err
}
for {
time.Sleep(c.waitPeriodMonitorTx)
log.Debugf("waiting for tx %s to be mined", id.Hex())
res, err := c.ethTxMan.Result(ctx, id)
if err != nil {
log.Error("error calling ethTxMan.Result: ", err)
}
switch res.Status {
case ethtxmanager.MonitoredTxStatusCreated:
continue
case ethtxmanager.MonitoredTxStatusSent:
case ethtxmanager.MonitoredTxStatusCreated,
ethtxmanager.MonitoredTxStatusSent:
continue
case ethtxmanager.MonitoredTxStatusFailed:
return fmt.Errorf("tx %s failed", res.ID)
case ethtxmanager.MonitoredTxStatusMined:
return nil
case ethtxmanager.MonitoredTxStatusSafe:
return nil
case ethtxmanager.MonitoredTxStatusFinalized:
case ethtxmanager.MonitoredTxStatusMined,
ethtxmanager.MonitoredTxStatusSafe,
ethtxmanager.MonitoredTxStatusFinalized:
return nil
default:
log.Error("unexpected tx status: ", res.Status)
Expand Down
5 changes: 3 additions & 2 deletions aggoracle/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aggoracle_test
import (
"context"
"errors"
"fmt"
"math/big"
"strconv"
"testing"
Expand Down Expand Up @@ -59,7 +60,7 @@ func commonSetup(t *testing.T) (
require.NoError(t, err)
// Syncer
dbPathSyncer := t.TempDir()
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, common.Address{}, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down Expand Up @@ -208,6 +209,6 @@ func runTest(
require.NoError(t, err)
isInjected, err := sender.IsGERAlreadyInjected(expectedGER)
require.NoError(t, err)
require.True(t, isInjected)
require.True(t, isInjected, fmt.Sprintf("iteration %d, GER: %s", i, common.Bytes2Hex(expectedGER[:])))
}
}
53 changes: 30 additions & 23 deletions aggoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,9 @@ import (
"github.com/0xPolygon/cdk/l1infotreesync"
"github.com/0xPolygon/cdk/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)

type EthClienter interface {
ethereum.LogFilterer
ethereum.BlockNumberReader
ethereum.ChainReader
bind.ContractBackend
}

type L1InfoTreer interface {
GetLatestInfoUntilBlock(ctx context.Context, blockNum uint64) (*l1infotreesync.L1InfoTreeLeaf, error)
}
Expand All @@ -31,15 +23,15 @@ type ChainSender interface {

type AggOracle struct {
ticker *time.Ticker
l1Client EthClienter
l1Client ethereum.ChainReader
l1Info L1InfoTreer
chainSender ChainSender
blockFinality *big.Int
}

func New(
chainSender ChainSender,
l1Client EthClienter,
l1Client ethereum.ChainReader,
l1InfoTreeSyncer L1InfoTreer,
blockFinalityType etherman.BlockNumberFinality,
waitPeriodNextGER time.Duration,
Expand All @@ -59,22 +51,30 @@ func New(
}

func (a *AggOracle) Start(ctx context.Context) {
var (
blockNumToFetch uint64
gerToInject common.Hash
err error
)
for {
select {
case <-a.ticker.C:
gerToInject, err := a.getLastFinalisedGER(ctx)
blockNumToFetch, gerToInject, err = a.getLastFinalisedGER(ctx, blockNumToFetch)
if err != nil {
if err == l1infotreesync.ErrBlockNotProcessed || err == l1infotreesync.ErrNotFound {
log.Debugf("syncer is not ready: %v", err)
if err == l1infotreesync.ErrBlockNotProcessed {
log.Debugf("syncer is not ready for the block %d", blockNumToFetch)
} else if err == l1infotreesync.ErrNotFound {
blockNumToFetch = 0
log.Debugf("syncer has not found any GER until block %d", blockNumToFetch)
} else {
log.Error("error calling isGERAlreadyInjected: ", err)
log.Error("error calling getLastFinalisedGER: ", err)
}
continue
}
if alreadyInjectd, err := a.chainSender.IsGERAlreadyInjected(gerToInject); err != nil {
if alreadyInjected, err := a.chainSender.IsGERAlreadyInjected(gerToInject); err != nil {
log.Error("error calling isGERAlreadyInjected: ", err)
continue
} else if alreadyInjectd {
} else if alreadyInjected {
log.Debugf("GER %s already injected", gerToInject.Hex())
continue
}
Expand All @@ -90,14 +90,21 @@ func (a *AggOracle) Start(ctx context.Context) {
}
}

func (a *AggOracle) getLastFinalisedGER(ctx context.Context) (common.Hash, error) {
header, err := a.l1Client.HeaderByNumber(ctx, a.blockFinality)
if err != nil {
return common.Hash{}, err
// getLastFinalisedGER tries to return a finalised GER:
// If blockNumToFetch != 0: it will try to fetch it until the given block
// Else it will ask the L1 client for the latest finalised block and use that
// If it fails to get the GER from the syncer, it will retunr the block number that used to query
func (a *AggOracle) getLastFinalisedGER(ctx context.Context, blockNumToFetch uint64) (uint64, common.Hash, error) {
if blockNumToFetch == 0 {
header, err := a.l1Client.HeaderByNumber(ctx, a.blockFinality)
if err != nil {
return 0, common.Hash{}, err
}
blockNumToFetch = header.Number.Uint64()
}
info, err := a.l1Info.GetLatestInfoUntilBlock(ctx, header.Number.Uint64())
info, err := a.l1Info.GetLatestInfoUntilBlock(ctx, blockNumToFetch)
if err != nil {
return common.Hash{}, err
return blockNumToFetch, common.Hash{}, err
}
return info.GlobalExitRoot, nil
return 0, info.GlobalExitRoot, nil
}
2 changes: 1 addition & 1 deletion bridgesync/bridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func new(
return nil, err
}
if lastProcessedBlock < initialBlock {
err = processor.ProcessBlock(sync.Block{
err = processor.ProcessBlock(ctx, sync.Block{
Num: initialBlock,
})
if err != nil {
Expand Down
63 changes: 24 additions & 39 deletions bridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"log"
"math/big"
"path"

dbCommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/sync"
"github.com/0xPolygon/cdk/tree"
"github.com/ethereum/go-ethereum/common"
"github.com/iden3/go-iden3-crypto/keccak256"
"github.com/ledgerwatch/erigon-lib/kv"
Expand All @@ -19,8 +20,6 @@ import (
const (
eventsTableSufix = "-events"
lastBlockTableSufix = "-lastBlock"
rootTableSufix = "-root"
rhtTableSufix = "-rht"
)

var (
Expand Down Expand Up @@ -84,38 +83,34 @@ type processor struct {
db kv.RwDB
eventsTable string
lastBlockTable string
tree *tree
exitTree *tree.AppendOnlyTree
}

func newProcessor(ctx context.Context, dbPath, dbPrefix string) (*processor, error) {
eventsTable := dbPrefix + eventsTableSufix
lastBlockTable := dbPrefix + lastBlockTableSufix
rootTable := dbPrefix + rootTableSufix
rhtTable := dbPrefix + rhtTableSufix
db, err := mdbx.NewMDBX(nil).
Path(dbPath).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TableCfg{
eventsTable: {},
lastBlockTable: {},
rootTable: {},
rhtTable: {},
}
}).
Open()
if err != nil {
return nil, err
}

tree, err := newTree(ctx, rhtTable, rootTable, db)
exitTreeDBPath := path.Join(dbPath, "exittree")
exitTree, err := tree.NewAppendOnly(ctx, exitTreeDBPath, dbPrefix)
if err != nil {
return nil, err
}
return &processor{
db: db,
eventsTable: eventsTable,
lastBlockTable: lastBlockTable,
tree: tree,
exitTree: exitTree,
}, nil
}

Expand Down Expand Up @@ -144,7 +139,7 @@ func (p *processor) GetClaimsAndBridges(
}
defer c.Close()

for k, v, err := c.Seek(dbCommon.Uint64To2Bytes(fromBlock)); k != nil; k, v, err = c.Next() {
for k, v, err := c.Seek(dbCommon.Uint64ToBytes(fromBlock)); k != nil; k, v, err = c.Next() {
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,8 +176,8 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) {
}
}

func (p *processor) Reorg(firstReorgedBlock uint64) error {
tx, err := p.db.BeginRw(context.Background())
func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
tx, err := p.db.BeginRw(ctx)
if err != nil {
return err
}
Expand All @@ -191,7 +186,7 @@ func (p *processor) Reorg(firstReorgedBlock uint64) error {
return err
}
defer c.Close()
firstKey := dbCommon.Uint64To2Bytes(firstReorgedBlock)
firstKey := dbCommon.Uint64ToBytes(firstReorgedBlock)
firstDepositCountReorged := int64(-1)
for k, v, err := c.Seek(firstKey); k != nil; k, _, err = c.Next() {
if err != nil {
Expand Down Expand Up @@ -221,22 +216,15 @@ func (p *processor) Reorg(firstReorgedBlock uint64) error {
return err
}
if firstDepositCountReorged != -1 {
var lastValidDepositCount uint32
if firstDepositCountReorged == 0 {
lastValidDepositCount = 0
} else {
lastValidDepositCount = uint32(firstDepositCountReorged) - 1
}
if err := p.tree.reorg(tx, lastValidDepositCount); err != nil {
if err := p.exitTree.Reorg(ctx, uint32(firstDepositCountReorged)); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}

func (p *processor) ProcessBlock(block sync.Block) error {
ctx := context.Background()
func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
tx, err := p.db.BeginRw(ctx)
if err != nil {
return err
Expand All @@ -256,7 +244,7 @@ func (p *processor) ProcessBlock(block sync.Block) error {
tx.Rollback()
return err
}
if err := tx.Put(p.eventsTable, dbCommon.Uint64To2Bytes(block.Num), value); err != nil {
if err := tx.Put(p.eventsTable, dbCommon.Uint64ToBytes(block.Num), value); err != nil {
tx.Rollback()
return err
}
Expand All @@ -267,24 +255,21 @@ func (p *processor) ProcessBlock(block sync.Block) error {
return err
}

for i, bridge := range bridges {
if err := p.tree.addLeaf(tx, bridge.DepositCount, bridge.Hash()); err != nil {
if i != 0 {
tx.Rollback()
if err2 := p.tree.initLastLeftCacheAndLastDepositCount(ctx); err2 != nil {
log.Fatalf(
"after failing to add a leaf to the tree with error: %v, error initializing the cache with error: %v",
err, err2,
)
}
return err
}
}
leaves := []tree.Leaf{}
for _, bridge := range bridges {
leaves = append(leaves, tree.Leaf{
Index: bridge.DepositCount,
Hash: bridge.Hash(),
})
}
if err := p.exitTree.AddLeaves(ctx, leaves); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}

func (p *processor) updateLastProcessedBlock(tx kv.RwTx, blockNum uint64) error {
blockNumBytes := dbCommon.Uint64To2Bytes(blockNum)
blockNumBytes := dbCommon.Uint64ToBytes(blockNum)
return tx.Put(p.lastBlockTable, lastBlokcKey, blockNumBytes)
}
Loading
Loading