Skip to content

Commit

Permalink
Add finalizer component to TXM (#13638)
Browse files Browse the repository at this point in the history
* Added a finalizer component that assesses confirmed transactions for finality

* Moved Finalizer component into EVM code and addressed feedback

* Fixed linting and renumbered sql migration

* Added limit to Finalizer RPC batch calls

* Cleaned up unneeded code

* Renumbered sql migration

* Updated Finalizer to use LatestAndFinalizedBlock method from HeadTracker

* Fixed health check tests and fixed linting

* Fixed lint error

* Fixed lint error

* Added finalized state to replace finalized column

* Updated finalizer batch RPC validation to use blockByNumber and added filter to DB query

* Updated reaper to reap old confirmed transactions

* Fixed migration test

* Fixed lint error

* Changed log level

* Renumbered sql migration

* Updated Finalizer to only process on new finalized heads and improved query performance

* Fixed mocks

* Updated TxStore method name and fixed mocks

* Fixed mock

* Updated TxStore method to exit early

* Removed unused error

---------

Co-authored-by: Silas Lenihan <[email protected]>
  • Loading branch information
amit-momin and silaslenihan authored Aug 6, 2024
1 parent ce90bc3 commit 2312827
Show file tree
Hide file tree
Showing 34 changed files with 1,170 additions and 357 deletions.
5 changes: 5 additions & 0 deletions .changeset/itchy-bugs-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Introduced finalized transaction state. Added a finalizer component to the TXM to mark transactions as finalized. #internal
1 change: 1 addition & 0 deletions common/txmgr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
TxUnconfirmed = txmgrtypes.TxState("unconfirmed")
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
TxFinalized = txmgrtypes.TxState("finalized")
)
9 changes: 3 additions & 6 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
// Reaper handles periodic database cleanup for Txm
type Reaper[CHAIN_ID types.ID] struct {
store txmgrtypes.TxHistoryReaper[CHAIN_ID]
config txmgrtypes.ReaperChainConfig
txConfig txmgrtypes.ReaperTransactionsConfig
chainID CHAIN_ID
log logger.Logger
Expand All @@ -25,10 +24,9 @@ type Reaper[CHAIN_ID types.ID] struct {
}

// NewReaper instantiates a new reaper object
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] {
r := &Reaper[CHAIN_ID]{
store,
config,
txConfig,
chainID,
logger.Named(lggr, "Reaper"),
Expand Down Expand Up @@ -103,13 +101,12 @@ func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {
r.log.Debug("Transactions.ReaperThreshold set to 0; skipping ReapTxes")
return nil
}
minBlockNumberToKeep := headNum - int64(r.config.FinalityDepth())
mark := time.Now()
timeThreshold := mark.Add(-threshold)

r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)
r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold)

if err := r.store.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, r.chainID); err != nil {
if err := r.store.ReapTxHistory(ctx, timeThreshold, r.chainID); err != nil {
return err
}

Expand Down
20 changes: 17 additions & 3 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type Txm[
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newErrorClassifier NewErrorClassifier
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewTxm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
finalizer txmgrtypes.Finalizer[BLOCK_HASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -165,13 +167,14 @@ func NewTxm[
resender: resender,
tracker: tracker,
newErrorClassifier: newErrorClassifierFunc,
finalizer: finalizer,
}

if txCfg.ResendAfterThreshold() <= 0 {
b.logger.Info("Resender: Disabled")
}
if txCfg.ReaperThreshold() > 0 && txCfg.ReaperInterval() > 0 {
b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, cfg, txCfg, chainId)
b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, txCfg, chainId)
} else {
b.logger.Info("TxReaper: Disabled")
}
Expand Down Expand Up @@ -199,6 +202,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return fmt.Errorf("Txm: Tracker failed to start: %w", err)
}

if err := ms.Start(ctx, b.finalizer); err != nil {
return fmt.Errorf("Txm: Finalizer failed to start: %w", err)
}

b.logger.Info("Txm starting runLoop")
b.wg.Add(1)
go b.runLoop()
Expand Down Expand Up @@ -293,6 +300,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthRepo
services.CopyHealth(report, b.broadcaster.HealthReport())
services.CopyHealth(report, b.confirmer.HealthReport())
services.CopyHealth(report, b.txAttemptBuilder.HealthReport())
services.CopyHealth(report, b.finalizer.HealthReport())
})

if b.txConfig.ForwardersEnabled() {
Expand Down Expand Up @@ -415,6 +423,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
b.finalizer.DeliverLatestHead(head)
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
// into this block after chStop has already been closed and the
Expand Down Expand Up @@ -446,6 +455,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
}
err = b.finalizer.Close()
if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) {
b.logger.Errorw(fmt.Sprintf("Failed to Close Finalizer: %v", err), "err", err)
}
return
case <-keysChanged:
// This check prevents the weird edge-case where you can select
Expand Down Expand Up @@ -644,9 +657,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac
// Return unconfirmed for ConfirmedMissingReceipt since a receipt is required to determine if it is finalized
return commontypes.Unconfirmed, nil
case TxConfirmed:
// TODO: Check for finality and return finalized status
// Return unconfirmed if tx receipt's block is newer than the latest finalized block
// Return unconfirmed for confirmed transactions because they are not yet finalized
return commontypes.Unconfirmed, nil
case TxFinalized:
return commontypes.Finalized, nil
case TxFatalError:
// Use an ErrorClassifier to determine if the transaction is considered Fatal
txErr := b.newErrorClassifier(tx.GetError())
Expand Down
6 changes: 0 additions & 6 deletions common/txmgr/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import "time"
type TransactionManagerChainConfig interface {
BroadcasterChainConfig
ConfirmerChainConfig
ReaperChainConfig
}

type TransactionManagerFeeConfig interface {
Expand Down Expand Up @@ -74,11 +73,6 @@ type ResenderTransactionsConfig interface {
MaxInFlight() uint32
}

// ReaperConfig is the config subset used by the reaper
type ReaperChainConfig interface {
FinalityDepth() uint32
}

type ReaperTransactionsConfig interface {
ReaperInterval() time.Duration
ReaperThreshold() time.Duration
Expand Down
12 changes: 12 additions & 0 deletions common/txmgr/types/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type Finalizer[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]] interface {
// interfaces for running the underlying estimator
services.Service
DeliverLatestHead(head HEAD) bool
}
77 changes: 0 additions & 77 deletions common/txmgr/types/mocks/reaper_chain_config.go

This file was deleted.

80 changes: 10 additions & 70 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,10 @@ type TransactionStore[
UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxForRebroadcast(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (finalized bool, err error)
}

type TxHistoryReaper[CHAIN_ID types.ID] interface {
ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error
ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID) error
}

type UnstartedTxQueuePruner interface {
Expand Down
Loading

0 comments on commit 2312827

Please sign in to comment.