Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SSZ encoding for block validation #600

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
)

var (
apiDefaultListenAddr = common.GetEnv("LISTEN_ADDR", "localhost:9062")
apiDefaultBlockSim = common.GetEnv("BLOCKSIM_URI", "http://localhost:8545")
apiDefaultSecretKey = common.GetEnv("SECRET_KEY", "")
apiDefaultLogTag = os.Getenv("LOG_TAG")
apiDefaultListenAddr = common.GetEnv("LISTEN_ADDR", "localhost:9062")
apiDefaultBlockSim = common.GetEnv("BLOCKSIM_URI", "http://localhost:8545")
apiDefaultBlockSimHTTP = common.GetEnv("BLOCKSIM_HTTP_URI", "http://localhost:28546")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments to the two BLOCKSIM_URI to clarify what they are used for

apiDefaultSecretKey = common.GetEnv("SECRET_KEY", "")
apiDefaultLogTag = os.Getenv("LOG_TAG")

apiDefaultPprofEnabled = os.Getenv("PPROF") == "1"
apiDefaultInternalAPIEnabled = os.Getenv("ENABLE_INTERNAL_API") == "1"
Expand All @@ -32,16 +33,17 @@ var (
apiDefaultDataAPIEnabled = os.Getenv("DISABLE_DATA_API") != "1"
apiDefaultProposerAPIEnabled = os.Getenv("DISABLE_PROPOSER_API") != "1"

apiListenAddr string
apiPprofEnabled bool
apiSecretKey string
apiBlockSimURL string
apiDebug bool
apiBuilderAPI bool
apiDataAPI bool
apiInternalAPI bool
apiProposerAPI bool
apiLogTag string
apiListenAddr string
apiPprofEnabled bool
apiSecretKey string
apiBlockSimURL string
apiBlockSimHTTPURL string
apiDebug bool
apiBuilderAPI bool
apiDataAPI bool
apiInternalAPI bool
apiProposerAPI bool
apiLogTag string
)

func init() {
Expand All @@ -60,6 +62,7 @@ func init() {
"Enable memcached, typically used as secondary backup to Redis for redundancy")
apiCmd.Flags().StringVar(&apiSecretKey, "secret-key", apiDefaultSecretKey, "secret key for signing bids")
apiCmd.Flags().StringVar(&apiBlockSimURL, "blocksim", apiDefaultBlockSim, "URL for block simulator")
apiCmd.Flags().StringVar(&apiBlockSimHTTPURL, "blocksim-http", apiDefaultBlockSimHTTP, "HTTP URL for block simulator")
apiCmd.Flags().StringVar(&network, "network", defaultNetwork, "Which network to use")

apiCmd.Flags().BoolVar(&apiPprofEnabled, "pprof", apiDefaultPprofEnabled, "enable pprof API")
Expand Down Expand Up @@ -145,15 +148,16 @@ var apiCmd = &cobra.Command{
}

opts := api.RelayAPIOpts{
Log: log,
ListenAddr: apiListenAddr,
BeaconClient: beaconClient,
Datastore: ds,
Redis: redis,
Memcached: mem,
DB: db,
EthNetDetails: *networkInfo,
BlockSimURL: apiBlockSimURL,
Log: log,
ListenAddr: apiListenAddr,
BeaconClient: beaconClient,
Datastore: ds,
Redis: redis,
Memcached: mem,
DB: db,
EthNetDetails: *networkInfo,
BlockSimURL: apiBlockSimURL,
BlockSimHTTPURL: apiBlockSimHTTPURL,

BlockBuilderAPI: apiBuilderAPI,
DataAPI: apiDataAPI,
Expand Down
74 changes: 74 additions & 0 deletions common/types_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/attestantio/go-eth2-client/spec/capella"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
fastssz "github.com/ferranbt/fastssz"
"github.com/flashbots/go-boost-utils/bls"
"github.com/flashbots/go-boost-utils/ssz"
"github.com/flashbots/go-boost-utils/utils"
Expand Down Expand Up @@ -237,6 +238,12 @@ type BuilderBlockValidationRequest struct {
ParentBeaconBlockRoot *phase0.Root
}

type DenebBlockValidationRequest struct {
builderApiDeneb.SubmitBlockRequest
RegisteredGasLimit uint64
ParentBeaconBlockRoot phase0.Root
}

type capellaBuilderBlockValidationRequestJSON struct {
Message *builderApiV1.BidTrace `json:"message"`
ExecutionPayload *capella.ExecutionPayload `json:"execution_payload"`
Expand Down Expand Up @@ -276,6 +283,73 @@ func (r *BuilderBlockValidationRequest) MarshalJSON() ([]byte, error) {
}
}

func (r *BuilderBlockValidationRequest) MarshalSSZ() ([]byte, error) {
switch r.Version { //nolint:exhaustive
case spec.DataVersionDeneb:
req := &DenebBlockValidationRequest{
SubmitBlockRequest: *r.Deneb,
RegisteredGasLimit: r.RegisteredGasLimit,
ParentBeaconBlockRoot: *r.ParentBeaconBlockRoot,
}
return req.MarshalSSZ()
default:
return nil, errors.Wrap(ErrInvalidVersion, fmt.Sprintf("%s is not supported", r.Version))
}
}

// MarshalSSZ ssz marshals the DenebBlockValidationRequest object
func (b *DenebBlockValidationRequest) MarshalSSZ() ([]byte, error) {
return fastssz.MarshalSSZ(b)
}

// MarshalSSZTo ssz marshals the DenebBlockValidationRequest object to a target array
func (b *DenebBlockValidationRequest) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf
offset := int(380)

// Field (0) 'Message'
if b.Message == nil {
b.Message = new(builderApiV1.BidTrace)
}
if dst, err = b.Message.MarshalSSZTo(dst); err != nil {
return nil, err
}

// Offset (1) 'ExecutionPayload'
dst = fastssz.WriteOffset(dst, offset)
if b.ExecutionPayload == nil {
b.ExecutionPayload = new(deneb.ExecutionPayload)
}
offset += b.ExecutionPayload.SizeSSZ()

// Offset (2) 'BlobsBundle'
dst = fastssz.WriteOffset(dst, offset)
if b.BlobsBundle == nil {
b.BlobsBundle = new(builderApiDeneb.BlobsBundle)
}

// Field (3) 'Signature'
dst = append(dst, b.Signature[:]...)

// Field (4) 'ParentBeaconBlockRoot'
dst = append(dst, b.ParentBeaconBlockRoot[:]...)

// Field (5) 'RegisteredGasLimit'
dst = fastssz.MarshalUint64(dst, b.RegisteredGasLimit)

// Field (1) 'ExecutionPayload'
if dst, err = b.ExecutionPayload.MarshalSSZTo(dst); err != nil {
return nil, err
}

// Field (2) 'BlobsBundle'
if dst, err = b.BlobsBundle.MarshalSSZTo(dst); err != nil {
return nil, err
}

return dst, nil
}

type VersionedSubmitBlockRequest struct {
builderSpec.VersionedSubmitBlockRequest
}
Expand Down
70 changes: 61 additions & 9 deletions services/api/blocksim_ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -35,17 +36,19 @@ type IBlockSimRateLimiter interface {
}

type BlockSimulationRateLimiter struct {
cv *sync.Cond
counter int64
blockSimURL string
client http.Client
cv *sync.Cond
counter int64
blockSimURL string
blockSimHTTPURL string
client http.Client
}

func NewBlockSimulationRateLimiter(blockSimURL string) *BlockSimulationRateLimiter {
func NewBlockSimulationRateLimiter(blockSimURL, blockSimHTTPURL string) *BlockSimulationRateLimiter {
return &BlockSimulationRateLimiter{
cv: sync.NewCond(&sync.Mutex{}),
counter: 0,
blockSimURL: blockSimURL,
cv: sync.NewCond(&sync.Mutex{}),
counter: 0,
blockSimURL: blockSimURL,
blockSimHTTPURL: blockSimHTTPURL,
client: http.Client{ //nolint:exhaustruct
Timeout: simRequestTimeout,
},
Expand All @@ -71,7 +74,6 @@ func (b *BlockSimulationRateLimiter) Send(context context.Context, payload *comm
return fmt.Errorf("%w, %w", ErrRequestClosed, err), nil
}

var simReq *jsonrpc.JSONRPCRequest
if payload.Version == spec.DataVersionCapella && payload.Capella == nil {
return ErrNoCapellaPayload, nil
}
Expand All @@ -95,7 +97,14 @@ func (b *BlockSimulationRateLimiter) Send(context context.Context, payload *comm
headers.Add("X-Fast-Track", "true")
}

if (b.blockSimHTTPURL != "") && (payload.Version == spec.DataVersionDeneb) {
// Create and fire off HTTP request
requestErr, validationErr = SendHTTPRequest(&b.client, payload, b.blockSimHTTPURL, headers)
return requestErr, validationErr
}

// Create and fire off JSON-RPC request
var simReq *jsonrpc.JSONRPCRequest
if payload.Version == spec.DataVersionDeneb {
simReq = jsonrpc.NewJSONRPCRequest("1", "flashbots_validateBuilderSubmissionV3", payload)
} else {
Expand All @@ -110,6 +119,49 @@ func (b *BlockSimulationRateLimiter) CurrentCounter() int64 {
return atomic.LoadInt64(&b.counter)
}

func SendHTTPRequest(client *http.Client, req *common.BuilderBlockValidationRequest, url string, headers http.Header) (requestErr, validationErr error) {
payloadBytes, err := req.MarshalSSZ()
if err != nil {
return fmt.Errorf("could not marshal request: %w", err), nil
}
httpReq, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("invalid request for %s: %w", url, err), nil
}

httpReq.Header.Add("Content-Type", "application/octet-stream")
httpReq.Header.Add("Eth-Consensus-Version", strings.ToLower(req.Version.String()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow future forks to reuse the same endpoint instead of a new endpoint incremented by version number, like in the JSON RPC endpoint.

httpReq.Header.Set("Accept", "application/json")
for k, v := range headers {
httpReq.Header.Add(k, v[0])
}

// execute request
resp, err := client.Do(httpReq)
if err != nil {
return err, nil
}
defer resp.Body.Close()

bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read response body for %s: %w", url, err), nil
}

if resp.StatusCode >= http.StatusMultipleChoices {
ec := &struct {
Code int `json:"code"`
Message string `json:"message"`
}{}
if err = json.Unmarshal(bodyBytes, ec); err != nil {
return fmt.Errorf("could not unmarshal error response from validation node for %s from %s: %w", url, string(bodyBytes), err), nil
}
return nil, fmt.Errorf("%w: %s", ErrSimulationFailed, ec.Message)
}

return nil, nil
}

// SendJSONRPCRequest sends the request to URL and returns the general JsonRpcResponse, or an error (note: not the JSONRPCError)
func SendJSONRPCRequest(client *http.Client, req jsonrpc.JSONRPCRequest, url string, headers http.Header) (res *jsonrpc.JSONRPCResponse, requestErr, validationErr error) {
buf, err := json.Marshal(req)
Expand Down
7 changes: 4 additions & 3 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ var (
type RelayAPIOpts struct {
Log *logrus.Entry

ListenAddr string
BlockSimURL string
ListenAddr string
BlockSimURL string
BlockSimHTTPURL string

BeaconClient beaconclient.IMultiBeaconClient
Datastore *datastore.Datastore
Expand Down Expand Up @@ -283,7 +284,7 @@ func NewRelayAPI(opts RelayAPIOpts) (api *RelayAPI, err error) {
payloadAttributes: make(map[string]payloadAttributesHelper),

proposerDutiesResponse: &[]byte{},
blockSimRateLimiter: NewBlockSimulationRateLimiter(opts.BlockSimURL),
blockSimRateLimiter: NewBlockSimulationRateLimiter(opts.BlockSimURL, opts.BlockSimHTTPURL),

validatorRegC: make(chan builderApiV1.SignedValidatorRegistration, 450_000),
}
Expand Down
Loading