Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Decouple block processing (#129)
Browse files Browse the repository at this point in the history
* tmp relay reporting via rpc

* decouple consumeBuiltBlock service and init it

* reg builder flag

* fix lint

* add retry logic

* cleanup old test

* fix tests

* go mod tidy
  • Loading branch information
TymKh authored Jan 25, 2024
1 parent 03ee71c commit 18be100
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 474 deletions.
22 changes: 20 additions & 2 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/flashbots/go-boost-utils/utils"
"github.com/holiman/uint256"
"golang.org/x/time/rate"

"github.com/cenkalti/backoff/v4"
)

const (
Expand Down Expand Up @@ -64,6 +66,7 @@ type IBuilder interface {

type Builder struct {
ds flashbotsextra.IDatabaseService
blockConsumer flashbotsextra.BlockConsumer
relay IRelay
eth IEthereumService
dryRun bool
Expand Down Expand Up @@ -91,6 +94,7 @@ type Builder struct {
type BuilderArgs struct {
sk *bls.SecretKey
ds flashbotsextra.IDatabaseService
blockConsumer flashbotsextra.BlockConsumer
relay IRelay
builderSigningDomain phase0.Domain
builderBlockResubmitInterval time.Duration
Expand Down Expand Up @@ -130,6 +134,7 @@ func NewBuilder(args BuilderArgs) (*Builder, error) {
slotCtx, slotCtxCancel := context.WithCancel(context.Background())
return &Builder{
ds: args.ds,
blockConsumer: args.blockConsumer,
relay: args.relay,
eth: args.eth,
dryRun: args.dryRun,
Expand Down Expand Up @@ -267,7 +272,7 @@ func (b *Builder) submitBellatrixBlock(block *types.Block, blockValue *big.Int,
log.Error("could not validate bellatrix block", "err", err)
}
} else {
go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
err = b.relay.SubmitBlock(&blockSubmitReq, vd)
if err != nil {
log.Error("could not submit bellatrix block", "err", err, "#commitedBundles", len(commitedBundles))
Expand Down Expand Up @@ -326,7 +331,7 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
log.Error("could not validate block for capella", "err", err)
}
} else {
go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
err = b.relay.SubmitBlockCapella(&blockSubmitReq, vd)
if err != nil {
log.Error("could not submit capella block", "err", err, "#commitedBundles", len(commitedBundles))
Expand All @@ -338,6 +343,19 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
return nil
}

func (b *Builder) processBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) {
back := backoff.NewExponentialBackOff()
back.MaxInterval = 3 * time.Second
back.MaxElapsedTime = 12 * time.Second
err := backoff.Retry(func() error {
return b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, bidTrace)
}, back)
if err != nil {
log.Error("could not consume built block", "err", err)
} else {
log.Info("successfully relayed block data to consumer")
}
}
func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) error {
if attrs == nil {
return nil
Expand Down
1 change: 1 addition & 0 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestOnPayloadAttributes(t *testing.T) {
validator: nil,
beaconClient: &testBeacon,
limiter: nil,
blockConsumer: flashbotsextra.NilDbService{},
}
builder, err := NewBuilder(builderArgs)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
BuilderSubmissionOffset time.Duration `toml:",omitempty"`
DiscardRevertibleTxOnErr bool `toml:",omitempty"`
EnableCancellations bool `toml:",omitempty"`
BlockProcessorURL string `toml:",omitempty"`
}

// DefaultConfig is the default config for the builder.
Expand Down
1 change: 1 addition & 0 deletions builder/local_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func newTestBackend(t *testing.T, forkchoiceData *engine.ExecutableData, block *
validator: nil,
beaconClient: beaconClient,
limiter: nil,
blockConsumer: flashbotsextra.NilDbService{},
}
backend, _ := NewBuilder(builderArgs)

Expand Down
10 changes: 10 additions & 0 deletions builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
submissionOffset = SubmissionOffsetFromEndOfSlotSecondsDefault
}

var blockConsumer flashbotsextra.BlockConsumer
rpcURL := cfg.BlockProcessorURL
if rpcURL != "" {
blockConsumer = flashbotsextra.NewRpcBlockClient(rpcURL)
} else {
log.Warn("Block consumer url is empty. Built block data reporting is essentially disabled")
blockConsumer = flashbotsextra.NilDbService{}
}

// TODO: move to proper flags
var ds flashbotsextra.IDatabaseService
dbDSN := os.Getenv("FLASHBOTS_POSTGRES_DSN")
Expand Down Expand Up @@ -282,6 +291,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {

builderArgs := BuilderArgs{
sk: builderSk,
blockConsumer: blockConsumer,
ds: ds,
dryRun: cfg.DryRun,
eth: ethereumService,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var (
utils.BuilderSubmissionOffset,
utils.BuilderDiscardRevertibleTxOnErr,
utils.BuilderEnableCancellations,
utils.BuilderBlockProcessorURL,
}

rpcFlags = []cli.Flag{
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,12 @@ var (
Category: flags.BuilderCategory,
}

BuilderBlockProcessorURL = &cli.StringFlag{
Name: "builder.block_processor_url",
Usage: "RPC URL for the block processor",
Category: flags.BuilderCategory,
}

// RPC settings
IPCDisabledFlag = &cli.BoolFlag{
Name: "ipcdisable",
Expand Down Expand Up @@ -1724,6 +1730,8 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) {
cfg.DiscardRevertibleTxOnErr = ctx.Bool(BuilderDiscardRevertibleTxOnErr.Name)
cfg.EnableCancellations = ctx.IsSet(BuilderEnableCancellations.Name)
cfg.BuilderRateLimitResubmitInterval = ctx.String(BuilderBlockResubmitInterval.Name)

cfg.BlockProcessorURL = ctx.String(BuilderBlockProcessorURL.Name)
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
Loading

0 comments on commit 18be100

Please sign in to comment.