Skip to content

Commit

Permalink
feat: block history estimator (#688)
Browse files Browse the repository at this point in the history
* block history estimator

* tidy + lint

* fix broken txm tests

* replace {} (empty = finalized) with confirmed commitment

* add blockhistory estimator config
  • Loading branch information
aalu1418 committed Jun 5, 2024
1 parent eef8bfb commit 555ff58
Show file tree
Hide file tree
Showing 16 changed files with 119,769 additions and 685 deletions.
8 changes: 4 additions & 4 deletions integration-tests/smoke/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ func TestSolanaOCRV2Smoke(t *testing.T) {
}
for successFullRounds < *config.OCR2.Smoke.NumberOfRounds {
time.Sleep(time.Second * 6)
require.Less(t, stuck, 10, "Rounds have been stuck for more than 10 iterations")
require.Less(t, stuck, 10, fmt.Sprintf("%s: Rounds have been stuck for more than 10 iterations", name))
log.Info().Str("Transmission", sg.OcrAddress).Msg("Inspecting transmissions")
transmissions, err := sg.FetchTransmissions(sg.OcrAddress)
require.NoError(t, err)
if len(transmissions) <= 1 {
log.Info().Str("Contract", sg.OcrAddress).Str("No", "Transmissions")
log.Info().Str("Contract", sg.OcrAddress).Msg(fmt.Sprintf("%s: No Transmissions", name))
stuck++
continue
}
Expand All @@ -143,11 +143,11 @@ func TestSolanaOCRV2Smoke(t *testing.T) {
prevRound = currentRound
}
if currentRound.RoundID <= prevRound.RoundID {
log.Info().Str("Transmission", sg.OcrAddress).Msg("No new transmissions")
log.Info().Str("Transmission", sg.OcrAddress).Msg(fmt.Sprintf("%s: No new transmissions", name))
stuck++
continue
}
log.Info().Str("Contract", sg.OcrAddress).Interface("Answer", currentRound.Answer).Int64("RoundID", currentRound.Answer).Msg("New answer found")
log.Info().Str("Contract", sg.OcrAddress).Interface("Answer", currentRound.Answer).Int64("RoundID", currentRound.Answer).Msg(fmt.Sprintf("%s: New answer found", name))
require.Equal(t, currentRound.Answer, int64(5), fmt.Sprintf("Actual: %d, Expected: 5", currentRound.Answer))
require.Less(t, prevRound.RoundID, currentRound.RoundID, fmt.Sprintf("Expected round %d to be less than %d", prevRound.RoundID, currentRound.RoundID))
prevRound = currentRound
Expand Down
27 changes: 26 additions & 1 deletion pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Reader interface {
LatestBlockhash() (*rpc.GetLatestBlockhashResult, error)
ChainID() (string, error)
GetFeeForMessage(msg string) (uint64, error)
GetLatestBlock() (*rpc.GetBlockResult, error)
}

// AccountReader is an interface that allows users to pass either the solana rpc client or the relay client
Expand Down Expand Up @@ -90,10 +91,14 @@ func (c *Client) Balance(addr solana.PublicKey) (uint64, error) {
}

func (c *Client) SlotHeight() (uint64, error) {
return c.SlotHeightWithCommitment(rpc.CommitmentProcessed) // get the latest slot height
}

func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetSlotHeight", func() (interface{}, error) {
return c.rpc.GetSlot(ctx, rpc.CommitmentProcessed) // get the latest slot height
return c.rpc.GetSlot(ctx, commitment)
})
return v.(uint64), err
}
Expand Down Expand Up @@ -211,3 +216,23 @@ func (c *Client) SendTx(ctx context.Context, tx *solana.Transaction) (solana.Sig

return c.rpc.SendTransactionWithOpts(ctx, tx, opts)
}

