diff --git a/builder/builder.go b/builder/builder.go index 4988adb84b..9ba147c67b 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -29,6 +29,8 @@ import ( "github.com/flashbots/go-boost-utils/utils" "github.com/holiman/uint256" "golang.org/x/time/rate" + + "github.com/cenkalti/backoff/v4" ) const ( @@ -64,6 +66,7 @@ type IBuilder interface { type Builder struct { ds flashbotsextra.IDatabaseService + blockConsumer flashbotsextra.BlockConsumer relay IRelay eth IEthereumService dryRun bool @@ -91,6 +94,7 @@ type Builder struct { type BuilderArgs struct { sk *bls.SecretKey ds flashbotsextra.IDatabaseService + blockConsumer flashbotsextra.BlockConsumer relay IRelay builderSigningDomain phase0.Domain builderBlockResubmitInterval time.Duration @@ -130,6 +134,7 @@ func NewBuilder(args BuilderArgs) (*Builder, error) { slotCtx, slotCtxCancel := context.WithCancel(context.Background()) return &Builder{ ds: args.ds, + blockConsumer: args.blockConsumer, relay: args.relay, eth: args.eth, dryRun: args.dryRun, @@ -267,7 +272,7 @@ func (b *Builder) submitBellatrixBlock(block *types.Block, blockValue *big.Int, log.Error("could not validate bellatrix block", "err", err) } } else { - go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg) + go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg) err = b.relay.SubmitBlock(&blockSubmitReq, vd) if err != nil { log.Error("could not submit bellatrix block", "err", err, "#commitedBundles", len(commitedBundles)) @@ -326,7 +331,7 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or log.Error("could not validate block for capella", "err", err) } } else { - go b.ds.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg) + go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg) err = b.relay.SubmitBlockCapella(&blockSubmitReq, vd) if err != nil { log.Error("could not submit capella block", "err", err, "#commitedBundles", len(commitedBundles)) @@ -338,6 +343,19 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or return nil } +func (b *Builder) processBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) { + back := backoff.NewExponentialBackOff() + back.MaxInterval = 3 * time.Second + back.MaxElapsedTime = 12 * time.Second + err := backoff.Retry(func() error { + return b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, bidTrace) + }, back) + if err != nil { + log.Error("could not consume built block", "err", err) + } else { + log.Info("successfully relayed block data to consumer") + } +} func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) error { if attrs == nil { return nil diff --git a/builder/builder_test.go b/builder/builder_test.go index c2be63ea33..6fdd34f9fe 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -97,6 +97,7 @@ func TestOnPayloadAttributes(t *testing.T) { validator: nil, beaconClient: &testBeacon, limiter: nil, + blockConsumer: flashbotsextra.NilDbService{}, } builder, err := NewBuilder(builderArgs) require.NoError(t, err) diff --git a/builder/config.go b/builder/config.go index ca73966a1c..de9868caaf 100644 --- a/builder/config.go +++ b/builder/config.go @@ -28,6 +28,7 @@ type Config struct { BuilderSubmissionOffset time.Duration `toml:",omitempty"` DiscardRevertibleTxOnErr bool `toml:",omitempty"` EnableCancellations bool `toml:",omitempty"` + BlockProcessorURL string `toml:",omitempty"` } // DefaultConfig is the default config for the builder. diff --git a/builder/local_relay_test.go b/builder/local_relay_test.go index d536ad65be..047111c8a6 100644 --- a/builder/local_relay_test.go +++ b/builder/local_relay_test.go @@ -56,6 +56,7 @@ func newTestBackend(t *testing.T, forkchoiceData *engine.ExecutableData, block * validator: nil, beaconClient: beaconClient, limiter: nil, + blockConsumer: flashbotsextra.NilDbService{}, } backend, _ := NewBuilder(builderArgs) diff --git a/builder/service.go b/builder/service.go index 64c262cb29..67e052b5bf 100644 --- a/builder/service.go +++ b/builder/service.go @@ -250,6 +250,15 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { submissionOffset = SubmissionOffsetFromEndOfSlotSecondsDefault } + var blockConsumer flashbotsextra.BlockConsumer + rpcURL := cfg.BlockProcessorURL + if rpcURL != "" { + blockConsumer = flashbotsextra.NewRpcBlockClient(rpcURL) + } else { + log.Warn("Block consumer url is empty. Built block data reporting is essentially disabled") + blockConsumer = flashbotsextra.NilDbService{} + } + // TODO: move to proper flags var ds flashbotsextra.IDatabaseService dbDSN := os.Getenv("FLASHBOTS_POSTGRES_DSN") @@ -282,6 +291,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { builderArgs := BuilderArgs{ sk: builderSk, + blockConsumer: blockConsumer, ds: ds, dryRun: cfg.DryRun, eth: ethereumService, diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c04c5614d1..329a348451 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -184,6 +184,7 @@ var ( utils.BuilderSubmissionOffset, utils.BuilderDiscardRevertibleTxOnErr, utils.BuilderEnableCancellations, + utils.BuilderBlockProcessorURL, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 3fc51adc03..2944ccdc91 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -893,6 +893,12 @@ var ( Category: flags.BuilderCategory, } + BuilderBlockProcessorURL = &cli.StringFlag{ + Name: "builder.block_processor_url", + Usage: "RPC URL for the block processor", + Category: flags.BuilderCategory, + } + // RPC settings IPCDisabledFlag = &cli.BoolFlag{ Name: "ipcdisable", @@ -1724,6 +1730,8 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) { cfg.DiscardRevertibleTxOnErr = ctx.Bool(BuilderDiscardRevertibleTxOnErr.Name) cfg.EnableCancellations = ctx.IsSet(BuilderEnableCancellations.Name) cfg.BuilderRateLimitResubmitInterval = ctx.String(BuilderBlockResubmitInterval.Name) + + cfg.BlockProcessorURL = ctx.String(BuilderBlockProcessorURL.Name) } // SetNodeConfig applies node-related command line flags to the config. diff --git a/flashbotsextra/database.go b/flashbotsextra/database.go index 80e8170607..bb4d5ca88a 100644 --- a/flashbotsextra/database.go +++ b/flashbotsextra/database.go @@ -2,15 +2,12 @@ package flashbotsextra import ( "context" - "database/sql" "math/big" "time" apiv1 "github.com/attestantio/go-builder-client/api/v1" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/google/uuid" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" ) @@ -20,18 +17,18 @@ const ( lowPrioLimitSize = 100 ) +type BlockConsumer interface { + ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, OrdersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) error +} type IDatabaseService interface { - ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, OrdersClosedAt time.Time, sealedAt time.Time, - commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, - usedSbundles []types.UsedSBundle, - bidTrace *apiv1.BidTrace) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error) } type NilDbService struct{} -func (NilDbService) ConsumeBuiltBlock(block *types.Block, _ *big.Int, _ time.Time, _ time.Time, _ []types.SimulatedBundle, _ []types.SimulatedBundle, _ []types.UsedSBundle, _ *apiv1.BidTrace) { +func (NilDbService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, OrdersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) error { + return nil } func (NilDbService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) { @@ -45,7 +42,6 @@ func (NilDbService) GetLatestUuidBundles(ctx context.Context, blockNum int64) ([ type DatabaseService struct { db *sqlx.DB - insertBuiltBlockStmt *sqlx.NamedStmt insertMissingBundleStmt *sqlx.NamedStmt fetchPrioBundlesStmt *sqlx.NamedStmt fetchGetLatestUuidBundlesStmt *sqlx.NamedStmt @@ -57,11 +53,6 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { return nil, err } - insertBuiltBlockStmt, err := db.PrepareNamed("insert into built_blocks (block_number, profit, slot, hash, gas_limit, gas_used, base_fee, parent_hash, proposer_pubkey, proposer_fee_recipient, builder_pubkey, timestamp, timestamp_datetime, orders_closed_at, sealed_at) values (:block_number, :profit, :slot, :hash, :gas_limit, :gas_used, :base_fee, :parent_hash, :proposer_pubkey, :proposer_fee_recipient, :builder_pubkey, :timestamp, to_timestamp(:timestamp), :orders_closed_at, :sealed_at) returning block_id") - if err != nil { - return nil, err - } - insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase, :bundle_uuid) on conflict do nothing returning id") if err != nil { return nil, err @@ -79,7 +70,6 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { return &DatabaseService{ db: db, - insertBuiltBlockStmt: insertBuiltBlockStmt, insertMissingBundleStmt: insertMissingBundleStmt, fetchPrioBundlesStmt: fetchPrioBundlesStmt, fetchGetLatestUuidBundlesStmt: fetchGetLatestUuidBundlesStmt, @@ -93,252 +83,6 @@ func Min(l int, r int) int { return r } -func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) { - if len(bundles) == 0 { - return nil, nil - } - - bundleIdsMap := make(map[uuid.UUID]uint64, len(bundles)) - - // Batch by 500 - requestsToMake := [][]string{make([]string, 0, Min(500, len(bundles)))} - cRequestInd := 0 - for i, bundle := range bundles { - if i != 0 && i%500 == 0 { - cRequestInd += 1 - requestsToMake = append(requestsToMake, make([]string, 0, Min(500, len(bundles)-i))) - } - requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.SimulatedBundle.OriginalBundle.Hash.String()) - } - - for _, request := range requestsToMake { - query, args, err := sqlx.In("select id, bundle_hash, bundle_uuid from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request) - if err != nil { - return nil, err - } - query = ds.db.Rebind(query) - - queryRes := []struct { - Id uint64 `db:"id"` - BundleHash string `db:"bundle_hash"` - BundleUUID uuid.UUID `db:"bundle_uuid"` - }{} - - err = ds.db.SelectContext(ctx, &queryRes, query, args...) - if err != nil { - return nil, err - } - RowLoop: - for _, row := range queryRes { - for _, b := range bundles { - // if UUID agree it's same exact bundle we stop searching - if b.UUID == row.BundleUUID { - bundleIdsMap[b.UUID] = row.Id - continue RowLoop - } - // we can have multiple bundles with same hash eventually, so we fall back on getting row with same hash - if b.SimulatedBundle.OriginalBundle.Hash.String() == row.BundleHash { - bundleIdsMap[b.UUID] = row.Id - } - } - } - } - - return bundleIdsMap, nil -} - -// TODO: cache locally for current block! -func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) { - bundleIdsMap, err := ds.getBundleIds(ctx, blockNumber, bundles) - if err != nil { - return nil, err - } - - toRetry := make([]uuidBundle, 0) - for _, bundle := range bundles { - if _, found := bundleIdsMap[bundle.UUID]; found { - continue - } - - var bundleId uint64 - missingBundleData := SimulatedBundleToDbBundle(&bundle.SimulatedBundle) // nolint: gosec - err = ds.insertMissingBundleStmt.GetContext(ctx, &bundleId, missingBundleData) // not using the tx as it relies on the unique constraint! - if err == nil { - bundleIdsMap[bundle.UUID] = bundleId - } else if err == sql.ErrNoRows /* conflict, someone else inserted the bundle before we could */ { - toRetry = append(toRetry, bundle) - } else { - log.Error("could not insert missing bundle", "err", err) - } - } - - retriedBundleIds, err := ds.getBundleIds(ctx, blockNumber, toRetry) - if err != nil { - return nil, err - } - - for hash, id := range retriedBundleIds { - bundleIdsMap[hash] = id - } - - return bundleIdsMap, nil -} - -func (ds *DatabaseService) insertBuildBlock(tx *sqlx.Tx, ctx context.Context, block *types.Block, blockValue *big.Int, bidTrace *apiv1.BidTrace, ordersClosedAt time.Time, sealedAt time.Time) (uint64, error) { - blockData := BuiltBlock{ - BlockNumber: block.NumberU64(), - Profit: new(big.Rat).SetFrac(blockValue, big.NewInt(1e18)).FloatString(18), - Slot: bidTrace.Slot, - Hash: block.Hash().String(), - GasLimit: block.GasLimit(), - GasUsed: block.GasUsed(), - BaseFee: block.BaseFee().Uint64(), - ParentHash: block.ParentHash().String(), - ProposerPubkey: bidTrace.ProposerPubkey.String(), - ProposerFeeRecipient: bidTrace.ProposerFeeRecipient.String(), - BuilderPubkey: bidTrace.BuilderPubkey.String(), - Timestamp: block.Time(), - OrdersClosedAt: ordersClosedAt.UTC(), - SealedAt: sealedAt.UTC(), - } - - var blockId uint64 - if err := tx.NamedStmtContext(ctx, ds.insertBuiltBlockStmt).GetContext(ctx, &blockId, blockData); err != nil { - log.Error("could not insert built block", "err", err) - return 0, err - } - - return blockId, nil -} - -func (ds *DatabaseService) insertBuildBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error { - if len(bundleIds) == 0 { - return nil - } - - toInsert := make([]blockAndBundleId, len(bundleIds)) - for i, bundleId := range bundleIds { - toInsert[i] = blockAndBundleId{blockId, bundleId} - } - - _, err := tx.NamedExecContext(ctx, "insert into built_blocks_bundles (block_id, bundle_id) values (:block_id, :bundle_id)", toInsert) - return err -} - -func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error { - if len(bundleIds) == 0 { - return nil - } - - toInsert := make([]blockAndBundleId, 0, len(bundleIds)) - for _, bundleId := range bundleIds { - toInsert = append(toInsert, blockAndBundleId{blockId, bundleId}) - } - - _, err := tx.NamedExecContext(ctx, "insert into built_blocks_all_bundles (block_id, bundle_id) values (:block_id, :bundle_id)", toInsert) - return err -} - -func (ds *DatabaseService) insertUsedSBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, usedSbundles []types.UsedSBundle) error { - if len(usedSbundles) == 0 { - return nil - } - - toInsert := make([]DbUsedSBundle, len(usedSbundles)) - for i, u := range usedSbundles { - toInsert[i] = DbUsedSBundle{ - BlockId: blockId, - Hash: u.Bundle.Hash().Bytes(), - Inserted: u.Success, - } - } - _, err := tx.NamedExecContext(ctx, insertUsedSbundleQuery, toInsert) - return err -} - -type uuidBundle struct { - SimulatedBundle types.SimulatedBundle - UUID uuid.UUID -} - -func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, - commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, - usedSbundles []types.UsedSBundle, - bidTrace *apiv1.BidTrace) { - var allUUIDBundles = make([]uuidBundle, 0, len(allBundles)) - for _, bundle := range allBundles { - allUUIDBundles = append(allUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()}) - } - - var commitedUUIDBundles = make([]uuidBundle, 0, len(commitedBundles)) - for _, bundle := range commitedBundles { - commitedUUIDBundles = append(commitedUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()}) - } - - ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) - defer cancel() - - bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allUUIDBundles) - if err != nil { - log.Error("could not insert bundles", "err", err) - } - - tx, err := ds.db.Beginx() - if err != nil { - log.Error("could not open DB transaction", "err", err) - return - } - - blockId, err := ds.insertBuildBlock(tx, ctx, block, blockValue, bidTrace, ordersClosedAt, sealedAt) - if err != nil { - tx.Rollback() - log.Error("could not insert built block", "err", err) - return - } - - commitedBundlesIds := make([]uint64, 0, len(commitedBundles)) - for _, bundle := range commitedUUIDBundles { - if id, found := bundleIdsMap[bundle.UUID]; found { - commitedBundlesIds = append(commitedBundlesIds, id) - } - } - - err = ds.insertBuildBlockBundleIds(tx, ctx, blockId, commitedBundlesIds) - if err != nil { - tx.Rollback() - log.Error("could not insert built block bundles", "err", err) - return - } - - var uniqueBundleIDs = make(map[uint64]struct{}) - var allBundleIds []uint64 - // we need to filter out duplicates while we still have unique constraint on bundle_hash+block_number which leads to data discrepancies - for _, v := range bundleIdsMap { - if _, ok := uniqueBundleIDs[v]; ok { - continue - } - uniqueBundleIDs[v] = struct{}{} - allBundleIds = append(allBundleIds, v) - } - err = ds.insertAllBlockBundleIds(tx, ctx, blockId, allBundleIds) - if err != nil { - tx.Rollback() - log.Error("could not insert built block all bundles", "err", err, "block", block.NumberU64(), "commitedBundles", commitedBundlesIds) - return - } - - err = ds.insertUsedSBundleIds(tx, ctx, blockId, usedSbundles) - if err != nil { - tx.Rollback() - log.Error("could not insert used sbundles", "err", err) - return - } - - err = tx.Commit() - if err != nil { - log.Error("could not commit DB trasnaction", "err", err) - } -} func (ds *DatabaseService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) { var bundles []DbBundle arg := map[string]interface{}{"param_block_number": uint64(blockNum), "is_high_prio": isHighPrio, "limit": lowPrioLimitSize} diff --git a/flashbotsextra/database_test.go b/flashbotsextra/database_test.go index a9c2c79337..ab5a6ba2f0 100644 --- a/flashbotsextra/database_test.go +++ b/flashbotsextra/database_test.go @@ -2,177 +2,13 @@ package flashbotsextra import ( "math/big" - "os" "testing" - "time" - apiv1 "github.com/attestantio/go-builder-client/api/v1" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" ) -func TestDatabaseBlockInsertion(t *testing.T) { - dsn := os.Getenv("FLASHBOTS_TEST_POSTGRES_DSN") - if dsn == "" { - t.Skip() - } - ds, err := NewDatabaseService(dsn) - require.NoError(t, err) - - _, err = ds.db.Exec("delete from built_blocks_bundles where block_id = (select block_id from built_blocks where hash = '0x9cc3ee47d091fea38c0187049cae56abe4e642eeb06c4832f06ec59f5dbce7ab')") - require.NoError(t, err) - - _, err = ds.db.Exec("delete from built_blocks_all_bundles where block_id = (select block_id from built_blocks where hash = '0x9cc3ee47d091fea38c0187049cae56abe4e642eeb06c4832f06ec59f5dbce7ab')") - require.NoError(t, err) - - _, err = ds.db.Exec("delete from built_blocks where hash = '0x9cc3ee47d091fea38c0187049cae56abe4e642eeb06c4832f06ec59f5dbce7ab'") - require.NoError(t, err) - - _, err = ds.db.Exec("delete from bundles where bundle_hash in ('0x0978000000000000000000000000000000000000000000000000000000000000', '0x1078000000000000000000000000000000000000000000000000000000000000', '0x0979000000000000000000000000000000000000000000000000000000000000', '0x1080000000000000000000000000000000000000000000000000000000000000')") - require.NoError(t, err) - - block := types.NewBlock( - &types.Header{ - ParentHash: common.HexToHash("0xafafafa"), - Number: big.NewInt(12), - GasLimit: uint64(10000), - GasUsed: uint64(1000), - Time: 16000000, - BaseFee: big.NewInt(7), - }, nil, nil, nil, nil) - blockProfit := big.NewInt(10) - - simBundle1 := types.SimulatedBundle{ - MevGasPrice: big.NewInt(9), - TotalEth: big.NewInt(11), - EthSentToCoinbase: big.NewInt(10), - TotalGasUsed: uint64(100), - OriginalBundle: types.MevBundle{ - Txs: types.Transactions{types.NewTransaction(uint64(50), common.Address{0x60}, big.NewInt(19), uint64(67), big.NewInt(43), []byte{})}, - BlockNumber: big.NewInt(12), - MinTimestamp: uint64(1000000), - RevertingTxHashes: []common.Hash{{0x10, 0x17}}, - Hash: common.Hash{0x09, 0x78}, - }, - } - - simBundle2 := types.SimulatedBundle{ - MevGasPrice: big.NewInt(90), - TotalEth: big.NewInt(110), - EthSentToCoinbase: big.NewInt(100), - TotalGasUsed: uint64(1000), - OriginalBundle: types.MevBundle{ - Txs: types.Transactions{types.NewTransaction(uint64(51), common.Address{0x61}, big.NewInt(109), uint64(167), big.NewInt(433), []byte{})}, - BlockNumber: big.NewInt(12), - MinTimestamp: uint64(1000020), - RevertingTxHashes: []common.Hash{{0x11, 0x17}}, - Hash: common.Hash{0x10, 0x78}, - }, - } - - var bundle2Id uint64 - ds.db.Get(&bundle2Id, "insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase) on conflict (bundle_hash, param_block_number) do nothing returning id", SimulatedBundleToDbBundle(&simBundle2)) - - simBundle3 := types.SimulatedBundle{ - MevGasPrice: big.NewInt(91), - TotalEth: big.NewInt(111), - EthSentToCoinbase: big.NewInt(101), - TotalGasUsed: uint64(101), - OriginalBundle: types.MevBundle{ - Txs: types.Transactions{types.NewTransaction(uint64(51), common.Address{0x62}, big.NewInt(20), uint64(68), big.NewInt(44), []byte{})}, - BlockNumber: big.NewInt(12), - MinTimestamp: uint64(1000021), - RevertingTxHashes: []common.Hash{{0x10, 0x18}}, - Hash: common.Hash{0x09, 0x79}, - }, - } - - simBundle4 := types.SimulatedBundle{ - MevGasPrice: big.NewInt(92), - TotalEth: big.NewInt(112), - EthSentToCoinbase: big.NewInt(102), - TotalGasUsed: uint64(1002), - OriginalBundle: types.MevBundle{ - Txs: types.Transactions{types.NewTransaction(uint64(52), common.Address{0x62}, big.NewInt(110), uint64(168), big.NewInt(434), []byte{})}, - BlockNumber: big.NewInt(12), - MinTimestamp: uint64(1000022), - RevertingTxHashes: []common.Hash{{0x11, 0x19}}, - Hash: common.Hash{0x10, 0x80}, - }, - } - - var bundle4Id uint64 - ds.db.Get(&bundle4Id, "insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase) on conflict (bundle_hash, param_block_number) do nothing returning id", SimulatedBundleToDbBundle(&simBundle4)) - - usedSbundle := types.UsedSBundle{ - Bundle: &types.SBundle{ - Inclusion: types.BundleInclusion{ - BlockNumber: 5, - MaxBlockNumber: 6, - }, - Body: []types.BundleBody{ - { - Tx: types.NewTransaction(uint64(53), common.Address{0x63}, big.NewInt(111), uint64(169), big.NewInt(435), []byte{})}, - }, - }, - Success: true, - } - - bidTrace := &apiv1.BidTrace{} - - ocAt := time.Now().Add(-time.Hour).UTC() - sealedAt := time.Now().Add(-30 * time.Minute).UTC() - ds.ConsumeBuiltBlock(block, blockProfit, ocAt, sealedAt, - []types.SimulatedBundle{simBundle1, simBundle2}, []types.SimulatedBundle{simBundle1, simBundle2, simBundle3, simBundle4}, - []types.UsedSBundle{usedSbundle}, bidTrace) - - var dbBlock BuiltBlock - require.NoError(t, ds.db.Get(&dbBlock, "select block_id, block_number, profit, slot, hash, gas_limit, gas_used, base_fee, parent_hash, timestamp, timestamp_datetime, orders_closed_at, sealed_at from built_blocks where hash = '0x9cc3ee47d091fea38c0187049cae56abe4e642eeb06c4832f06ec59f5dbce7ab'")) - require.Equal(t, BuiltBlock{ - BlockId: dbBlock.BlockId, - BlockNumber: 12, - Profit: "0.000000000000000010", - Slot: 0, - Hash: block.Hash().String(), - GasLimit: block.GasLimit(), - GasUsed: block.GasUsed(), - BaseFee: 7, - ParentHash: "0x000000000000000000000000000000000000000000000000000000000afafafa", - Timestamp: 16000000, - TimestampDatetime: dbBlock.TimestampDatetime, - OrdersClosedAt: dbBlock.OrdersClosedAt, - SealedAt: dbBlock.SealedAt, - }, dbBlock) - - require.True(t, dbBlock.TimestampDatetime.Equal(time.Unix(16000000, 0))) - require.Equal(t, ocAt.Truncate(time.Millisecond), dbBlock.OrdersClosedAt.UTC().Truncate(time.Millisecond)) - require.Equal(t, sealedAt.Truncate(time.Millisecond), dbBlock.SealedAt.UTC().Truncate(time.Millisecond)) - - var bundles []DbBundle - ds.db.Select(&bundles, "select bundle_hash, param_signed_txs, param_block_number, param_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid from bundles order by param_timestamp") - require.Len(t, bundles, 4) - require.Equal(t, []DbBundle{SimulatedBundleToDbBundle(&simBundle1), SimulatedBundleToDbBundle(&simBundle2), SimulatedBundleToDbBundle(&simBundle3), SimulatedBundleToDbBundle(&simBundle4)}, bundles) - - var commitedBundles []string - require.NoError(t, ds.db.Select(&commitedBundles, "select b.bundle_hash as bundle_hash from built_blocks_bundles bbb inner join bundles b on b.id = bbb.bundle_id where bbb.block_id = $1 order by b.param_timestamp", dbBlock.BlockId)) - require.Len(t, commitedBundles, 2) - require.Equal(t, []string{simBundle1.OriginalBundle.Hash.String(), simBundle2.OriginalBundle.Hash.String()}, commitedBundles) - - var allBundles []string - require.NoError(t, ds.db.Select(&allBundles, "select b.bundle_hash as bundle_hash from built_blocks_all_bundles bbb inner join bundles b on b.id = bbb.bundle_id where bbb.block_id = $1 order by b.param_timestamp", dbBlock.BlockId)) - require.Len(t, allBundles, 4) - require.Equal(t, []string{simBundle1.OriginalBundle.Hash.String(), simBundle2.OriginalBundle.Hash.String(), simBundle3.OriginalBundle.Hash.String(), simBundle4.OriginalBundle.Hash.String()}, allBundles) - - var usedSbundles []DbUsedSBundle - require.NoError(t, ds.db.Select(&usedSbundles, "select hash, inserted from sbundle_builder_used where block_id = $1", dbBlock.BlockId)) - require.Len(t, usedSbundles, 1) - require.Equal(t, DbUsedSBundle{ - Hash: usedSbundle.Bundle.Hash().Bytes(), - Inserted: usedSbundle.Success, - }, usedSbundles[0]) -} - func simpleTx(nonce uint64) *types.Transaction { value := big.NewInt(1000000000000000) // in wei (0.001 eth) gasLimit := uint64(21000) // in units diff --git a/flashbotsextra/database_types.go b/flashbotsextra/database_types.go index c034d6cfe8..866609d80a 100644 --- a/flashbotsextra/database_types.go +++ b/flashbotsextra/database_types.go @@ -1,13 +1,8 @@ package flashbotsextra import ( - "math/big" - "strings" "time" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/google/uuid" ) @@ -62,50 +57,8 @@ type DbLatestUuidBundle struct { BundleUUID uuid.UUID `db:"bundle_uuid"` } -type blockAndBundleId struct { - BlockId uint64 `db:"block_id"` - BundleId uint64 `db:"bundle_id"` -} - type DbUsedSBundle struct { BlockId uint64 `db:"block_id"` Hash []byte `db:"hash"` Inserted bool `db:"inserted"` } - -var insertUsedSbundleQuery = ` -INSERT INTO sbundle_builder_used (block_id, hash, inserted) -VALUES (:block_id, :hash, :inserted) -ON CONFLICT (block_id, hash) DO NOTHING` - -func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle { - revertingTxHashes := make([]string, len(bundle.OriginalBundle.RevertingTxHashes)) - for i, rTxHash := range bundle.OriginalBundle.RevertingTxHashes { - revertingTxHashes[i] = rTxHash.String() - } - paramRevertingTxHashes := strings.Join(revertingTxHashes, ",") - signedTxsStrings := make([]string, len(bundle.OriginalBundle.Txs)) - for i, tx := range bundle.OriginalBundle.Txs { - txBytes, err := tx.MarshalBinary() - if err != nil { - log.Error("could not marshal tx bytes", "err", err) - continue - } - signedTxsStrings[i] = hexutil.Encode(txBytes) - } - - return DbBundle{ - BundleHash: bundle.OriginalBundle.Hash.String(), - BundleUUID: bundle.OriginalBundle.ComputeUUID(), - ParamSignedTxs: strings.Join(signedTxsStrings, ","), - ParamBlockNumber: bundle.OriginalBundle.BlockNumber.Uint64(), - ParamTimestamp: &bundle.OriginalBundle.MinTimestamp, - ParamRevertingTxHashes: ¶mRevertingTxHashes, - - CoinbaseDiff: new(big.Rat).SetFrac(bundle.TotalEth, big.NewInt(1e18)).FloatString(18), - TotalGasUsed: bundle.TotalGasUsed, - StateBlockNumber: bundle.OriginalBundle.BlockNumber.Uint64(), - GasFees: new(big.Int).Mul(big.NewInt(int64(bundle.TotalGasUsed)), bundle.MevGasPrice).String(), - EthSentToCoinbase: new(big.Rat).SetFrac(bundle.EthSentToCoinbase, big.NewInt(1e18)).FloatString(18), - } -} diff --git a/flashbotsextra/rpc_block_service.go b/flashbotsextra/rpc_block_service.go new file mode 100644 index 0000000000..e9a3504f46 --- /dev/null +++ b/flashbotsextra/rpc_block_service.go @@ -0,0 +1,36 @@ +package flashbotsextra + +import ( + "math/big" + "time" + + apiv1 "github.com/attestantio/go-builder-client/api/v1" + "github.com/ethereum/go-ethereum/core/types" + "github.com/flashbots/go-utils/jsonrpc" +) + +type RpcBlockClient struct { + URL string +} + +func NewRpcBlockClient(URL string) *RpcBlockClient { + return &RpcBlockClient{URL: URL} +} + +func (r *RpcBlockClient) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) error { + reqrpc := jsonrpc.JSONRPCRequest{ + ID: nil, + Method: "block_consumeBuiltBlock", + Version: "2.0", + Params: []interface{}{block.Header(), blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, bidTrace}, + } + + resp, err := jsonrpc.SendJSONRPCRequest(reqrpc, r.URL) + if err != nil { + return err + } + if resp.Error != nil { + return resp.Error + } + return nil +} diff --git a/go.mod b/go.mod index 651b1a2ab6..a890af8c06 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.1.1 github.com/aws/aws-sdk-go-v2/service/route53 v1.1.1 github.com/btcsuite/btcd/btcec/v2 v2.3.2 + github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/cp v1.1.1 github.com/cloudflare/cloudflare-go v0.14.0 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 diff --git a/go.sum b/go.sum index c92e8bf331..56ecb10a38 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,8 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU= github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=