Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(driver): add blob datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
YoGhurt111 committed Apr 3, 2024
1 parent b64bf1a commit 1e3fb08
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 10 deletions.
7 changes: 7 additions & 0 deletions cmd/flags/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ var (
Value: 0,
Category: driverCategory,
}
// blob server endpoint
BlobServerEndpoint = &cli.StringFlag{
Name: "blob.server",
Usage: "Blob sidecar storage server",
Category: driverCategory,
}
)

// DriverFlags All driver flags.
Expand All @@ -63,4 +69,5 @@ var DriverFlags = MergeFlags(CommonFlags, []cli.Flag{
P2PSyncTimeout,
CheckPointSyncURL,
MaxExponent,
BlobServerEndpoint,
})
20 changes: 14 additions & 6 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,28 @@ import (
"errors"
"fmt"
"math/big"
"net/url"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

"github.com/ethereum/go-ethereum"
"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/bindings/encoding"
anchorTxConstructor "github.com/taikoxyz/taiko-client/driver/anchor_tx_constructor"
"github.com/taikoxyz/taiko-client/driver/chain_syncer/beaconsync"
"github.com/taikoxyz/taiko-client/driver/state"
txlistfetcher "github.com/taikoxyz/taiko-client/driver/txlist_fetcher"
"github.com/taikoxyz/taiko-client/internal/metrics"
"github.com/taikoxyz/taiko-client/internal/utils"
eventIterator "github.com/taikoxyz/taiko-client/pkg/chain_iterator/event_iterator"
"github.com/taikoxyz/taiko-client/pkg/rpc"

anchorTxConstructor "github.com/taikoxyz/taiko-client/driver/anchor_tx_constructor"
txlistfetcher "github.com/taikoxyz/taiko-client/driver/txlist_fetcher"
eventIterator "github.com/taikoxyz/taiko-client/pkg/chain_iterator/event_iterator"
txListValidator "github.com/taikoxyz/taiko-client/pkg/txlist_validator"
)

Expand All @@ -42,6 +43,7 @@ type Syncer struct {
lastInsertedBlockID *big.Int
reorgDetectedFlag bool
maxRetrieveExponent uint64
blobDatasource *txlistfetcher.BlobDataSource
}

// NewSyncer creates a new syncer instance.
Expand All @@ -51,6 +53,7 @@ func NewSyncer(
state *state.State,
progressTracker *beaconsync.SyncProgressTracker,
maxRetrieveExponent uint64,
blobServerEndpoint *url.URL,
) (*Syncer, error) {
configs, err := client.TaikoL1.GetConfig(&bind.CallOpts{Context: ctx})
if err != nil {
Expand All @@ -74,6 +77,11 @@ func NewSyncer(
client.L2.ChainID,
),
maxRetrieveExponent: maxRetrieveExponent,
blobDatasource: txlistfetcher.NewBlobDataSource(
ctx,
client,
blobServerEndpoint,
),
}, nil
}