func (c *Client) GetLatestBlock() (*rpc.GetBlockResult, error) {
// get latest confirmed slot
slot, err := c.SlotHeightWithCommitment(c.commitment)
if err != nil {
return nil, fmt.Errorf("GetLatestBlock.SlotHeight: %w", err)
}

// get block based on slot
ctx, cancel := context.WithTimeout(context.Background(), c.txTimeout)
defer cancel()
v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {
version := uint64(0) // pull all tx types (legacy + v0)
return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
Commitment: c.commitment,
MaxSupportedTransactionVersion: &version,
})
})
return v.(*rpc.GetBlockResult), err
}
7 changes: 7 additions & 0 deletions pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func TestClient_Reader_Integration(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(1), res.Value.Lamports)
assert.Equal(t, "NativeLoader1111111111111111111111111111111", res.Value.Owner.String())

// get block + check for nonzero values
block, err := c.GetLatestBlock()
require.NoError(t, err)
assert.NotEqual(t, solana.Hash{}, block.Blockhash)
assert.NotEqual(t, uint64(0), block.ParentSlot)
assert.NotEqual(t, uint64(0), block.ParentSlot)
}

func TestClient_Reader_ChainID(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions pkg/solana/client/mocks/ReaderWriter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pkg/solana/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var defaultConfigSet = configSet{
ComputeUnitPriceMax: 1_000,
ComputeUnitPriceMin: 0,
ComputeUnitPriceDefault: 0,
FeeBumpPeriod: 3 * time.Second,
FeeBumpPeriod: 3 * time.Second, // set to 0 to disable fee bumping
BlockHistoryPollPeriod: 5 * time.Second,
}

//go:generate mockery --name Config --output ./mocks/ --case=underscore --filename config.go
Expand All @@ -49,6 +50,7 @@ type Config interface {
ComputeUnitPriceMin() uint64
ComputeUnitPriceDefault() uint64
FeeBumpPeriod() time.Duration
BlockHistoryPollPeriod() time.Duration
}

// opt: remove
Expand All @@ -69,6 +71,7 @@ type configSet struct {
ComputeUnitPriceMin uint64
ComputeUnitPriceDefault uint64
FeeBumpPeriod time.Duration
BlockHistoryPollPeriod time.Duration
}

type Chain struct {
Expand All @@ -87,6 +90,7 @@ type Chain struct {
ComputeUnitPriceMin *uint64
ComputeUnitPriceDefault *uint64
FeeBumpPeriod *config.Duration
BlockHistoryPollPeriod *config.Duration
}

func (c *Chain) SetDefaults() {
Expand Down Expand Up @@ -136,6 +140,9 @@ func (c *Chain) SetDefaults() {
if c.FeeBumpPeriod == nil {
c.FeeBumpPeriod = config.MustNewDuration(defaultConfigSet.FeeBumpPeriod)
}
if c.BlockHistoryPollPeriod == nil {
c.BlockHistoryPollPeriod = config.MustNewDuration(defaultConfigSet.BlockHistoryPollPeriod)
}
}

type Node struct {
Expand Down
14 changes: 14 additions & 0 deletions pkg/solana/config/mocks/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/solana/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func setFromChain(c, f *Chain) {
if f.FeeBumpPeriod != nil {
c.FeeBumpPeriod = f.FeeBumpPeriod
}
if f.BlockHistoryPollPeriod != nil {
c.BlockHistoryPollPeriod = f.BlockHistoryPollPeriod
}
}

func (c *TOMLConfig) ValidateConfig() (err error) {
Expand Down Expand Up @@ -268,6 +271,10 @@ func (c *TOMLConfig) FeeBumpPeriod() time.Duration {
return c.Chain.FeeBumpPeriod.Duration()
}

func (c *TOMLConfig) BlockHistoryPollPeriod() time.Duration {
return c.Chain.BlockHistoryPollPeriod.Duration()
}

func (c *TOMLConfig) ListNodes() Nodes {
return c.Nodes
}
Expand Down
137 changes: 137 additions & 0 deletions pkg/solana/fees/block_history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package fees

import (
"context"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
)

var _ Estimator = &blockHistoryEstimator{}

type blockHistoryEstimator struct {
starter services.StateMachine
chStop chan struct{}
done sync.WaitGroup

client *utils.LazyLoad[client.ReaderWriter]
cfg config.Config
lgr logger.Logger

price uint64
lock sync.RWMutex
}

// NewBlockHistoryEstimator creates a new fee estimator that parses historical fees from a fetched block
// Note: getRecentPrioritizationFees is not used because it provides the lowest prioritization fee for an included tx in the block
// which is not effective enough for increasing the chances of block inclusion
func NewBlockHistoryEstimator(c *utils.LazyLoad[client.ReaderWriter], cfg config.Config, lgr logger.Logger) (*blockHistoryEstimator, error) {
return &blockHistoryEstimator{
chStop: make(chan struct{}),
client: c,
cfg: cfg,
lgr: lgr,
price: cfg.ComputeUnitPriceDefault(), // use default value
}, nil
}

func (bhe *blockHistoryEstimator) Start(ctx context.Context) error {
return bhe.starter.StartOnce("solana_blockHistoryEstimator", func() error {
bhe.done.Add(1)
go bhe.run()
bhe.lgr.Debugw("BlockHistoryEstimator: started")
return nil
})
}

func (bhe *blockHistoryEstimator) run() {
defer bhe.done.Done()

tick := time.After(0)
for {
select {
case <-bhe.chStop:
return
case <-tick:
if err := bhe.calculatePrice(); err != nil {
bhe.lgr.Error(fmt.Errorf("BlockHistoryEstimator failed to fetch price: %w", err))
}
}

tick = time.After(utils.WithJitter(bhe.cfg.BlockHistoryPollPeriod()))
}
}

func (bhe *blockHistoryEstimator) Close() error {
close(bhe.chStop)
bhe.done.Wait()
bhe.lgr.Debugw("BlockHistoryEstimator: stopped")
return nil
}

func (bhe *blockHistoryEstimator) BaseComputeUnitPrice() uint64 {
price := bhe.readRawPrice()
if price >= bhe.cfg.ComputeUnitPriceMin() && price <= bhe.cfg.ComputeUnitPriceMax() {
return price
}

if price < bhe.cfg.ComputeUnitPriceMin() {
bhe.lgr.Warnw("BlockHistoryEstimator: estimation below minimum consider lowering ComputeUnitPriceMin", "min", bhe.cfg.ComputeUnitPriceMin(), "calculated", price)
return bhe.cfg.ComputeUnitPriceMin()
}

bhe.lgr.Warnw("BlockHistoryEstimator: estimation above maximum consider increasing ComputeUnitPriceMax", "min", bhe.cfg.ComputeUnitPriceMax(), "calculated", price)
return bhe.cfg.ComputeUnitPriceMax()
}

func (bhe *blockHistoryEstimator) readRawPrice() uint64 {
bhe.lock.RLock()
defer bhe.lock.RUnlock()
return bhe.price
}

func (bhe *blockHistoryEstimator) calculatePrice() error {
// fetch client
c, err := bhe.client.Get()
if err != nil {
return fmt.Errorf("failed to get client in blockHistoryEstimator.getFee: %w", err)
}

// get latest block based on configured confirmation
block, err := c.GetLatestBlock()
if err != nil {
return fmt.Errorf("failed to get block in blockHistoryEstimator.getFee: %w", err)
}

// parse block for fee data
feeData, err := ParseBlock(block)
if err != nil {
return fmt.Errorf("failed to parse block in blockHistoryEstimator.getFee: %w", err)
}

// take median of returned fee values
v, err := mathutil.Median(feeData.Prices...)
if err != nil {
return fmt.Errorf("failed to find median in blockHistoryEstimator.getFee: %w", err)
}

// set data
bhe.lock.Lock()
bhe.price = uint64(v) // ComputeUnitPrice is uint64 underneath
bhe.lock.Unlock()
bhe.lgr.Debugw("BlockHistoryEstimator: updated",
"computeUnitPrice", v,
"blockhash", block.Blockhash,
"slot", block.ParentSlot+1,
"count", len(feeData.Prices),
)
return nil
}
Loading

0 comments on commit 555ff58

Please sign in to comment.