Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Decouple block processing #129

Merged
merged 8 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/flashbots/go-boost-utils/utils"
"github.com/holiman/uint256"
"golang.org/x/time/rate"

"github.com/cenkalti/backoff/v4"
)

const (
Expand Down Expand Up @@ -64,6 +66,7 @@ type IBuilder interface {

type Builder struct {
ds flashbotsextra.IDatabaseService
blockConsumer flashbotsextra.BlockConsumer
relay IRelay
eth IEthereumService
dryRun bool
Expand Down Expand Up @@ -91,6 +94,7 @@ type Builder struct {
type BuilderArgs struct {
sk *bls.SecretKey
ds flashbotsextra.IDatabaseService
blockConsumer flashbotsextra.BlockConsumer
relay IRelay
builderSigningDomain phase0.Domain
builderBlockResubmitInterval time.Duration
Expand Down Expand Up @@ -130,6 +134,7 @@ func NewBuilder(args BuilderArgs) (*Builder, error) {
slotCtx, slotCtxCancel := context.WithCancel(context.Background())
return &Builder{
ds: args.ds,
blockConsumer: args.blockConsumer,
relay: args.relay,
eth: args.eth,
dryRun: args.dryRun,
Expand Down Expand Up @@ -267,7 +272,7 @@ func (b *Builder) submitBellatrixBlock(block *types.Block, blockValue *big.Int,
log.Error("could not validate bellatrix block", "err", err)
}
} else {
go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
err = b.relay.SubmitBlock(&blockSubmitReq, vd)
if err != nil {
log.Error("could not submit bellatrix block", "err", err, "#commitedBundles", len(commitedBundles))
Expand Down Expand Up @@ -326,7 +331,7 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
log.Error("could not validate block for capella", "err", err)
}
} else {
go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
err = b.relay.SubmitBlockCapella(&blockSubmitReq, vd)
if err != nil {
log.Error("could not submit capella block", "err", err, "#commitedBundles", len(commitedBundles))
Expand All @@ -338,6 +343,19 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
return nil
}

func (b *Builder) processBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) {
back := backoff.NewExponentialBackOff()
back.MaxInterval = 3 * time.Second
back.MaxElapsedTime = 12 * time.Second
err := backoff.Retry(func() error {
return b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, bidTrace)
}, back)
if err != nil {
log.Error("could not consume built block", "err", err)
} else {
log.Info("successfully relayed block data to consumer")
}
}
func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) error {
if attrs == nil {
return nil
Expand Down
1 change: 1 addition & 0 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestOnPayloadAttributes(t *testing.T) {
validator: nil,
beaconClient: &testBeacon,
limiter: nil,
blockConsumer: flashbotsextra.NilDbService{},
}
builder, err := NewBuilder(builderArgs)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
BuilderSubmissionOffset time.Duration `toml:",omitempty"`
DiscardRevertibleTxOnErr bool `toml:",omitempty"`
EnableCancellations bool `toml:",omitempty"`
BlockProcessorURL string `toml:",omitempty"`
}

// DefaultConfig is the default config for the builder.
Expand Down
1 change: 1 addition & 0 deletions builder/local_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func newTestBackend(t *testing.T, forkchoiceData *engine.ExecutableData, block *
validator: nil,
beaconClient: beaconClient,
limiter: nil,
blockConsumer: flashbotsextra.NilDbService{},
}
backend, _ := NewBuilder(builderArgs)

Expand Down
10 changes: 10 additions & 0 deletions builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
submissionOffset = SubmissionOffsetFromEndOfSlotSecondsDefault
}

var blockConsumer flashbotsextra.BlockConsumer
rpcURL := cfg.BlockProcessorURL
if rpcURL != "" {
blockConsumer = flashbotsextra.NewRpcBlockClient(rpcURL)
} else {
log.Warn("Block consumer url is empty. Built block data reporting is essentially disabled")
blockConsumer = flashbotsextra.NilDbService{}
}

// TODO: move to proper flags
var ds flashbotsextra.IDatabaseService
dbDSN := os.Getenv("FLASHBOTS_POSTGRES_DSN")
Expand Down Expand Up @@ -282,6 +291,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {

builderArgs := BuilderArgs{
sk: builderSk,
blockConsumer: blockConsumer,
ds: ds,
dryRun: cfg.DryRun,
eth: ethereumService,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var (
utils.BuilderSubmissionOffset,
utils.BuilderDiscardRevertibleTxOnErr,
utils.BuilderEnableCancellations,
utils.BuilderBlockProcessorURL,
}

rpcFlags = []cli.Flag{
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,12 @@ var (
Category: flags.BuilderCategory,
}

BuilderBlockProcessorURL = &cli.StringFlag{
Name: "builder.block_processor_url",
Usage: "RPC URL for the block processor",
Category: flags.BuilderCategory,
}

// RPC settings
IPCDisabledFlag = &cli.BoolFlag{
Name: "ipcdisable",
Expand Down Expand Up @@ -1724,6 +1730,8 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) {
cfg.DiscardRevertibleTxOnErr = ctx.Bool(BuilderDiscardRevertibleTxOnErr.Name)
cfg.EnableCancellations = ctx.IsSet(BuilderEnableCancellations.Name)
cfg.BuilderRateLimitResubmitInterval = ctx.String(BuilderBlockResubmitInterval.Name)

cfg.BlockProcessorURL = ctx.String(BuilderBlockProcessorURL.Name)
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
Loading
Loading