Skip to content

Commit

Permalink
CCIP price cache use DB timestamp (#13133)
Browse files Browse the repository at this point in the history
* use statement timestamp

* expiration uses db timestamp

* changeset
  • Loading branch information
matYang authored May 8, 2024
1 parent a0d1ce5 commit 2e66837
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-gorillas-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#changed CCIP price cache to use DB timestamp
22 changes: 10 additions & 12 deletions core/services/ccip/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 12 additions & 14 deletions core/services/ccip/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type ORM interface {
InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error
InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error

ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error
ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
}

type orm struct {
Expand Down Expand Up @@ -99,20 +99,19 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector
return nil
}

now := time.Now()
insertData := make([]map[string]interface{}, 0, len(gasPrices))
for _, price := range gasPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"source_chain_selector": price.SourceChainSelector,
"gas_price": price.GasPrice,
"created_at": now,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at)
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, :created_at);`
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err)
Expand All @@ -126,20 +125,19 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect
return nil
}

now := time.Now()
insertData := make([]map[string]interface{}, 0, len(tokenPrices))
for _, price := range tokenPrices {
insertData = append(insertData, map[string]interface{}{
"chain_selector": destChainSelector,
"job_id": jobId,
"token_addr": price.TokenAddr,
"token_price": price.TokenPrice,
"created_at": now,
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at)
VALUES (:chain_selector, :job_id, :token_addr, :token_price, :created_at);`
VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
if err != nil {
err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err)
Expand All @@ -148,16 +146,16 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect
return err
}

func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < $2`
func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to)
_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}

func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < $2`
func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to)
_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}
22 changes: 12 additions & 10 deletions core/services/ccip/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) {
assert.NoError(t, err)
}

interimTimeStamp := time.Now()
sleepSec := 2
time.Sleep(time.Duration(sleepSec) * time.Second)

// insert for the 2nd time after interimTimeStamp
for _, updatesPerSelector := range updates {
Expand All @@ -222,13 +223,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) {

assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db))

// clear by interimTimeStamp should delete rows inserted before it
err := orm.ClearGasPricesByDestChain(ctx, destSelector, interimTimeStamp)
// clear by sleepSec should delete rows inserted before it
err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec)
assert.NoError(t, err)
assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db))

// clear by Now() should delete all rows
err = orm.ClearGasPricesByDestChain(ctx, destSelector, time.Now())
// clear by 0 expiration seconds should delete all rows
err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0)
assert.NoError(t, err)
assert.Equal(t, 0, getGasTableRowCount(t, db))
}
Expand Down Expand Up @@ -324,7 +325,8 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) {
assert.NoError(t, err)
}

interimTimeStamp := time.Now()
sleepSec := 2
time.Sleep(time.Duration(sleepSec) * time.Second)

// insert for the 2nd time after interimTimeStamp
for _, updatesPerAddr := range updates {
Expand All @@ -334,13 +336,13 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) {

assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db))

// clear by interimTimeStamp should delete rows inserted before it
err := orm.ClearTokenPricesByDestChain(ctx, destSelector, interimTimeStamp)
// clear by sleepSec should delete rows inserted before it
err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec)
assert.NoError(t, err)
assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db))

// clear by Now() should delete all rows
err = orm.ClearTokenPricesByDestChain(ctx, destSelector, time.Now())
// clear by 0 expiration seconds should delete all rows
err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0)
assert.NoError(t, err)
assert.Equal(t, 0, getTokenTableRowCount(t, db))
}

0 comments on commit 2e66837

Please sign in to comment.