Expand Down Expand Up @@ -239,7 +247,7 @@ func (s *Syncer) onBlockProposed(
// Decode transactions list.
var txListDecoder txlistfetcher.TxListFetcher
if event.Meta.BlobUsed {
txListDecoder = txlistfetcher.NewBlobTxListFetcher(s.rpc)
txListDecoder = txlistfetcher.NewBlobTxListFetcher(s.rpc, s.blobDatasource)
} else {
txListDecoder = new(txlistfetcher.CalldataFetcher)
}
Expand Down
2 changes: 2 additions & 0 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *CalldataSyncerTestSuite) SetupTest() {
state2,
beaconsync.NewSyncProgressTracker(s.RPCClient.L2, 1*time.Hour),
0,
nil,
)
s.Nil(err)
s.s = syncer
Expand All @@ -55,6 +56,7 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() {
s.s.state,
s.s.progressTracker,
0,
nil,
)
s.Nil(syncer)
s.NotNil(err)
Expand Down
5 changes: 4 additions & 1 deletion driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chainsyncer
import (
"context"
"fmt"
"net/url"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -41,12 +42,14 @@ func New(
p2pSyncVerifiedBlocks bool,
p2pSyncTimeout time.Duration,
maxRetrieveExponent uint64,
blobServerEndpoint *url.URL,

) (*L2ChainSyncer, error) {
tracker := beaconsync.NewSyncProgressTracker(rpc.L2, p2pSyncTimeout)
go tracker.Track(ctx)

beaconSyncer := beaconsync.NewSyncer(ctx, rpc, state, tracker)
calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker, maxRetrieveExponent)
calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker, maxRetrieveExponent, blobServerEndpoint)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions driver/chain_syncer/chain_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *ChainSyncerTestSuite) SetupTest() {
false,
1*time.Hour,
0,
nil,
)
s.Nil(err)
s.s = syncer
Expand Down
12 changes: 12 additions & 0 deletions driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package driver
import (
"errors"
"fmt"
"net/url"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -21,6 +22,7 @@ type Config struct {
RPCTimeout time.Duration
RetryInterval time.Duration
MaxExponent uint64
BlobServerEndpoint *url.URL
}

// NewConfigFromCliContext creates a new config instance from
Expand All @@ -44,6 +46,15 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
return nil, errors.New("empty L1 beacon endpoint")
}

var blobServerEndpoint *url.URL
if c.IsSet(flags.BlobServerEndpoint.Name) {
if blobServerEndpoint, err = url.Parse(
c.String(flags.BlobServerEndpoint.Name),
); err != nil {
return nil, err
}
}

var timeout = c.Duration(flags.RPCTimeout.Name)
return &Config{
ClientConfig: &rpc.ClientConfig{
Expand All @@ -62,5 +73,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
P2PSyncTimeout: c.Duration(flags.P2PSyncTimeout.Name),
RPCTimeout: timeout,
MaxExponent: c.Uint64(flags.MaxExponent.Name),
BlobServerEndpoint: blobServerEndpoint,
}, nil
}
1 change: 1 addition & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) {
cfg.P2PSyncVerifiedBlocks,
cfg.P2PSyncTimeout,
cfg.MaxExponent,
cfg.BlobServerEndpoint,
); err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions driver/txlist_fetcher/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
// BlobFetcher is responsible for fetching the txList blob from the L1 block sidecar.
type BlobFetcher struct {
rpc *rpc.Client
ds *BlobDataSource
}

