Skip to content

Commit

Permalink
TxPool: Merge tx_chain and tx_packer to reduce complexity (#2549)
Browse files Browse the repository at this point in the history
* TxPool: Merge tx_chain and tx_packer to reduce complexity

* Fix copyright year
  • Loading branch information
jangko authored Aug 7, 2024
1 parent 38572bd commit d578675
Show file tree
Hide file tree
Showing 11 changed files with 509 additions and 805 deletions.
14 changes: 1 addition & 13 deletions nimbus/core/casper.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2022 Status Research & Development GmbH
# Copyright (c) 2022-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand All @@ -17,18 +17,6 @@ type
withdrawals : seq[Withdrawal] ## EIP-4895
beaconRoot : Hash256 ## EIP-4788

proc prepare*(ctx: CasperRef, header: var BlockHeader) =
header.coinbase = ctx.feeRecipient
header.timestamp = ctx.timestamp
header.prevRandao = ctx.prevRandao
header.difficulty = DifficultyInt.zero

proc prepareForSeal*(ctx: CasperRef, header: var BlockHeader) {.gcsafe, raises:[].} =
header.nonce = default(BlockNonce)
header.extraData = @[] # TODO: probably this should be configurable by user?
# this repetition, assigning prevRandao is because how txpool works
header.prevRandao = ctx.prevRandao

# ------------------------------------------------------------------------------
# Getters
# ------------------------------------------------------------------------------
Expand Down
174 changes: 15 additions & 159 deletions nimbus/core/tx_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,6 @@
## The `flags` parameter holds a set of strategy symbols for how to process
## items and buckets.
##
## *packItemsMaxGasLimit*
## It set, the *packer* will execute and collect additional items from
## the `staged` bucket while accumulating `gasUsed` as long as
## `maxGasLimit` is not exceeded. If `packItemsTryHarder` flag is also
## set, the *packer* will not stop until at least `hwmGasLimit` is
## reached.
##
## Otherwise the *packer* will accumulate up until `trgGasLimit` is
## not exceeded, and not stop until at least `lwmGasLimit` is reached
## in case `packItemsTryHarder` is also set,
##
## *packItemsTryHarder*
## It set, the *packer* will *not* stop accumulaing transactions up until
## the `lwmGasLimit` or `hwmGasLimit` is reached, depending on whether
## the `packItemsMaxGasLimit` is set. Otherwise, accumulating stops
## immediately before the next transaction exceeds `trgGasLimit`, or
## `maxGasLimit` depending on `packItemsMaxGasLimit`.
##
## *autoUpdateBucketsDB*
## Automatically update the state buckets after running batch jobs if the
## `dirtyBuckets` flag is also set.
Expand All @@ -314,33 +296,11 @@
## Automatically dispose *pending* or *staged* tx items that were added to
## the state buckets database at least `lifeTime` ago.
##
## *autoZombifyPacked*
## Automatically dispose *packed* tx itemss that were added to
## the state buckets database at least `lifeTime` ago.
##
## *..there might be more strategy symbols..*
##
## hwmTrgPercent
## This parameter implies the size of `hwmGasLimit` which is calculated
## as `max(trgGasLimit, maxGasLimit * lwmTrgPercent / 100)`.
##
## lifeTime
## Txs that stay longer in one of the buckets will be moved to a waste
## basket. From there they will be eventually deleted oldest first when
## the maximum size would be exceeded.
##
## lwmMaxPercent
## This parameter implies the size of `lwmGasLimit` which is calculated
## as `max(minGasLimit, trgGasLimit * lwmTrgPercent / 100)`.
##
## minFeePrice
## Applies no EIP-1559 txs only. Txs are packed if `maxFee` is at least
## that value.
##
## minPreLondonGasPrice
## For pre-London or legacy txs, this parameter has precedence over
## `minTipPrice`. Txs are packed if the `gasPrice` is at least that value.
##
## priceBump
## There can be only one transaction in the database for the same `sender`
## account and `nonce` value. When adding a transaction with the same
Expand All @@ -350,69 +310,21 @@
##
## Read-Only Parameters
## --------------------
##
## baseFee
## This parameter is derived from the internally cached block chain state.
## The base fee parameter modifies/determines the expected gain when packing
## a new block (is set to *zero* for *pre-London* blocks.)
##
## dirtyBuckets
## If `true`, the state buckets database is ready for re-org if the
## `autoUpdateBucketsDB` flag is also set.
##
## gasLimit
## Taken or derived from the current block chain head, incoming txs that
## exceed this gas limit are stored into the *pending* bucket (maybe
## eligible for staging at the next cycle when the internally cached block
## chain state is updated.)
##
## head
## Cached block chain insertion point, not necessarily the same header as
## retrieved by the `getCanonicalHead()`. This insertion point can be
## adjusted with the `smartHead()` function.
##
## hwmGasLimit
## This parameter is at least `trgGasLimit` and does not exceed
## `maxGasLimit` and can be adjusted by means of setting `hwmMaxPercent`. It
## is used by the packer as a minimum block size if both flags
## `packItemsTryHarder` and `packItemsMaxGasLimit` are set.
##
## lwmGasLimit
## This parameter is at least `minGasLimit` and does not exceed
## `trgGasLimit` and can be adjusted by means of setting `lwmTrgPercent`. It
## is used by the packer as a minimum block size if the flag
## `packItemsTryHarder` is set and `packItemsMaxGasLimit` is unset.
##
## maxGasLimit
## This parameter is at least `hwmGasLimit`. It is calculated considering
## the current state of the block chain as represented by the internally
## cached head. This parameter is used by the *packer* as a size limit if
## `packItemsMaxGasLimit` is set.
##
## minGasLimit
## This parameter is calculated considering the current state of the block
## chain as represented by the internally cached head. It can be used for
## verifying that a generated block does not underflow minimum size.
## Underflow can only be happen if there are not enough transaction available
## in the pool.
##
## trgGasLimit
## This parameter is at least `lwmGasLimit` and does not exceed
## `maxGasLimit`. It is calculated considering the current state of the block
## chain as represented by the internally cached head. This parameter is
## used by the *packer* as a size limit if `packItemsMaxGasLimit` is unset.
##


import
std/[sequtils, tables],
./tx_pool/[tx_chain, tx_desc, tx_info, tx_item],
./tx_pool/[tx_packer, tx_desc, tx_info, tx_item],
./tx_pool/tx_tabs,
./tx_pool/tx_tasks/[
tx_add,
tx_bucket,
tx_head,
tx_dispose,
tx_packer],
tx_dispose],
chronicles,
eth/keys,
stew/keyed_queue,
Expand All @@ -435,7 +347,8 @@ export
tx_item.sender,
tx_item.status,
tx_item.timeStamp,
tx_item.tx
tx_item.tx,
tx_desc.head

