Skip to content

Commit

Permalink
[BCI-3988] - FilteredLogs receive []Expression instead of whole KeyFi…
Browse files Browse the repository at this point in the history
…lter (#14109)

* FilteredLogs receive []Expression instead of whole KeyFilter

* remove key from query.Where KeyFilter creation

* add changeset

* remove brackets from changeset

* fix usage

* fix comment lint

* remove todo

* refactor based on comments

* add where func inside logpoller without key

* fix reference
  • Loading branch information
Farber98 authored Sep 4, 2024
1 parent 33e6a0c commit 2761cd5
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 83 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-geese-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

FilteredLogs receive Expression instead of whole KeyFilter. #internal
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) {
func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) {
return nil, ErrDisabled
}

Expand Down
23 changes: 21 additions & 2 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type LogPoller interface {
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -1518,6 +1518,25 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}

// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter.
// If no expressions are provided, or an error occurs, an empty slice is returned.
func Where(expressions ...query.Expression) ([]query.Expression, error) {
filter, err := query.Where(
"",
expressions...,
)

if err != nil {
return []query.Expression{}, err
}

if filter.Expressions == nil {
return []query.Expression{}, nil
}

return filter.Expressions, nil
}
38 changes: 38 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils"

htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
Expand Down Expand Up @@ -2091,3 +2093,39 @@ func TestFindLCA(t *testing.T) {
})
}
}

func TestWhere(t *testing.T) {
address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678")
eventSig := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234")
ts := time.Now()

expr1 := logpoller.NewAddressFilter(address)
expr2 := logpoller.NewEventSigFilter(eventSig)
expr3 := query.Timestamp(uint64(ts.Unix()), primitives.Gte)
expr4 := logpoller.NewConfirmationsFilter(evmtypes.Confirmations(0))

t.Run("Valid combination of filters", func(t *testing.T) {
result, err := logpoller.Where(expr1, expr2, expr3, expr4)
assert.NoError(t, err)
assert.Equal(t, []query.Expression{expr1, expr2, expr3, expr4}, result)
})

t.Run("No expressions (should return empty slice)", func(t *testing.T) {
result, err := logpoller.Where()
assert.NoError(t, err)
assert.Equal(t, []query.Expression{}, result)
})

t.Run("Invalid boolean expression", func(t *testing.T) {
invalidExpr := query.Expression{
BoolExpression: query.BoolExpression{
Expressions: []query.Expression{},
},
}

result, err := logpoller.Where(invalidExpr)
assert.Error(t, err)
assert.EqualError(t, err, "all boolean expressions should have at least 2 expressions")
assert.Equal(t, []query.Expression{}, result)
})
}
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c
})
}

func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName)
})
Expand Down
7 changes: 3 additions & 4 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type ORM interface {
SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// FilteredLogs accepts chainlink-common filtering DSL.
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type DSORM struct {
Expand Down Expand Up @@ -964,9 +964,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
return logs, nil
}

// TODO flaky BCF-3258
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort)
func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort)
if err != nil {
return nil, err
}
Expand Down
110 changes: 50 additions & 60 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func TestORM_IndexedLogs(t *testing.T) {
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}).Expressions, limiter, "")
require.NoError(t, err)
require.Equal(t, 1, len(lgs))
assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes())
Expand All @@ -640,19 +640,17 @@ func TestORM_IndexedLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter, "")
lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}).Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
filtersForTopics(topicIdx, topicValues),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
},
blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
filtersForTopics(topicIdx, topicValues),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
}
}

Expand Down Expand Up @@ -711,23 +709,21 @@ func TestORM_IndexedLogs(t *testing.T) {
},
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter, "")
lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

rangeFilter := func(topicIdx uint64, min, max uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
},
rangeFilter := func(topicIdx uint64, min, max uint64) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
}
}

Expand Down Expand Up @@ -835,7 +831,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) {
},
}

retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter, "")
retrievedLogs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "")
require.NoError(t, err)

require.Equal(t, 2, len(retrievedLogs))
Expand Down Expand Up @@ -876,19 +872,17 @@ func TestORM_DataWords(t *testing.T) {
},
}))

wordFilter := func(wordIdx uint8, word1, word2 uint64) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
},
wordFilter := func(wordIdx uint8, word1, word2 uint64) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte},
}),
logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{
{Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte},
}),
query.Confidence(primitives.Unconfirmed),
}
}

Expand Down Expand Up @@ -947,15 +941,13 @@ func TestORM_DataWords(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

filter := query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{
{Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte},
}),
query.Confidence(primitives.Unconfirmed),
},
filter := []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(eventSig),
logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{
{Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte},
}),
query.Confidence(primitives.Unconfirmed),
}

lgs, err = o1.FilteredLogs(ctx, filter, limiter, "")
Expand Down Expand Up @@ -1099,7 +1091,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
})

assertion(t, logs, err, startBlock, endBlock)
logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))).Expressions, limiter, "")

assertion(t, logs, err, startBlock, endBlock)
}
Expand Down Expand Up @@ -1161,14 +1153,12 @@ func TestLogPoller_Logs(t *testing.T) {
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[4].BlockHash.String())
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String())

logFilter := func(start, end string, address common.Address) query.KeyFilter {
return query.KeyFilter{
Expressions: []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(event1),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
},
logFilter := func(start, end string, address common.Address) []query.Expression {
return []query.Expression{
logpoller.NewAddressFilter(address),
logpoller.NewEventSigFilter(event1),
query.Block(start, primitives.Gte),
query.Block(end, primitives.Lte),
}
}

Expand Down Expand Up @@ -1722,7 +1712,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand All @@ -1735,7 +1725,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand Down Expand Up @@ -1991,7 +1981,7 @@ func TestSelectLogsDataWordBetween(t *testing.T) {

assertion(t, logs, err, tt.expectedLogs)

logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter, "")
logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue).Expressions, limiter, "")

assertion(t, logs, err, tt.expectedLogs)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context,
return nil, err
}

reportsQuery, err := query.Where(
c.address.String(),
reportsQuery, err := logpoller.Where(
logpoller.NewAddressFilter(c.address),
logpoller.NewEventSigFilter(c.reportAcceptedSig),
query.Timestamp(uint64(ts.Unix()), primitives.Gte),
Expand Down
Loading

0 comments on commit 2761cd5

Please sign in to comment.