Skip to content

Commit

Permalink
Merge pull request #5 from kaleido-io/hwm-protection
Browse files Browse the repository at this point in the history
When new listener joins avoid delivering events before HWM
  • Loading branch information
nguyer authored Aug 15, 2022
2 parents 703559c + 95efaa8 commit 3b3313a
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 22 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ go 1.17
require (
github.com/go-resty/resty/v2 v2.7.0
github.com/hashicorp/golang-lru v0.5.4
github.com/hyperledger/firefly-common v0.1.18
github.com/hyperledger/firefly-common v0.1.20
github.com/hyperledger/firefly-signer v0.9.13
github.com/hyperledger/firefly-transaction-manager v0.9.3
github.com/hyperledger/firefly-transaction-manager v0.9.4
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.0.0-20220531201128-c960675eff93
golang.org/x/text v0.3.7
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -42,7 +42,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.8.2 // indirect
Expand All @@ -51,8 +51,8 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.12.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/spf13/viper v1.12.1-0.20220712161005-5247643f0235 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/subosito/gotenv v1.4.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
Expand Down
21 changes: 13 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,12 @@ github.com/hashicorp/serf v0.9.7/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpT
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hyperledger/firefly-common v0.1.13/go.mod h1:2NqPi5Ud9H6rSlZXkLbotxW7z4EAD89p3/8oNOpm9Gs=
github.com/hyperledger/firefly-common v0.1.17-0.20220808193503-961a6b241a1a/go.mod h1:MNbaI2spBsdZYOub6Duj9xueE7Qyu9itOmJ4vE8tjYw=
github.com/hyperledger/firefly-common v0.1.18 h1:wfXEDToFYMsoTSacx7Wb+ufIswtUL6kzCDyhmwkWXA0=
github.com/hyperledger/firefly-common v0.1.18/go.mod h1:MNbaI2spBsdZYOub6Duj9xueE7Qyu9itOmJ4vE8tjYw=
github.com/hyperledger/firefly-common v0.1.20 h1:0dShkjlIShyBxkXRmu3vLmpEK6xrqmfc8GhF6k0Vgbg=
github.com/hyperledger/firefly-common v0.1.20/go.mod h1:gMlv4Iy5JjnzXmSEdb+tWVDIc/2GhL9MRcgNX+VmI4M=
github.com/hyperledger/firefly-signer v0.9.13 h1:yvKxYTsEmE0XWl0vcZBQV353YmmePvWwIPMr4Lie67o=
github.com/hyperledger/firefly-signer v0.9.13/go.mod h1:GPQRUZOFOAjkLmg8GDjZUjEdUD0gcar+CSVhwltIwyw=
github.com/hyperledger/firefly-transaction-manager v0.9.3 h1:/zWrZVBJ8+VfgXdS9PHrt6m5mxMVqX3+ke60BjUYasY=
github.com/hyperledger/firefly-transaction-manager v0.9.3/go.mod h1:GuwXVVXI6p3tNk99jbYi4PopMzipVBwy3uu5wSKXJEc=
github.com/hyperledger/firefly-transaction-manager v0.9.4 h1:/0mqyDZXXOs7z1VbajmTYiKp1riaYZi8NFQm2ut7b5g=
github.com/hyperledger/firefly-transaction-manager v0.9.4/go.mod h1:GuwXVVXI6p3tNk99jbYi4PopMzipVBwy3uu5wSKXJEc=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
Expand Down Expand Up @@ -402,8 +402,9 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU=
github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/pelletier/go-toml/v2 v2.0.2 h1:+jQXlF3scKIcSEKkdHzXhCTDLPFi5r1wnK6yPS+49Gw=
github.com/pelletier/go-toml/v2 v2.0.2/go.mod h1:MovirKjgVRESsAvNZlAjtFwV867yGuwRkXbG66OzopI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -459,20 +460,24 @@ github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmq
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
github.com/spf13/viper v1.12.1-0.20220712161005-5247643f0235 h1:azjn5/lAGpcMny6s1fW/y6rXTu2YUOA+a2C6wKpgpkw=
github.com/spf13/viper v1.12.1-0.20220712161005-5247643f0235/go.mod h1:f40df4ovE8V1ot0NXmYP1zUDS+X1D5AXGviq9fCJqZU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/subosito/gotenv v1.4.0 h1:yAzM1+SmVcz5R4tXGsNMu1jUl2aOJXoiWUCEwwnGrvs=
github.com/subosito/gotenv v1.4.0/go.mod h1:mZd6rFysKEcUhUHXJk0C/08wAgyDBFuwEYL7vWWGaGo=
Expand Down
1 change: 1 addition & 0 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) int64 {
bl.mux.Lock()
highestBlock = bl.highestBlock
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock
}

Expand Down
2 changes: 2 additions & 0 deletions internal/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-evmconnect/mocks/jsonrpcmocks"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -37,6 +38,7 @@ func newTestConnector(t *testing.T) (context.Context, *ethConnector, *jsonrpcmoc
InitConfig(conf)
conf.Set(ffresty.HTTPConfigURL, "http://localhost:8545")
conf.Set(BlockPollingInterval, "1h") // Disable for tests that are not using it
logrus.SetLevel(logrus.DebugLevel)
ctx, done := context.WithCancel(context.Background())
cc, err := NewEthereumConnector(ctx, conf)
assert.NoError(t, err)
Expand Down
10 changes: 8 additions & 2 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (l *listener) checkReadyForLeadPackOrRemoved(ctx context.Context) (bool, bo
headBlock := l.es.headBlock
blockGap := headBlock - l.hwmBlock
readyForLead := blockGap < l.c.catchupThreshold
log.L(ctx).Debugf("Listener %s head=%d gap=%d readyForLead=%t", l.id, headBlock, blockGap, readyForLead)
log.L(ctx).Debugf("Listener %s head=%d hwm=%d (gap=%d) readyForLead=%t", l.id, headBlock, l.hwmBlock, blockGap, readyForLead)
return readyForLead, l.removed
}

