Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow changing L1 synced height via admin RPC/CLI #1044

Merged
merged 20 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,32 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}

// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization
func (api *PrivateAdminAPI) SetRollupEventSyncedL1Height(height uint64) error {
rollupSyncService := api.eth.GetRollupSyncService()
if rollupSyncService == nil {
return errors.New("RollupSyncService is not available")
}

log.Info("Setting rollup event synced L1 height", "height", height)
rollupSyncService.ResetStartSyncHeight(height)

return nil
}

// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization
func (api *PrivateAdminAPI) SetL1MessageSyncedL1Height(height uint64) error {
syncService := api.eth.GetSyncService()
if syncService == nil {
return errors.New("SyncService is not available")
}

log.Info("Setting L1 message synced L1 height", "height", height)
syncService.ResetStartSyncHeight(height)

return nil
}

// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
Expand Down
12 changes: 12 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,15 @@ func (s *Ethereum) Stop() error {

return nil
}

// GetRollupSyncService returns the RollupSyncService of the Ethereum instance.
// It returns nil if the service is not initialized.
func (e *Ethereum) GetRollupSyncService() *rollup_sync_service.RollupSyncService {
return e.rollupSyncService
}

// GetSyncService returns the SyncService of the Ethereum instance.
// It returns nil if the service is not initialized.
func (e *Ethereum) GetSyncService() *sync_service.SyncService {
return e.syncService
}
10 changes: 10 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ web3._extend({
name: 'stopWS',
call: 'admin_stopWS'
}),
new web3._extend.Method({
name: 'setRollupEventSyncedL1Height',
call: 'admin_setRollupEventSyncedL1Height',
params: 1
}),
new web3._extend.Method({
name: 'setL1MessageSyncedL1Height',
call: 'admin_setL1MessageSyncedL1Height',
params: 1
}),
],
properties: [
new web3._extend.Property({
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 7 // Minor version component of the current release
VersionPatch = 21 // Patch version component of the current release
VersionPatch = 22 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
26 changes: 19 additions & 7 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"os"
"reflect"
"sync/atomic"
"time"

"github.com/scroll-tech/da-codec/encoding"
Expand Down Expand Up @@ -56,7 +57,7 @@ type RollupSyncService struct {
cancel context.CancelFunc
client *L1Client
db ethdb.Database
latestProcessedBlock uint64
latestProcessedBlock atomic.Uint64
scrollChainABI *abi.ABI
l1CommitBatchEventSignature common.Hash
l1RevertBatchEventSignature common.Hash
Expand Down Expand Up @@ -106,7 +107,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
cancel: cancel,
client: client,
db: db,
latestProcessedBlock: latestProcessedBlock,
scrollChainABI: scrollChainABI,
l1CommitBatchEventSignature: scrollChainABI.Events["CommitBatch"].ID,
l1RevertBatchEventSignature: scrollChainABI.Events["RevertBatch"].ID,
Expand All @@ -115,6 +115,8 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
stack: stack,
}

service.latestProcessedBlock.Store(latestProcessedBlock)

return &service, nil
}

Expand All @@ -123,7 +125,7 @@ func (s *RollupSyncService) Start() {
return
}

log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock)
log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock.Load())

go func() {
syncTicker := time.NewTicker(defaultSyncInterval)
Expand All @@ -139,7 +141,7 @@ func (s *RollupSyncService) Start() {
case <-syncTicker.C:
s.fetchRollupEvents()
case <-logTicker.C:
log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock)
log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock.Load())
}
}
}()
Expand All @@ -157,17 +159,27 @@ func (s *RollupSyncService) Stop() {
}
}

// ResetStartSyncHeight resets the RollupSyncService to a specific L1 block height
func (s *RollupSyncService) ResetStartSyncHeight(height uint64) {
if s == nil {
return
}

s.latestProcessedBlock.Store(height)
log.Info("Reset sync service", "height", height)
}

func (s *RollupSyncService) fetchRollupEvents() {
latestConfirmed, err := s.client.getLatestFinalizedBlockNumber()
if err != nil {
log.Warn("failed to get latest confirmed block number", "err", err)
return
}

log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock, "latest confirmed", latestConfirmed)
log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed)
Thegaram marked this conversation as resolved.
Show resolved Hide resolved

// query in batches
for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange {
for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += defaultFetchBlockRange {
if s.ctx.Err() != nil {
log.Info("Context canceled", "reason", s.ctx.Err())
return
Expand All @@ -189,7 +201,7 @@ func (s *RollupSyncService) fetchRollupEvents() {
return
}

s.latestProcessedBlock = to
s.latestProcessedBlock.Store(to)
}
}

Expand Down
42 changes: 27 additions & 15 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sync/atomic"
"time"

"github.com/scroll-tech/go-ethereum/core"
Expand Down Expand Up @@ -48,7 +49,7 @@ type SyncService struct {
db ethdb.Database
msgCountFeed event.Feed
pollInterval time.Duration
latestProcessedBlock uint64
latestProcessedBlock atomic.Uint64
scope event.SubscriptionScope
}

Expand Down Expand Up @@ -79,14 +80,15 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node
ctx, cancel := context.WithCancel(ctx)

service := SyncService{
ctx: ctx,
cancel: cancel,
client: client,
db: db,
pollInterval: DefaultPollInterval,
latestProcessedBlock: latestProcessedBlock,
ctx: ctx,
cancel: cancel,
client: client,
db: db,
pollInterval: DefaultPollInterval,
}

service.latestProcessedBlock.Store(latestProcessedBlock)

return &service, nil
}

Expand All @@ -96,14 +98,14 @@ func (s *SyncService) Start() {
}

// wait for initial sync before starting node
log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock)
log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock.Load())

// block node startup during initial sync and print some helpful logs
latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err == nil && latestConfirmed > s.latestProcessedBlock+1000 {
if err == nil && latestConfirmed > s.latestProcessedBlock.Load()+1000 {
log.Warn("Running initial sync of L1 messages before starting l2geth, this might take a while...")
s.fetchMessages()
log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock)
log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock.Load())
}

go func() {
Expand Down Expand Up @@ -139,6 +141,16 @@ func (s *SyncService) Stop() {
}
}

// ResetStartSyncHeight resets the SyncService to a specific L1 block height
func (s *SyncService) ResetStartSyncHeight(height uint64) {
if s == nil {
return
}

s.latestProcessedBlock.Store(height)
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Reset sync service", "height", height)
}

// SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) event.Subscription {
Expand All @@ -152,7 +164,7 @@ func (s *SyncService) fetchMessages() {
return
}

log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed)
log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock.Load(), "latestConfirmed", latestConfirmed)

// keep track of next queue index we're expecting to see
queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db)
Expand Down Expand Up @@ -182,15 +194,15 @@ func (s *SyncService) fetchMessages() {
numMessagesPendingDbWrite = 0
}

s.latestProcessedBlock = lastBlock
s.latestProcessedBlock.Store(lastBlock)
}

// ticker for logging progress
t := time.NewTicker(LogProgressInterval)
numMsgsCollected := 0

// query in batches
for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange {
for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += DefaultFetchBlockRange {
omerfirmak marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-s.ctx.Done():
// flush pending writes to database
Expand All @@ -199,8 +211,8 @@ func (s *SyncService) fetchMessages() {
}
return
case <-t.C:
progress := 100 * float64(s.latestProcessedBlock) / float64(latestConfirmed)
log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress)
progress := 100 * float64(s.latestProcessedBlock.Load()) / float64(latestConfirmed)
log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock.Load(), "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress)
default:
}

Expand Down
Loading