Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
add retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
TymKh committed Nov 6, 2023
1 parent 0eb04a0 commit 8d4b3f3
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 271 deletions.
19 changes: 17 additions & 2 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/flashbots/go-boost-utils/utils"
"github.com/holiman/uint256"
"golang.org/x/time/rate"

"github.com/cenkalti/backoff/v4"
)

const (
Expand Down Expand Up @@ -270,7 +272,7 @@ func (b *Builder) submitBellatrixBlock(block *types.Block, blockValue *big.Int,
log.Error("could not validate bellatrix block", "err", err)
}
} else {
go b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
err = b.relay.SubmitBlock(&blockSubmitReq, vd)
if err != nil {
log.Error("could not submit bellatrix block", "err", err, "#commitedBundles", len(commitedBundles))
Expand Down Expand Up @@ -329,7 +331,7 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
log.Error("could not validate block for capella", "err", err)
}
} else {
go b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
go b.processBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, &blockBidMsg)
err = b.relay.SubmitBlockCapella(&blockSubmitReq, vd)
if err != nil {
log.Error("could not submit capella block", "err", err, "#commitedBundles", len(commitedBundles))
Expand All @@ -341,6 +343,19 @@ func (b *Builder) submitCapellaBlock(block *types.Block, blockValue *big.Int, or
return nil
}

func (b *Builder) processBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) {
back := backoff.NewExponentialBackOff()
back.MaxInterval = 3 * time.Second
back.MaxElapsedTime = 12 * time.Second
err := backoff.Retry(func() error {
return b.blockConsumer.ConsumeBuiltBlock(block, blockValue, ordersClosedAt, sealedAt, commitedBundles, allBundles, usedSbundles, bidTrace)
}, back)
if err != nil {
log.Error("could not consume built block", "err", err)
} else {
log.Info("successfully relayed block data to consumer")
}
}
func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) error {
if attrs == nil {
return nil
Expand Down
264 changes: 3 additions & 261 deletions flashbotsextra/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ package flashbotsextra

import (
"context"
"database/sql"
"math/big"
"time"

apiv1 "github.com/attestantio/go-builder-client/api/v1"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
Expand All @@ -21,10 +18,7 @@ const (
)

type BlockConsumer interface {
ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, OrdersClosedAt time.Time, sealedAt time.Time,
commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle,
usedSbundles []types.UsedSBundle,
bidTrace *apiv1.BidTrace)
ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, OrdersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) error
}
type IDatabaseService interface {
GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error)
Expand All @@ -33,7 +27,8 @@ type IDatabaseService interface {

type NilDbService struct{}

func (NilDbService) ConsumeBuiltBlock(block *types.Block, _ *big.Int, _ time.Time, _ time.Time, _ []types.SimulatedBundle, _ []types.SimulatedBundle, _ []types.UsedSBundle, _ *apiv1.BidTrace) {
func (NilDbService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, OrdersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) error {
return nil
}

func (NilDbService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) {
Expand All @@ -47,7 +42,6 @@ func (NilDbService) GetLatestUuidBundles(ctx context.Context, blockNum int64) ([
type DatabaseService struct {
db *sqlx.DB

insertBuiltBlockStmt *sqlx.NamedStmt
insertMissingBundleStmt *sqlx.NamedStmt
fetchPrioBundlesStmt *sqlx.NamedStmt
fetchGetLatestUuidBundlesStmt *sqlx.NamedStmt
Expand All @@ -59,11 +53,6 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {
return nil, err
}

insertBuiltBlockStmt, err := db.PrepareNamed("insert into built_blocks (block_number, profit, slot, hash, gas_limit, gas_used, base_fee, parent_hash, proposer_pubkey, proposer_fee_recipient, builder_pubkey, timestamp, timestamp_datetime, orders_closed_at, sealed_at) values (:block_number, :profit, :slot, :hash, :gas_limit, :gas_used, :base_fee, :parent_hash, :proposer_pubkey, :proposer_fee_recipient, :builder_pubkey, :timestamp, to_timestamp(:timestamp), :orders_closed_at, :sealed_at) returning block_id")
if err != nil {
return nil, err
}

insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase, :bundle_uuid) on conflict do nothing returning id")
if err != nil {
return nil, err
Expand All @@ -81,7 +70,6 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {

return &DatabaseService{
db: db,
insertBuiltBlockStmt: insertBuiltBlockStmt,
insertMissingBundleStmt: insertMissingBundleStmt,
fetchPrioBundlesStmt: fetchPrioBundlesStmt,
fetchGetLatestUuidBundlesStmt: fetchGetLatestUuidBundlesStmt,
Expand All @@ -95,252 +83,6 @@ func Min(l int, r int) int {
return r
}

func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) {
if len(bundles) == 0 {
return nil, nil
}

bundleIdsMap := make(map[uuid.UUID]uint64, len(bundles))

// Batch by 500
requestsToMake := [][]string{make([]string, 0, Min(500, len(bundles)))}
cRequestInd := 0
for i, bundle := range bundles {
if i != 0 && i%500 == 0 {
cRequestInd += 1
requestsToMake = append(requestsToMake, make([]string, 0, Min(500, len(bundles)-i)))
}
requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.SimulatedBundle.OriginalBundle.Hash.String())
}

for _, request := range requestsToMake {
query, args, err := sqlx.In("select id, bundle_hash, bundle_uuid from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request)
if err != nil {
return nil, err
}
query = ds.db.Rebind(query)

queryRes := []struct {
Id uint64 `db:"id"`
BundleHash string `db:"bundle_hash"`
BundleUUID uuid.UUID `db:"bundle_uuid"`
}{}

err = ds.db.SelectContext(ctx, &queryRes, query, args...)
if err != nil {
return nil, err
}
RowLoop:
for _, row := range queryRes {
for _, b := range bundles {
// if UUID agree it's same exact bundle we stop searching
if b.UUID == row.BundleUUID {
bundleIdsMap[b.UUID] = row.Id
continue RowLoop
}
// we can have multiple bundles with same hash eventually, so we fall back on getting row with same hash
if b.SimulatedBundle.OriginalBundle.Hash.String() == row.BundleHash {
bundleIdsMap[b.UUID] = row.Id
}
}
}
}

return bundleIdsMap, nil
}

// TODO: cache locally for current block!
func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) {
bundleIdsMap, err := ds.getBundleIds(ctx, blockNumber, bundles)
if err != nil {
return nil, err
}

toRetry := make([]uuidBundle, 0)
for _, bundle := range bundles {
if _, found := bundleIdsMap[bundle.UUID]; found {
continue
}

var bundleId uint64
missingBundleData := SimulatedBundleToDbBundle(&bundle.SimulatedBundle) // nolint: gosec
err = ds.insertMissingBundleStmt.GetContext(ctx, &bundleId, missingBundleData) // not using the tx as it relies on the unique constraint!
if err == nil {
bundleIdsMap[bundle.UUID] = bundleId
} else if err == sql.ErrNoRows /* conflict, someone else inserted the bundle before we could */ {
toRetry = append(toRetry, bundle)
} else {
log.Error("could not insert missing bundle", "err", err)
}
}

retriedBundleIds, err := ds.getBundleIds(ctx, blockNumber, toRetry)
if err != nil {
return nil, err
}

for hash, id := range retriedBundleIds {
bundleIdsMap[hash] = id
}

return bundleIdsMap, nil
}

func (ds *DatabaseService) insertBuildBlock(tx *sqlx.Tx, ctx context.Context, block *types.Block, blockValue *big.Int, bidTrace *apiv1.BidTrace, ordersClosedAt time.Time, sealedAt time.Time) (uint64, error) {
blockData := BuiltBlock{
BlockNumber: block.NumberU64(),
Profit: new(big.Rat).SetFrac(blockValue, big.NewInt(1e18)).FloatString(18),
Slot: bidTrace.Slot,
Hash: block.Hash().String(),
GasLimit: block.GasLimit(),
GasUsed: block.GasUsed(),
BaseFee: block.BaseFee().Uint64(),
ParentHash: block.ParentHash().String(),
ProposerPubkey: bidTrace.ProposerPubkey.String(),
ProposerFeeRecipient: bidTrace.ProposerFeeRecipient.String(),
BuilderPubkey: bidTrace.BuilderPubkey.String(),
Timestamp: block.Time(),
OrdersClosedAt: ordersClosedAt.UTC(),
SealedAt: sealedAt.UTC(),
}

var blockId uint64
if err := tx.NamedStmtContext(ctx, ds.insertBuiltBlockStmt).GetContext(ctx, &blockId, blockData); err != nil {
log.Error("could not insert built block", "err", err)
return 0, err
}

return blockId, nil
}

func (ds *DatabaseService) insertBuildBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error {
if len(bundleIds) == 0 {
return nil
}

toInsert := make([]blockAndBundleId, len(bundleIds))
for i, bundleId := range bundleIds {
toInsert[i] = blockAndBundleId{blockId, bundleId}
}

_, err := tx.NamedExecContext(ctx, "insert into built_blocks_bundles (block_id, bundle_id) values (:block_id, :bundle_id)", toInsert)
return err
}

func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error {
if len(bundleIds) == 0 {
return nil
}

toInsert := make([]blockAndBundleId, 0, len(bundleIds))
for _, bundleId := range bundleIds {
toInsert = append(toInsert, blockAndBundleId{blockId, bundleId})
}

_, err := tx.NamedExecContext(ctx, "insert into built_blocks_all_bundles (block_id, bundle_id) values (:block_id, :bundle_id)", toInsert)
return err
}

func (ds *DatabaseService) insertUsedSBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, usedSbundles []types.UsedSBundle) error {
if len(usedSbundles) == 0 {
return nil
}

toInsert := make([]DbUsedSBundle, len(usedSbundles))
for i, u := range usedSbundles {
toInsert[i] = DbUsedSBundle{
BlockId: blockId,
Hash: u.Bundle.Hash().Bytes(),
Inserted: u.Success,
}
}
_, err := tx.NamedExecContext(ctx, insertUsedSbundleQuery, toInsert)
return err
}

type uuidBundle struct {
SimulatedBundle types.SimulatedBundle
UUID uuid.UUID
}

func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time,
commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle,
usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace,
) {
var allUUIDBundles = make([]uuidBundle, 0, len(allBundles))
for _, bundle := range allBundles {
allUUIDBundles = append(allUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()})
}

var commitedUUIDBundles = make([]uuidBundle, 0, len(commitedBundles))
for _, bundle := range commitedBundles {
commitedUUIDBundles = append(commitedUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()})
}

ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allUUIDBundles)
if err != nil {
log.Error("could not insert bundles", "err", err)
}

tx, err := ds.db.Beginx()
if err != nil {
log.Error("could not open DB transaction", "err", err)
return
}

blockId, err := ds.insertBuildBlock(tx, ctx, block, blockValue, bidTrace, ordersClosedAt, sealedAt)
if err != nil {
tx.Rollback()
log.Error("could not insert built block", "err", err)
return
}

commitedBundlesIds := make([]uint64, 0, len(commitedBundles))
for _, bundle := range commitedUUIDBundles {
if id, found := bundleIdsMap[bundle.UUID]; found {
commitedBundlesIds = append(commitedBundlesIds, id)
}
}

err = ds.insertBuildBlockBundleIds(tx, ctx, blockId, commitedBundlesIds)
if err != nil {
tx.Rollback()
log.Error("could not insert built block bundles", "err", err)
return
}

var uniqueBundleIDs = make(map[uint64]struct{})
var allBundleIds []uint64
// we need to filter out duplicates while we still have unique constraint on bundle_hash+block_number which leads to data discrepancies
for _, v := range bundleIdsMap {
if _, ok := uniqueBundleIDs[v]; ok {
continue
}
uniqueBundleIDs[v] = struct{}{}
allBundleIds = append(allBundleIds, v)
}
err = ds.insertAllBlockBundleIds(tx, ctx, blockId, allBundleIds)
if err != nil {
tx.Rollback()
log.Error("could not insert built block all bundles", "err", err, "block", block.NumberU64(), "commitedBundles", commitedBundlesIds)
return
}

err = ds.insertUsedSBundleIds(tx, ctx, blockId, usedSbundles)
if err != nil {
tx.Rollback()
log.Error("could not insert used sbundles", "err", err)
return
}

err = tx.Commit()
if err != nil {
log.Error("could not commit DB trasnaction", "err", err)
}
}
func (ds *DatabaseService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) {
var bundles []DbBundle
arg := map[string]interface{}{"param_block_number": uint64(blockNum), "is_high_prio": isHighPrio, "limit": lowPrioLimitSize}
Expand Down
14 changes: 6 additions & 8 deletions flashbotsextra/rpc_block_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

apiv1 "github.com/attestantio/go-builder-client/api/v1"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/flashbots/go-utils/jsonrpc"
)

Expand All @@ -18,10 +17,7 @@ func NewRpcBlockClient(URL string) *RpcBlockClient {
return &RpcBlockClient{URL: URL}
}

func (r *RpcBlockClient) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time,
commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle,
usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace,
) {
func (r *RpcBlockClient) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) error {
reqrpc := jsonrpc.JSONRPCRequest{
ID: nil,
Method: "block_consumeBuiltBlock",
Expand All @@ -31,8 +27,10 @@ func (r *RpcBlockClient) ConsumeBuiltBlock(block *types.Block, blockValue *big.I

resp, err := jsonrpc.SendJSONRPCRequest(reqrpc, r.URL)
if err != nil {
log.Error("could not send rpc request", "err", err)
} else {
log.Info("successfully relayed data to block processor via json rpc", "resp", string(resp.Result))
return err
}
if resp.Error != nil {
return resp.Error
}
return nil
}
Loading

0 comments on commit 8d4b3f3

Please sign in to comment.