From 8f1f6b725ed8133f3560805ac6227d87c6305414 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:31:26 +0800 Subject: [PATCH] feat: introduce custom errors and mark RPC related errors as temporary so that they can be retried in pipeline --- .../da_syncer/blob_client/blob_client_list.go | 7 +- rollup/da_syncer/block_queue.go | 5 +- rollup/da_syncer/da/calldata_blob_source.go | 19 ++--- rollup/da_syncer/da/commitV0.go | 11 +-- rollup/da_syncer/da/da.go | 2 +- rollup/da_syncer/da_queue.go | 5 +- rollup/da_syncer/serrors/errors.go | 69 +++++++++++++++++++ rollup/da_syncer/syncing_pipeline.go | 28 ++++++-- rollup/rollup_sync_service/l1client.go | 2 +- 9 files changed, 118 insertions(+), 30 deletions(-) create mode 100644 rollup/da_syncer/serrors/errors.go diff --git a/rollup/da_syncer/blob_client/blob_client_list.go b/rollup/da_syncer/blob_client/blob_client_list.go index b1c11d6d3e4cf..da032c4f972b6 100644 --- a/rollup/da_syncer/blob_client/blob_client_list.go +++ b/rollup/da_syncer/blob_client/blob_client_list.go @@ -2,12 +2,13 @@ package blob_client import ( "context" + "errors" "fmt" - "io" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/crypto/kzg4844" "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) type BlobClientList struct { @@ -37,8 +38,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos) } - // if we iterated over entire list, return EOF error that will be handled in syncing_pipeline with a backoff and retry - return nil, io.EOF + // if we iterated over entire list, return a temporary error that will be handled in syncing_pipeline with a backoff and retry + return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients")) } func (c *BlobClientList) nextPos() int { diff --git a/rollup/da_syncer/block_queue.go b/rollup/da_syncer/block_queue.go index 3c3e68bdcf798..a122d41ab3568 100644 --- a/rollup/da_syncer/block_queue.go +++ b/rollup/da_syncer/block_queue.go @@ -45,10 +45,7 @@ func (bq *BlockQueue) getBlocksFromBatch(ctx context.Context) error { return fmt.Errorf("unexpected type of daEntry: %T", daEntry) } - bq.blocks, err = entryWithBlocks.Blocks() - if err != nil { - return fmt.Errorf("failed to get blocks from daEntry: %w", err) - } + bq.blocks = entryWithBlocks.Blocks() return nil } diff --git a/rollup/da_syncer/da/calldata_blob_source.go b/rollup/da_syncer/da/calldata_blob_source.go index 19fd1b5eccae9..94470e53a74a5 100644 --- a/rollup/da_syncer/da/calldata_blob_source.go +++ b/rollup/da_syncer/da/calldata_blob_source.go @@ -11,6 +11,7 @@ import ( "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" ) @@ -72,7 +73,7 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) { if to > ds.l1Finalized { ds.l1Finalized, err = ds.l1Client.GetLatestFinalizedBlockNumber() if err != nil { - return nil, fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err) + return nil, serrors.WrapWithTemporary(fmt.Errorf("failed to query GetLatestFinalizedBlockNumber, error: %v", err)) } // make sure we don't request more than finalized blocks to = min(to, ds.l1Finalized) @@ -84,13 +85,15 @@ func (ds *CalldataBlobSource) NextData() (Entries, error) { logs, err := ds.l1Client.FetchRollupEventsInRange(ds.l1height, to) if err != nil { - return nil, fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err) + return nil, serrors.WrapWithTemporary(fmt.Errorf("cannot get events, l1height: %d, error: %v", ds.l1height, err)) } da, err := ds.processLogsToDA(logs) - if err == nil { - ds.l1height = to + 1 + if err != nil { + return nil, serrors.WrapWithTemporary(fmt.Errorf("failed to process logs to DA, error: %v", err)) } - return da, err + + ds.l1height = to + 1 + return da, nil } func (ds *CalldataBlobSource) L1Height() uint64 { @@ -119,7 +122,7 @@ func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error) case ds.l1RevertBatchEventSignature: event := &rollup_sync_service.L1RevertBatchEvent{} - if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil { + if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, revertBatchEventName, vLog); err != nil { return nil, fmt.Errorf("failed to unpack revert rollup event log, err: %w", err) } @@ -129,7 +132,7 @@ func (ds *CalldataBlobSource) processLogsToDA(logs []types.Log) (Entries, error) case ds.l1FinalizeBatchEventSignature: event := &rollup_sync_service.L1FinalizeBatchEvent{} - if err := rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil { + if err = rollup_sync_service.UnpackLog(ds.scrollChainABI, event, finalizeBatchEventName, vLog); err != nil { return nil, fmt.Errorf("failed to unpack finalized rollup event log, err: %w", err) } @@ -188,7 +191,7 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, vLog *types.Lo txData, err := ds.l1Client.FetchTxData(vLog) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch tx data, tx hash: %v, err: %w", vLog.TxHash.Hex(), err) } if len(txData) < methodIDLength { return nil, fmt.Errorf("transaction data is too short, length of tx data: %v, minimum length required: %v", len(txData), methodIDLength) diff --git a/rollup/da_syncer/da/commitV0.go b/rollup/da_syncer/da/commitV0.go index abedfd3231d58..66a13786c9cb7 100644 --- a/rollup/da_syncer/da/commitV0.go +++ b/rollup/da_syncer/da/commitV0.go @@ -3,7 +3,6 @@ package da import ( "encoding/binary" "fmt" - "io" "github.com/scroll-tech/da-codec/encoding" "github.com/scroll-tech/da-codec/encoding/codecv0" @@ -11,6 +10,7 @@ import ( "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) type CommitBatchDAV0 struct { @@ -92,7 +92,7 @@ func (c *CommitBatchDAV0) CompareTo(other Entry) int { return 0 } -func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) { +func (c *CommitBatchDAV0) Blocks() []*PartialBlock { var blocks []*PartialBlock l1TxPointer := 0 @@ -125,7 +125,8 @@ func (c *CommitBatchDAV0) Blocks() ([]*PartialBlock, error) { blocks = append(blocks, block) } } - return blocks, nil + + return blocks } func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int { @@ -156,8 +157,8 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped l1Tx := rawdb.ReadL1Message(db, currentIndex) if l1Tx == nil { // message not yet available - // we return io.EOF as this will be handled in the syncing pipeline with a backoff and retry - return nil, io.EOF + // we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry + return nil, serrors.EOFError } txs = append(txs, l1Tx) currentIndex++ diff --git a/rollup/da_syncer/da/da.go b/rollup/da_syncer/da/da.go index 935f09cfda7cd..5a55a4fbb5130 100644 --- a/rollup/da_syncer/da/da.go +++ b/rollup/da_syncer/da/da.go @@ -31,7 +31,7 @@ type Entry interface { type EntryWithBlocks interface { Entry - Blocks() ([]*PartialBlock, error) + Blocks() []*PartialBlock } type Entries []Entry diff --git a/rollup/da_syncer/da_queue.go b/rollup/da_syncer/da_queue.go index 9ff1b992c3aba..64673a4a646b4 100644 --- a/rollup/da_syncer/da_queue.go +++ b/rollup/da_syncer/da_queue.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) // DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage. @@ -54,7 +55,9 @@ func (dq *DAQueue) getNextData(ctx context.Context) error { if errors.Is(err, da.ErrSourceExhausted) { dq.l1height = dq.dataSource.L1Height() dq.dataSource = nil - return dq.getNextData(ctx) + + // we return EOFError to be handled in pipeline + return serrors.EOFError } return err diff --git a/rollup/da_syncer/serrors/errors.go b/rollup/da_syncer/serrors/errors.go new file mode 100644 index 0000000000000..9926aafc8f0a7 --- /dev/null +++ b/rollup/da_syncer/serrors/errors.go @@ -0,0 +1,69 @@ +package serrors + +import ( + "fmt" +) + +const ( + temporary Type = iota + eof +) + +var ( + TemporaryError = NewTemporaryError(nil) + EOFError = NewEOFError(nil) +) + +type Type uint8 + +func (t Type) String() string { + switch t { + case temporary: + return "temporary" + case eof: + return "EOF" + default: + return "unknown" + } +} + +type syncError struct { + t Type + err error +} + +func NewTemporaryError(err error) error { + return &syncError{t: temporary, err: err} +} + +func NewEOFError(err error) error { + return &syncError{t: eof, err: err} +} + +func (s *syncError) Error() string { + return fmt.Sprintf("%s: %v", s.t, s.err) +} + +func (s *syncError) Unwrap() error { + return s.err +} + +func (s *syncError) Is(target error) bool { + if target == nil { + return s == nil + } + + targetSyncErr, ok := target.(*syncError) + if !ok { + return false + } + + return s.t == targetSyncErr.t +} + +func WrapWithTemporary(err error) error { + return &syncError{ + t: temporary, + err: err, + } +} diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index fea754a01367a..ef66211fe5ee4 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "sync" "time" @@ -15,6 +14,7 @@ import ( "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/params" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" "github.com/scroll-tech/go-ethereum/rollup/sync_service" ) @@ -114,6 +114,7 @@ func (s *SyncingPipeline) mainLoop() { stepCh := make(chan struct{}, 1) var delayedStepCh <-chan time.Time var resetCounter int + var tempErrorCounter int // reqStep is a helper function to request a step to be executed. // If delay is true, it will request a delayed step with exponential backoff, otherwise it will request an immediate step. @@ -157,18 +158,32 @@ func (s *SyncingPipeline) mainLoop() { reqStep(false) s.expBackoff.Reset() resetCounter = 0 + tempErrorCounter = 0 continue } - if errors.Is(err, io.EOF) { + if errors.Is(err, serrors.EOFError) { // pipeline is empty, request a delayed step + // TODO: eventually (with state manager) this should not trigger a delayed step because external events will trigger a new step anyway reqStep(true) + tempErrorCounter = 0 continue - } - if errors.Is(err, ErrBlockTooLow) { + } else if errors.Is(err, serrors.TemporaryError) { + log.Warn("syncing pipeline step failed due to temporary error, retrying", "err", err) + if tempErrorCounter > 100 { + log.Warn("syncing pipeline step failed due to 100 consecutive temporary errors, stopping pipeline worker", "last err", err) + return + } + + // temporary error, request a delayed step + reqStep(true) + tempErrorCounter++ + continue + } else if errors.Is(err, ErrBlockTooLow) { // block number returned by the block queue is too low, // we skip the blocks until we reach the correct block number again. reqStep(false) + tempErrorCounter = 0 continue } else if errors.Is(err, ErrBlockTooHigh) { // block number returned by the block queue is too high, @@ -176,10 +191,9 @@ func (s *SyncingPipeline) mainLoop() { s.reset(resetCounter) resetCounter++ reqStep(false) + tempErrorCounter = 0 continue - } - - if errors.Is(err, context.Canceled) { + } else if errors.Is(err, context.Canceled) { log.Info("syncing pipeline stopped due to cancelled context", "err", err) return } diff --git a/rollup/rollup_sync_service/l1client.go b/rollup/rollup_sync_service/l1client.go index 2cd5efc625d5b..d2a28f659f10d 100644 --- a/rollup/rollup_sync_service/l1client.go +++ b/rollup/rollup_sync_service/l1client.go @@ -55,7 +55,7 @@ func NewL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId return &client, nil } -// fetcRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to]. +// FetchRollupEventsInRange retrieves and parses commit/revert/finalize rollup events between block numbers: [from, to]. func (c *L1Client) FetchRollupEventsInRange(from, to uint64) ([]types.Log, error) { log.Trace("L1Client fetchRollupEventsInRange", "fromBlock", from, "toBlock", to)