diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c0cb3c982a71..b8b930c3d13f 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -153,6 +153,12 @@ var ( utils.CircuitCapacityCheckEnabledFlag, utils.RollupVerifyEnabledFlag, utils.ShadowforkPeersFlag, + utils.DASyncEnabledFlag, + utils.DAModeFlag, + utils.DASnapshotFileFlag, + utils.DABlockNativeAPIEndpointFlag, + utils.DABlobScanAPIEndpointFlag, + utils.DABeaconNodeAPIEndpointFlag, }, utils.NetworkFlags, utils.DatabaseFlags) rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9c5ddc95a92b..c5a456936241 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -72,6 +72,7 @@ import ( "github.com/scroll-tech/go-ethereum/p2p/nat" "github.com/scroll-tech/go-ethereum/p2p/netutil" "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer" "github.com/scroll-tech/go-ethereum/rollup/tracing" "github.com/scroll-tech/go-ethereum/rpc" "github.com/scroll-tech/go-ethereum/trie" @@ -1017,6 +1018,34 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Name: "net.shadowforkpeers", Usage: "peer ids of shadow fork peers", } + + // DA syncing settings + DASyncEnabledFlag = cli.BoolFlag{ + Name: "da.sync", + Usage: "Enable node syncing from DA", + } + defaultDA = ethconfig.Defaults.DA.FetcherMode + DAModeFlag = TextMarshalerFlag{ + Name: "da.mode", + Usage: `DA sync mode ("l1rpc" or "snapshot")`, + Value: &defaultDA, + } + DASnapshotFileFlag = cli.StringFlag{ + Name: "da.snapshot.file", + Usage: "Snapshot file to sync from DA", + } + DABlobScanAPIEndpointFlag = cli.StringFlag{ + Name: "da.blob.blobscan", + Usage: "BlobScan blob API endpoint", + } + DABlockNativeAPIEndpointFlag = cli.StringFlag{ + Name: "da.blob.blocknative", + Usage: "BlockNative blob API endpoint", + } + DABeaconNodeAPIEndpointFlag = cli.StringFlag{ + Name: "da.blob.beaconnode", + Usage: "Beacon node API endpoint", + } ) var ( @@ -1505,6 +1534,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) { SetDataDir(ctx, cfg) setSmartCard(ctx, cfg) setL1(ctx, cfg) + if ctx.GlobalIsSet(DASyncEnabledFlag.Name) { + cfg.DaSyncingEnabled = ctx.GlobalBool(DASyncEnabledFlag.Name) + } if ctx.IsSet(JWTSecretFlag.Name) { cfg.JWTSecret = ctx.String(JWTSecretFlag.Name) @@ -1751,6 +1783,27 @@ func setEnableRollupVerify(ctx *cli.Context, cfg *ethconfig.Config) { } } +func setDA(ctx *cli.Context, cfg *ethconfig.Config) { + if ctx.GlobalIsSet(DASyncEnabledFlag.Name) { + cfg.EnableDASyncing = ctx.GlobalBool(DASyncEnabledFlag.Name) + if ctx.GlobalIsSet(DAModeFlag.Name) { + cfg.DA.FetcherMode = *GlobalTextMarshaler(ctx, DAModeFlag.Name).(*da_syncer.FetcherMode) + } + if ctx.GlobalIsSet(DASnapshotFileFlag.Name) { + cfg.DA.SnapshotFilePath = ctx.GlobalString(DASnapshotFileFlag.Name) + } + if ctx.GlobalIsSet(DABlobScanAPIEndpointFlag.Name) { + cfg.DA.BlobScanAPIEndpoint = ctx.GlobalString(DABlobScanAPIEndpointFlag.Name) + } + if ctx.GlobalIsSet(DABlockNativeAPIEndpointFlag.Name) { + cfg.DA.BlockNativeAPIEndpoint = ctx.GlobalString(DABlockNativeAPIEndpointFlag.Name) + } + if ctx.GlobalIsSet(DABeaconNodeAPIEndpointFlag.Name) { + cfg.DA.BeaconNodeAPIEndpoint = ctx.GlobalString(DABeaconNodeAPIEndpointFlag.Name) + } + } +} + func setMaxBlockRange(ctx *cli.Context, cfg *ethconfig.Config) { if ctx.IsSet(MaxBlockRangeFlag.Name) { cfg.MaxBlockRange = ctx.Int64(MaxBlockRangeFlag.Name) @@ -1816,6 +1869,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { setLes(ctx, cfg) setCircuitCapacityCheck(ctx, cfg) setEnableRollupVerify(ctx, cfg) + setDA(ctx, cfg) setMaxBlockRange(ctx, cfg) if ctx.IsSet(ShadowforkPeersFlag.Name) { cfg.ShadowForkPeerIDs = ctx.StringSlice(ShadowforkPeersFlag.Name) diff --git a/common/backoff/exponential.go b/common/backoff/exponential.go new file mode 100644 index 000000000000..e1f9b53a350e --- /dev/null +++ b/common/backoff/exponential.go @@ -0,0 +1,51 @@ +package backoff + +import ( + "math" + "math/rand" + "time" +) + +// Exponential is a backoff strategy that increases the delay between retries exponentially. +type Exponential struct { + attempt int + + maxJitter time.Duration + + min time.Duration + max time.Duration +} + +func NewExponential(minimum, maximum, maxJitter time.Duration) *Exponential { + return &Exponential{ + min: minimum, + max: maximum, + maxJitter: maxJitter, + } +} + +func (e *Exponential) NextDuration() time.Duration { + var jitter time.Duration + if e.maxJitter > 0 { + jitter = time.Duration(rand.Int63n(e.maxJitter.Nanoseconds())) + } + + minFloat := float64(e.min) + duration := math.Pow(2, float64(e.attempt)) * minFloat + + // limit at configured maximum + if duration > float64(e.max) { + duration = float64(e.max) + } + + e.attempt++ + return time.Duration(duration) + jitter +} + +func (e *Exponential) Reset() { + e.attempt = 0 +} + +func (e *Exponential) Attempt() int { + return e.attempt +} diff --git a/common/backoff/exponential_test.go b/common/backoff/exponential_test.go new file mode 100644 index 000000000000..ff659337a2b0 --- /dev/null +++ b/common/backoff/exponential_test.go @@ -0,0 +1,39 @@ +package backoff + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestExponentialBackoff(t *testing.T) { + t.Run("Multiple attempts", func(t *testing.T) { + e := NewExponential(100*time.Millisecond, 10*time.Second, 0) + expectedDurations := []time.Duration{ + 100 * time.Millisecond, + 200 * time.Millisecond, + 400 * time.Millisecond, + 800 * time.Millisecond, + 1600 * time.Millisecond, + 3200 * time.Millisecond, + 6400 * time.Millisecond, + 10 * time.Second, // capped at max + } + for i, expected := range expectedDurations { + require.Equal(t, expected, e.NextDuration(), "attempt %d", i) + } + }) + + t.Run("Jitter added", func(t *testing.T) { + e := NewExponential(1*time.Second, 10*time.Second, 1*time.Second) + duration := e.NextDuration() + require.GreaterOrEqual(t, duration, 1*time.Second) + require.Less(t, duration, 2*time.Second) + }) + + t.Run("Edge case: min > max", func(t *testing.T) { + e := NewExponential(10*time.Second, 5*time.Second, 0) + require.Equal(t, 5*time.Second, e.NextDuration()) + }) +} diff --git a/common/heap.go b/common/heap.go new file mode 100644 index 000000000000..67b79a1136d1 --- /dev/null +++ b/common/heap.go @@ -0,0 +1,109 @@ +package common + +import ( + "container/heap" +) + +// Heap is a generic min-heap (or max-heap, depending on Comparable behavior) implementation. +type Heap[T Comparable[T]] struct { + heap innerHeap[T] +} + +func NewHeap[T Comparable[T]]() *Heap[T] { + return &Heap[T]{ + heap: make(innerHeap[T], 0), + } +} + +func (h *Heap[T]) Len() int { + return len(h.heap) +} + +func (h *Heap[T]) Push(element T) *HeapElement[T] { + heapElement := NewHeapElement(element) + heap.Push(&h.heap, heapElement) + + return heapElement +} + +func (h *Heap[T]) Pop() *HeapElement[T] { + return heap.Pop(&h.heap).(*HeapElement[T]) +} + +func (h *Heap[T]) Peek() *HeapElement[T] { + if h.Len() == 0 { + return nil + } + + return h.heap[0] +} + +func (h *Heap[T]) Remove(element *HeapElement[T]) { + heap.Remove(&h.heap, element.index) +} + +func (h *Heap[T]) Clear() { + h.heap = make(innerHeap[T], 0) +} + +type innerHeap[T Comparable[T]] []*HeapElement[T] + +func (h innerHeap[T]) Len() int { + return len(h) +} + +func (h innerHeap[T]) Less(i, j int) bool { + return h[i].Value().CompareTo(h[j].Value()) < 0 +} + +func (h innerHeap[T]) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index, h[j].index = i, j +} + +func (h *innerHeap[T]) Push(x interface{}) { + data := x.(*HeapElement[T]) + *h = append(*h, data) + data.index = len(*h) - 1 +} + +func (h *innerHeap[T]) Pop() interface{} { + n := len(*h) + element := (*h)[n-1] + (*h)[n-1] = nil // avoid memory leak + *h = (*h)[:n-1] + element.index = -1 + + return element +} + +// Comparable is an interface for types that can be compared. +type Comparable[T any] interface { + // CompareTo compares x with other. + // To create a min heap, return: + // -1 if x < other + // 0 if x == other + // +1 if x > other + // To create a max heap, return the opposite. + CompareTo(other T) int +} + +// HeapElement is a wrapper around the value stored in the heap. +type HeapElement[T Comparable[T]] struct { + value T + index int +} + +func NewHeapElement[T Comparable[T]](value T) *HeapElement[T] { + return &HeapElement[T]{ + value: value, + } +} + +func (h *HeapElement[T]) Value() T { + return h.value +} + +func (h *HeapElement[T]) Index() int { + return h.index +} diff --git a/common/heap_test.go b/common/heap_test.go new file mode 100644 index 000000000000..ac927c375de4 --- /dev/null +++ b/common/heap_test.go @@ -0,0 +1,40 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type Int int + +func (i Int) CompareTo(other Int) int { + if i < other { + return -1 + } else if i > other { + return 1 + } else { + return 0 + } +} + +func TestHeap(t *testing.T) { + h := NewHeap[Int]() + + require.Equal(t, 0, h.Len(), "Heap should be empty initially") + + h.Push(Int(3)) + h.Push(Int(1)) + h.Push(Int(2)) + + require.Equal(t, 3, h.Len(), "Heap should have three elements after pushing") + + require.EqualValues(t, 1, h.Pop(), "Pop should return the smallest element") + require.Equal(t, 2, h.Len(), "Heap should have two elements after popping") + + require.EqualValues(t, 2, h.Pop(), "Pop should return the next smallest element") + require.Equal(t, 1, h.Len(), "Heap should have one element after popping") + + require.EqualValues(t, 3, h.Pop(), "Pop should return the last element") + require.Equal(t, 0, h.Len(), "Heap should be empty after popping all elements") +} diff --git a/common/shrinkingmap.go b/common/shrinkingmap.go new file mode 100644 index 000000000000..4bf98f87c2da --- /dev/null +++ b/common/shrinkingmap.go @@ -0,0 +1,71 @@ +package common + +// ShrinkingMap is a map that shrinks itself (by allocating a new map) after a certain number of deletions have been performed. +// If shrinkAfterDeletionsCount is set to <=0, the map will never shrink. +// This is useful to prevent memory leaks in long-running processes that delete a lot of keys from a map. +// See here for more details: https://github.com/golang/go/issues/20135 +type ShrinkingMap[K comparable, V any] struct { + m map[K]V + deletedKeys int + + shrinkAfterDeletionsCount int +} + +func NewShrinkingMap[K comparable, V any](shrinkAfterDeletionsCount int) *ShrinkingMap[K, V] { + return &ShrinkingMap[K, V]{ + m: make(map[K]V), + shrinkAfterDeletionsCount: shrinkAfterDeletionsCount, + } +} + +func (s *ShrinkingMap[K, V]) Set(key K, value V) { + s.m[key] = value +} + +func (s *ShrinkingMap[K, V]) Get(key K) (value V, exists bool) { + value, exists = s.m[key] + return value, exists +} + +func (s *ShrinkingMap[K, V]) Has(key K) bool { + _, exists := s.m[key] + return exists +} + +func (s *ShrinkingMap[K, V]) Delete(key K) (deleted bool) { + if _, exists := s.m[key]; !exists { + return false + } + + delete(s.m, key) + s.deletedKeys++ + + if s.shouldShrink() { + s.shrink() + } + + return true +} + +func (s *ShrinkingMap[K, V]) Size() (size int) { + return len(s.m) +} + +func (s *ShrinkingMap[K, V]) Clear() { + s.m = make(map[K]V) + s.deletedKeys = 0 +} + +func (s *ShrinkingMap[K, V]) shouldShrink() bool { + return s.shrinkAfterDeletionsCount > 0 && s.deletedKeys >= s.shrinkAfterDeletionsCount +} + +func (s *ShrinkingMap[K, V]) shrink() { + newMap := make(map[K]V, len(s.m)) + for k, v := range s.m { + newMap[k] = v + } + + s.m = newMap + s.deletedKeys = 0 +} diff --git a/common/shrinkingmap_test.go b/common/shrinkingmap_test.go new file mode 100644 index 000000000000..c94a917ee140 --- /dev/null +++ b/common/shrinkingmap_test.go @@ -0,0 +1,135 @@ +package common + +import ( + "fmt" + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestShrinkingMap_Shrink(t *testing.T) { + m := NewShrinkingMap[int, int](10) + + for i := 0; i < 100; i++ { + m.Set(i, i) + } + + for i := 0; i < 100; i++ { + val, exists := m.Get(i) + require.Equal(t, true, exists) + require.Equal(t, i, val) + + has := m.Has(i) + require.Equal(t, true, has) + } + + for i := 0; i < 9; i++ { + m.Delete(i) + } + require.Equal(t, 9, m.deletedKeys) + + // Delete the 10th key -> shrinks the map + m.Delete(9) + require.Equal(t, 0, m.deletedKeys) + + for i := 0; i < 100; i++ { + if i < 10 { + val, exists := m.Get(i) + require.Equal(t, false, exists) + require.Equal(t, 0, val) + + has := m.Has(i) + require.Equal(t, false, has) + } else { + val, exists := m.Get(i) + require.Equal(t, true, exists) + require.Equal(t, i, val) + + has := m.Has(i) + require.Equal(t, true, has) + } + } + + require.Equal(t, 90, m.Size()) +} + +func TestNewShrinkingMap_NoShrinking(t *testing.T) { + m := NewShrinkingMap[int, int](0) + for i := 0; i < 10000; i++ { + m.Set(i, i) + } + + for i := 0; i < 10000; i++ { + val, exists := m.Get(i) + require.Equal(t, true, exists) + require.Equal(t, i, val) + + m.Delete(i) + } + + require.Equal(t, 0, m.Size()) + require.Equal(t, 10000, m.deletedKeys) +} + +func TestShrinkingMap_MemoryShrinking(t *testing.T) { + t.Skip("Only for manual testing and memory profiling") + + gcAndPrintAlloc("start") + m := NewShrinkingMap[int, int](10000) + + const mapSize = 1_000_000 + + for i := 0; i < mapSize; i++ { + m.Set(i, i) + } + + gcAndPrintAlloc("after map creation") + + for i := 0; i < mapSize/2; i++ { + m.Delete(i) + } + + gcAndPrintAlloc("after removing half of the elements") + + val, exist := m.Get(mapSize - 1) + require.Equal(t, true, exist) + require.Equal(t, mapSize-1, val) + + gcAndPrintAlloc("end") +} + +func TestShrinkingMap_MemoryNoShrinking(t *testing.T) { + t.Skip("Only for manual testing and memory profiling") + + gcAndPrintAlloc("start") + m := NewShrinkingMap[int, int](0) + + const mapSize = 1_000_000 + + for i := 0; i < mapSize; i++ { + m.Set(i, i) + } + + gcAndPrintAlloc("after map creation") + + for i := 0; i < mapSize/2; i++ { + m.Delete(i) + } + + gcAndPrintAlloc("after removing half of the elements") + + val, exist := m.Get(mapSize - 1) + require.Equal(t, true, exist) + require.Equal(t, mapSize-1, val) + + gcAndPrintAlloc("end") +} + +func gcAndPrintAlloc(prefix string) { + runtime.GC() + + var stats runtime.MemStats + runtime.ReadMemStats(&stats) + fmt.Printf(prefix+", Allocated memory %d KiB\n", stats.Alloc/1024) +} diff --git a/core/blockchain.go b/core/blockchain.go index 6db2b45d355d..400a424cf294 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2024,6 +2024,55 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) { + if !bc.chainmu.TryLock() { + return NonStatTy, errInsertionInterrupted + } + defer bc.chainmu.Unlock() + + statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps) + if err != nil { + return NonStatTy, err + } + + statedb.StartPrefetcher("l1sync") + defer statedb.StopPrefetcher() + + header.ParentHash = parentBlock.Hash() + + tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) + receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) + if err != nil { + return NonStatTy, err + } + + // TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique + // This should be done with https://github.com/scroll-tech/go-ethereum/pull/913. + + header.GasUsed = gasUsed + header.Root = statedb.GetRootHash() + + fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil)) + + blockHash := fullBlock.Hash() + // manually replace the block hash in the receipts + for i, receipt := range receipts { + // add block location fields + receipt.BlockHash = blockHash + receipt.BlockNumber = tempBlock.Number() + receipt.TransactionIndex = uint(i) + + for _, l := range receipt.Logs { + l.BlockHash = blockHash + } + } + for _, l := range logs { + l.BlockHash = blockHash + } + + return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false) +} + // insertSideChain is called when an import batch hits upon a pruned ancestor // error, which happens when a sidechain with a sufficiently old fork-block is // found. diff --git a/core/rawdb/accessors_da_syncer.go b/core/rawdb/accessors_da_syncer.go new file mode 100644 index 000000000000..96f816685652 --- /dev/null +++ b/core/rawdb/accessors_da_syncer.go @@ -0,0 +1,39 @@ +package rawdb + +import ( + "math/big" + + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" +) + +// WriteDASyncedL1BlockNumber writes the highest synced L1 block number to the database. +func WriteDASyncedL1BlockNumber(db ethdb.KeyValueWriter, L1BlockNumber uint64) { + value := big.NewInt(0).SetUint64(L1BlockNumber).Bytes() + + if err := db.Put(daSyncedL1BlockNumberKey, value); err != nil { + log.Crit("Failed to update DA synced L1 block number", "err", err) + } +} + +// ReadDASyncedL1BlockNumber retrieves the highest synced L1 block number. +func ReadDASyncedL1BlockNumber(db ethdb.Reader) *uint64 { + data, err := db.Get(daSyncedL1BlockNumberKey) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read DA synced L1 block number from database", "err", err) + } + if len(data) == 0 { + return nil + } + + number := new(big.Int).SetBytes(data) + if !number.IsUint64() { + log.Crit("Unexpected DA synced L1 block number in database", "number", number) + } + + value := number.Uint64() + return &value +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 4ccd401b04bb..19883114fd12 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -154,6 +154,9 @@ var ( finalizedL2BlockNumberKey = []byte("R-finalized") lastFinalizedBatchIndexKey = []byte("R-finalizedBatchIndex") + // Scroll da syncer store + daSyncedL1BlockNumberKey = []byte("LastDASyncedL1BlockNumber") + // Row consumption rowConsumptionPrefix = []byte("rc") // rowConsumptionPrefix + hash -> row consumption by block diff --git a/eth/backend.go b/eth/backend.go index a4c70ff80c02..2cfba8e79a00 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -240,6 +240,18 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl return nil, err } + // Initialize and start DA syncing pipeline before SyncService as SyncService is blocking until all L1 messages are loaded. + // We need SyncService to load the L1 messages for DA syncing, but since both sync from last known L1 state, we can + // simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline + // by waiting and retrying. + if config.EnableDASyncing { + eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA) + if err != nil { + return nil, fmt.Errorf("cannot initialize da syncer: %w", err) + } + eth.syncingPipeline.Start() + } + // initialize and start L1 message sync service eth.syncService, err = sync_service.NewSyncService(context.Background(), chainConfig, stack.Config(), eth.chainDb, l1Client) if err != nil { @@ -273,7 +285,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl return nil, err } - eth.miner = miner.New(eth, &config.Miner, eth.blockchain.Config(), eth.EventMux(), eth.engine, eth.isLocalBlock) + eth.miner = miner.New(eth, &config.Miner, eth.blockchain.Config(), eth.EventMux(), eth.engine, eth.isLocalBlock, config.EnableDASyncing) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} @@ -336,6 +348,14 @@ func (s *Ethereum) APIs() []rpc.API { // Append any APIs exposed explicitly by the consensus engine apis = append(apis, s.engine.APIs(s.BlockChain())...) + if !s.config.EnableDASyncing { + apis = append(apis, rpc.API{ + Namespace: "eth", + Version: "1.0", + Service: downloader.NewPublicDownloaderAPI(s.handler.downloader, s.eventMux), + Public: true, + }) + } // Append all the local APIs and return return append(apis, []rpc.API{ @@ -524,6 +544,10 @@ func (s *Ethereum) SyncService() *sync_service.SyncService { return s.syncServic // Protocols returns all the currently configured // network protocols to start. func (s *Ethereum) Protocols() []p2p.Protocol { + // if DA syncing enabled then we don't create handler + if s.config.EnableDASyncing { + return nil + } protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates) if !s.blockchain.Config().Scroll.ZktrieEnabled() && s.config.SnapshotCache > 0 { protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.snapDialCandidates)...) @@ -551,7 +575,10 @@ func (s *Ethereum) Start() error { // maxPeers -= s.config.LightPeers // } // Start the networking layer and the light server if requested - s.handler.Start(maxPeers) + // handler is not enabled when DA syncing enabled + if !s.config.EnableDASyncing { + s.handler.Start(maxPeers) + } return nil } @@ -561,7 +588,10 @@ func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.ethDialCandidates.Close() s.snapDialCandidates.Close() - s.handler.Stop() + // handler is not enabled if DA syncing enabled + if !s.config.EnableDASyncing { + s.handler.Stop() + } // Then stop everything else. s.bloomIndexer.Close() @@ -571,6 +601,9 @@ func (s *Ethereum) Stop() error { if s.config.EnableRollupVerify { s.rollupSyncService.Stop() } + if s.config.EnableDASyncing { + s.syncingPipeline.Stop() + } s.miner.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 7fecce39c3db..eae163897591 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -34,6 +34,7 @@ import ( "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/miner" "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer" ) // FullNodeGPO contains default gasprice oracle settings for full node. @@ -78,6 +79,9 @@ var Defaults = Config{ GPO: FullNodeGPO, RPCTxFeeCap: 1, // 1 ether MaxBlockRange: -1, // Default unconfigured value: no block range limit for backward compatibility + DA: da_syncer.Config{ + FetcherMode: da_syncer.L1RPC, + }, } //go:generate go run github.com/fjl/gencodec -type Config -formats toml -out gen_config.go @@ -181,6 +185,12 @@ type Config struct { // List of peer ids that take part in the shadow-fork ShadowForkPeerIDs []string + + // Enable syncing node from DA + EnableDASyncing bool + + // DA syncer config + DA da_syncer.Config } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/go.mod b/go.mod index 896047ab902c..45b47d00d181 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 github.com/protolambda/bls12-381-util v0.0.0-20220416220906-d8552aa452c7 github.com/rs/cors v1.7.0 - github.com/scroll-tech/da-codec v0.1.1-0.20240727174557-66c0e75af163 + github.com/scroll-tech/da-codec v0.1.1-0.20240826104725-b7214d9781ca github.com/scroll-tech/zktrie v0.8.4 github.com/shirou/gopsutil v3.21.11+incompatible github.com/status-im/keycard-go v0.2.0 diff --git a/miner/miner.go b/miner/miner.go index e870232ef721..6beb5c7dd57f 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -88,7 +88,7 @@ type Miner struct { wg sync.WaitGroup } -func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(header *types.Header) bool) *Miner { +func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(header *types.Header) bool, daSyncingEnabled bool) *Miner { miner := &Miner{ mux: mux, eth: eth, @@ -96,10 +96,12 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even exitCh: make(chan struct{}), startCh: make(chan struct{}), stopCh: make(chan struct{}), - worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), + worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true, daSyncingEnabled), + } + if !daSyncingEnabled { + miner.wg.Add(1) + go miner.update() } - miner.wg.Add(1) - go miner.update() return miner } diff --git a/miner/miner_test.go b/miner/miner_test.go index 49bf802dca43..4f61872f4ecf 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -330,7 +330,7 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) { // Create event Mux mux := new(event.TypeMux) // Create Miner - miner := New(backend, &config, chainConfig, mux, engine, nil) + miner := New(backend, &config, chainConfig, mux, engine, nil, false) cleanup := func(skipMiner bool) { bc.Stop() engine.Close() diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go index e5688bc24a6b..1d342a92d381 100644 --- a/miner/scroll_worker.go +++ b/miner/scroll_worker.go @@ -138,7 +138,7 @@ type worker struct { beforeTxHook func() // Method to call before processing a transaction. } -func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Header) bool, init bool) *worker { +func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Header) bool, init bool, daSyncingEnabled bool) *worker { worker := &worker{ config: config, chainConfig: chainConfig, @@ -156,6 +156,10 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus circuitCapacityChecker: ccc.NewChecker(true), } log.Info("created new worker", "Checker ID", worker.circuitCapacityChecker.ID) + if daSyncingEnabled { + log.Info("Worker will not start, because DA syncing is enabled") + return worker + } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) diff --git a/node/config.go b/node/config.go index 93dc612ced99..e75a6d3c70fb 100644 --- a/node/config.go +++ b/node/config.go @@ -218,6 +218,8 @@ type Config struct { L1Confirmations rpc.BlockNumber `toml:",omitempty"` // L1 bridge deployment block number L1DeploymentBlock uint64 `toml:",omitempty"` + // Is daSyncingEnabled + DaSyncingEnabled bool `toml:",omitempty"` } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into diff --git a/node/node.go b/node/node.go index 00d906845e74..eb2ccee20192 100644 --- a/node/node.go +++ b/node/node.go @@ -265,9 +265,13 @@ func (n *Node) doClose(errs []error) error { // openEndpoints starts all network and RPC endpoints. func (n *Node) openEndpoints() error { // start networking endpoints - n.log.Info("Starting peer-to-peer node", "instance", n.server.Name) - if err := n.server.Start(); err != nil { - return convertFileLockError(err) + if !n.config.DaSyncingEnabled { + n.log.Info("Starting peer-to-peer node", "instance", n.server.Name) + if err := n.server.Start(); err != nil { + return convertFileLockError(err) + } + } else { + n.log.Info("Peer-to-peer node will not start, because DA syncing is enabled") } // start RPC endpoints err := n.startRPC() diff --git a/rollup/da_syncer/batch_queue.go b/rollup/da_syncer/batch_queue.go new file mode 100644 index 000000000000..7a3d094f6322 --- /dev/null +++ b/rollup/da_syncer/batch_queue.go @@ -0,0 +1,102 @@ +package da_syncer + +import ( + "context" + "fmt" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" +) + +// BatchQueue is a pipeline stage that reads all batch events from DAQueue and provides only finalized batches to the next stage. +type BatchQueue struct { + DAQueue *DAQueue + db ethdb.Database + lastFinalizedBatchIndex uint64 + batches *common.Heap[da.Entry] + batchesMap *common.ShrinkingMap[uint64, *common.HeapElement[da.Entry]] +} + +func NewBatchQueue(DAQueue *DAQueue, db ethdb.Database) *BatchQueue { + return &BatchQueue{ + DAQueue: DAQueue, + db: db, + lastFinalizedBatchIndex: 0, + batches: common.NewHeap[da.Entry](), + batchesMap: common.NewShrinkingMap[uint64, *common.HeapElement[da.Entry]](1000), + } +} + +// NextBatch finds next finalized batch and returns data, that was committed in that batch +func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) { + if batch := bq.getFinalizedBatch(); batch != nil { + return batch, nil + } + + for { + daEntry, err := bq.DAQueue.NextDA(ctx) + if err != nil { + return nil, err + } + switch daEntry.Type() { + case da.CommitBatchV0Type, da.CommitBatchV1Type, da.CommitBatchV2Type: + bq.addBatch(daEntry) + case da.RevertBatchType: + bq.deleteBatch(daEntry) + case da.FinalizeBatchType: + if daEntry.BatchIndex() > bq.lastFinalizedBatchIndex { + bq.lastFinalizedBatchIndex = daEntry.BatchIndex() + } + + if batch := bq.getFinalizedBatch(); batch != nil { + return batch, nil + } + default: + return nil, fmt.Errorf("unexpected type of daEntry: %T", daEntry) + } + } +} + +// getFinalizedBatch returns next finalized batch if there is available +func (bq *BatchQueue) getFinalizedBatch() da.Entry { + if bq.batches.Len() == 0 { + return nil + } + + batch := bq.batches.Peek().Value() + if batch.BatchIndex() <= bq.lastFinalizedBatchIndex { + bq.deleteBatch(batch) + return batch + } else { + return nil + } +} + +func (bq *BatchQueue) addBatch(batch da.Entry) { + heapElement := bq.batches.Push(batch) + bq.batchesMap.Set(batch.BatchIndex(), heapElement) +} + +// deleteBatch deletes data committed in the batch from map, because this batch is reverted or finalized +// updates DASyncedL1BlockNumber +func (bq *BatchQueue) deleteBatch(batch da.Entry) { + batchHeapElement, exists := bq.batchesMap.Get(batch.BatchIndex()) + if !exists { + return + } + + bq.batchesMap.Delete(batch.BatchIndex()) + bq.batches.Remove(batchHeapElement) + + // we store here min height of currently loaded batches to be able to start syncing from the same place in case of restart + // TODO: we should store this information when the batch is done being processed to avoid inconsistencies + rawdb.WriteDASyncedL1BlockNumber(bq.db, batch.L1BlockNumber()-1) +} + +func (bq *BatchQueue) Reset(height uint64) { + bq.batches.Clear() + bq.batchesMap.Clear() + bq.DAQueue.Reset(height) +} diff --git a/rollup/da_syncer/blob_client/beacon_node_client.go b/rollup/da_syncer/blob_client/beacon_node_client.go new file mode 100644 index 000000000000..20e4d876a489 --- /dev/null +++ b/rollup/da_syncer/blob_client/beacon_node_client.go @@ -0,0 +1,183 @@ +package blob_client + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" +) + +type BeaconNodeClient struct { + apiEndpoint string + l1Client *rollup_sync_service.L1Client + genesisTime uint64 + secondsPerSlot uint64 +} + +var ( + beaconNodeGenesisEndpoint = "/eth/v1/beacon/genesis" + beaconNodeSpecEndpoint = "/eth/v1/config/spec" + beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars" +) + +func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) { + // get genesis time + genesisPath, err := url.JoinPath(apiEndpoint, beaconNodeGenesisEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err := http.Get(genesisPath) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr) + } + + var genesisResp GenesisResp + err = json.NewDecoder(resp.Body).Decode(&genesisResp) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + genesisTime, err := strconv.ParseUint(genesisResp.Data.GenesisTime, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to decode genesis time %s, err: %w", genesisResp.Data.GenesisTime, err) + } + + // get seconds per slot from spec + specPath, err := url.JoinPath(apiEndpoint, beaconNodeSpecEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err = http.Get(specPath) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr) + } + + var specResp SpecResp + err = json.NewDecoder(resp.Body).Decode(&specResp) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + secondsPerSlot, err := strconv.ParseUint(specResp.Data.SecondsPerSlot, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to decode seconds per slot %s, err: %w", specResp.Data.SecondsPerSlot, err) + } + if secondsPerSlot == 0 { + return nil, fmt.Errorf("failed to make new BeaconNodeClient, secondsPerSlot is 0") + } + + return &BeaconNodeClient{ + apiEndpoint: apiEndpoint, + l1Client: l1Client, + genesisTime: genesisTime, + secondsPerSlot: secondsPerSlot, + }, nil +} + +func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { + // get block timestamp to calculate slot + header, err := c.l1Client.GetHeaderByNumber(blockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get header by number, err: %w", err) + } + slot := (header.Time - c.genesisTime) / c.secondsPerSlot + + // get blob sidecar for slot + blobSidecarPath, err := url.JoinPath(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot)) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err := http.Get(blobSidecarPath) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr) + } + + var blobSidecarResp BlobSidecarResp + err = json.NewDecoder(resp.Body).Decode(&blobSidecarResp) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + + // find blob with desired versionedHash + for _, blob := range blobSidecarResp.Data { + // calculate blob hash from commitment and check it with desired + commitmentBytes := common.FromHex(blob.KzgCommitment) + if len(commitmentBytes) != lenKZGCommitment { + return nil, fmt.Errorf("len of kzg commitment is not correct, expected: %d, got: %d", lenKZGCommitment, len(commitmentBytes)) + } + commitment := kzg4844.Commitment(commitmentBytes) + blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment) + + if blobVersionedHash == versionedHash { + // found desired blob + blobBytes := common.FromHex(blob.Blob) + if len(blobBytes) != lenBlobBytes { + return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes)) + } + + b := kzg4844.Blob(blobBytes) + return &b, nil + } + } + + return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber) +} + +type GenesisResp struct { + Data struct { + GenesisTime string `json:"genesis_time"` + } `json:"data"` +} + +type SpecResp struct { + Data struct { + SecondsPerSlot string `json:"SECONDS_PER_SLOT"` + } `json:"data"` +} + +type BlobSidecarResp struct { + Data []struct { + Index string `json:"index"` + Blob string `json:"blob"` + KzgCommitment string `json:"kzg_commitment"` + KzgProof string `json:"kzg_proof"` + SignedBlockHeader struct { + Message struct { + Slot string `json:"slot"` + ProposerIndex string `json:"proposer_index"` + ParentRoot string `json:"parent_root"` + StateRoot string `json:"state_root"` + BodyRoot string `json:"body_root"` + } `json:"message"` + Signature string `json:"signature"` + } `json:"signed_block_header"` + KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"` + } `json:"data"` +} diff --git a/rollup/da_syncer/blob_client/blob_client.go b/rollup/da_syncer/blob_client/blob_client.go new file mode 100644 index 000000000000..5ce0d16a42cc --- /dev/null +++ b/rollup/da_syncer/blob_client/blob_client.go @@ -0,0 +1,17 @@ +package blob_client + +import ( + "context" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" +) + +const ( + lenBlobBytes int = 131072 + lenKZGCommitment int = 48 +) + +type BlobClient interface { + GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) +} diff --git a/rollup/da_syncer/blob_client/blob_client_list.go b/rollup/da_syncer/blob_client/blob_client_list.go new file mode 100644 index 000000000000..647f5f78ff71 --- /dev/null +++ b/rollup/da_syncer/blob_client/blob_client_list.go @@ -0,0 +1,65 @@ +package blob_client + +import ( + "context" + "errors" + "fmt" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" +) + +type BlobClientList struct { + list []BlobClient + curPos int +} + +func NewBlobClientList(blobClients ...BlobClient) *BlobClientList { + return &BlobClientList{ + list: blobClients, + curPos: 0, + } +} + +func (c *BlobClientList) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { + if len(c.list) == 0 { + return nil, fmt.Errorf("BlobClientList.GetBlobByVersionedHash: list of BlobClients is empty") + } + + for i := 0; i < len(c.list); i++ { + blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber) + if err == nil { + return blob, nil + } + c.nextPos() + // there was an error, try the next blob client in following iteration + log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos) + } + + // if we iterated over entire list, return a temporary error that will be handled in syncing_pipeline with a backoff and retry + return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients")) +} + +func (c *BlobClientList) nextPos() { + c.curPos = (c.curPos + 1) % len(c.list) +} + +func (c *BlobClientList) AddBlobClient(blobClient BlobClient) { + c.list = append(c.list, blobClient) +} + +func (c *BlobClientList) RemoveBlobClient(blobClient BlobClient) { + c.list = append(c.list, blobClient) + for pos, client := range c.list { + if client == blobClient { + c.list = append(c.list[:pos], c.list[pos+1:]...) + c.curPos %= len(c.list) + return + } + } +} +func (c *BlobClientList) Size() int { + return len(c.list) +} diff --git a/rollup/da_syncer/blob_client/blob_scan_client.go b/rollup/da_syncer/blob_client/blob_scan_client.go new file mode 100644 index 000000000000..49a32d6d8191 --- /dev/null +++ b/rollup/da_syncer/blob_client/blob_scan_client.go @@ -0,0 +1,78 @@ +package blob_client + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" +) + +type BlobScanClient struct { + client *http.Client + apiEndpoint string +} + +func NewBlobScanClient(apiEndpoint string) *BlobScanClient { + return &BlobScanClient{ + client: http.DefaultClient, + apiEndpoint: apiEndpoint, + } +} + +func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { + // blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId + path, err := url.JoinPath(c.apiEndpoint, versionedHash.String()) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + req, err := http.NewRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, fmt.Errorf("cannot create request, err: %w", err) + } + req.Header.Set("accept", "application/json") + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("no blob with versioned hash : %s", versionedHash.String()) + } + var res ErrorRespBlobScan + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + return nil, fmt.Errorf("error while fetching blob, message: %s, code: %s, versioned hash: %s", res.Message, res.Code, versionedHash.String()) + } + var result BlobRespBlobScan + + err = json.NewDecoder(resp.Body).Decode(&result) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + blobBytes, err := hex.DecodeString(result.Data[2:]) + if err != nil { + return nil, fmt.Errorf("failed to decode data to bytes, err: %w", err) + } + if len(blobBytes) != lenBlobBytes { + return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes)) + } + blob := kzg4844.Blob(blobBytes) + return &blob, nil +} + +type BlobRespBlobScan struct { + Data string `json:"data"` +} + +type ErrorRespBlobScan struct { + Message string `json:"message"` + Code string `json:"code"` +} diff --git a/rollup/da_syncer/blob_client/block_native_client.go b/rollup/da_syncer/blob_client/block_native_client.go new file mode 100644 index 000000000000..32ab290728f7 --- /dev/null +++ b/rollup/da_syncer/blob_client/block_native_client.go @@ -0,0 +1,71 @@ +package blob_client + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" +) + +type BlockNativeClient struct { + apiEndpoint string +} + +func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient { + return &BlockNativeClient{ + apiEndpoint: apiEndpoint, + } +} + +func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { + // blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive + path, err := url.JoinPath(c.apiEndpoint, versionedHash.String()) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err := http.Get(path) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + var res ErrorRespBlockNative + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + return nil, fmt.Errorf("error while fetching blob, message: %s, code: %d, versioned hash: %s", res.Error.Message, res.Error.Code, versionedHash.String()) + } + var result BlobRespBlockNative + err = json.NewDecoder(resp.Body).Decode(&result) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + blobBytes, err := hex.DecodeString(result.Blob.Data[2:]) + if err != nil { + return nil, fmt.Errorf("failed to decode data to bytes, err: %w", err) + } + if len(blobBytes) != lenBlobBytes { + return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes)) + } + blob := kzg4844.Blob(blobBytes) + return &blob, nil +} + +type BlobRespBlockNative struct { + Blob struct { + Data string `json:"data"` + } `json:"blob"` +} + +type ErrorRespBlockNative struct { + Error struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` +} diff --git a/rollup/da_syncer/block_queue.go b/rollup/da_syncer/block_queue.go new file mode 100644 index 000000000000..a122d41ab356 --- /dev/null +++ b/rollup/da_syncer/block_queue.go @@ -0,0 +1,56 @@ +package da_syncer + +import ( + "context" + "fmt" + + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" +) + +// BlockQueue is a pipeline stage that reads batches from BatchQueue, extracts all da.PartialBlock from it and +// provides them to the next stage one-by-one. +type BlockQueue struct { + batchQueue *BatchQueue + blocks []*da.PartialBlock +} + +func NewBlockQueue(batchQueue *BatchQueue) *BlockQueue { + return &BlockQueue{ + batchQueue: batchQueue, + blocks: make([]*da.PartialBlock, 0), + } +} + +func (bq *BlockQueue) NextBlock(ctx context.Context) (*da.PartialBlock, error) { + for len(bq.blocks) == 0 { + err := bq.getBlocksFromBatch(ctx) + if err != nil { + return nil, err + } + } + block := bq.blocks[0] + bq.blocks = bq.blocks[1:] + return block, nil +} + +func (bq *BlockQueue) getBlocksFromBatch(ctx context.Context) error { + daEntry, err := bq.batchQueue.NextBatch(ctx) + if err != nil { + return err + } + + entryWithBlocks, ok := daEntry.(da.EntryWithBlocks) + // this should never happen because we only receive CommitBatch entries + if !ok { + return fmt.Errorf("unexpected type of daEntry: %T", daEntry) + } + + bq.blocks = entryWithBlocks.Blocks() + + return nil +} + +func (bq *BlockQueue) Reset(height uint64) { + bq.blocks = make([]*da.PartialBlock, 0) + bq.batchQueue.Reset(height) +} diff --git a/rollup/da_syncer/da/calldata_blob_source.go b/rollup/da_syncer/da/calldata_blob_source.go new file mode 100644 index 000000000000..d946c8096fe8 --- /dev/null +++ b/rollup/da_syncer/da/calldata_blob_source.go @@ -0,0 +1,241 @@ +package da + +import ( + "context" + "errors" + "fmt" + + "github.com/scroll-tech/go-ethereum/accounts/abi" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" +) + +const ( + callDataBlobSourceFetchBlockRange uint64 = 500 + commitBatchEventName = "CommitBatch" + revertBatchEventName = "RevertBatch" + finalizeBatchEventName = "FinalizeBatch" + commitBatchMethodName = "commitBatch" + commitBatchWithBlobProofMethodName = "commitBatchWithBlobProof" + + // the length og method ID at the beginning of transaction data + methodIDLength = 4 +) + +var ( + ErrSourceExhausted = errors.New("data source has been exhausted") +) + +type CalldataBlobSource struct { + ctx context.Context + l1Client *rollup_sync_service.L1Client + blobClient blob_client.BlobClient + l1height uint64 + scrollChainABI *abi.ABI + l1CommitBatchEventSignature common.Hash + l1RevertBatchEventSignature common.Hash + l1FinalizeBatchEventSignature common.Hash + db ethdb.Database + + l1Finalized uint64 +} + +func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Client *rollup_sync_service.L1Client, blobClient blob_client.BlobClient, db ethdb.Database) (*CalldataBlobSource, error) { + scrollChainABI, err := rollup_sync_service.ScrollChainMetaData.GetAbi() + if err != nil { + return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) + } + return &CalldataBlobSource{ + ctx: ctx, + l1Client: l1Client, + blobClient: blobClient, + l1height: l1height, + scrollChainABI: scrollChainABI, + l1CommitBatchEventSignature: scrollChainABI.Events[commitBatchEventName].ID, + l1RevertBatchEventSignature: scrollChainABI.Events[revertBatchEventName].ID, + l1FinalizeBatchEventSignature: scrollChainABI.Events[finalizeBatchEventName].ID, + db: db, + }, nil +} + +func (ds *CalldataBlobSource) NextData() (Entries, error) { + var err error + to := ds.l1height + callDataBlobSourceFetchBlockRange + + // If there's not enough finalized blocks to request up to, we need to query finalized block number. + // Otherwise, we know that there's more finalized blocks than we want to request up to + // -> no need to query finalized block number + if to > ds.l1Finalized { + ds.l1Finalized, err = ds.l1Client.GetLatestFinalizedBlockNumber() + if err != nil { + return nil, serrors.NewTemporaryError(fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err)) + } + // make sure we don't request more than finalized blocks + to = min(to, ds.l1Finalized) + } + + if ds.l1height > to { + return nil, ErrSourceExhausted + } + + logs, err := ds.l1Client.FetchRollupEventsInRange(ds.l1height, to) + if err != nil { + return nil, serrors.NewTemporaryError(fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err)) + } + da, err := ds.processLogsToDA(logs) + if err != nil { + return nil, serrors.NewTemporaryError(fmt.Errorf("failed to process logs to DA, error: %v", err)) + } + + ds.l1height = to + 1 + return da, nil +} + +func (ds *CalldataBlobSource) L1Height() uint64 { + return ds.l1height +} + +func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error) { + var entries Entries + var entry Entry + var err error + + for _, vLog := range logs { + switch vLog.Topics[0] { + case ds.l1CommitBatchEventSignature: + event := &rollup_sync_service.L1CommitBatchEvent{} + if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, commitBatchEventName, vLog); err != nil { + return nil, fmt.Errorf("failed to unpack commit rollup event log, err: %w", err) + } + + batchIndex := event.BatchIndex.Uint64() + log.Trace("found new CommitBatch event", "batch index", batchIndex) + + if entry, err = ds.getCommitBatchDA(batchIndex, &vLog); err != nil { + return nil, fmt.Errorf("failed to get commit batch da: %v, err: %w", batchIndex, err) + } + + case ds.l1RevertBatchEventSignature: + event := &rollup_sync_service.L1RevertBatchEvent{} + if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil { + return nil, fmt.Errorf("failed to unpack revert rollup event log, err: %w", err) + } + + batchIndex := event.BatchIndex.Uint64() + log.Trace("found new RevertBatchType event", "batch index", batchIndex) + entry = NewRevertBatch(batchIndex) + + case ds.l1FinalizeBatchEventSignature: + event := &rollup_sync_service.L1FinalizeBatchEvent{} + if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil { + return nil, fmt.Errorf("failed to unpack finalized rollup event log, err: %w", err) + } + + batchIndex := event.BatchIndex.Uint64() + log.Trace("found new FinalizeBatchType event", "batch index", event.BatchIndex.Uint64()) + entry = NewFinalizeBatch(batchIndex) + + default: + return nil, fmt.Errorf("unknown event, topic: %v, tx hash: %v", vLog.Topics[0].Hex(), vLog.TxHash.Hex()) + } + + entries = append(entries, entry) + } + return entries, nil +} + +type commitBatchArgs struct { + Version uint8 + ParentBatchHeader []byte + Chunks [][]byte + SkippedL1MessageBitmap []byte +} + +func newCommitBatchArgs(method *abi.Method, values []interface{}) (*commitBatchArgs, error) { + var args commitBatchArgs + err := method.Inputs.Copy(&args, values) + return &args, err +} + +func newCommitBatchArgsFromCommitBatchWithProof(method *abi.Method, values []interface{}) (*commitBatchArgs, error) { + var args commitBatchWithBlobProofArgs + err := method.Inputs.Copy(&args, values) + if err != nil { + return nil, err + } + return &commitBatchArgs{ + Version: args.Version, + ParentBatchHeader: args.ParentBatchHeader, + Chunks: args.Chunks, + SkippedL1MessageBitmap: args.SkippedL1MessageBitmap, + }, nil +} + +type commitBatchWithBlobProofArgs struct { + Version uint8 + ParentBatchHeader []byte + Chunks [][]byte + SkippedL1MessageBitmap []byte + BlobDataProof []byte +} + +func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, vLog *types.Log) (Entry, error) { + if batchIndex == 0 { + return NewCommitBatchDAV0Empty(), nil + } + + txData, err := ds.l1Client.FetchTxData(vLog) + if err != nil { + return nil, fmt.Errorf("failed to fetch tx data, tx hash: %v, err: %w", vLog.TxHash.Hex(), err) + } + if len(txData) < methodIDLength { + return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength) + } + + method, err := ds.scrollChainABI.MethodById(txData[:methodIDLength]) + if err != nil { + return nil, fmt.Errorf("failed to get method by ID, ID: %v, err: %w", txData[:methodIDLength], err) + } + values, err := method.Inputs.Unpack(txData[methodIDLength:]) + if err != nil { + return nil, fmt.Errorf("failed to unpack transaction data using ABI, tx data: %v, err: %w", txData, err) + } + + if method.Name == commitBatchMethodName { + args, err := newCommitBatchArgs(method, values) + if err != nil { + return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err) + } + switch args.Version { + case 0: + return NewCommitBatchDAV0(ds.db, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, vLog.BlockNumber) + case 1: + return NewCommitBatchDAV1(ds.ctx, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + case 2: + return NewCommitBatchDAV2(ds.ctx, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + default: + return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) + } + } else if method.Name == commitBatchWithBlobProofMethodName { + args, err := newCommitBatchArgsFromCommitBatchWithProof(method, values) + if err != nil { + return nil, fmt.Errorf("failed to decode calldata into commitBatch args, values: %+v, err: %w", values, err) + } + switch args.Version { + case 3: + // we can use V2 for version 3, because it's same + return NewCommitBatchDAV2(ds.ctx, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + case 4: + return NewCommitBatchDAV4(ds.ctx, ds.db, ds.l1Client, ds.blobClient, vLog, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + default: + return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) + } + } + + return nil, fmt.Errorf("unknown method name: %s", method.Name) +} diff --git a/rollup/da_syncer/da/commitV0.go b/rollup/da_syncer/da/commitV0.go new file mode 100644 index 000000000000..66a13786c9cb --- /dev/null +++ b/rollup/da_syncer/da/commitV0.go @@ -0,0 +1,173 @@ +package da + +import ( + "encoding/binary" + "fmt" + + "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/da-codec/encoding/codecv0" + + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" +) + +type CommitBatchDAV0 struct { + version uint8 + batchIndex uint64 + parentTotalL1MessagePopped uint64 + skippedL1MessageBitmap []byte + chunks []*codecv0.DAChunkRawTx + l1Txs []*types.L1MessageTx + + l1BlockNumber uint64 +} + +func NewCommitBatchDAV0(db ethdb.Database, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, + l1BlockNumber uint64, +) (*CommitBatchDAV0, error) { + decodedChunks, err := codecv0.DecodeDAChunksRawTx(chunks) + if err != nil { + return nil, fmt.Errorf("failed to unpack chunks: %d, err: %w", batchIndex, err) + } + + return NewCommitBatchDAV0WithChunks(db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, l1BlockNumber) +} + +func NewCommitBatchDAV0WithChunks(db ethdb.Database, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + decodedChunks []*codecv0.DAChunkRawTx, + skippedL1MessageBitmap []byte, + l1BlockNumber uint64, +) (*CommitBatchDAV0, error) { + parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(parentBatchHeader) + l1Txs, err := getL1Messages(db, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks)) + if err != nil { + return nil, fmt.Errorf("failed to get L1 messages for v0 batch %d: %w", batchIndex, err) + } + + return &CommitBatchDAV0{ + version: version, + batchIndex: batchIndex, + parentTotalL1MessagePopped: parentTotalL1MessagePopped, + skippedL1MessageBitmap: skippedL1MessageBitmap, + chunks: decodedChunks, + l1Txs: l1Txs, + l1BlockNumber: l1BlockNumber, + }, nil +} + +func NewCommitBatchDAV0Empty() *CommitBatchDAV0 { + return &CommitBatchDAV0{ + batchIndex: 0, + } +} + +func (c *CommitBatchDAV0) Type() Type { + return CommitBatchV0Type +} + +func (c *CommitBatchDAV0) L1BlockNumber() uint64 { + return c.l1BlockNumber +} + +func (c *CommitBatchDAV0) BatchIndex() uint64 { + return c.batchIndex +} + +func (c *CommitBatchDAV0) CompareTo(other Entry) int { + if c.BatchIndex() < other.BatchIndex() { + return -1 + } else if c.BatchIndex() > other.BatchIndex() { + return 1 + } + return 0 +} + +func (c *CommitBatchDAV0) Blocks() []*PartialBlock { + var blocks []*PartialBlock + l1TxPointer := 0 + + curL1TxIndex := c.parentTotalL1MessagePopped + for _, chunk := range c.chunks { + for blockId, daBlock := range chunk.Blocks { + // create txs + txs := make(types.Transactions, 0, daBlock.NumTransactions) + // insert l1 msgs + for l1TxPointer < len(c.l1Txs) && c.l1Txs[l1TxPointer].QueueIndex < curL1TxIndex+uint64(daBlock.NumL1Messages) { + l1Tx := types.NewTx(c.l1Txs[l1TxPointer]) + txs = append(txs, l1Tx) + l1TxPointer++ + } + curL1TxIndex += uint64(daBlock.NumL1Messages) + + // insert l2 txs + txs = append(txs, chunk.Transactions[blockId]...) + + block := NewPartialBlock( + &PartialHeader{ + Number: daBlock.BlockNumber, + Time: daBlock.Timestamp, + BaseFee: daBlock.BaseFee, + GasLimit: daBlock.GasLimit, + Difficulty: 10, // TODO: replace with real difficulty + ExtraData: []byte{1, 2, 3, 4, 5, 6, 7, 8}, // TODO: replace with real extra data + }, + txs) + blocks = append(blocks, block) + } + } + + return blocks +} + +func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int { + totalL1MessagePopped := 0 + for _, chunk := range decodedChunks { + for _, block := range chunk.Blocks { + totalL1MessagePopped += int(block.NumL1Messages) + } + } + return totalL1MessagePopped +} + +func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) { + var txs []*types.L1MessageTx + + decodedSkippedBitmap, err := encoding.DecodeBitmap(skippedBitmap, totalL1MessagePopped) + if err != nil { + return nil, fmt.Errorf("failed to decode skipped message bitmap: err: %w", err) + } + + // get all necessary l1 messages without skipped + currentIndex := parentTotalL1MessagePopped + for index := 0; index < totalL1MessagePopped; index++ { + if encoding.IsL1MessageSkipped(decodedSkippedBitmap, currentIndex-parentTotalL1MessagePopped) { + currentIndex++ + continue + } + l1Tx := rawdb.ReadL1Message(db, currentIndex) + if l1Tx == nil { + // message not yet available + // we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry + return nil, serrors.EOFError + } + txs = append(txs, l1Tx) + currentIndex++ + } + + return txs, nil +} + +func getBatchTotalL1MessagePopped(data []byte) uint64 { + // total l1 message popped stored in bytes from 17 to 24, accordingly to codec spec + return binary.BigEndian.Uint64(data[17:25]) +} diff --git a/rollup/da_syncer/da/commitV1.go b/rollup/da_syncer/da/commitV1.go new file mode 100644 index 000000000000..d94a046c81df --- /dev/null +++ b/rollup/da_syncer/da/commitV1.go @@ -0,0 +1,92 @@ +package da + +import ( + "context" + "crypto/sha256" + "fmt" + + "github.com/scroll-tech/da-codec/encoding/codecv0" + "github.com/scroll-tech/da-codec/encoding/codecv1" + + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" + "github.com/scroll-tech/go-ethereum/ethdb" +) + +type CommitBatchDAV1 struct { + *CommitBatchDAV0 +} + +func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database, + l1Client *rollup_sync_service.L1Client, + blobClient blob_client.BlobClient, + vLog *types.Log, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, +) (*CommitBatchDAV1, error) { + return NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Client, blobClient, vLog, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv1.DecodeTxsFromBlob) +} + +func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database, + l1Client *rollup_sync_service.L1Client, + blobClient blob_client.BlobClient, + vLog *types.Log, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, + decodeTxsFromBlobFunc func(*kzg4844.Blob, []*codecv0.DAChunkRawTx) error, +) (*CommitBatchDAV1, error) { + decodedChunks, err := codecv1.DecodeDAChunksRawTx(chunks) + if err != nil { + return nil, fmt.Errorf("failed to unpack chunks: %v, err: %w", batchIndex, err) + } + + versionedHash, err := l1Client.FetchTxBlobHash(vLog) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err) + } + + blob, err := blobClient.GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, vLog.BlockNumber) + if err != nil { + return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) + } + if blob == nil { + return nil, fmt.Errorf("unexpected, blob == nil and err != nil, batch index: %d, versionedHash: %s, blobClient: %T", batchIndex, versionedHash.String(), blobClient) + } + + // compute blob versioned hash and compare with one from tx + c, err := kzg4844.BlobToCommitment(blob) + if err != nil { + return nil, fmt.Errorf("failed to create blob commitment") + } + blobVersionedHash := common.Hash(kzg4844.CalcBlobHashV1(sha256.New(), &c)) + if blobVersionedHash != versionedHash { + return nil, fmt.Errorf("blobVersionedHash from blob source is not equal to versionedHash from tx, correct versioned hash: %s, fetched blob hash: %s", versionedHash.String(), blobVersionedHash.String()) + } + + // decode txs from blob + err = decodeTxsFromBlobFunc(blob, decodedChunks) + if err != nil { + return nil, fmt.Errorf("failed to decode txs from blob: %w", err) + } + + v0, err := NewCommitBatchDAV0WithChunks(db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, vLog.BlockNumber) + if err != nil { + return nil, err + } + + return &CommitBatchDAV1{v0}, nil +} + +func (c *CommitBatchDAV1) Type() Type { + return CommitBatchV1Type +} diff --git a/rollup/da_syncer/da/commitV2.go b/rollup/da_syncer/da/commitV2.go new file mode 100644 index 000000000000..c1e6d353fc5b --- /dev/null +++ b/rollup/da_syncer/da/commitV2.go @@ -0,0 +1,40 @@ +package da + +import ( + "context" + + "github.com/scroll-tech/da-codec/encoding/codecv2" + + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + + "github.com/scroll-tech/go-ethereum/core/types" +) + +type CommitBatchDAV2 struct { + *CommitBatchDAV1 +} + +func NewCommitBatchDAV2(ctx context.Context, db ethdb.Database, + l1Client *rollup_sync_service.L1Client, + blobClient blob_client.BlobClient, + vLog *types.Log, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, +) (*CommitBatchDAV2, error) { + + v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Client, blobClient, vLog, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv2.DecodeTxsFromBlob) + if err != nil { + return nil, err + } + + return &CommitBatchDAV2{v1}, nil +} + +func (c *CommitBatchDAV2) Type() Type { + return CommitBatchV2Type +} diff --git a/rollup/da_syncer/da/commitV4.go b/rollup/da_syncer/da/commitV4.go new file mode 100644 index 000000000000..9b590b2bfff5 --- /dev/null +++ b/rollup/da_syncer/da/commitV4.go @@ -0,0 +1,40 @@ +package da + +import ( + "context" + + "github.com/scroll-tech/da-codec/encoding/codecv4" + + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + + "github.com/scroll-tech/go-ethereum/core/types" +) + +type CommitBatchDAV4 struct { + *CommitBatchDAV1 +} + +func NewCommitBatchDAV4(ctx context.Context, db ethdb.Database, + l1Client *rollup_sync_service.L1Client, + blobClient blob_client.BlobClient, + vLog *types.Log, + version uint8, + batchIndex uint64, + parentBatchHeader []byte, + chunks [][]byte, + skippedL1MessageBitmap []byte, +) (*CommitBatchDAV2, error) { + + v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Client, blobClient, vLog, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv4.DecodeTxsFromBlob) + if err != nil { + return nil, err + } + + return &CommitBatchDAV2{v1}, nil +} + +func (c *CommitBatchDAV4) Type() Type { + return CommitBatchV4Type +} diff --git a/rollup/da_syncer/da/da.go b/rollup/da_syncer/da/da.go new file mode 100644 index 000000000000..5f00e86115a1 --- /dev/null +++ b/rollup/da_syncer/da/da.go @@ -0,0 +1,73 @@ +package da + +import ( + "math/big" + + "github.com/scroll-tech/go-ethereum/core/types" +) + +type Type int + +const ( + // CommitBatchV0Type contains data of event of CommitBatchV0Type + CommitBatchV0Type Type = iota + // CommitBatchV1Type contains data of event of CommitBatchV1Type + CommitBatchV1Type + // CommitBatchV2Type contains data of event of CommitBatchV2Type + CommitBatchV2Type + // CommitBatchV4Type contains data of event of CommitBatchV2Type + CommitBatchV4Type + // RevertBatchType contains data of event of RevertBatchType + RevertBatchType + // FinalizeBatchType contains data of event of FinalizeBatchType + FinalizeBatchType +) + +// Entry represents a single DA event (commit, revert, finalize). +type Entry interface { + Type() Type + BatchIndex() uint64 + L1BlockNumber() uint64 + CompareTo(Entry) int +} + +type EntryWithBlocks interface { + Entry + Blocks() []*PartialBlock +} + +type Entries []Entry + +// PartialHeader represents a partial header (from DA) of a block. +type PartialHeader struct { + Number uint64 + Time uint64 + BaseFee *big.Int + GasLimit uint64 + Difficulty uint64 + ExtraData []byte +} + +func (h *PartialHeader) ToHeader() *types.Header { + return &types.Header{ + Number: big.NewInt(0).SetUint64(h.Number), + Time: h.Time, + BaseFee: h.BaseFee, + GasLimit: h.GasLimit, + Difficulty: new(big.Int).SetUint64(h.Difficulty), + Extra: h.ExtraData, + } +} + +// PartialBlock represents a partial block (from DA). +type PartialBlock struct { + PartialHeader *PartialHeader + Transactions types.Transactions +} + +func NewPartialBlock(partialHeader *PartialHeader, txs types.Transactions) *PartialBlock { + return &PartialBlock{ + PartialHeader: partialHeader, + Transactions: txs, + } +} diff --git a/rollup/da_syncer/da/finalize.go b/rollup/da_syncer/da/finalize.go new file mode 100644 index 000000000000..14d6c2a644cb --- /dev/null +++ b/rollup/da_syncer/da/finalize.go @@ -0,0 +1,34 @@ +package da + +type FinalizeBatch struct { + batchIndex uint64 + + l1BlockNumber uint64 +} + +func NewFinalizeBatch(batchIndex uint64) *FinalizeBatch { + return &FinalizeBatch{ + batchIndex: batchIndex, + } +} + +func (f *FinalizeBatch) Type() Type { + return FinalizeBatchType +} + +func (f *FinalizeBatch) L1BlockNumber() uint64 { + return f.l1BlockNumber +} + +func (f *FinalizeBatch) BatchIndex() uint64 { + return f.batchIndex +} + +func (f *FinalizeBatch) CompareTo(other Entry) int { + if f.BatchIndex() < other.BatchIndex() { + return -1 + } else if f.BatchIndex() > other.BatchIndex() { + return 1 + } + return 0 +} diff --git a/rollup/da_syncer/da/revert.go b/rollup/da_syncer/da/revert.go new file mode 100644 index 000000000000..d84f22ebaa7b --- /dev/null +++ b/rollup/da_syncer/da/revert.go @@ -0,0 +1,33 @@ +package da + +type RevertBatch struct { + batchIndex uint64 + + l1BlockNumber uint64 +} + +func NewRevertBatch(batchIndex uint64) *RevertBatch { + return &RevertBatch{ + batchIndex: batchIndex, + } +} + +func (r *RevertBatch) Type() Type { + return RevertBatchType +} + +func (r *RevertBatch) L1BlockNumber() uint64 { + return r.l1BlockNumber +} +func (r *RevertBatch) BatchIndex() uint64 { + return r.batchIndex +} + +func (r *RevertBatch) CompareTo(other Entry) int { + if r.BatchIndex() < other.BatchIndex() { + return -1 + } else if r.BatchIndex() > other.BatchIndex() { + return 1 + } + return 0 +} diff --git a/rollup/da_syncer/da_queue.go b/rollup/da_syncer/da_queue.go new file mode 100644 index 000000000000..64673a4a646b --- /dev/null +++ b/rollup/da_syncer/da_queue.go @@ -0,0 +1,70 @@ +package da_syncer + +import ( + "context" + "errors" + + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" +) + +// DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage. +type DAQueue struct { + l1height uint64 + dataSourceFactory *DataSourceFactory + dataSource DataSource + da da.Entries +} + +func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue { + return &DAQueue{ + l1height: l1height, + dataSourceFactory: dataSourceFactory, + dataSource: nil, + da: make(da.Entries, 0), + } +} + +func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) { + for len(dq.da) == 0 { + err := dq.getNextData(ctx) + if err != nil { + return nil, err + } + } + daEntry := dq.da[0] + dq.da = dq.da[1:] + return daEntry, nil +} + +func (dq *DAQueue) getNextData(ctx context.Context) error { + var err error + if dq.dataSource == nil { + dq.dataSource, err = dq.dataSourceFactory.OpenDataSource(ctx, dq.l1height) + if err != nil { + return err + } + } + + dq.da, err = dq.dataSource.NextData() + if err == nil { + return nil + } + + // previous dataSource has been exhausted, create new + if errors.Is(err, da.ErrSourceExhausted) { + dq.l1height = dq.dataSource.L1Height() + dq.dataSource = nil + + // we return EOFError to be handled in pipeline + return serrors.EOFError + } + + return err +} + +func (dq *DAQueue) Reset(height uint64) { + dq.l1height = height + dq.dataSource = nil + dq.da = make(da.Entries, 0) +} diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go new file mode 100644 index 000000000000..43926e97d0bf --- /dev/null +++ b/rollup/da_syncer/da_syncer.go @@ -0,0 +1,47 @@ +package da_syncer + +import ( + "fmt" + + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" +) + +var ( + ErrBlockTooLow = fmt.Errorf("block number is too low") + ErrBlockTooHigh = fmt.Errorf("block number is too high") +) + +type DASyncer struct { + blockchain *core.BlockChain +} + +func NewDASyncer(blockchain *core.BlockChain) *DASyncer { + return &DASyncer{ + blockchain: blockchain, + } +} + +// SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain. +func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { + parentBlock := s.blockchain.CurrentBlock() + // we expect blocks to be consecutive. block.PartialHeader.Number == parentBlock.Number+1. + if block.PartialHeader.Number <= parentBlock.NumberU64() { + log.Debug("block number is too low", "block number", block.PartialHeader.Number, "parent block number", parentBlock.NumberU64()) + return ErrBlockTooLow + } else if block.PartialHeader.Number > parentBlock.NumberU64()+1 { + log.Debug("block number is too high", "block number", block.PartialHeader.Number, "parent block number", parentBlock.NumberU64()) + return ErrBlockTooHigh + } + + if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions); err != nil { + return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) + } + + if s.blockchain.CurrentBlock().Header().Number.Uint64()%1000 == 0 { + log.Info("L1 sync progress", "blockhain height", s.blockchain.CurrentBlock().Header().Number, "block hash", s.blockchain.CurrentBlock().Header().Hash(), "root", s.blockchain.CurrentBlock().Header().Root) + } + + return nil +} diff --git a/rollup/da_syncer/data_source.go b/rollup/da_syncer/data_source.go new file mode 100644 index 000000000000..f417d09af00e --- /dev/null +++ b/rollup/da_syncer/data_source.go @@ -0,0 +1,44 @@ +package da_syncer + +import ( + "context" + "errors" + + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" +) + +type DataSource interface { + NextData() (da.Entries, error) + L1Height() uint64 +} + +type DataSourceFactory struct { + config Config + genesisConfig *params.ChainConfig + l1Client *rollup_sync_service.L1Client + blobClient blob_client.BlobClient + db ethdb.Database +} + +func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Client *rollup_sync_service.L1Client, blobClient blob_client.BlobClient, db ethdb.Database) *DataSourceFactory { + return &DataSourceFactory{ + config: config, + genesisConfig: genesisConfig, + l1Client: l1Client, + blobClient: blobClient, + db: db, + } +} + +func (ds *DataSourceFactory) OpenDataSource(ctx context.Context, l1height uint64) (DataSource, error) { + if ds.config.FetcherMode == L1RPC { + return da.NewCalldataBlobSource(ctx, l1height, ds.l1Client, ds.blobClient, ds.db) + } else { + return nil, errors.New("snapshot_data_source: not implemented") + } +} diff --git a/rollup/da_syncer/modes.go b/rollup/da_syncer/modes.go new file mode 100644 index 000000000000..bfcc1d1dfba0 --- /dev/null +++ b/rollup/da_syncer/modes.go @@ -0,0 +1,52 @@ +package da_syncer + +import "fmt" + +// FetcherMode represents the mode of fetcher +type FetcherMode int + +const ( + // L1RPC mode fetches DA from L1RPC + L1RPC FetcherMode = iota + // Snapshot mode loads DA from snapshot file + Snapshot +) + +func (mode FetcherMode) IsValid() bool { + return mode >= L1RPC && mode <= Snapshot +} + +// String implements the stringer interface. +func (mode FetcherMode) String() string { + switch mode { + case L1RPC: + return "l1rpc" + case Snapshot: + return "snapshot" + default: + return "unknown" + } +} + +func (mode FetcherMode) MarshalText() ([]byte, error) { + switch mode { + case L1RPC: + return []byte("l1rpc"), nil + case Snapshot: + return []byte("snapshot"), nil + default: + return nil, fmt.Errorf("unknown sync mode %d", mode) + } +} + +func (mode *FetcherMode) UnmarshalText(text []byte) error { + switch string(text) { + case "l1rpc": + *mode = L1RPC + case "snapshot": + *mode = Snapshot + default: + return fmt.Errorf(`unknown sync mode %q, want "l1rpc" or "snapshot"`, text) + } + return nil +} diff --git a/rollup/da_syncer/serrors/errors.go b/rollup/da_syncer/serrors/errors.go new file mode 100644 index 000000000000..aa0426f0771d --- /dev/null +++ b/rollup/da_syncer/serrors/errors.go @@ -0,0 +1,62 @@ +package serrors + +import ( + "fmt" +) + +const ( + temporary Type = iota + eof +) + +var ( + TemporaryError = NewTemporaryError(nil) + EOFError = NewEOFError(nil) +) + +type Type uint8 + +func (t Type) String() string { + switch t { + case temporary: + return "temporary" + case eof: + return "EOF" + default: + return "unknown" + } +} + +type syncError struct { + t Type + err error +} + +func NewTemporaryError(err error) error { + return &syncError{t: temporary, err: err} +} + +func NewEOFError(err error) error { + return &syncError{t: eof, err: err} +} + +func (s *syncError) Error() string { + return fmt.Sprintf("%s: %v", s.t, s.err) +} + +func (s *syncError) Unwrap() error { + return s.err +} + +func (s *syncError) Is(target error) bool { + if target == nil { + return s == nil + } + + targetSyncErr, ok := target.(*syncError) + if !ok { + return false + } + + return s.t == targetSyncErr.t +} diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go new file mode 100644 index 000000000000..c99a49210ac1 --- /dev/null +++ b/rollup/da_syncer/syncing_pipeline.go @@ -0,0 +1,233 @@ +package da_syncer + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/scroll-tech/go-ethereum/common/backoff" + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + "github.com/scroll-tech/go-ethereum/rollup/sync_service" +) + +// Config is the configuration parameters of data availability syncing. +type Config struct { + FetcherMode FetcherMode // mode of fetcher + SnapshotFilePath string // path to snapshot file + BlobScanAPIEndpoint string // BlobScan blob api endpoint + BlockNativeAPIEndpoint string // BlockNative blob api endpoint + BeaconNodeAPIEndpoint string // Beacon node api endpoint +} + +// SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into +// L2 blocks and chain. +type SyncingPipeline struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + expBackoff *backoff.Exponential + + l1DeploymentBlock uint64 + + db ethdb.Database + blockchain *core.BlockChain + blockQueue *BlockQueue + daSyncer *DASyncer +} + +func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesisConfig *params.ChainConfig, db ethdb.Database, ethClient sync_service.EthClient, l1DeploymentBlock uint64, config Config) (*SyncingPipeline, error) { + scrollChainABI, err := rollup_sync_service.ScrollChainMetaData.GetAbi() + if err != nil { + return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) + } + + l1Client, err := rollup_sync_service.NewL1Client(ctx, ethClient, genesisConfig.Scroll.L1Config.L1ChainId, genesisConfig.Scroll.L1Config.ScrollChainAddress, scrollChainABI) + if err != nil { + return nil, err + } + + blobClientList := blob_client.NewBlobClientList() + if config.BeaconNodeAPIEndpoint != "" { + beaconNodeClient, err := blob_client.NewBeaconNodeClient(config.BeaconNodeAPIEndpoint, l1Client) + if err != nil { + log.Warn("failed to create BeaconNodeClient", "err", err) + } else { + blobClientList.AddBlobClient(beaconNodeClient) + } + } + if config.BlobScanAPIEndpoint != "" { + blobClientList.AddBlobClient(blob_client.NewBlobScanClient(config.BlobScanAPIEndpoint)) + } + if config.BlockNativeAPIEndpoint != "" { + blobClientList.AddBlobClient(blob_client.NewBlockNativeClient(config.BlockNativeAPIEndpoint)) + } + if blobClientList.Size() == 0 { + log.Crit("DA syncing is enabled but no blob client is configured. Please provide at least one blob client via command line flag.") + } + + dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Client, blobClientList, db) + syncedL1Height := l1DeploymentBlock - 1 + from := rawdb.ReadDASyncedL1BlockNumber(db) + if from != nil { + syncedL1Height = *from + } + + daQueue := NewDAQueue(syncedL1Height, dataSourceFactory) + batchQueue := NewBatchQueue(daQueue, db) + blockQueue := NewBlockQueue(batchQueue) + daSyncer := NewDASyncer(blockchain) + + ctx, cancel := context.WithCancel(ctx) + return &SyncingPipeline{ + ctx: ctx, + cancel: cancel, + expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), + wg: sync.WaitGroup{}, + l1DeploymentBlock: l1DeploymentBlock, + db: db, + blockchain: blockchain, + blockQueue: blockQueue, + daSyncer: daSyncer, + }, nil +} + +func (s *SyncingPipeline) Step() error { + block, err := s.blockQueue.NextBlock(s.ctx) + if err != nil { + return err + } + err = s.daSyncer.SyncOneBlock(block) + return err +} + +func (s *SyncingPipeline) Start() { + log.Info("sync from DA: starting pipeline") + + s.wg.Add(1) + go func() { + s.mainLoop() + s.wg.Done() + }() +} + +func (s *SyncingPipeline) mainLoop() { + stepCh := make(chan struct{}, 1) + var delayedStepCh <-chan time.Time + var resetCounter int + var tempErrorCounter int + + // reqStep is a helper function to request a step to be executed. + // If delay is true, it will request a delayed step with exponential backoff, otherwise it will request an immediate step. + reqStep := func(delay bool) { + if delay { + if delayedStepCh == nil { + delayDur := s.expBackoff.NextDuration() + delayedStepCh = time.After(delayDur) + log.Debug("requesting delayed step", "delay", delayDur, "attempt", s.expBackoff.Attempt()) + } else { + log.Debug("ignoring step request because of ongoing delayed step", "attempt", s.expBackoff.Attempt()) + } + } else { + select { + case stepCh <- struct{}{}: + default: + } + } + } + + // start pipeline + reqStep(false) + + for { + select { + case <-s.ctx.Done(): + return + default: + } + + select { + case <-s.ctx.Done(): + return + case <-delayedStepCh: + delayedStepCh = nil + reqStep(false) + case <-stepCh: + err := s.Step() + if err == nil { + // step succeeded, reset exponential backoff and continue + reqStep(false) + s.expBackoff.Reset() + resetCounter = 0 + tempErrorCounter = 0 + continue + } + + if errors.Is(err, serrors.EOFError) { + // pipeline is empty, request a delayed step + // TODO: eventually (with state manager) this should not trigger a delayed step because external events will trigger a new step anyway + reqStep(true) + tempErrorCounter = 0 + continue + } else if errors.Is(err, serrors.TemporaryError) { + log.Warn("syncing pipeline step failed due to temporary error, retrying", "err", err) + if tempErrorCounter > 100 { + log.Warn("syncing pipeline step failed due to 100 consecutive temporary errors, stopping pipeline worker", "last err", err) + return + } + + // temporary error, request a delayed step + reqStep(true) + tempErrorCounter++ + continue + } else if errors.Is(err, ErrBlockTooLow) { + // block number returned by the block queue is too low, + // we skip the blocks until we reach the correct block number again. + reqStep(false) + tempErrorCounter = 0 + continue + } else if errors.Is(err, ErrBlockTooHigh) { + // block number returned by the block queue is too high, + // reset the pipeline and move backwards from the last L1 block we read + s.reset(resetCounter) + resetCounter++ + reqStep(false) + tempErrorCounter = 0 + continue + } else if errors.Is(err, context.Canceled) { + log.Info("syncing pipeline stopped due to cancelled context", "err", err) + return + } + + log.Warn("syncing pipeline step failed due to unrecoverable error, stopping pipeline worker", "err", err) + return + } + } +} + +func (s *SyncingPipeline) Stop() { + log.Info("sync from DA: stopping pipeline...") + s.cancel() + s.wg.Wait() + log.Info("sync from DA: stopping pipeline... done") +} + +func (s *SyncingPipeline) reset(resetCounter int) { + amount := 100 * uint64(resetCounter) + syncedL1Height := s.l1DeploymentBlock - 1 + from := rawdb.ReadDASyncedL1BlockNumber(s.db) + if from != nil && *from+amount > syncedL1Height { + syncedL1Height = *from - amount + rawdb.WriteDASyncedL1BlockNumber(s.db, syncedL1Height) + } + log.Info("resetting syncing pipeline", "syncedL1Height", syncedL1Height) + s.blockQueue.Reset(syncedL1Height) +} diff --git a/rollup/rollup_sync_service/abi.go b/rollup/rollup_sync_service/abi.go index 6975001f1870..ff423d5c21bd 100644 --- a/rollup/rollup_sync_service/abi.go +++ b/rollup/rollup_sync_service/abi.go @@ -11,7 +11,7 @@ import ( ) // scrollChainMetaData contains ABI of the ScrollChain contract. -var scrollChainMetaData = &bind.MetaData{ +var ScrollChainMetaData = &bind.MetaData{ ABI: "[{\"anonymous\": false,\"inputs\": [{\"indexed\": true,\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"},{\"indexed\": true,\"internalType\": \"bytes32\",\"name\": \"batchHash\",\"type\": \"bytes32\"}],\"name\": \"CommitBatch\",\"type\": \"event\"},{\"anonymous\": false,\"inputs\": [{\"indexed\": true,\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"},{\"indexed\": true,\"internalType\": \"bytes32\",\"name\": \"batchHash\",\"type\": \"bytes32\"},{\"indexed\": false,\"internalType\": \"bytes32\",\"name\": \"stateRoot\",\"type\": \"bytes32\"},{\"indexed\": false,\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"}],\"name\": \"FinalizeBatch\",\"type\": \"event\"},{\"anonymous\": false,\"inputs\": [{\"indexed\": true,\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"},{\"indexed\": true,\"internalType\": \"bytes32\",\"name\": \"batchHash\",\"type\": \"bytes32\"}],\"name\": \"RevertBatch\",\"type\": \"event\"},{\"anonymous\": false,\"inputs\": [{\"indexed\": false,\"internalType\": \"uint256\",\"name\": \"oldMaxNumTxInChunk\",\"type\": \"uint256\"},{\"indexed\": false,\"internalType\": \"uint256\",\"name\": \"newMaxNumTxInChunk\",\"type\": \"uint256\"}],\"name\": \"UpdateMaxNumTxInChunk\",\"type\": \"event\"},{\"anonymous\": false,\"inputs\": [{\"indexed\": true,\"internalType\": \"address\",\"name\": \"account\",\"type\": \"address\"},{\"indexed\": false,\"internalType\": \"bool\",\"name\": \"status\",\"type\": \"bool\"}],\"name\": \"UpdateProver\",\"type\": \"event\"},{\"anonymous\": false,\"inputs\": [{\"indexed\": true,\"internalType\": \"address\",\"name\": \"account\",\"type\": \"address\"},{\"indexed\": false,\"internalType\": \"bool\",\"name\": \"status\",\"type\": \"bool\"}],\"name\": \"UpdateSequencer\",\"type\": \"event\"},{\"inputs\": [{\"internalType\": \"uint8\",\"name\": \"version\",\"type\": \"uint8\"},{\"internalType\": \"bytes\",\"name\": \"parentBatchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes[]\",\"name\": \"chunks\",\"type\": \"bytes[]\"},{\"internalType\": \"bytes\",\"name\": \"skippedL1MessageBitmap\",\"type\": \"bytes\"}],\"name\": \"commitBatch\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"uint8\",\"name\": \"version\",\"type\": \"uint8\"},{\"internalType\": \"bytes\",\"name\": \"parentBatchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes[]\",\"name\": \"chunks\",\"type\": \"bytes[]\"},{\"internalType\": \"bytes\",\"name\": \"skippedL1MessageBitmap\",\"type\": \"bytes\"},{\"internalType\": \"bytes\",\"name\": \"blobDataProof\",\"type\": \"bytes\"}],\"name\": \"commitBatchWithBlobProof\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"}],\"name\": \"committedBatches\",\"outputs\": [{\"internalType\": \"bytes32\",\"name\": \"\",\"type\": \"bytes32\"}],\"stateMutability\": \"view\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"prevStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"postStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"}],\"name\": \"finalizeBatch\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"prevStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"postStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes\",\"name\": \"blobDataProof\",\"type\": \"bytes\"}],\"name\": \"finalizeBatch4844\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"prevStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"postStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes\",\"name\": \"aggrProof\",\"type\": \"bytes\"}],\"name\": \"finalizeBatchWithProof\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"prevStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"postStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes\",\"name\": \"blobDataProof\",\"type\": \"bytes\"},{\"internalType\": \"bytes\",\"name\": \"aggrProof\",\"type\": \"bytes\"}],\"name\": \"finalizeBatchWithProof4844\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"postStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"}],\"name\": \"finalizeBundle\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"postStateRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes32\",\"name\": \"withdrawRoot\",\"type\": \"bytes32\"},{\"internalType\": \"bytes\",\"name\": \"aggrProof\",\"type\": \"bytes\"}],\"name\": \"finalizeBundleWithProof\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"}],\"name\": \"finalizedStateRoots\",\"outputs\": [{\"internalType\": \"bytes32\",\"name\": \"\",\"type\": \"bytes32\"}],\"stateMutability\": \"view\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"_batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"bytes32\",\"name\": \"_stateRoot\",\"type\": \"bytes32\"}],\"name\": \"importGenesisBatch\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"}],\"name\": \"isBatchFinalized\",\"outputs\": [{\"internalType\": \"bool\",\"name\": \"\",\"type\": \"bool\"}],\"stateMutability\": \"view\",\"type\": \"function\"},{\"inputs\": [],\"name\": \"lastFinalizedBatchIndex\",\"outputs\": [{\"internalType\": \"uint256\",\"name\": \"\",\"type\": \"uint256\"}],\"stateMutability\": \"view\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"bytes\",\"name\": \"batchHeader\",\"type\": \"bytes\"},{\"internalType\": \"uint256\",\"name\": \"count\",\"type\": \"uint256\"}],\"name\": \"revertBatch\",\"outputs\": [],\"stateMutability\": \"nonpayable\",\"type\": \"function\"},{\"inputs\": [{\"internalType\": \"uint256\",\"name\": \"batchIndex\",\"type\": \"uint256\"}],\"name\": \"withdrawRoots\",\"outputs\": [{\"internalType\": \"bytes32\",\"name\": \"\",\"type\": \"bytes32\"}],\"stateMutability\": \"view\",\"type\": \"function\"}]", } diff --git a/rollup/rollup_sync_service/abi_test.go b/rollup/rollup_sync_service/abi_test.go index d47a2c72e190..550c950bb337 100644 --- a/rollup/rollup_sync_service/abi_test.go +++ b/rollup/rollup_sync_service/abi_test.go @@ -13,7 +13,7 @@ import ( ) func TestEventSignatures(t *testing.T) { - scrollChainABI, err := scrollChainMetaData.GetAbi() + scrollChainABI, err := ScrollChainMetaData.GetAbi() if err != nil { t.Fatal("failed to get scroll chain abi", "err", err) } @@ -24,7 +24,7 @@ func TestEventSignatures(t *testing.T) { } func TestUnpackLog(t *testing.T) { - scrollChainABI, err := scrollChainMetaData.GetAbi() + scrollChainABI, err := ScrollChainMetaData.GetAbi() require.NoError(t, err) mockBatchIndex := big.NewInt(123) diff --git a/rollup/rollup_sync_service/l1client.go b/rollup/rollup_sync_service/l1client.go index 34ffc4db1bc2..4f636102ddee 100644 --- a/rollup/rollup_sync_service/l1client.go +++ b/rollup/rollup_sync_service/l1client.go @@ -27,9 +27,9 @@ type L1Client struct { l1FinalizeBatchEventSignature common.Hash } -// newL1Client initializes a new L1Client instance with the provided configuration. +// NewL1Client initializes a new L1Client instance with the provided configuration. // It checks for a valid scrollChainAddress and verifies the chain ID. -func newL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId uint64, scrollChainAddress common.Address, scrollChainABI *abi.ABI) (*L1Client, error) { +func NewL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId uint64, scrollChainAddress common.Address, scrollChainABI *abi.ABI) (*L1Client, error) { if scrollChainAddress == (common.Address{}) { return nil, errors.New("must pass non-zero scrollChainAddress to L1Client") } @@ -55,8 +55,8 @@ func newL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId return &client, nil } -// fetcRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to]. -func (c *L1Client) fetchRollupEventsInRange(from, to uint64) ([]types.Log, error) { +// FetchRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to]. +func (c *L1Client) FetchRollupEventsInRange(from, to uint64) ([]types.Log, error) { log.Trace("L1Client fetchRollupEventsInRange", "fromBlock", from, "toBlock", to) query := ethereum.FilterQuery{ @@ -79,8 +79,8 @@ func (c *L1Client) fetchRollupEventsInRange(from, to uint64) ([]types.Log, error return logs, nil } -// getLatestFinalizedBlockNumber fetches the block number of the latest finalized block from the L1 chain. -func (c *L1Client) getLatestFinalizedBlockNumber() (uint64, error) { +// GetLatestFinalizedBlockNumber fetches the block number of the latest finalized block from the L1 chain. +func (c *L1Client) GetLatestFinalizedBlockNumber() (uint64, error) { header, err := c.client.HeaderByNumber(c.ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) if err != nil { return 0, err @@ -90,3 +90,69 @@ func (c *L1Client) getLatestFinalizedBlockNumber() (uint64, error) { } return header.Number.Uint64(), nil } + +// FetchTxData fetches tx data corresponding to given event log +func (c *L1Client) FetchTxData(vLog *types.Log) ([]byte, error) { + tx, _, err := c.client.TransactionByHash(c.ctx, vLog.TxHash) + if err != nil { + log.Debug("failed to get transaction by hash, probably an unindexed transaction, fetching the whole block to get the transaction", + "tx hash", vLog.TxHash.Hex(), "block number", vLog.BlockNumber, "block hash", vLog.BlockHash.Hex(), "err", err) + block, err := c.client.BlockByHash(c.ctx, vLog.BlockHash) + if err != nil { + return nil, fmt.Errorf("failed to get block by hash, block number: %v, block hash: %v, err: %w", vLog.BlockNumber, vLog.BlockHash.Hex(), err) + } + + found := false + for _, txInBlock := range block.Transactions() { + if txInBlock.Hash() == vLog.TxHash { + tx = txInBlock + found = true + break + } + } + if !found { + return nil, fmt.Errorf("transaction not found in the block, tx hash: %v, block number: %v, block hash: %v", vLog.TxHash.Hex(), vLog.BlockNumber, vLog.BlockHash.Hex()) + } + } + + return tx.Data(), nil +} + +// FetchTxBlobHash fetches tx blob hash corresponding to given event log +func (c *L1Client) FetchTxBlobHash(vLog *types.Log) (common.Hash, error) { + tx, _, err := c.client.TransactionByHash(c.ctx, vLog.TxHash) + if err != nil { + log.Debug("failed to get transaction by hash, probably an unindexed transaction, fetching the whole block to get the transaction", + "tx hash", vLog.TxHash.Hex(), "block number", vLog.BlockNumber, "block hash", vLog.BlockHash.Hex(), "err", err) + block, err := c.client.BlockByHash(c.ctx, vLog.BlockHash) + if err != nil { + return common.Hash{}, fmt.Errorf("failed to get block by hash, block number: %v, block hash: %v, err: %w", vLog.BlockNumber, vLog.BlockHash.Hex(), err) + } + + found := false + for _, txInBlock := range block.Transactions() { + if txInBlock.Hash() == vLog.TxHash { + tx = txInBlock + found = true + break + } + } + if !found { + return common.Hash{}, fmt.Errorf("transaction not found in the block, tx hash: %v, block number: %v, block hash: %v", vLog.TxHash.Hex(), vLog.BlockNumber, vLog.BlockHash.Hex()) + } + } + blobHashes := tx.BlobHashes() + if len(blobHashes) == 0 { + return common.Hash{}, fmt.Errorf("transaction does not contain any blobs, tx hash: %v", vLog.TxHash.Hex()) + } + return blobHashes[0], nil +} + +// GetHeaderByNumber fetches the block header by number +func (c *L1Client) GetHeaderByNumber(blockNumber uint64) (*types.Header, error) { + header, err := c.client.HeaderByNumber(c.ctx, big.NewInt(0).SetUint64(blockNumber)) + if err != nil { + return nil, err + } + return header, nil +} diff --git a/rollup/rollup_sync_service/l1client_test.go b/rollup/rollup_sync_service/l1client_test.go index 8c7bd92f8b11..38719d220f62 100644 --- a/rollup/rollup_sync_service/l1client_test.go +++ b/rollup/rollup_sync_service/l1client_test.go @@ -18,19 +18,19 @@ func TestL1Client(t *testing.T) { ctx := context.Background() mockClient := &mockEthClient{} - scrollChainABI, err := scrollChainMetaData.GetAbi() + scrollChainABI, err := ScrollChainMetaData.GetAbi() if err != nil { t.Fatal("failed to get scroll chain abi", "err", err) } scrollChainAddress := common.HexToAddress("0x0123456789abcdef") - l1Client, err := newL1Client(ctx, mockClient, 11155111, scrollChainAddress, scrollChainABI) + l1Client, err := NewL1Client(ctx, mockClient, 11155111, scrollChainAddress, scrollChainABI) require.NoError(t, err, "Failed to initialize L1Client") - blockNumber, err := l1Client.getLatestFinalizedBlockNumber() + blockNumber, err := l1Client.GetLatestFinalizedBlockNumber() assert.NoError(t, err, "Error getting latest confirmed block number") assert.Equal(t, uint64(36), blockNumber, "Unexpected block number") - logs, err := l1Client.fetchRollupEventsInRange(0, blockNumber) + logs, err := l1Client.FetchRollupEventsInRange(0, blockNumber) assert.NoError(t, err, "Error fetching rollup events in range") assert.Empty(t, logs, "Expected no logs from fetchRollupEventsInRange") } diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 52ce75e27e4e..cec7d97f3efe 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -72,12 +72,12 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig return nil, fmt.Errorf("missing L1 config in genesis") } - scrollChainABI, err := scrollChainMetaData.GetAbi() + scrollChainABI, err := ScrollChainMetaData.GetAbi() if err != nil { return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) } - client, err := newL1Client(ctx, l1Client, genesisConfig.Scroll.L1Config.L1ChainId, genesisConfig.Scroll.L1Config.ScrollChainAddress, scrollChainABI) + client, err := NewL1Client(ctx, l1Client, genesisConfig.Scroll.L1Config.L1ChainId, genesisConfig.Scroll.L1Config.ScrollChainAddress, scrollChainABI) if err != nil { return nil, fmt.Errorf("failed to initialize l1 client: %w", err) } @@ -154,7 +154,7 @@ func (s *RollupSyncService) Stop() { } func (s *RollupSyncService) fetchRollupEvents() { - latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() + latestConfirmed, err := s.client.GetLatestFinalizedBlockNumber() if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) return @@ -174,7 +174,7 @@ func (s *RollupSyncService) fetchRollupEvents() { to = latestConfirmed } - logs, err := s.client.fetchRollupEventsInRange(from, to) + logs, err := s.client.FetchRollupEventsInRange(from, to) if err != nil { log.Error("failed to fetch rollup events in range", "from block", from, "to block", to, "err", err) return