Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow changing L1 synced height via admin RPC/CLI #1044

Merged
merged 20 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,34 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}

// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization
func (api *PrivateAdminAPI) SetRollupEventSyncedL1Height(height uint64) error {
rollupSyncService := api.eth.GetRollupSyncService()
if rollupSyncService == nil {
return errors.New("RollupSyncService is not available")
}

log.Info("Setting rollup event synced L1 height", "height", height)

rollupSyncService.ResetToHeight(height)

return nil
}

// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization
func (api *PrivateAdminAPI) SetL1MessageSyncedL1Height(height uint64) error {
syncService := api.eth.GetSyncService()
if syncService == nil {
return errors.New("SyncService is not available")
}

log.Info("Setting L1 message synced L1 height", "height", height)

syncService.ResetToHeight(height)

return nil
}

// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
Expand Down
12 changes: 12 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,15 @@ func (s *Ethereum) Stop() error {

return nil
}

// GetRollupSyncService returns the RollupSyncService of the Ethereum instance.
// It returns nil if the service is not initialized.
func (e *Ethereum) GetRollupSyncService() *rollup_sync_service.RollupSyncService {
return e.rollupSyncService
}

// GetSyncService returns the SyncService of the Ethereum instance.
// It returns nil if the service is not initialized.
func (e *Ethereum) GetSyncService() *sync_service.SyncService {
return e.syncService
}
10 changes: 10 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ web3._extend({
name: 'stopWS',
call: 'admin_stopWS'
}),
new web3._extend.Method({
name: 'setRollupEventSyncedL1Height',
call: 'admin_setRollupEventSyncedL1Height',
params: 1
}),
new web3._extend.Method({
name: 'setL1MessageSyncedL1Height',
call: 'admin_setL1MessageSyncedL1Height',
params: 1
}),
],
properties: [
new web3._extend.Property({
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 7 // Minor version component of the current release
VersionPatch = 17 // Patch version component of the current release
VersionPatch = 18 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
39 changes: 37 additions & 2 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"os"
"reflect"
"sync"
"time"

"github.com/scroll-tech/da-codec/encoding"
Expand Down Expand Up @@ -52,6 +53,7 @@ const (

// RollupSyncService collects ScrollChain batch commit/revert/finalize events and stores metadata into db.
type RollupSyncService struct {
originalCtx context.Context
ctx context.Context
cancel context.CancelFunc
client *L1Client
Expand All @@ -63,6 +65,9 @@ type RollupSyncService struct {
l1FinalizeBatchEventSignature common.Hash
bc *core.BlockChain
stack *node.Node

stateMu sync.Mutex // protects the service state
resetMu sync.Mutex // protects critical sections during reset operation
}

func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) {
Expand Down Expand Up @@ -99,10 +104,11 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
latestProcessedBlock = *block
}

ctx, cancel := context.WithCancel(ctx)
serviceCtx, cancel := context.WithCancel(ctx)

service := RollupSyncService{
ctx: ctx,
originalCtx: ctx,
ctx: serviceCtx,
cancel: cancel,
client: client,
db: db,
Expand All @@ -123,6 +129,9 @@ func (s *RollupSyncService) Start() {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock)

go func() {
Expand Down Expand Up @@ -150,13 +159,39 @@ func (s *RollupSyncService) Stop() {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

log.Info("Stopping rollup event sync background service")

if s.cancel != nil {
s.cancel()
}
}

// ResetToHeight resets the RollupSyncService to a specific L1 block height
func (s *RollupSyncService) ResetToHeight(height uint64) {
if s == nil {
return
}

s.resetMu.Lock()
defer s.resetMu.Unlock()

s.Stop()
Thegaram marked this conversation as resolved.
Show resolved Hide resolved

newCtx, newCancel := context.WithCancel(s.originalCtx)
s.ctx = newCtx
s.cancel = newCancel
s.latestProcessedBlock = height

rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height)

log.Info("Reset rollup sync service", "height", height)

go s.Start()
}

func (s *RollupSyncService) fetchRollupEvents() {
latestConfirmed, err := s.client.getLatestFinalizedBlockNumber()
if err != nil {
Expand Down
39 changes: 37 additions & 2 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/scroll-tech/go-ethereum/core"
Expand Down Expand Up @@ -42,6 +43,7 @@ var (

// SyncService collects all L1 messages and stores them in a local database.
type SyncService struct {
originalCtx context.Context
ctx context.Context
cancel context.CancelFunc
client *BridgeClient
Expand All @@ -50,6 +52,9 @@ type SyncService struct {
pollInterval time.Duration
latestProcessedBlock uint64
scope event.SubscriptionScope

stateMu sync.Mutex // protects the service state
resetMu sync.Mutex // protects critical sections during reset operation
}

func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) {
Expand All @@ -76,10 +81,11 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node
latestProcessedBlock = *block
}

ctx, cancel := context.WithCancel(ctx)
serviceCtx, cancel := context.WithCancel(ctx)

service := SyncService{
ctx: ctx,
originalCtx: ctx,
ctx: serviceCtx,
cancel: cancel,
client: client,
db: db,
Expand All @@ -95,6 +101,9 @@ func (s *SyncService) Start() {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

// wait for initial sync before starting node
log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock)

Expand Down Expand Up @@ -129,6 +138,9 @@ func (s *SyncService) Stop() {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

log.Info("Stopping sync service")

// Unsubscribe all subscriptions registered
Expand All @@ -139,6 +151,29 @@ func (s *SyncService) Stop() {
}
}

// ResetToHeight resets the SyncService to a specific L1 block height
func (s *SyncService) ResetToHeight(height uint64) {
if s == nil {
return
}

s.resetMu.Lock()
defer s.resetMu.Unlock()

s.Stop()
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved

newCtx, newCancel := context.WithCancel(s.originalCtx)
s.ctx = newCtx
s.cancel = newCancel
s.latestProcessedBlock = height

rawdb.WriteSyncedL1BlockNumber(s.db, height)

log.Info("Reset sync service", "height", height)

go s.Start()
}

// SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) event.Subscription {
Expand Down
Loading