Skip to content

Commit

Permalink
feat: cherry pick
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jan 24, 2025
1 parent 0017ded commit 6b0e1b2
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 40 deletions.
7 changes: 4 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ func waitSignal(cancelFuncs []context.CancelFunc) {
func newReorgDetector(
cfg *reorgdetector.Config,
client *ethclient.Client,
network reorgdetector.Network,
) *reorgdetector.ReorgDetector {
rd, err := reorgdetector.New(client, *cfg)
rd, err := reorgdetector.New(client, *cfg, network)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -595,7 +596,7 @@ func runReorgDetectorL1IfNeeded(
components) {
return nil, nil
}
rd := newReorgDetector(cfg, l1Client)
rd := newReorgDetector(cfg, l1Client, reorgdetector.L1)

errChan := make(chan error)
go func() {
Expand All @@ -617,7 +618,7 @@ func runReorgDetectorL2IfNeeded(
if !isNeeded([]string{cdkcommon.AGGORACLE, cdkcommon.BRIDGE, cdkcommon.AGGSENDER}, components) {
return nil, nil
}
rd := newReorgDetector(cfg, l2Client)
rd := newReorgDetector(cfg, l2Client, reorgdetector.L2)

errChan := make(chan error)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestWithReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 30)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestStressAndReorgs(t *testing.T) {

client, auth, gerAddr, verifyAddr, gerSc, verifySC := newSimulatedClient(t)

rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, reorgdetector.L1)
require.NoError(t, err)
require.NoError(t, rd.Start(ctx))

Expand Down
5 changes: 5 additions & 0 deletions l1infotreesync/migrations/l1infotreesync0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +migrate Down
ALTER TABLE block DROP COLUMN hash;

-- +migrate Up
ALTER TABLE block ADD COLUMN hash VARCHAR;
7 changes: 7 additions & 0 deletions l1infotreesync/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var mig001 string
//go:embed l1infotreesync0002.sql
var mig002 string

//go:embed l1infotreesync0003.sql
var mig003 string

func RunMigrations(dbPath string) error {
migrations := []types.Migration{
{
Expand All @@ -29,6 +32,10 @@ func RunMigrations(dbPath string) error {
ID: "l1infotreesync0002",
SQL: mig002,
},
{
ID: "l1infotreesync0003",
SQL: mig003,
},
}
for _, tm := range treeMigrations.Migrations {
migrations = append(migrations, types.Migration{
Expand Down
45 changes: 27 additions & 18 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type processor struct {
mu mutex.RWMutex
halted bool
haltedReason string
log *log.Logger
}

// UpdateL1InfoTree representation of the UpdateL1InfoTree event
Expand Down Expand Up @@ -151,6 +152,7 @@ func newProcessor(dbPath string) (*processor, error) {
db: db,
l1InfoTree: tree.NewAppendOnlyTree(db, migrations.L1InfoTreePrefix),
rollupExitTree: tree.NewUpdatableTree(db, migrations.RollupExitTreePrefix),
log: log.WithFields("processor", "l1infotreesync"),
}, nil
}

Expand Down Expand Up @@ -178,7 +180,7 @@ func (p *processor) GetLatestInfoUntilBlock(ctx context.Context, blockNum uint64
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Warnf("error rolling back tx: %v", err)
p.log.Warnf("error rolling back tx: %v", err)
}
}()

Expand Down Expand Up @@ -235,6 +237,8 @@ func (p *processor) getLastProcessedBlockWithTx(tx db.Querier) (uint64, error) {
// Reorg triggers a purge and reset process on the processor to leaf it on a state
// as if the last block processed was firstReorgedBlock-1
func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
p.log.Infof("reorging to block %d", firstReorgedBlock)

tx, err := db.NewTx(ctx, p.db)
if err != nil {
return err
Expand All @@ -243,7 +247,7 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
defer func() {
if shouldRollback {
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
p.log.Errorf("error while rolling back tx %v", errRllbck)
}
}
}()
Expand All @@ -269,6 +273,8 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
return err
}

p.log.Infof("reorged to block %d, %d rows affected", firstReorgedBlock, rowsAffected)

shouldRollback = false

sync.UnhaltIfAffectedRows(&p.halted, &p.haltedReason, &p.mu, rowsAffected)
Expand All @@ -279,25 +285,25 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
// and updates the last processed block (can be called without events for that purpose)
func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if p.isHalted() {
log.Errorf("processor is halted due to: %s", p.haltedReason)
p.log.Errorf("processor is halted due to: %s", p.haltedReason)
return sync.ErrInconsistentState
}
tx, err := db.NewTx(ctx, p.db)
if err != nil {
return err
}
log.Debugf("init block processing for block %d", block.Num)
p.log.Debugf("init block processing for block %d", block.Num)
shouldRollback := true
defer func() {
if shouldRollback {
log.Debugf("rolling back block processing for block %d", block.Num)
p.log.Debugf("rolling back block processing for block %d", block.Num)
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
p.log.Errorf("error while rolling back tx %v", errRllbck)
}
}
}()

if _, err := tx.Exec(`INSERT INTO block (num) VALUES ($1)`, block.Num); err != nil {
if _, err := tx.Exec(`INSERT INTO block (num, hash) VALUES ($1, $2)`, block.Num, block.Hash.String()); err != nil {
return fmt.Errorf("insert Block. err: %w", err)
}

Expand Down Expand Up @@ -343,10 +349,13 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
if err != nil {
return fmt.Errorf("AddLeaf(%s). err: %w", info.String(), err)
}
log.Infof("inserted L1InfoTreeLeaf %s", info.String())
p.log.Infof("inserted L1InfoTreeLeaf %s", info.String())
l1InfoLeavesAdded++
}
if event.UpdateL1InfoTreeV2 != nil {
p.log.Infof("handle UpdateL1InfoTreeV2 event. Block: %d, block hash: %s. Event root: %s. Event leaf count: %d.",
block.Num, block.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(), event.UpdateL1InfoTreeV2.LeafCount)

root, err := p.l1InfoTree.GetLastRoot(tx)
if err != nil {
return fmt.Errorf("GetLastRoot(). err: %w", err)
Expand All @@ -357,13 +366,13 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
// compared to the contracts, and this will need manual intervention.
if root.Hash != event.UpdateL1InfoTreeV2.CurrentL1InfoRoot || root.Index+1 != event.UpdateL1InfoTreeV2.LeafCount {
errStr := fmt.Sprintf(
"failed to check UpdateL1InfoTreeV2. Root: %s vs event:%s. "+
"Index: : %d vs event.LeafCount:%d. Happened on block %d",
root.Hash, common.Bytes2Hex(event.UpdateL1InfoTreeV2.CurrentL1InfoRoot[:]),
"failed to check UpdateL1InfoTreeV2. Root: %s vs event: %s. "+
"Index: %d vs event.LeafCount: %d. Happened on block %d",
root.Hash, event.UpdateL1InfoTreeV2.CurrentL1InfoRoot.String(),
root.Index, event.UpdateL1InfoTreeV2.LeafCount,
block.Num,
)
log.Error(errStr)
p.log.Error(errStr)
p.mu.Lock()
p.haltedReason = errStr
p.halted = true
Expand All @@ -372,21 +381,21 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
}
}
if event.VerifyBatches != nil {
log.Debugf("handle VerifyBatches event %s", event.VerifyBatches.String())
p.log.Debugf("handle VerifyBatches event %s", event.VerifyBatches.String())
err = p.processVerifyBatches(tx, block.Num, event.VerifyBatches)
if err != nil {
err = fmt.Errorf("processVerifyBatches. err: %w", err)
log.Errorf("error processing VerifyBatches: %v", err)
p.log.Errorf("error processing VerifyBatches: %v", err)
return err
}
}

if event.InitL1InfoRootMap != nil {
log.Debugf("handle InitL1InfoRootMap event %s", event.InitL1InfoRootMap.String())
p.log.Debugf("handle InitL1InfoRootMap event %s", event.InitL1InfoRootMap.String())
err = processEventInitL1InfoRootMap(tx, block.Num, event.InitL1InfoRootMap)
if err != nil {
err = fmt.Errorf("initL1InfoRootMap. Err: %w", err)
log.Errorf("error processing InitL1InfoRootMap: %v", err)
p.log.Errorf("error processing InitL1InfoRootMap: %v", err)
return err
}
}
Expand All @@ -396,9 +405,9 @@ func (p *processor) ProcessBlock(ctx context.Context, block sync.Block) error {
return fmt.Errorf("err: %w", err)
}
shouldRollback = false
logFunc := log.Debugf
logFunc := p.log.Debugf
if len(block.Events) > 0 {
logFunc = log.Infof
logFunc = p.log.Infof
}
logFunc("block %d processed with %d events", block.Num, len(block.Events))
return nil
Expand Down
18 changes: 17 additions & 1 deletion reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ import (
"golang.org/x/sync/errgroup"
)

type Network string

const (
L1 Network = "l1"
L2 Network = "l2"
)

func (n Network) String() string {
return string(n)
}

type EthClient interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
Expand All @@ -34,9 +45,11 @@ type ReorgDetector struct {

subscriptionsLock sync.RWMutex
subscriptions map[string]*Subscription

log *log.Logger
}

func New(client EthClient, cfg Config) (*ReorgDetector, error) {
func New(client EthClient, cfg Config, network Network) (*ReorgDetector, error) {
err := migrations.RunMigrations(cfg.DBPath)
if err != nil {
return nil, err
Expand All @@ -52,6 +65,7 @@ func New(client EthClient, cfg Config) (*ReorgDetector, error) {
checkReorgInterval: cfg.GetCheckReorgsInterval(),
trackedBlocks: make(map[string]*headersList),
subscriptions: make(map[string]*Subscription),
log: log.WithFields("reorg-detector", network.String()),
}, nil
}

Expand Down Expand Up @@ -137,6 +151,8 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
continue
}

rd.log.Debugf("Checking reorgs in tracked blocks up to block %d", lastFinalisedBlock.Number.Uint64())

errGroup.Go(func() error {
headers := hdrs.getSorted()
for _, hdr := range headers {
Expand Down
3 changes: 3 additions & 0 deletions reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error {
} else {
hdrs.add(b)
}

rd.log.Debugf("Tracking block %d for subscriber %s", b.Num, id)

rd.trackedBlocksLock.Unlock()
return meddler.Insert(rd.db, "tracked_block", &headerWithSubscriberID{
SubscriberID: id,
Expand Down
1 change: 1 addition & 0 deletions reorgdetector/reorgdetector_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) {
rd.subscriptionsLock.RUnlock()

if ok {
rd.log.Infof("Reorg detected for subscriber %s at block %d", id, startingBlock.Num)
sub.ReorgedBlock <- startingBlock.Num
<-sub.ReorgProcessed
}
Expand Down
12 changes: 6 additions & 6 deletions reorgdetector/reorgdetector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_ReorgDetector(t *testing.T) {
// Create test DB dir
testDir := path.Join(t.TempDir(), "reorgdetectorTest_ReorgDetector.sqlite")

reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)

err = reorgDetector.Start(ctx)
Expand Down Expand Up @@ -103,7 +103,7 @@ func Test_ReorgDetector(t *testing.T) {
func TestGetTrackedBlocks(t *testing.T) {
clientL1 := simulated.NewBackend(nil, simulated.WithBlockGasLimit(10000000))
testDir := path.Join(t.TempDir(), "reorgdetector_TestGetTrackedBlocks.sqlite")
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)
list, err := reorgDetector.getTrackedBlocks()
require.NoError(t, err)
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestGetTrackedBlocks(t *testing.T) {
func TestNotSubscribed(t *testing.T) {
clientL1 := simulated.NewBackend(nil, simulated.WithBlockGasLimit(10000000))
testDir := path.Join(t.TempDir(), "reorgdetectorTestNotSubscribed.sqlite")
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)
err = reorgDetector.AddBlockToTrack(context.Background(), "foo", 1, common.Hash{})
require.True(t, strings.Contains(err.Error(), "is not subscribed"))
Expand All @@ -179,7 +179,7 @@ func TestDetectReorgs(t *testing.T) {
client.On("HeaderByNumber", ctx, trackedBlock.Number).Return(trackedBlock, nil)

testDir := path.Join(t.TempDir(), "reorgdetectorTestDetectReorgs.sqlite")
reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)

_, err = reorgDetector.Subscribe(syncerID)
Expand All @@ -205,7 +205,7 @@ func TestDetectReorgs(t *testing.T) {
client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(lastFinalizedBlock, nil)

testDir := path.Join(t.TempDir(), "reorgdetectorTestDetectReorgs.sqlite")
reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)

_, err = reorgDetector.Subscribe(syncerID)
Expand All @@ -230,7 +230,7 @@ func TestDetectReorgs(t *testing.T) {
client.On("HeaderByNumber", ctx, trackedBlock.Number).Return(reorgedTrackedBlock, nil)

testDir := path.Join(t.TempDir(), "reorgdetectorTestDetectReorgs.sqlite")
reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
reorgDetector, err := New(client, Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}, L1)
require.NoError(t, err)

subscription, err := reorgDetector.Subscribe(syncerID)
Expand Down
3 changes: 3 additions & 0 deletions sync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package sync
import (
"context"
"errors"

"github.com/ethereum/go-ethereum/common"
)

var ErrInconsistentState = errors.New("state is inconsistent, try again later once the state is consolidated")

type Block struct {
Num uint64
Events []interface{}
Hash common.Hash
}

type ProcessorInterface interface {
Expand Down
2 changes: 1 addition & 1 deletion sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (d *EVMDownloader) Download(ctx context.Context, fromBlock uint64, download
for {
select {
case <-ctx.Done():
d.log.Debug("closing channel")
d.log.Info("closing evm downloader channel")
close(downloadedCh)
return
default:
Expand Down
1 change: 1 addition & 0 deletions sync/evmdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, cancel context.CancelFun
blockToProcess := Block{
Num: b.Num,
Events: b.Events,
Hash: b.Hash,
}
err := d.processor.ProcessBlock(ctx, blockToProcess)
if err != nil {
Expand Down
Loading

0 comments on commit 6b0e1b2

Please sign in to comment.