// NewBlobTxListFetcher creates a new BlobFetcher instance based on the given rpc client.
func NewBlobTxListFetcher(rpc *rpc.Client) *BlobFetcher {
return &BlobFetcher{rpc}
func NewBlobTxListFetcher(rpc *rpc.Client, ds *BlobDataSource) *BlobFetcher {
return &BlobFetcher{rpc, ds}
}

// Fetch implements the TxListFetcher interface.
Expand All @@ -35,7 +36,7 @@ func (d *BlobFetcher) Fetch(
}

// Fetch the L1 block sidecars.
sidecars, err := d.rpc.L1Beacon.GetBlobs(ctx, meta.Timestamp)
sidecars, err := d.ds.GetBlobs(ctx, meta)
if err != nil {
return nil, err
}
Expand Down
109 changes: 109 additions & 0 deletions driver/txlist_fetcher/blob_datasource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package txlistdecoder

import (
"context"
"fmt"
"net/url"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/go-resty/resty/v2"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/blob"
"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/pkg/rpc"
)

type BlobDataSource struct {
ctx context.Context
rpc *rpc.Client
blobServerEndpoint *url.URL
}

type BlobData struct {
// TODO: wait /getBlob add column
BlobHash string `json:"blob_hash"`
KzgCommitment string `json:"kzg_commitment"`
}

type BlobDataSeq struct {
Data []*BlobData `json:"data"`
}

func NewBlobDataSource(
ctx context.Context,
rpc *rpc.Client,
blobServerEndpoint *url.URL,
) *BlobDataSource {
return &BlobDataSource{
ctx: ctx,
rpc: rpc,
blobServerEndpoint: blobServerEndpoint,
}
}

// GetBlobs get blob sidecar by meta
func (ds *BlobDataSource) GetBlobs(
ctx context.Context,
meta *bindings.TaikoDataBlockMetadata,
) ([]*blob.Sidecar, error) {
if !meta.BlobUsed {
return nil, errBlobUnused
}

sidecars, err := ds.rpc.L1Beacon.GetBlobs(ctx, meta.Timestamp)
if err != nil {
log.Info("Failed to get blobs from beacon, try to use blob server.", "err", err.Error())
if ds.blobServerEndpoint == nil {
log.Info("No blob server endpoint set")
return nil, err
}
blobs, err := ds.getBlobFromServer(ctx, common.Bytes2Hex(meta.BlobHash[:]))
if err != nil {
return nil, err
}
for index, value := range blobs.Data {
sidecars[index] = &blob.Sidecar{
// TODO: wait /getBlob add column
KzgCommitment: value.KzgCommitment,
}
}
}
return sidecars, err
}

// getBlobFromServer get blob data from server path `/getBlob`.
func (ds *BlobDataSource) getBlobFromServer(ctx context.Context, blobHash string) (*BlobDataSeq, error) {
var (
route = "/getBlob"
param = map[string]string{"blobHash": blobHash}
result = &BlobDataSeq{}
)
err := ds.get(ctx, route, param, result)
if err != nil {
return nil, err
}
return result, nil
}

// get send the given GET request to the blob server.
func (ds *BlobDataSource) get(ctx context.Context, route string, param map[string]string, result interface{}) error {
resp, err := resty.New().R().
SetResult(result).
SetQueryParams(param).
SetContext(ctx).
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
Get(fmt.Sprintf("%v/%v", ds.blobServerEndpoint.String(), route))
if err != nil {
return err
}

if !resp.IsSuccess() {
return fmt.Errorf(
"unable to contect blob server endpoint, status code: %v",
resp.StatusCode(),
)
}

return nil
}
40 changes: 40 additions & 0 deletions driver/txlist_fetcher/blob_datasource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package txlistdecoder

import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/taikoxyz/taiko-client/bindings"
"net/url"
"testing"

"github.com/stretchr/testify/suite"
"github.com/taikoxyz/taiko-client/internal/testutils"
)

type BlobDataSourceTestSuite struct {
testutils.ClientTestSuite
ds *BlobDataSource
}

func (s *BlobDataSourceTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()
// Init BlobDataSource
blobServerEndpoint, err := url.Parse("https://blob.hekla.taiko.xyz")
s.Nil(err)
s.ds = NewBlobDataSource(context.Background(), s.RPCClient, blobServerEndpoint)
}

func (s *BlobDataSourceTestSuite) TestGetBlobs() {
meta := &bindings.TaikoDataBlockMetadata{
BlobUsed: true,
BlobHash: common.HexToHash(""),
Timestamp: 1,
}
sidecars, err := s.ds.GetBlobs(context.Background(), meta)
s.Nil(err)
s.Greater(len(sidecars), 0)
}

func TestBlobDataSourceTestSuite(t *testing.T) {
suite.Run(t, new(BlobDataSourceTestSuite))
}
1 change: 1 addition & 0 deletions prover/event_handler/transition_proved_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *EventHandlerTestSuite) SetupTest() {
testState,
tracker,
0,
nil,
)
s.Nil(err)

Expand Down
1 change: 1 addition & 0 deletions prover/proof_submitter/proof_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *ProofSubmitterTestSuite) SetupTest() {
testState,
tracker,
0,
nil,
)
s.Nil(err)

Expand Down

0 comments on commit 1e3fb08

Please sign in to comment.