From 4fdc5ad9800a2b16c687862338913778a57fcf4f Mon Sep 17 00:00:00 2001 From: 0xmountaintop <37070449+0xmountaintop@users.noreply.github.com> Date: Thu, 10 Oct 2024 12:31:39 +1100 Subject: [PATCH] feat: allow changing L1 synced height via admin RPC/CLI (#1061) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: allow changing L1 synced height via admin RPC/CLI (#1044) * feat: allow changing L1 synced height via RPC/CLI * chore: auto version bump [bot] * fix typos * add height validity check * change implementation * moved hotfix apis from ScrollAPI to PrivateAdminAPI * change namespace from scroll to admin * fix bugs * add locks * add a lock to protect latestProcessedBlock and db state * revert some changes * remove lock and use atomic in latestProcessedBlock * fix CI * tweak * use cas in updating latestProcessedBlock * renaming * also update sync height in error paths * fix a bug * use mutex lock --------- Co-authored-by: colinlyguo * more --------- Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com> Co-authored-by: colinlyguo --- eth/api_admin.go | 27 +++++++++++++++++++ eth/backend.go | 12 +++++++++ internal/web3ext/web3ext.go | 10 +++++++ .../rollup_sync_service.go | 18 +++++++++++++ rollup/sync_service/sync_service.go | 18 +++++++++++++ 5 files changed, 85 insertions(+) diff --git a/eth/api_admin.go b/eth/api_admin.go index 6b76b95661ef..70720ce16ffe 100644 --- a/eth/api_admin.go +++ b/eth/api_admin.go @@ -26,6 +26,7 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rlp" ) @@ -141,3 +142,29 @@ func (api *AdminAPI) ImportChain(file string) (bool, error) { } return true, nil } + +// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization +func (api *AdminAPI) SetRollupEventSyncedL1Height(height uint64) error { + rollupSyncService := api.eth.GetRollupSyncService() + if rollupSyncService == nil { + return errors.New("RollupSyncService is not available") + } + + log.Info("Setting rollup event synced L1 height", "height", height) + rollupSyncService.ResetStartSyncHeight(height) + + return nil +} + +// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization +func (api *AdminAPI) SetL1MessageSyncedL1Height(height uint64) error { + syncService := api.eth.GetSyncService() + if syncService == nil { + return errors.New("SyncService is not available") + } + + log.Info("Setting L1 message synced L1 height", "height", height) + syncService.ResetStartSyncHeight(height) + + return nil +} diff --git a/eth/backend.go b/eth/backend.go index 9ef891cdc95b..ce03e7c1c68c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -590,3 +590,15 @@ func (s *Ethereum) Stop() error { return nil } + +// GetRollupSyncService returns the RollupSyncService of the Ethereum instance. +// It returns nil if the service is not initialized. +func (e *Ethereum) GetRollupSyncService() *rollup_sync_service.RollupSyncService { + return e.rollupSyncService +} + +// GetSyncService returns the SyncService of the Ethereum instance. +// It returns nil if the service is not initialized. +func (e *Ethereum) GetSyncService() *sync_service.SyncService { + return e.syncService +} diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index dc0c95ef6c23..fcd982f4ec3f 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -191,6 +191,16 @@ web3._extend({ name: 'stopWS', call: 'admin_stopWS' }), + new web3._extend.Method({ + name: 'setRollupEventSyncedL1Height', + call: 'admin_setRollupEventSyncedL1Height', + params: 1 + }), + new web3._extend.Method({ + name: 'setL1MessageSyncedL1Height', + call: 'admin_setL1MessageSyncedL1Height', + params: 1 + }), ], properties: [ new web3._extend.Property({ diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index e83fc45fd6a9..ec2ca9a85677 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -7,6 +7,7 @@ import ( "math/big" "os" "reflect" + "sync" "time" "github.com/scroll-tech/da-codec/encoding" @@ -62,6 +63,7 @@ type RollupSyncService struct { l1FinalizeBatchEventSignature common.Hash bc *core.BlockChain stack *node.Node + stateMu sync.Mutex } func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) { @@ -156,7 +158,23 @@ func (s *RollupSyncService) Stop() { } } +// ResetStartSyncHeight resets the RollupSyncService to a specific L1 block height +func (s *RollupSyncService) ResetStartSyncHeight(height uint64) { + if s == nil { + return + } + + s.stateMu.Lock() + defer s.stateMu.Unlock() + + s.latestProcessedBlock = height + log.Info("Reset sync service", "height", height) +} + func (s *RollupSyncService) fetchRollupEvents() { + s.stateMu.Lock() + defer s.stateMu.Unlock() + latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 091f2d19691f..9239e210b8ed 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sync" "time" "github.com/scroll-tech/go-ethereum/core" @@ -50,6 +51,7 @@ type SyncService struct { pollInterval time.Duration latestProcessedBlock uint64 scope event.SubscriptionScope + stateMu sync.Mutex } func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) { @@ -139,6 +141,19 @@ func (s *SyncService) Stop() { } } +// ResetStartSyncHeight resets the SyncService to a specific L1 block height +func (s *SyncService) ResetStartSyncHeight(height uint64) { + if s == nil { + return + } + + s.stateMu.Lock() + defer s.stateMu.Unlock() + + s.latestProcessedBlock = height + log.Info("Reset sync service", "height", height) +} + // SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and // starts sending event to the given channel. func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) event.Subscription { @@ -146,6 +161,9 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve } func (s *SyncService) fetchMessages() { + s.stateMu.Lock() + defer s.stateMu.Unlock() + latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) if err != nil { log.Warn("Failed to get latest confirmed block number", "err", err)