From 9e3c8381aa1dc22f57ef1afc67df9d35a8ce54c5 Mon Sep 17 00:00:00 2001 From: Nazarii Denha Date: Mon, 19 Aug 2024 11:01:08 +0200 Subject: [PATCH] (follower_node)support beacon node client as blob provider (#988) * support beacon node client as blob provider * fix * fix formatting * use url.JoinPath instead of path * don't move pos each time in blob_client_list --------- Co-authored-by: jonastheis <4181434+jonastheis@users.noreply.github.com> --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 14 +- .../blob_client/beacon_node_client.go | 183 ++++++++++++++++++ rollup/da_syncer/blob_client/blob_client.go | 6 +- .../da_syncer/blob_client/blob_client_list.go | 9 +- .../da_syncer/blob_client/blob_scan_client.go | 42 +--- .../blob_client/block_native_client.go | 11 +- rollup/da_syncer/da/commitV1.go | 2 +- rollup/da_syncer/syncing_pipeline.go | 9 + rollup/rollup_sync_service/l1client.go | 17 +- 10 files changed, 231 insertions(+), 63 deletions(-) create mode 100644 rollup/da_syncer/blob_client/beacon_node_client.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 09fa46374c06..1ec163d97aea 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -175,6 +175,7 @@ var ( utils.DASnapshotFileFlag, utils.DABlockNativeAPIEndpointFlag, utils.DABlobScanAPIEndpointFlag, + utils.DABeaconNodeAPIEndpointFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f3fdbefb7196..d9be3622a0c5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -879,16 +879,19 @@ var ( } DASnapshotFileFlag = cli.StringFlag{ Name: "da.snapshot.file", - Usage: "Snapshot file to sync from da", + Usage: "Snapshot file to sync from DA", } DABlobScanAPIEndpointFlag = cli.StringFlag{ Name: "da.blob.blobscan", - Usage: "BlobScan blob api endpoint", - Value: ethconfig.Defaults.DA.BlobScanAPIEndpoint, + Usage: "BlobScan blob API endpoint", } DABlockNativeAPIEndpointFlag = cli.StringFlag{ Name: "da.blob.blocknative", - Usage: "BlockNative blob api endpoint", + Usage: "BlockNative blob API endpoint", + } + DABeaconNodeAPIEndpointFlag = cli.StringFlag{ + Name: "da.blob.beaconnode", + Usage: "Beacon node API endpoint", } ) @@ -1625,6 +1628,9 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) { if ctx.GlobalIsSet(DABlockNativeAPIEndpointFlag.Name) { cfg.DA.BlockNativeAPIEndpoint = ctx.GlobalString(DABlockNativeAPIEndpointFlag.Name) } + if ctx.GlobalIsSet(DABeaconNodeAPIEndpointFlag.Name) { + cfg.DA.BeaconNodeAPIEndpoint = ctx.GlobalString(DABeaconNodeAPIEndpointFlag.Name) + } } } diff --git a/rollup/da_syncer/blob_client/beacon_node_client.go b/rollup/da_syncer/blob_client/beacon_node_client.go new file mode 100644 index 000000000000..20e4d876a489 --- /dev/null +++ b/rollup/da_syncer/blob_client/beacon_node_client.go @@ -0,0 +1,183 @@ +package blob_client + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto/kzg4844" + "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" +) + +type BeaconNodeClient struct { + apiEndpoint string + l1Client *rollup_sync_service.L1Client + genesisTime uint64 + secondsPerSlot uint64 +} + +var ( + beaconNodeGenesisEndpoint = "/eth/v1/beacon/genesis" + beaconNodeSpecEndpoint = "/eth/v1/config/spec" + beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars" +) + +func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) { + // get genesis time + genesisPath, err := url.JoinPath(apiEndpoint, beaconNodeGenesisEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err := http.Get(genesisPath) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr) + } + + var genesisResp GenesisResp + err = json.NewDecoder(resp.Body).Decode(&genesisResp) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + genesisTime, err := strconv.ParseUint(genesisResp.Data.GenesisTime, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to decode genesis time %s, err: %w", genesisResp.Data.GenesisTime, err) + } + + // get seconds per slot from spec + specPath, err := url.JoinPath(apiEndpoint, beaconNodeSpecEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err = http.Get(specPath) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr) + } + + var specResp SpecResp + err = json.NewDecoder(resp.Body).Decode(&specResp) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + secondsPerSlot, err := strconv.ParseUint(specResp.Data.SecondsPerSlot, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to decode seconds per slot %s, err: %w", specResp.Data.SecondsPerSlot, err) + } + if secondsPerSlot == 0 { + return nil, fmt.Errorf("failed to make new BeaconNodeClient, secondsPerSlot is 0") + } + + return &BeaconNodeClient{ + apiEndpoint: apiEndpoint, + l1Client: l1Client, + genesisTime: genesisTime, + secondsPerSlot: secondsPerSlot, + }, nil +} + +func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { + // get block timestamp to calculate slot + header, err := c.l1Client.GetHeaderByNumber(blockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get header by number, err: %w", err) + } + slot := (header.Time - c.genesisTime) / c.secondsPerSlot + + // get blob sidecar for slot + blobSidecarPath, err := url.JoinPath(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot)) + if err != nil { + return nil, fmt.Errorf("failed to join path, err: %w", err) + } + resp, err := http.Get(blobSidecarPath) + if err != nil { + return nil, fmt.Errorf("cannot do request, err: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr) + } + + var blobSidecarResp BlobSidecarResp + err = json.NewDecoder(resp.Body).Decode(&blobSidecarResp) + if err != nil { + return nil, fmt.Errorf("failed to decode result into struct, err: %w", err) + } + + // find blob with desired versionedHash + for _, blob := range blobSidecarResp.Data { + // calculate blob hash from commitment and check it with desired + commitmentBytes := common.FromHex(blob.KzgCommitment) + if len(commitmentBytes) != lenKZGCommitment { + return nil, fmt.Errorf("len of kzg commitment is not correct, expected: %d, got: %d", lenKZGCommitment, len(commitmentBytes)) + } + commitment := kzg4844.Commitment(commitmentBytes) + blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment) + + if blobVersionedHash == versionedHash { + // found desired blob + blobBytes := common.FromHex(blob.Blob) + if len(blobBytes) != lenBlobBytes { + return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes)) + } + + b := kzg4844.Blob(blobBytes) + return &b, nil + } + } + + return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber) +} + +type GenesisResp struct { + Data struct { + GenesisTime string `json:"genesis_time"` + } `json:"data"` +} + +type SpecResp struct { + Data struct { + SecondsPerSlot string `json:"SECONDS_PER_SLOT"` + } `json:"data"` +} + +type BlobSidecarResp struct { + Data []struct { + Index string `json:"index"` + Blob string `json:"blob"` + KzgCommitment string `json:"kzg_commitment"` + KzgProof string `json:"kzg_proof"` + SignedBlockHeader struct { + Message struct { + Slot string `json:"slot"` + ProposerIndex string `json:"proposer_index"` + ParentRoot string `json:"parent_root"` + StateRoot string `json:"state_root"` + BodyRoot string `json:"body_root"` + } `json:"message"` + Signature string `json:"signature"` + } `json:"signed_block_header"` + KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"` + } `json:"data"` +} diff --git a/rollup/da_syncer/blob_client/blob_client.go b/rollup/da_syncer/blob_client/blob_client.go index eea486c6fa77..5ce0d16a42cc 100644 --- a/rollup/da_syncer/blob_client/blob_client.go +++ b/rollup/da_syncer/blob_client/blob_client.go @@ -8,10 +8,10 @@ import ( ) const ( - okStatusCode int = 200 - lenBlobBytes int = 131072 + lenBlobBytes int = 131072 + lenKZGCommitment int = 48 ) type BlobClient interface { - GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) + GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) } diff --git a/rollup/da_syncer/blob_client/blob_client_list.go b/rollup/da_syncer/blob_client/blob_client_list.go index 13e48b221f7f..647f5f78ff71 100644 --- a/rollup/da_syncer/blob_client/blob_client_list.go +++ b/rollup/da_syncer/blob_client/blob_client_list.go @@ -23,17 +23,17 @@ func NewBlobClientList(blobClients ...BlobClient) *BlobClientList { } } -func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) { +func (c *BlobClientList) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { if len(c.list) == 0 { return nil, fmt.Errorf("BlobClientList.GetBlobByVersionedHash: list of BlobClients is empty") } for i := 0; i < len(c.list); i++ { - blob, err := c.list[c.nextPos()].GetBlobByVersionedHash(ctx, versionedHash) + blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber) if err == nil { return blob, nil } - + c.nextPos() // there was an error, try the next blob client in following iteration log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos) } @@ -42,9 +42,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients")) } -func (c *BlobClientList) nextPos() int { +func (c *BlobClientList) nextPos() { c.curPos = (c.curPos + 1) % len(c.list) - return c.curPos } func (c *BlobClientList) AddBlobClient(blobClient BlobClient) { diff --git a/rollup/da_syncer/blob_client/blob_scan_client.go b/rollup/da_syncer/blob_client/blob_scan_client.go index 98b190c2214f..49a32d6d8191 100644 --- a/rollup/da_syncer/blob_client/blob_scan_client.go +++ b/rollup/da_syncer/blob_client/blob_scan_client.go @@ -24,7 +24,7 @@ func NewBlobScanClient(apiEndpoint string) *BlobScanClient { } } -func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) { +func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { // blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId path, err := url.JoinPath(c.apiEndpoint, versionedHash.String()) if err != nil { @@ -40,8 +40,8 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa return nil, fmt.Errorf("cannot do request, err: %w", err) } defer resp.Body.Close() - if resp.StatusCode != okStatusCode { - if resp.StatusCode == 404 { + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { return nil, fmt.Errorf("no blob with versioned hash : %s", versionedHash.String()) } var res ErrorRespBlobScan @@ -69,44 +69,10 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa } type BlobRespBlobScan struct { - Commitment string `json:"commitment"` - Proof string `json:"proof"` - Size int `json:"size"` - VersionedHash string `json:"versionedHash"` - Data string `json:"data"` - DataStorageReferences []struct { - BlobStorage string `json:"blobStorage"` - DataReference string `json:"dataReference"` - } `json:"dataStorageReferences"` - Transactions []struct { - Hash string `json:"hash"` - Index int `json:"index"` - Block struct { - Number int `json:"number"` - BlobGasUsed string `json:"blobGasUsed"` - BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"` - BlobGasPrice string `json:"blobGasPrice"` - ExcessBlobGas string `json:"excessBlobGas"` - Hash string `json:"hash"` - Timestamp string `json:"timestamp"` - Slot int `json:"slot"` - } `json:"block"` - From string `json:"from"` - To string `json:"to"` - MaxFeePerBlobGas string `json:"maxFeePerBlobGas"` - BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"` - Rollup string `json:"rollup"` - BlobAsCalldataGasFee string `json:"blobAsCalldataGasFee"` - BlobGasBaseFee string `json:"blobGasBaseFee"` - BlobGasMaxFee string `json:"blobGasMaxFee"` - BlobGasUsed string `json:"blobGasUsed"` - } `json:"transactions"` + Data string `json:"data"` } type ErrorRespBlobScan struct { Message string `json:"message"` Code string `json:"code"` - Issues []struct { - Message string `json:"message"` - } `json:"issues"` } diff --git a/rollup/da_syncer/blob_client/block_native_client.go b/rollup/da_syncer/blob_client/block_native_client.go index ef3eebaa2972..32ab290728f7 100644 --- a/rollup/da_syncer/blob_client/block_native_client.go +++ b/rollup/da_syncer/blob_client/block_native_client.go @@ -22,7 +22,7 @@ func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient { } } -func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) { +func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) { // blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive path, err := url.JoinPath(c.apiEndpoint, versionedHash.String()) if err != nil { @@ -33,7 +33,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione return nil, fmt.Errorf("cannot do request, err: %w", err) } defer resp.Body.Close() - if resp.StatusCode != okStatusCode { + if resp.StatusCode != http.StatusOK { var res ErrorRespBlockNative err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { @@ -59,12 +59,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione type BlobRespBlockNative struct { Blob struct { - VersionedHash string `json:"versionedHash"` - Commitment string `json:"commitment"` - Proof string `json:"proof"` - ZeroBytes int `json:"zeroBytes"` - NonZeroBytes int `json:"nonZeroBytes"` - Data string `json:"data"` + Data string `json:"data"` } `json:"blob"` } diff --git a/rollup/da_syncer/da/commitV1.go b/rollup/da_syncer/da/commitV1.go index 19512972928c..d94a046c81df 100644 --- a/rollup/da_syncer/da/commitV1.go +++ b/rollup/da_syncer/da/commitV1.go @@ -55,7 +55,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err) } - blob, err := blobClient.GetBlobByVersionedHash(ctx, versionedHash) + blob, err := blobClient.GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, vLog.BlockNumber) if err != nil { return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err) } diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index ef66211fe5ee..c99a49210ac1 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -25,6 +25,7 @@ type Config struct { SnapshotFilePath string // path to snapshot file BlobScanAPIEndpoint string // BlobScan blob api endpoint BlockNativeAPIEndpoint string // BlockNative blob api endpoint + BeaconNodeAPIEndpoint string // Beacon node api endpoint } // SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into @@ -55,6 +56,14 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi } blobClientList := blob_client.NewBlobClientList() + if config.BeaconNodeAPIEndpoint != "" { + beaconNodeClient, err := blob_client.NewBeaconNodeClient(config.BeaconNodeAPIEndpoint, l1Client) + if err != nil { + log.Warn("failed to create BeaconNodeClient", "err", err) + } else { + blobClientList.AddBlobClient(beaconNodeClient) + } + } if config.BlobScanAPIEndpoint != "" { blobClientList.AddBlobClient(blob_client.NewBlobScanClient(config.BlobScanAPIEndpoint)) } diff --git a/rollup/rollup_sync_service/l1client.go b/rollup/rollup_sync_service/l1client.go index d2a28f659f10..4f636102ddee 100644 --- a/rollup/rollup_sync_service/l1client.go +++ b/rollup/rollup_sync_service/l1client.go @@ -27,7 +27,7 @@ type L1Client struct { l1FinalizeBatchEventSignature common.Hash } -// newL1Client initializes a new L1Client instance with the provided configuration. +// NewL1Client initializes a new L1Client instance with the provided configuration. // It checks for a valid scrollChainAddress and verifies the chain ID. func NewL1Client(ctx context.Context, l1Client sync_service.EthClient, l1ChainId uint64, scrollChainAddress common.Address, scrollChainABI *abi.ABI) (*L1Client, error) { if scrollChainAddress == (common.Address{}) { @@ -79,7 +79,7 @@ func (c *L1Client) FetchRollupEventsInRange(from, to uint64) ([]types.Log, error return logs, nil } -// getLatestFinalizedBlockNumber fetches the block number of the latest finalized block from the L1 chain. +// GetLatestFinalizedBlockNumber fetches the block number of the latest finalized block from the L1 chain. func (c *L1Client) GetLatestFinalizedBlockNumber() (uint64, error) { header, err := c.client.HeaderByNumber(c.ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) if err != nil { @@ -91,7 +91,7 @@ func (c *L1Client) GetLatestFinalizedBlockNumber() (uint64, error) { return header.Number.Uint64(), nil } -// fetchTxData fetches tx data corresponding to given event log +// FetchTxData fetches tx data corresponding to given event log func (c *L1Client) FetchTxData(vLog *types.Log) ([]byte, error) { tx, _, err := c.client.TransactionByHash(c.ctx, vLog.TxHash) if err != nil { @@ -118,7 +118,7 @@ func (c *L1Client) FetchTxData(vLog *types.Log) ([]byte, error) { return tx.Data(), nil } -// fetchTxBlobHash fetches tx blob hash corresponding to given event log +// FetchTxBlobHash fetches tx blob hash corresponding to given event log func (c *L1Client) FetchTxBlobHash(vLog *types.Log) (common.Hash, error) { tx, _, err := c.client.TransactionByHash(c.ctx, vLog.TxHash) if err != nil { @@ -147,3 +147,12 @@ func (c *L1Client) FetchTxBlobHash(vLog *types.Log) (common.Hash, error) { } return blobHashes[0], nil } + +// GetHeaderByNumber fetches the block header by number +func (c *L1Client) GetHeaderByNumber(blockNumber uint64) (*types.Header, error) { + header, err := c.client.HeaderByNumber(c.ctx, big.NewInt(0).SetUint64(blockNumber)) + if err != nil { + return nil, err + } + return header, nil +}