From 66bddc5d1c9ad9cfeac5e94a947e045831ec8bef Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 16 Oct 2024 11:53:01 +0800 Subject: [PATCH 1/7] add recovery mode to pipeline: allow specifying initial L1 block and initial batch from which point the existing local chain will be overridden with the blocks from the pipeline --- rollup/da_syncer/da_queue.go | 29 +++++++++++----- rollup/da_syncer/da_syncer.go | 27 +++++++++++---- rollup/da_syncer/syncing_pipeline.go | 51 ++++++++++++++++++---------- 3 files changed, 75 insertions(+), 32 deletions(-) diff --git a/rollup/da_syncer/da_queue.go b/rollup/da_syncer/da_queue.go index 64673a4a646b..13fd05b4ecbc 100644 --- a/rollup/da_syncer/da_queue.go +++ b/rollup/da_syncer/da_queue.go @@ -10,14 +10,17 @@ import ( // DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage. type DAQueue struct { - l1height uint64 + initialBatch uint64 + l1height uint64 + dataSourceFactory *DataSourceFactory dataSource DataSource da da.Entries } -func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue { +func NewDAQueue(l1height uint64, initialBatch uint64, dataSourceFactory *DataSourceFactory) *DAQueue { return &DAQueue{ + initialBatch: initialBatch, l1height: l1height, dataSourceFactory: dataSourceFactory, dataSource: nil, @@ -26,15 +29,23 @@ func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue } func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) { - for len(dq.da) == 0 { - err := dq.getNextData(ctx) - if err != nil { - return nil, err + for { + for len(dq.da) == 0 { + err := dq.getNextData(ctx) + if err != nil { + return nil, err + } + } + + daEntry := dq.da[0] + dq.da = dq.da[1:] + + if daEntry.BatchIndex() < dq.initialBatch { + continue } + + return daEntry, nil } - daEntry := dq.da[0] - dq.da = dq.da[1:] - return daEntry, nil } func (dq *DAQueue) getNextData(ctx context.Context) error { diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index c3c223ff22a9..b9f5f86890d8 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -24,11 +24,12 @@ func NewDASyncer(blockchain *core.BlockChain) *DASyncer { } // SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain. -func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { +func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool) error { currentBlock := s.blockchain.CurrentBlock() // we expect blocks to be consecutive. block.PartialHeader.Number == parentBlock.Number+1. - if block.PartialHeader.Number <= currentBlock.Number.Uint64() { + // if override is true, we allow blocks to be lower than the current block number and replace the blocks. + if !override && block.PartialHeader.Number <= currentBlock.Number.Uint64() { log.Debug("block number is too low", "block number", block.PartialHeader.Number, "parent block number", currentBlock.Number.Uint64()) return ErrBlockTooLow } else if block.PartialHeader.Number > currentBlock.Number.Uint64()+1 { @@ -36,13 +37,27 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { return ErrBlockTooHigh } - parentBlock := s.blockchain.GetBlockByNumber(currentBlock.Number.Uint64()) - if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions); err != nil { + parentBlockNumber := currentBlock.Number.Uint64() + if override { + parentBlockNumber = block.PartialHeader.Number - 1 + } + + parentBlock := s.blockchain.GetBlockByNumber(parentBlockNumber) + if parentBlock == nil { + return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber) + } + + if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign); err != nil { return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) } - if s.blockchain.CurrentBlock().Number.Uint64()%1000 == 0 { - log.Info("L1 sync progress", "blockhain height", s.blockchain.CurrentBlock().Number.Uint64(), "block hash", s.blockchain.CurrentBlock().Hash(), "root", s.blockchain.CurrentBlock().Root) + currentBlock = s.blockchain.CurrentBlock() + if override && block.PartialHeader.Number != currentBlock.Number.Uint64() && block.PartialHeader.Number%100 == 0 { + newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number) + log.Info("L1 sync progress", "processed block ", newBlock.Number.Uint64(), "block hash", newBlock.Hash(), "root", newBlock.Root) + log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root) + } else if currentBlock.Number.Uint64()%100 == 0 { + log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root) } return nil diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 6795f2608e05..80efec268dd6 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -26,6 +26,11 @@ type Config struct { BlobScanAPIEndpoint string // BlobScan blob api endpoint BlockNativeAPIEndpoint string // BlockNative blob api endpoint BeaconNodeAPIEndpoint string // Beacon node api endpoint + + RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch + InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests) + InitialBatch uint64 // Batch number from which to start syncing and overriding blocks + SignBlocks bool // Whether to sign the blocks after reading them from the pipeline } // SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into @@ -36,7 +41,7 @@ type SyncingPipeline struct { wg sync.WaitGroup expBackoff *backoff.Exponential - l1DeploymentBlock uint64 + config Config db ethdb.Database blockchain *core.BlockChain @@ -75,28 +80,37 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi } dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Client, blobClientList, db) - syncedL1Height := l1DeploymentBlock - 1 - from := rawdb.ReadDASyncedL1BlockNumber(db) - if from != nil { - syncedL1Height = *from + + var initialL1Block uint64 + if config.RecoveryMode { + initialL1Block = config.InitialL1Block + log.Info("sync from DA: initializing pipeline in recovery mode", "initialL1Block", initialL1Block, "initialBatch", config.InitialBatch) + } else { + initialL1Block = l1DeploymentBlock - 1 + config.InitialL1Block = initialL1Block + from := rawdb.ReadDASyncedL1BlockNumber(db) + if from != nil { + initialL1Block = *from + } + log.Info("sync from DA: initializing pipeline", "initialL1Block", initialL1Block) } - daQueue := NewDAQueue(syncedL1Height, dataSourceFactory) + daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory) batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) daSyncer := NewDASyncer(blockchain) ctx, cancel := context.WithCancel(ctx) return &SyncingPipeline{ - ctx: ctx, - cancel: cancel, - expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), - wg: sync.WaitGroup{}, - l1DeploymentBlock: l1DeploymentBlock, - db: db, - blockchain: blockchain, - blockQueue: blockQueue, - daSyncer: daSyncer, + ctx: ctx, + cancel: cancel, + expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), + wg: sync.WaitGroup{}, + config: config, + db: db, + blockchain: blockchain, + blockQueue: blockQueue, + daSyncer: daSyncer, }, nil } @@ -105,7 +119,10 @@ func (s *SyncingPipeline) Step() error { if err != nil { return err } - err = s.daSyncer.SyncOneBlock(block) + + // in recovery mode, we override already existing blocks with whatever we read from the pipeline + err = s.daSyncer.SyncOneBlock(block, s.config.RecoveryMode, s.config.SignBlocks) + return err } @@ -222,7 +239,7 @@ func (s *SyncingPipeline) Stop() { func (s *SyncingPipeline) reset(resetCounter int) { amount := 100 * uint64(resetCounter) - syncedL1Height := s.l1DeploymentBlock - 1 + syncedL1Height := s.config.InitialL1Block from := rawdb.ReadDASyncedL1BlockNumber(s.db) if from != nil && *from+amount > syncedL1Height { syncedL1Height = *from - amount From 605da8bf17e507de517164648d6746378b25f6ae Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 16 Oct 2024 11:56:58 +0800 Subject: [PATCH 2/7] add optional signing of blocks --- core/blockchain.go | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 0306f1681b3c..62b23e0b18ad 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2025,7 +2025,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } -func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) { +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (WriteStatus, error) { if !bc.chainmu.TryLock() { return NonStatTy, errInsertionInterrupted } @@ -2044,18 +2044,51 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) if err != nil { - return NonStatTy, fmt.Errorf("error processing block: %w", err) + return NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err) } // TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique // This should be done with https://github.com/scroll-tech/go-ethereum/pull/913. - // finalize and assemble block as fullBlock + if sign { + // remember the time as Clique will override it + originalTime := header.Time + + err = bc.engine.Prepare(bc, header) + if err != nil { + return NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) + } + + // we want to re-sign the block: set time to original value again. + header.Time = originalTime + } + + // finalize and assemble block as fullBlock: replicates consensus.FinalizeAndAssemble() header.GasUsed = gasUsed header.Root = statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)) fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil)) + // Sign the block if requested + if sign { + resultCh, stopCh := make(chan *types.Block), make(chan struct{}) + if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil { + return NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) + } + // Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy + // or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of + // that artificially added delay and subtract it from overall runtime of commit(). + fullBlock = <-resultCh + if fullBlock == nil { + return NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) + } + + // verify the generated block with local consensus engine to make sure everything is as expected + if err = bc.engine.VerifyHeader(bc, fullBlock.Header()); err != nil { + return NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) + } + } + blockHash := fullBlock.Hash() // manually replace the block hash in the receipts for i, receipt := range receipts { From 20caa19e563cc26d2f9dd400b38df31fc2c7b40f Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 16 Oct 2024 12:12:37 +0800 Subject: [PATCH 3/7] add config option for L2EndBlock --- rollup/da_syncer/da_syncer.go | 11 ++++++++++- rollup/da_syncer/serrors/errors.go | 1 + rollup/da_syncer/syncing_pipeline.go | 8 ++++++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index b9f5f86890d8..602c6b04c400 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -6,6 +6,7 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) var ( @@ -14,11 +15,13 @@ var ( ) type DASyncer struct { + l2EndBlock uint64 blockchain *core.BlockChain } -func NewDASyncer(blockchain *core.BlockChain) *DASyncer { +func NewDASyncer(blockchain *core.BlockChain, l2EndBlock uint64) *DASyncer { return &DASyncer{ + l2EndBlock: l2EndBlock, blockchain: blockchain, } } @@ -60,5 +63,11 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root) } + if s.l2EndBlock > 0 && s.l2EndBlock == block.PartialHeader.Number { + newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number) + log.Warn("L1 sync reached L2EndBlock: you can terminate recovery mode now", "L2EndBlock", newBlock.Number.Uint64(), "block hash", newBlock.Hash(), "root", newBlock.Root) + return serrors.Terminated + } + return nil } diff --git a/rollup/da_syncer/serrors/errors.go b/rollup/da_syncer/serrors/errors.go index aa0426f0771d..6dc373f22936 100644 --- a/rollup/da_syncer/serrors/errors.go +++ b/rollup/da_syncer/serrors/errors.go @@ -12,6 +12,7 @@ const ( var ( TemporaryError = NewTemporaryError(nil) EOFError = NewEOFError(nil) + Terminated = fmt.Errorf("terminated") ) type Type uint8 diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 80efec268dd6..34f6e46f1ffc 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -30,7 +30,8 @@ type Config struct { RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests) InitialBatch uint64 // Batch number from which to start syncing and overriding blocks - SignBlocks bool // Whether to sign the blocks after reading them from the pipeline + SignBlocks bool // Whether to sign the blocks after reading them from the pipeline (requires correct Clique signer key) and history of blocks with Clique signatures + L2EndBlock uint64 // L2 block number to sync until } // SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into @@ -98,7 +99,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory) batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) - daSyncer := NewDASyncer(blockchain) + daSyncer := NewDASyncer(blockchain, config.L2EndBlock) ctx, cancel := context.WithCancel(ctx) return &SyncingPipeline{ @@ -222,6 +223,9 @@ func (s *SyncingPipeline) mainLoop() { } else if errors.Is(err, context.Canceled) { log.Info("syncing pipeline stopped due to cancelled context", "err", err) return + } else if errors.Is(err, serrors.Terminated) { + log.Info("syncing pipeline stopped due to terminated state", "err", err) + return } log.Warn("syncing pipeline step failed due to unrecoverable error, stopping pipeline worker", "err", err) From eb6df8a2dcf8e346dd1c3499057a465a6d650310 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 16 Oct 2024 12:32:52 +0800 Subject: [PATCH 4/7] add CLI flags for new config values --- cmd/geth/main.go | 8 +++++++- cmd/utils/flags.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index f3303295adca..e42236407187 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "go.uber.org/automaxprocs/maxprocs" + "github.com/scroll-tech/go-ethereum/accounts" "github.com/scroll-tech/go-ethereum/accounts/keystore" "github.com/scroll-tech/go-ethereum/cmd/utils" @@ -39,7 +41,6 @@ import ( "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/metrics" "github.com/scroll-tech/go-ethereum/node" - "go.uber.org/automaxprocs/maxprocs" // Force-load the tracer engines to trigger registration _ "github.com/scroll-tech/go-ethereum/eth/tracers/js" @@ -160,6 +161,11 @@ var ( utils.DABlockNativeAPIEndpointFlag, utils.DABlobScanAPIEndpointFlag, utils.DABeaconNodeAPIEndpointFlag, + utils.DARecoveryModeFlag, + utils.DARecoveryInitialL1BlockFlag, + utils.DARecoveryInitialBatchFlag, + utils.DARecoverySignBlocksFlag, + utils.DARecoveryL2EndBlockFlag, }, utils.NetworkFlags, utils.DatabaseFlags) rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 25a61c20838e..dc060429805c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1053,6 +1053,26 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Name: "da.blob.beaconnode", Usage: "Beacon node API endpoint", } + DARecoveryModeFlag = &cli.BoolFlag{ + Name: "da.recovery", + Usage: "Enable recovery mode for DA syncing", + } + DARecoveryInitialL1BlockFlag = &cli.Uint64Flag{ + Name: "da.recovery.initiall1block", + Usage: "Initial L1 block to start recovery from", + } + DARecoveryInitialBatchFlag = &cli.Uint64Flag{ + Name: "da.recovery.initialbatch", + Usage: "Initial batch to start recovery from", + } + DARecoverySignBlocksFlag = &cli.BoolFlag{ + Name: "da.recovery.signblocks", + Usage: "Sign blocks during recovery (requires correct Clique signer key and history of blocks with Clique signatures)", + } + DARecoveryL2EndBlockFlag = &cli.Uint64Flag{ + Name: "da.recovery.l2endblock", + Usage: "End L2 block to recover to", + } ) var ( @@ -1816,6 +1836,21 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) { if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) { cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name) } + if ctx.IsSet(DARecoveryModeFlag.Name) { + cfg.DA.RecoveryMode = ctx.Bool(DARecoveryModeFlag.Name) + } + if ctx.IsSet(DARecoveryInitialL1BlockFlag.Name) { + cfg.DA.InitialL1Block = ctx.Uint64(DARecoveryInitialL1BlockFlag.Name) + } + if ctx.IsSet(DARecoveryInitialBatchFlag.Name) { + cfg.DA.InitialBatch = ctx.Uint64(DARecoveryInitialBatchFlag.Name) + } + if ctx.IsSet(DARecoverySignBlocksFlag.Name) { + cfg.DA.SignBlocks = ctx.Bool(DARecoverySignBlocksFlag.Name) + } + if ctx.IsSet(DARecoveryL2EndBlockFlag.Name) { + cfg.DA.L2EndBlock = ctx.Uint64(DARecoveryL2EndBlockFlag.Name) + } } } From b23eb932189fd236e497208f2f91b1083815bf84 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 6 Nov 2024 07:12:31 +0800 Subject: [PATCH 5/7] enable CCC in L1 follower node when `--ccc` flag is set to generate row consumption --- core/blockchain.go | 25 ++++++++++++++++--------- eth/backend.go | 3 +++ rollup/da_syncer/da_syncer.go | 23 ++++++++++++++++++----- rollup/da_syncer/syncing_pipeline.go | 5 ++++- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 62b23e0b18ad..10a32fa5d9b3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2025,15 +2025,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } -func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (WriteStatus, error) { +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) { if !bc.chainmu.TryLock() { - return NonStatTy, errInsertionInterrupted + return nil, NonStatTy, errInsertionInterrupted } defer bc.chainmu.Unlock() statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps) if err != nil { - return NonStatTy, err + return nil, NonStatTy, err } statedb.StartPrefetcher("l1sync") @@ -2044,7 +2044,7 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) if err != nil { - return NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err) } // TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique @@ -2056,7 +2056,7 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types err = bc.engine.Prepare(bc, header) if err != nil { - return NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) } // we want to re-sign the block: set time to original value again. @@ -2073,19 +2073,19 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types if sign { resultCh, stopCh := make(chan *types.Block), make(chan struct{}) if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil { - return NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) } // Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy // or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of // that artificially added delay and subtract it from overall runtime of commit(). fullBlock = <-resultCh if fullBlock == nil { - return NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) + return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) } // verify the generated block with local consensus engine to make sure everything is as expected if err = bc.engine.VerifyHeader(bc, fullBlock.Header()); err != nil { - return NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) + return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) } } @@ -2105,7 +2105,14 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types l.BlockHash = blockHash } - return bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false) + // Double check: even though we just built the block, make sure it is valid. + if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) + } + + writeStatus, err := bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false) + return fullBlock, writeStatus, err } // insertSideChain is called when an import batch hits upon a pruned ancestor diff --git a/eth/backend.go b/eth/backend.go index 71c1d0ebe822..ee27e064951a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -251,6 +251,9 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl // simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline // by waiting and retrying. if config.EnableDASyncing { + // Enable CCC if flag is set so that row consumption can be generated. + config.DA.CCCEnable = config.CheckCircuitCapacity + config.DA.CCCNumWorkers = config.CCCMaxWorkers eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA) if err != nil { return nil, fmt.Errorf("cannot initialize da syncer: %w", err) diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index 602c6b04c400..825129292bd7 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -5,6 +5,7 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/ccc" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) @@ -15,15 +16,22 @@ var ( ) type DASyncer struct { - l2EndBlock uint64 - blockchain *core.BlockChain + asyncChecker *ccc.AsyncChecker + l2EndBlock uint64 + blockchain *core.BlockChain } -func NewDASyncer(blockchain *core.BlockChain, l2EndBlock uint64) *DASyncer { - return &DASyncer{ +func NewDASyncer(blockchain *core.BlockChain, cccEnable bool, cccNumWorkers int, l2EndBlock uint64) *DASyncer { + s := &DASyncer{ l2EndBlock: l2EndBlock, blockchain: blockchain, } + + if cccEnable { + s.asyncChecker = ccc.NewAsyncChecker(blockchain, cccNumWorkers, false) + } + + return s } // SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain. @@ -50,10 +58,15 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber) } - if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign); err != nil { + fullBlock, _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign) + if err != nil { return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) } + if s.asyncChecker != nil { + _ = s.asyncChecker.Check(fullBlock) + } + currentBlock = s.blockchain.CurrentBlock() if override && block.PartialHeader.Number != currentBlock.Number.Uint64() && block.PartialHeader.Number%100 == 0 { newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number) diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 34f6e46f1ffc..8135d1ba6cb3 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -27,6 +27,9 @@ type Config struct { BlockNativeAPIEndpoint string // BlockNative blob api endpoint BeaconNodeAPIEndpoint string // Beacon node api endpoint + CCCEnable bool // enable CCC verification and generation of row consumption + CCCNumWorkers int // number of workers for CCC verification + RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests) InitialBatch uint64 // Batch number from which to start syncing and overriding blocks @@ -99,7 +102,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory) batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) - daSyncer := NewDASyncer(blockchain, config.L2EndBlock) + daSyncer := NewDASyncer(blockchain, config.CCCEnable, config.CCCNumWorkers, config.L2EndBlock) ctx, cancel := context.WithCancel(ctx) return &SyncingPipeline{ From afdc961110e5a5935771f23daddbb065c8ad5d71 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Wed, 6 Nov 2024 11:32:10 +0800 Subject: [PATCH 6/7] sanity check configuration when running in recovery mode --- rollup/da_syncer/syncing_pipeline.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 8135d1ba6cb3..0ba2285c3448 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -88,6 +88,13 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi var initialL1Block uint64 if config.RecoveryMode { initialL1Block = config.InitialL1Block + if initialL1Block == 0 { + return nil, errors.New("sync from DA: initial L1 block must be set in recovery mode") + } + if config.InitialBatch == 0 { + return nil, errors.New("sync from DA: initial batch must be set in recovery mode") + } + log.Info("sync from DA: initializing pipeline in recovery mode", "initialL1Block", initialL1Block, "initialBatch", config.InitialBatch) } else { initialL1Block = l1DeploymentBlock - 1 From 6f14c337e5fa7358a51c69195600f4ff3db949e6 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:55:22 +0800 Subject: [PATCH 7/7] address review comments --- cmd/geth/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 9570a9f8af44..54d9b92bbefb 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -25,8 +25,6 @@ import ( "strings" "time" - "go.uber.org/automaxprocs/maxprocs" - "github.com/scroll-tech/go-ethereum/accounts" "github.com/scroll-tech/go-ethereum/accounts/keystore" "github.com/scroll-tech/go-ethereum/cmd/utils" @@ -41,6 +39,7 @@ import ( "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/metrics" "github.com/scroll-tech/go-ethereum/node" + "go.uber.org/automaxprocs/maxprocs" // Force-load the tracer engines to trigger registration _ "github.com/scroll-tech/go-ethereum/eth/tracers/js"