Skip to content

Commit

Permalink
lastgersync to sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Sep 12, 2024
1 parent d17db59 commit fb289c9
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 279 deletions.
2 changes: 1 addition & 1 deletion bridgesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
}
}

if err := tx.Commit(); err != nil {
if err = tx.Commit(); err != nil {
return err
}
p.log.Debugf("processed %d events until block %d", len(block.Events), block.Num)
Expand Down
10 changes: 5 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/0xPolygon/cdk/etherman"
ethermanconfig "github.com/0xPolygon/cdk/etherman/config"
"github.com/0xPolygon/cdk/etherman/contracts"
"github.com/0xPolygon/cdk/injectedgersync"
"github.com/0xPolygon/cdk/l1bridge2infoindexsync"
"github.com/0xPolygon/cdk/l1infotreesync"
"github.com/0xPolygon/cdk/lastgersync"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/reorgdetector"
"github.com/0xPolygon/cdk/rpc"
Expand Down Expand Up @@ -568,15 +568,15 @@ func runL1Bridge2InfoIndexSyncIfNeeded(
func runLastGERSyncIfNeeded(
ctx context.Context,
components []string,
cfg lastgersync.Config,
cfg injectedgersync.Config,
reorgDetectorL2 *reorgdetector.ReorgDetector,
l2Client *ethclient.Client,
l1InfoTreeSync *l1infotreesync.L1InfoTreeSync,
) *lastgersync.LastGERSync {
) *injectedgersync.LastGERSync {
if !isNeeded([]string{RPC}, components) {
return nil
}
lastGERSync, err := lastgersync.New(
lastGERSync, err := injectedgersync.New(
ctx,
cfg.DBPath,
reorgDetectorL2,
Expand Down Expand Up @@ -663,7 +663,7 @@ func createRPC(
sponsor *claimsponsor.ClaimSponsor,
l1InfoTree *l1infotreesync.L1InfoTreeSync,
l1Bridge2Index *l1bridge2infoindexsync.L1Bridge2InfoIndexSync,
injectedGERs *lastgersync.LastGERSync,
injectedGERs *injectedgersync.LastGERSync,
bridgeL1 *bridgesync.BridgeSync,
bridgeL2 *bridgesync.BridgeSync,
) *jRPC.Server {
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/0xPolygon/cdk/claimsponsor"
"github.com/0xPolygon/cdk/common"
ethermanconfig "github.com/0xPolygon/cdk/etherman/config"
"github.com/0xPolygon/cdk/injectedgersync"
"github.com/0xPolygon/cdk/l1bridge2infoindexsync"
"github.com/0xPolygon/cdk/l1infotreesync"
"github.com/0xPolygon/cdk/lastgersync"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/reorgdetector"
"github.com/0xPolygon/cdk/sequencesender"
Expand Down Expand Up @@ -106,7 +106,7 @@ type Config struct {

// LastGERSync is the config for the synchronizer in charge of syncing the last GER injected on L2.
// Needed for the bridge service (RPC)
LastGERSync lastgersync.Config
LastGERSync injectedgersync.Config
}

// Default parses the default configuration values.
Expand Down
2 changes: 1 addition & 1 deletion lastgersync/config.go → injectedgersync/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package lastgersync
package injectedgersync

import (
"github.com/0xPolygon/cdk/config/types"
Expand Down
10 changes: 5 additions & 5 deletions lastgersync/e2e_test.go → injectedgersync/e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package lastgersync_test
package injectedgersync_test

import (
"context"
Expand All @@ -8,7 +8,7 @@ import (
"time"

"github.com/0xPolygon/cdk/etherman"
"github.com/0xPolygon/cdk/lastgersync"
"github.com/0xPolygon/cdk/injectedgersync"
"github.com/0xPolygon/cdk/test/helpers"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,7 +19,7 @@ func TestE2E(t *testing.T) {
ctx := context.Background()
env := helpers.SetupAggoracleWithEVMChain(t)
dbPathSyncer := t.TempDir()
syncer, err := lastgersync.New(
syncer, err := injectedgersync.New(
ctx,
dbPathSyncer,
env.ReorgDetector,
Expand Down Expand Up @@ -64,8 +64,8 @@ func TestE2E(t *testing.T) {
}
require.True(t, syncerUpToDate, errMsg)

_, actualGER, err := syncer.GetFirstGERAfterL1InfoTreeIndex(ctx, uint32(i))
injected, err := syncer.GetFirstGERAfterL1InfoTreeIndex(ctx, uint32(i))
require.NoError(t, err)
require.Equal(t, common.Hash(expectedGER), actualGER)
require.Equal(t, common.Hash(expectedGER), injected.GlobalExitRoot)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package lastgersync
package injectedgersync

import (
"context"
Expand Down Expand Up @@ -64,7 +64,7 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC
err error
)
for {
lastIndex, err = d.processor.getLastIndex(ctx)
lastIndex, err = d.processor.getLastIndex()
if err == ErrNotFound {
lastIndex = 0
} else if err != nil {
Expand All @@ -86,7 +86,7 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC
lastBlock := d.WaitForNewBlocks(ctx, fromBlock)

attempts = 0
var gers []Event
var gers []InjectedGER
for {
gers, err = d.getGERsFromIndex(ctx, lastIndex)
if err != nil {
Expand All @@ -112,12 +112,12 @@ func (d *downloader) Download(ctx context.Context, fromBlock uint64, downloadedC

downloadedCh <- *block
if block.Events != nil {
lastIndex = block.Events[0].(Event).L1InfoTreeIndex
lastIndex = block.Events[0].(InjectedGER).L1InfoTreeIndex
}
}
}

func (d *downloader) getGERsFromIndex(ctx context.Context, fromL1InfoTreeIndex uint32) ([]Event, error) {
func (d *downloader) getGERsFromIndex(ctx context.Context, fromL1InfoTreeIndex uint32) ([]InjectedGER, error) {
lastRoot, err := d.l1InfoTreesync.GetLastL1InfoTreeRoot(ctx)
if err == tree.ErrNotFound {
return nil, nil
Expand All @@ -126,13 +126,13 @@ func (d *downloader) getGERsFromIndex(ctx context.Context, fromL1InfoTreeIndex u
return nil, fmt.Errorf("error calling GetLastL1InfoTreeRoot: %v", err)
}

gers := []Event{}
gers := []InjectedGER{}
for i := fromL1InfoTreeIndex; i <= lastRoot.Index; i++ {
info, err := d.l1InfoTreesync.GetInfoByIndex(ctx, i)
if err != nil {
return nil, fmt.Errorf("error calling GetInfoByIndex: %v", err)
}
gers = append(gers, Event{
gers = append(gers, InjectedGER{
L1InfoTreeIndex: i,
GlobalExitRoot: info.GlobalExitRoot,
})
Expand All @@ -141,7 +141,7 @@ func (d *downloader) getGERsFromIndex(ctx context.Context, fromL1InfoTreeIndex u
return gers, nil
}

func (d *downloader) setGreatestGERInjectedFromList(b *sync.EVMBlock, list []Event) {
func (d *downloader) setGreatestGERInjectedFromList(b *sync.EVMBlock, list []InjectedGER) {
for _, event := range list {
var attempts int
for {
Expand Down
4 changes: 2 additions & 2 deletions lastgersync/lastgersync.go → injectedgersync/lastgersync.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package lastgersync
package injectedgersync

import (
"context"
Expand Down Expand Up @@ -75,7 +75,7 @@ func (s *LastGERSync) Start(ctx context.Context) {
s.driver.Sync(ctx)
}

func (s *LastGERSync) GetFirstGERAfterL1InfoTreeIndex(ctx context.Context, atOrAfterL1InfoTreeIndex uint32) (injectedL1InfoTreeIndex uint32, ger common.Hash, err error) {
func (s *LastGERSync) GetFirstGERAfterL1InfoTreeIndex(ctx context.Context, atOrAfterL1InfoTreeIndex uint32) (*InjectedGER, error) {
return s.processor.GetFirstGERAfterL1InfoTreeIndex(ctx, atOrAfterL1InfoTreeIndex)
}

Expand Down
17 changes: 17 additions & 0 deletions injectedgersync/migrations/lastgersync0001.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- +migrate Down
DROP TABLE IF EXISTS block;
DROP TABLE IF EXISTS claim;
DROP TABLE IF EXISTS bridge;

-- +migrate Up
CREATE TABLE block (
num BIGINT PRIMARY KEY
);

CREATE TABLE injected_ger (
block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE,
block_pos INTEGER NOT NULL,
l1_info_tree_index INTEGER NOT NULL,
global_exit_root VARCHAR NOT NULL,
PRIMARY KEY (block_num, block_pos)
);
29 changes: 29 additions & 0 deletions injectedgersync/migrations/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package migrations

import (
_ "embed"
"strings"

"github.com/0xPolygon/cdk/db"
migrate "github.com/rubenv/sql-migrate"
)

const upDownSeparator = "-- +migrate Up"

//go:embed lastgersync0001.sql
var mig001 string
var mig001splitted = strings.Split(mig001, upDownSeparator)

var lastgerMigrations = &migrate.MemoryMigrationSource{
Migrations: []*migrate.Migration{
{
Id: "bridgesync001",
Up: []string{mig001splitted[1]},
Down: []string{mig001splitted[0]},
},
},
}

func RunMigrations(dbPath string) error {
return db.RunMigrations(dbPath, lastgerMigrations)
}
129 changes: 129 additions & 0 deletions injectedgersync/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package injectedgersync

import (
"context"
"database/sql"
"errors"

"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/injectedgersync/migrations"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/sync"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/russross/meddler"
)

var (
// TODO: use db.ErrNotFound instead
ErrNotFound = errors.New("not found")
)

type InjectedGER struct {
BlockNum uint64 `meddler:"block_num"`
BlockPos uint64 `meddler:"block_pos"`
L1InfoTreeIndex uint32 `meddler:"l1_info_tree_index"`
GlobalExitRoot ethCommon.Hash `meddler:"global_exit_root,hash"`
}

type blockWithGERs struct {
// inclusive
FirstIndex uint32
// not inclusive
LastIndex uint32
}

type processor struct {
db *sql.DB
}

func newProcessor(dbPath string) (*processor, error) {
err := migrations.RunMigrations(dbPath)
if err != nil {
return nil, err
}
db, err := db.NewSQLiteDB(dbPath)
if err != nil {
return nil, err
}
return &processor{
db: db,
}, nil
}

// GetLastProcessedBlockAndL1InfoTreeIndex returns the last processed block oby the processor, including blocks
// that don't have events
func (p *processor) GetLastProcessedBlock(ctx context.Context) (uint64, error) {
return p.getLastProcessedBlockWithTx(p.db)
}

func (p *processor) getLastIndex() (uint32, error) {
var lastIndex uint32
row := p.db.QueryRow("SELECT l1_info_tree_index FROM injected_ger ORDER BY l1_info_tree_index DESC LIMIT 1;")
err := row.Scan(&lastIndex)
if errors.Is(err, sql.ErrNoRows) {
return 0, ErrNotFound
}
return lastIndex, err
}

func (p *processor) getLastProcessedBlockWithTx(tx db.DBer) (uint64, error) {
var lastProcessedBlock uint64
row := tx.QueryRow("SELECT num FROM BLOCK ORDER BY num DESC LIMIT 1;")
err := row.Scan(&lastProcessedBlock)
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return lastProcessedBlock, err
}

func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
tx, err := db.NewTx(ctx, p.db)
if err != nil {
return err
}
defer func() {
if err != nil {
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
}
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, block.Num); err != nil {
return err
}

for _, e := range block.Events {
event := e.(InjectedGER)
if err = meddler.Insert(tx, "injected_ger", event); err != nil {
return err
}
}

err = tx.Commit()
return err
}

func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
_, err := p.db.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock)
return err
}

// GetFirstGERAfterL1InfoTreeIndex returns the first GER injected on the chain that is related to l1InfoTreeIndex
// or greater
func (p *processor) GetFirstGERAfterL1InfoTreeIndex(ctx context.Context, l1InfoTreeIndex uint32) (*InjectedGER, error) {
injectedGER := &InjectedGER{}
if err := meddler.QueryRow(p.db, injectedGER, `
SELECT * FROM injected_ger
WHERE l1_info_tree_index >= $1
ORDER BY l1_info_tree_index ASC
LIMIT 1;
`, l1InfoTreeIndex); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
return nil, err
}

return injectedGER, nil
}
Loading

0 comments on commit fb289c9

Please sign in to comment.