{.push raises: [].}

Expand All @@ -451,8 +364,7 @@ proc maintenanceProcessing(xp: TxPoolRef)
## Tasks to be done after add/del txs processing

# Purge expired items
if autoZombifyUnpacked in xp.pFlags or
autoZombifyPacked in xp.pFlags:
if autoZombifyUnpacked in xp.pFlags:
# Move transactions older than `xp.lifeTime` to the waste basket.
xp.disposeExpiredItems

Expand All @@ -469,9 +381,9 @@ proc setHead(xp: TxPoolRef; val: BlockHeader)
{.gcsafe,raises: [CatchableError].} =
## Update cached block chain insertion point. This will also update the
## internally cached `baseFee` (depends on the block chain state.)
if xp.chain.head != val:
xp.chain.head = val # calculates the new baseFee
xp.txDB.baseFee = xp.chain.baseFee
if xp.head != val:
xp.head = val # calculates the new baseFee
xp.txDB.baseFee = xp.baseFee
xp.pDirtyBuckets = true
xp.bucketFlushPacked

Expand Down Expand Up @@ -512,25 +424,18 @@ proc add*(xp: TxPoolRef; tx: PooledTransaction; info = "")
## Variant of `add()` for a single transaction.
xp.add(@[tx], info)

proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool
proc smartHead*(xp: TxPoolRef; pos: BlockHeader): bool
{.gcsafe,raises: [CatchableError].} =
## This function moves the internal head cache (i.e. tx insertion point,
## vmState) and ponts it to a now block on the chain.
##
## In standard mode when argument `blindMode` is `false`, it calculates the
## it calculates the
## txs that need to be added or deleted after moving the insertion point
## head so that the tx-pool will not fail to re-insert quered txs that are
## on the chain, already. Neither will it loose any txs. After updating the
## the internal head cache, the previously calculated actions will be
## applied.
##
## If the argument `blindMode` is passed `true`, the insertion head is
## simply set ignoring all changes. This mode makes sense only in very
## particular circumstances.
if blindMode:
xp.setHead(pos)
return true

