diff --git a/models/errors/errors.go b/models/errors/errors.go index ba11350e..ab067e3d 100644 --- a/models/errors/errors.go +++ b/models/errors/errors.go @@ -19,10 +19,12 @@ var ( // General errors - ErrInternal = errors.New("internal error") - ErrInvalid = errors.New("invalid") - ErrRecoverable = errors.New("recoverable") - ErrDisconnected = NewRecoverableError(errors.New("disconnected")) + ErrInternal = errors.New("internal error") + ErrInvalid = errors.New("invalid") + ErrRecoverable = errors.New("recoverable") + ErrDisconnected = NewRecoverableError(errors.New("disconnected")) + ErrMissingBlock = errors.New("missing block") + ErrMissingTransactions = errors.New("missing transactions") // Transaction errors diff --git a/models/events.go b/models/events.go index fb44878c..76cc92e0 100644 --- a/models/events.go +++ b/models/events.go @@ -8,6 +8,8 @@ import ( "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/events" evmTypes "github.com/onflow/flow-go/fvm/evm/types" + + errs "github.com/onflow/flow-evm-gateway/models/errors" ) // isBlockExecutedEvent checks whether the given event contains block executed data. @@ -115,9 +117,13 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { } } - // safety check, we can't have an empty block with transactions + // safety check, we have a missing block in the events if e.block == nil && len(e.transactions) > 0 { - return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height) + return nil, fmt.Errorf( + "%w EVM block nil at flow block: %d", + errs.ErrMissingBlock, + events.Height, + ) } if e.block != nil { @@ -127,7 +133,8 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { } if e.block.TransactionHashRoot != txHashes.RootHash() { return nil, fmt.Errorf( - "block %d references missing transaction/s", + "%w EVM block %d references missing transaction/s", + errs.ErrMissingTransactions, e.block.Height, ) } diff --git a/services/ingestion/subscriber.go b/services/ingestion/subscriber.go index 7b2ca1fb..32bdbb13 100644 --- a/services/ingestion/subscriber.go +++ b/services/ingestion/subscriber.go @@ -2,6 +2,7 @@ package ingestion import ( "context" + "errors" "fmt" "github.com/onflow/cadence/runtime/common" @@ -33,6 +34,9 @@ type RPCSubscriber struct { chain flowGo.ChainID heartbeatInterval uint64 logger zerolog.Logger + + recovery bool + recoveredEvents []flow.Event } func NewRPCSubscriber( @@ -107,26 +111,26 @@ func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan mod // Subscribing to EVM specific events and handle any disconnection errors // as well as context cancellations. func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents { - events := make(chan models.BlockEvents) + eventsChan := make(chan models.BlockEvents) _, err := r.client.GetBlockHeaderByHeight(ctx, height) if err != nil { err = fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err) - events <- models.NewBlockEventsError(err) - return events + eventsChan <- models.NewBlockEventsError(err) + return eventsChan } - evs, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...) + eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...) if err != nil { - events <- models.NewBlockEventsError( + eventsChan <- models.NewBlockEventsError( fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err), ) - return events + return eventsChan } go func() { defer func() { - close(events) + close(eventsChan) }() for ctx.Err() == nil { @@ -135,31 +139,29 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac r.logger.Info().Msg("event ingestion received done signal") return - case blockEvents, ok := <-evs: + case blockEvents, ok := <-eventStream: if !ok { var err error err = errs.ErrDisconnected if ctx.Err() != nil { err = ctx.Err() } - events <- models.NewBlockEventsError(err) + eventsChan <- models.NewBlockEventsError(err) return } - evts := models.NewBlockEvents(blockEvents) - if evts.Err != nil { - r.logger.Warn().Err(err).Msgf( - "failed to parse EVM block events for Flow height: %d, retrying with gRPC API...", - blockEvents.Height, - ) - // call the `GetEventsForHeightRange` gRPC API endpoint to fetch - // the EVM-related events, when event streaming returned an - // inconsistent response. - events <- r.fetchBlockEvents(ctx, blockEvents) - } else { - events <- models.NewBlockEvents(blockEvents) + evmEvents := models.NewBlockEvents(blockEvents) + // if events contain an error, or we are in a recovery mode + if evmEvents.Err != nil || r.recovery { + evmEvents = r.recover(ctx, blockEvents, evmEvents.Err) + // if we are still in recovery go to the next event + if r.recovery { + continue + } } + eventsChan <- evmEvents + case err, ok := <-errChan: if !ok { var err error @@ -167,17 +169,17 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac if ctx.Err() != nil { err = ctx.Err() } - events <- models.NewBlockEventsError(err) + eventsChan <- models.NewBlockEventsError(err) return } - events <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) + eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err)) return } } }() - return events + return eventsChan } // backfill will use the provided height and with the client for the provided spork will start backfilling @@ -265,22 +267,18 @@ func (r *RPCSubscriber) blocksFilter() flow.EventFilter { } } -// fetchBlockEvents is used as a backup mechanism for fetching EVM-related +// fetchMissingData is used as a backup mechanism for fetching EVM-related // events, when the event streaming API returns an inconsistent response. // An inconsistent response could be an EVM block that references EVM -// transactions which are not present in the response. -// Under the hood, it uses the `GetEventsForHeightRange` gRPC API endpoint, -// making sure that we receive the expected events length for each event type -// and Flow height. -func (r *RPCSubscriber) fetchBlockEvents( +// transactions which are not present in the response. It falls back +// to using grpc requests instead of streaming. +func (r *RPCSubscriber) fetchMissingData( ctx context.Context, blockEvents flow.BlockEvents, ) models.BlockEvents { - blkEvents := flow.BlockEvents{ - BlockID: blockEvents.BlockID, - Height: blockEvents.Height, - BlockTimestamp: blockEvents.BlockTimestamp, - } + // remove existing events + blockEvents.Events = nil + for _, eventType := range r.blocksFilter().EventTypes { recoveredEvents, err := r.client.GetEventsForHeightRange( ctx, @@ -302,8 +300,52 @@ func (r *RPCSubscriber) fetchBlockEvents( ) } - blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...) + blockEvents.Events = append(blockEvents.Events, recoveredEvents[0].Events...) + } + + return models.NewBlockEvents(blockEvents) +} + +// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid +// EVM block event containing a block and transactions. At that point it will reset the recovery mode +// and return the valid block events. +func (r *RPCSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents { + r.recoveredEvents = append(r.recoveredEvents, events.Events...) + events.Events = r.recoveredEvents + + recovered := models.NewBlockEvents(events) + r.recovery = recovered.Err != nil + + if !r.recovery { + r.recoveredEvents = nil + } + + return recovered +} + +// recover tries to recover from an invalid data sent over the event stream. +// +// An invalid data can be a cause of corrupted index or network issue from the source, +// in which case we might miss one of the events (missing transaction), or it can be +// due to a failure from the system transaction which commits an EVM block, which results +// in missing EVM block event but present transactions. +func (r *RPCSubscriber) recover( + ctx context.Context, + events flow.BlockEvents, + err error, +) models.BlockEvents { + r.logger.Warn().Err(err).Msgf( + "failed to parse EVM block events for Flow height: %d, entering recovery", + events.Height, + ) + + if errors.Is(err, errs.ErrMissingBlock) || r.recovery { + return r.accumulateEventsMissingBlock(events) + } + + if errors.Is(err, errs.ErrMissingTransactions) { + return r.fetchMissingData(ctx, events) } - return models.NewBlockEvents(blkEvents) + return models.NewBlockEventsError(err) } diff --git a/services/ingestion/subscriber_test.go b/services/ingestion/subscriber_test.go index c867e9a1..22ac61e1 100644 --- a/services/ingestion/subscriber_test.go +++ b/services/ingestion/subscriber_test.go @@ -2,6 +2,7 @@ package ingestion import ( "context" + "fmt" "testing" "time" @@ -66,6 +67,93 @@ func Test_Subscribing(t *testing.T) { require.Equal(t, uint64(endHeight), prevHeight) } +func Test_MissingBlockEvent(t *testing.T) { + const endHeight = uint64(20) + const startHeight = uint64(1) + const missingBlockHeight = uint64(10) + const foundBlockHeight = uint64(15) + + currentClient, clientEvents := testutils.SetupClient(startHeight, endHeight) + + client, err := requester.NewCrossSporkClient( + currentClient, + nil, + zerolog.New(zerolog.NewTestWriter(t)), + flowGo.Previewnet, + ) + require.NoError(t, err) + + subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop()) + + events := subscriber.Subscribe(context.Background(), 1) + + missingHashes := make([]gethCommon.Hash, 0) + + go func() { + defer close(clientEvents) + + for i := startHeight; i <= endHeight; i++ { + txCdc, txEvent, tx, _, _ := newTransaction(i) + blockCdc, _, blockEvent, _ := newBlock(i, []gethCommon.Hash{tx.Hash()}) + + if i == foundBlockHeight { + missingHashes = append(missingHashes, tx.Hash()) + blockCdc, _, _, _ = newBlock(i, missingHashes) + } + + blockEvents := []flow.Event{ + {Value: txCdc, Type: string(txEvent.Etype)}, + {Value: blockCdc, Type: string(blockEvent.Etype)}, + } + + if i > missingBlockHeight && i < foundBlockHeight { + blockEvents = blockEvents[:1] // remove block + missingHashes = append(missingHashes, tx.Hash()) + } + + clientEvents <- flow.BlockEvents{ + Height: i, + Events: blockEvents, + } + } + }() + + var prevHeight uint64 + for ev := range events { + if prevHeight == endHeight { + require.ErrorIs(t, ev.Err, errs.ErrDisconnected) + break + } + + require.NoError(t, ev.Err) + block := ev.Events.Block() + require.NotNil(t, block) // make sure all have blocks + // make sure all normal blocks have 1 tx + if block.Height != foundBlockHeight { + require.Len(t, ev.Events.Transactions(), 1) + } + // the block that was missing has all txs + if block.Height == foundBlockHeight { + // the missing block has all the transaction in between when it was missing + require.Len(t, ev.Events.Transactions(), int(foundBlockHeight-missingBlockHeight)) + for i, h := range missingHashes { + found := false + for _, tx := range ev.Events.Transactions() { + if h.Cmp(tx.Hash()) == 0 { + found = true + } + } + require.True(t, found, fmt.Sprintf("required hash not found at index %d %s", i, h.String())) + } + } + + prevHeight = ev.Events.CadenceHeight() + } + + // this makes sure we indexed all the events + require.Equal(t, endHeight, prevHeight) +} + // Test that back-up fetching of EVM events is triggered when the // Event Streaming API returns an inconsistent response. // This scenario tests the happy path, when the back-up fetching of diff --git a/services/testutils/mock_client.go b/services/testutils/mock_client.go index ad1fb114..3e4c7faf 100644 --- a/services/testutils/mock_client.go +++ b/services/testutils/mock_client.go @@ -39,6 +39,23 @@ func (c *MockClient) SubscribeEventsByBlockHeight( } func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient { + client, events := SetupClient(startHeight, endHeight) + go func() { + defer close(events) + + for i := startHeight; i <= endHeight; i++ { + events <- flow.BlockEvents{ + Height: i, + } + } + }() + + return client +} + +func SetupClient(startHeight uint64, endHeight uint64) (*MockClient, chan flow.BlockEvents) { + events := make(chan flow.BlockEvents) + return &MockClient{ Client: &mocks.Client{}, GetLatestBlockHeaderFunc: func(ctx context.Context, sealed bool) (*flow.BlockHeader, error) { @@ -66,19 +83,7 @@ func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient { filter flow.EventFilter, opts ...access.SubscribeOption, ) (<-chan flow.BlockEvents, <-chan error, error) { - events := make(chan flow.BlockEvents) - - go func() { - defer close(events) - - for i := startHeight; i <= endHeight; i++ { - events <- flow.BlockEvents{ - Height: i, - } - } - }() - return events, make(chan error), nil }, - } + }, events }