Skip to content

Commit

Permalink
BCF-2324: isolate evm tables in specific schema (#10609)
Browse files Browse the repository at this point in the history
* BCF-2324: isolate evm tables in specific schema

* fix tests; fix bad replace all

* fix linter

* mv eth_XXX to evm.XXX; update and take care with triggers
  • Loading branch information
krehermann authored Sep 13, 2023
1 parent 18cd6e4 commit 350c3d4
Show file tree
Hide file tree
Showing 36 changed files with 558 additions and 455 deletions.
4 changes: 2 additions & 2 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func NewTOMLChainScopedConfig(genCfg gencfg.AppConfig, tomlConfig *toml.EVMConfig, lggr logger.Logger) *ChainScoped {
func NewTOMLChainScopedConfig(appCfg gencfg.AppConfig, tomlConfig *toml.EVMConfig, lggr logger.Logger) *ChainScoped {
return &ChainScoped{
AppConfig: genCfg,
AppConfig: appCfg,
evmConfig: &evmConfig{c: tomlConfig},
lggr: lggr}
}
Expand Down
14 changes: 7 additions & 7 deletions core/chains/evm/forwarders/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *orm {

// CreateForwarder creates the Forwarder address associated with the current EVM chain id.
func (o *orm) CreateForwarder(addr common.Address, evmChainId utils.Big) (fwd Forwarder, err error) {
sql := `INSERT INTO evm_forwarders (address, evm_chain_id, created_at, updated_at) VALUES ($1, $2, now(), now()) RETURNING *`
sql := `INSERT INTO evm.forwarders (address, evm_chain_id, created_at, updated_at) VALUES ($1, $2, now(), now()) RETURNING *`
err = o.q.Get(&fwd, sql, addr, evmChainId)
return fwd, err
}
Expand All @@ -50,7 +50,7 @@ func (o *orm) DeleteForwarder(id int64, cleanup func(tx pg.Queryer, evmChainID i

var rowsAffected int64
err = o.q.Transaction(func(tx pg.Queryer) error {
err = tx.Get(&dest, `SELECT evm_chain_id, address FROM evm_forwarders WHERE id = $1`, id)
err = tx.Get(&dest, `SELECT evm_chain_id, address FROM evm.forwarders WHERE id = $1`, id)
if err != nil {
return err
}
Expand All @@ -60,7 +60,7 @@ func (o *orm) DeleteForwarder(id int64, cleanup func(tx pg.Queryer, evmChainID i
}
}

result, err2 := o.q.Exec(`DELETE FROM evm_forwarders WHERE id = $1`, id)
result, err2 := o.q.Exec(`DELETE FROM evm.forwarders WHERE id = $1`, id)
// If the forwarder wasn't found, we still want to delete the filter.
// In that case, the transaction must return nil, even though DeleteForwarder
// will return sql.ErrNoRows
Expand All @@ -80,12 +80,12 @@ func (o *orm) DeleteForwarder(id int64, cleanup func(tx pg.Queryer, evmChainID i

// FindForwarders returns all forwarder addresses from offset up until limit.
func (o *orm) FindForwarders(offset, limit int) (fwds []Forwarder, count int, err error) {
sql := `SELECT count(*) FROM evm_forwarders`
sql := `SELECT count(*) FROM evm.forwarders`
if err = o.q.Get(&count, sql); err != nil {
return
}

sql = `SELECT * FROM evm_forwarders ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2`
sql = `SELECT * FROM evm.forwarders ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2`
if err = o.q.Select(&fwds, sql, limit, offset); err != nil {
return
}
Expand All @@ -94,7 +94,7 @@ func (o *orm) FindForwarders(offset, limit int) (fwds []Forwarder, count int, er

// FindForwardersByChain returns all forwarder addresses for a chain.
func (o *orm) FindForwardersByChain(evmChainId utils.Big) (fwds []Forwarder, err error) {
sql := `SELECT * FROM evm_forwarders where evm_chain_id = $1 ORDER BY created_at DESC, id DESC`
sql := `SELECT * FROM evm.forwarders where evm_chain_id = $1 ORDER BY created_at DESC, id DESC`
err = o.q.Select(&fwds, sql, evmChainId)
return
}
Expand All @@ -108,7 +108,7 @@ func (o *orm) FindForwardersInListByChain(evmChainId utils.Big, addrs []common.A
}

query, args, err := sqlx.Named(`
SELECT * FROM evm_forwarders
SELECT * FROM evm.forwarders
WHERE evm_chain_id = :chainid
AND address IN (:addresses)
ORDER BY created_at DESC, id DESC`,
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/headtracker/head_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

func firstHead(t *testing.T, db *sqlx.DB) (h evmtypes.Head) {
if err := db.Get(&h, `SELECT * FROM evm_heads ORDER BY number ASC LIMIT 1`); err != nil {
if err := db.Get(&h, `SELECT * FROM evm.heads ORDER BY number ASC LIMIT 1`); err != nil {
t.Fatal(err)
}
return h
Expand Down
12 changes: 6 additions & 6 deletions core/chains/evm/headtracker/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (orm *orm) IdempotentInsertHead(ctx context.Context, head *evmtypes.Head) e
// listener guarantees head.EVMChainID to be equal to orm.chainID
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
query := `
INSERT INTO evm_heads (hash, number, parent_hash, created_at, timestamp, l1_block_number, evm_chain_id, base_fee_per_gas) VALUES (
INSERT INTO evm.heads (hash, number, parent_hash, created_at, timestamp, l1_block_number, evm_chain_id, base_fee_per_gas) VALUES (
:hash, :number, :parent_hash, :created_at, :timestamp, :l1_block_number, :evm_chain_id, :base_fee_per_gas)
ON CONFLICT (evm_chain_id, hash) DO NOTHING`
err := q.ExecQNamed(query, head)
Expand All @@ -53,11 +53,11 @@ func (orm *orm) IdempotentInsertHead(ctx context.Context, head *evmtypes.Head) e
func (orm *orm) TrimOldHeads(ctx context.Context, n uint) (err error) {
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
return q.ExecQ(`
DELETE FROM evm_heads
DELETE FROM evm.heads
WHERE evm_chain_id = $1 AND number < (
SELECT min(number) FROM (
SELECT number
FROM evm_heads
FROM evm.heads
WHERE evm_chain_id = $1
ORDER BY number DESC
LIMIT $2
Expand All @@ -68,7 +68,7 @@ func (orm *orm) TrimOldHeads(ctx context.Context, n uint) (err error) {
func (orm *orm) LatestHead(ctx context.Context) (head *evmtypes.Head, err error) {
head = new(evmtypes.Head)
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
err = q.Get(head, `SELECT * FROM evm_heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT 1`, orm.chainID)
err = q.Get(head, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT 1`, orm.chainID)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
Expand All @@ -78,15 +78,15 @@ func (orm *orm) LatestHead(ctx context.Context) (head *evmtypes.Head, err error)

func (orm *orm) LatestHeads(ctx context.Context, limit uint) (heads []*evmtypes.Head, err error) {
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
err = q.Select(&heads, `SELECT * FROM evm_heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT $2`, orm.chainID, limit)
err = q.Select(&heads, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT $2`, orm.chainID, limit)
err = errors.Wrap(err, "LatestHeads failed")
return
}

func (orm *orm) HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) {
q := orm.q.WithOpts(pg.WithParentCtx(ctx))
head = new(evmtypes.Head)
err = q.Get(head, `SELECT * FROM evm_heads WHERE evm_chain_id = $1 AND hash = $2`, orm.chainID, hash)
err = q.Get(head, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 AND hash = $2`, orm.chainID, hash)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// that applications see them and take action upon it, however that
// results in significantly slower reads since we must then compute
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. eth_txes, so it seems better to just support the fast reads.
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
// These deletes are bounded by reorg depth, so they are
Expand Down
Loading

0 comments on commit 350c3d4

Please sign in to comment.