Expand Down Expand Up @@ -272,11 +272,17 @@ func (l *listener) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo

func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool) {

// Apply a post-filter check to the event
// Check the block for this event is at our high water mark, as we might have rewound for other listeners
blockNumber := ethLog.BlockNumber.BigInt().Int64()
transactionIndex := ethLog.TransactionIndex.BigInt().Int64()
logIndex := ethLog.LogIndex.BigInt().Int64()
protoID := getEventProtoID(blockNumber, transactionIndex, logIndex)
if blockNumber < l.hwmBlock {
log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, protoID, l.hwmBlock)
return nil, false
}

// Apply a post-filter check to the event
topicMatches := len(ethLog.Topics) > 0 && bytes.Equal(ethLog.Topics[0], f.Topic0)
addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:])
if !topicMatches || !addrMatches {
Expand Down
16 changes: 16 additions & 0 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,22 @@ func TestSerializeEventDataFail(t *testing.T) {

}

func TestFilterEnrichEthLogBlockBelowHWM(t *testing.T) {

l, _, _ := newTestListener(t, true)

var abiEvent *abi.Entry
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

l.hwmBlock = 2
_, ok := l.filterEnrichEthLog(context.Background(), l.config.filters[0], &logJSONRPC{
BlockNumber: ethtypes.NewHexInteger64(1),
})
assert.False(t, ok)

}

func TestFilterEnrichEthLogAddressMismatch(t *testing.T) {

l, _, _ := newTestListener(t, true)
Expand Down
1 change: 1 addition & 0 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (es *eventStream) addEventListener(ctx context.Context, req *ffcapi.EventLi
if err := l.ensureHWM(ctx); err != nil {
return nil, err
}
log.L(es.ctx).Infof("Initialized listener '%s' (FromBlock=%s) Block=%d Checkpoint=%+v", l.id, l.config.fromBlock, l.hwmBlock, checkpoint)

es.updateCount++
es.listeners[*req.ListenerID] = l
Expand Down
53 changes: 47 additions & 6 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ func TestCatchupThenRejoinLeadGroup(t *testing.T) {
for {
assert.True(t, time.Since(started) < 5*time.Second)
if l.catchup {
time.Sleep(1 * time.Microsecond)
time.Sleep(1 * time.Millisecond)
continue
}
if es.headBlock != testHighBlock-es.c.checkpointBlockGap {
time.Sleep(1 * time.Microsecond)
time.Sleep(1 * time.Millisecond)
continue
}
break
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{
{
BlockNumber: ethtypes.NewHexInteger64(1024),
BlockNumber: ethtypes.NewHexInteger64(212122),
TransactionIndex: ethtypes.NewHexInteger64(64),
LogIndex: ethtypes.NewHexInteger64(2),
BlockHash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
Expand All @@ -364,7 +364,7 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
}).Once()
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(1024),
Number: ethtypes.NewHexInteger64(212122),
Hash: ethtypes.MustNewHexBytes0xPrefix("0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c"),
}
})
Expand All @@ -379,10 +379,10 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
defer done()

e := <-events
assert.Equal(t, fftypes.FFuint64(1024), e.Event.ID.BlockNumber)
assert.Equal(t, fftypes.FFuint64(212122), e.Event.ID.BlockNumber)
assert.Equal(t, fftypes.FFuint64(64), e.Event.ID.TransactionIndex)
assert.Equal(t, fftypes.FFuint64(2), e.Event.ID.LogIndex)
assert.Equal(t, int64(1024), e.Checkpoint.(*listenerCheckpoint).Block)
assert.Equal(t, int64(212122), e.Checkpoint.(*listenerCheckpoint).Block)
assert.Equal(t, int64(64), e.Checkpoint.(*listenerCheckpoint).TransactionIndex)
assert.Equal(t, int64(2), e.Checkpoint.(*listenerCheckpoint).LogIndex)
assert.NotNil(t, e.Event)
Expand All @@ -391,6 +391,47 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
assert.Equal(t, "1000", e.Event.Data.JSONObject().GetString("value"))
}

func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) {

l1req := &ffcapi.EventListenerAddRequest{
ListenerID: fftypes.NewUUID(),
EventListenerOptions: ffcapi.EventListenerOptions{
Filters: []fftypes.JSONAny{
*fftypes.JSONAnyPtr(`{"address":"0xc89E46EEED41b777ca6625d37E1Cc87C5c037828","event":` + abiTransferEvent + `}`),
},
Options: fftypes.JSONAnyPtr(`{}`),
FromBlock: "0",
},
}

ctx, c, mRPC, done := newTestConnector(t)

filtered := make(chan struct{})
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
*args[1].(*ethtypes.HexInteger) = *ethtypes.NewHexInteger64(10)
})
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_newFilter", mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
assert.Equal(t, int64(0), args[3].(*logFilterJSONRPC).FromBlock.BigInt().Int64())
*args[1].(*string) = "filter_id1"
}).Once()
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Once().Run(func(args mock.Arguments) {
close(filtered)
})
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe()
mRPC.On("Invoke", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()

_, _, _, done = testEventStreamExistingConnector(t, ctx, done, c, mRPC, l1req)
defer done()

<-filtered
mRPC.AssertExpectations(t)
}

func TestLeadGroupCatchupRetry(t *testing.T) {

l1req := &ffcapi.EventListenerAddRequest{
Expand Down

0 comments on commit 3b3313a

Please sign in to comment.