let rcDiff = xp.headDiff(pos)
if rcDiff.isOk:
let changes = rcDiff.value
Expand All @@ -556,30 +461,13 @@ proc smartHead*(xp: TxPoolRef; pos: BlockHeader; blindMode = false): bool
xp.maintenanceProcessing
return true

proc triggerReorg*(xp: TxPoolRef)
{.gcsafe,raises: [CatchableError].} =
## This function triggers a tentative bucket re-org action by setting the
## `dirtyBuckets` parameter. This re-org action eventually happens only if
## the `autoUpdateBucketsDB` flag is also set.
xp.pDirtyBuckets = true
xp.maintenanceProcessing

# ------------------------------------------------------------------------------
# Public functions, getters
# ------------------------------------------------------------------------------

func com*(xp: TxPoolRef): CommonRef =
## Getter
xp.chain.com

func baseFee*(xp: TxPoolRef): GasInt =
## Getter, this parameter modifies/determines the expected gain when packing
xp.chain.baseFee

func dirtyBuckets*(xp: TxPoolRef): bool =
## Getter, bucket database is ready for re-org if the `autoUpdateBucketsDB`
## flag is also set.
xp.pDirtyBuckets
xp.vmState.com

type EthBlockAndBlobsBundle* = object
blk*: EthBlock
Expand All @@ -601,11 +489,11 @@ proc assembleBlock*(
## Note that this getter runs *ad hoc* all the txs through the VM in
## order to build the block.

xp.packerVmExec().isOkOr: # updates vmState
let pst = xp.packerVmExec().valueOr: # updates vmState
return err(error)

var blk = EthBlock(
header: xp.chain.getHeader # uses updated vmState
header: pst.assembleHeader # uses updated vmState
)
var blobsBundle: BlobsBundle

Expand All @@ -621,7 +509,7 @@ proc assembleBlock*(
for blob in tx.networkPayload.blobs:
blobsBundle.blobs.add blob

let com = xp.chain.com
let com = xp.vmState.com
if com.isShanghaiOrLater(blk.header.timestamp):
blk.withdrawals = Opt.some(com.pos.withdrawals)

Expand All @@ -643,16 +531,6 @@ proc assembleBlock*(
blk: blk,
blobsBundle: blobsBundleOpt)

func flags*(xp: TxPoolRef): set[TxPoolFlags] =
## Getter, retrieves strategy symbols for how to process items and buckets.
xp.pFlags

func head*(xp: TxPoolRef): BlockHeader =
## Getter, cached block chain insertion point. Typocally, this should be the
## the same header as retrieved by the `getCanonicalHead()` (unless in the
## middle of a mining update.)
xp.chain.head

# core/tx_pool.go(474): func (pool SetGasPrice,*TxPool) Stats() (int, int) {
# core/tx_pool.go(1728): func (t *txLookup) Count() int {
# core/tx_pool.go(1737): func (t *txLookup) LocalCount() int {
Expand All @@ -662,28 +540,6 @@ func nItems*(xp: TxPoolRef): TxTabsItemsCount =
## some totals.
xp.txDB.nItems

# ------------------------------------------------------------------------------
# Public functions, setters
# ------------------------------------------------------------------------------

func `baseFee=`*(xp: TxPoolRef; val: GasInt) {.raises: [KeyError].} =
## Setter, sets `baseFee` explicitely witout triggering a packer update.
## Stil a database update might take place when updating account ranks.
##
## Typically, this function would *not* be called but rather the `smartHead()`
## update would be employed to do the job figuring out the proper value
## for the `baseFee`.
xp.txDB.baseFee = val

func `flags=`*(xp: TxPoolRef; val: set[TxPoolFlags]) =
## Setter, strategy symbols for how to process items and buckets.
xp.pFlags = val

func `maxRejects=`*(xp: TxPoolRef; val: int) =
## Setter, the size of the waste basket. This setting becomes effective with
## the next move of an item into the waste basket.
xp.txDB.maxRejects = val

# ------------------------------------------------------------------------------
# Public functions, per-tx-item operations
# ------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit d578675

Please sign in to comment.