Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add implementation for events data providers #6766

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
fed835a
Added skeleton for events data provider
AndriiDiachuk Nov 22, 2024
ea73371
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 22, 2024
9c10d24
Added initializing of events filter, added missing data to factory
AndriiDiachuk Nov 22, 2024
9547b5d
Fixed factory test
AndriiDiachuk Nov 22, 2024
b7f5ca7
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 25, 2024
0f34ae1
Added test skeleton for testing invalid arguments
AndriiDiachuk Nov 25, 2024
411f9e5
Added test for messageIndex check
AndriiDiachuk Nov 26, 2024
9a402de
Merged
AndriiDiachuk Nov 27, 2024
636740a
Added check for a valid event types in parse function
AndriiDiachuk Nov 28, 2024
f70c8e1
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 28, 2024
588688e
Changed type of arguments for consistency
AndriiDiachuk Nov 28, 2024
b537a5f
Added test case for event provider in factory_test
AndriiDiachuk Nov 28, 2024
867fcf7
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 29, 2024
dca9a25
Fixed remarks
AndriiDiachuk Nov 29, 2024
309b148
Added check for starting index value
AndriiDiachuk Dec 2, 2024
8f1f99c
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Dec 2, 2024
0abe203
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Dec 2, 2024
8ffe023
changed handleResponse to generic
AndriiDiachuk Dec 2, 2024
1384405
Added happy path for testing all subscribe methods
AndriiDiachuk Dec 3, 2024
57a7c0f
Linted
AndriiDiachuk Dec 3, 2024
3e48960
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
peterargue Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewServer(serverAPI access.API,
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
}

dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI)
dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI, chain, stateStreamConfig.EventFilterConfig)
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)

c := cors.New(cors.Options{
Expand Down
185 changes: 185 additions & 0 deletions engine/access/rest/websockets/data_providers/events_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package data_providers

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/request"
"github.com/onflow/flow-go/engine/access/rest/util"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/counters"
)

// EventsArguments contains the arguments required for subscribing to events
type EventsArguments struct {
StartBlockID flow.Identifier // ID of the block to start subscription from
StartBlockHeight uint64 // Height of the block to start subscription from
Filter state_stream.EventFilter // Filter applied to events for a given subscription
}

// EventsDataProvider is responsible for providing events
type EventsDataProvider struct {
*BaseDataProviderImpl

logger zerolog.Logger
args EventsArguments
stateStreamApi state_stream.API
}

var _ DataProvider = (*EventsDataProvider)(nil)

// NewEventsDataProvider creates a new instance of EventsDataProvider.
func NewEventsDataProvider(
ctx context.Context,
logger zerolog.Logger,
stateStreamApi state_stream.API,
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
topic string,
arguments map[string]string,
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
send chan<- interface{},
) (*EventsDataProvider, error) {
p := &EventsDataProvider{
logger: logger.With().Str("component", "events-data-provider").Logger(),
stateStreamApi: stateStreamApi,
}

// Initialize arguments passed to the provider.
var err error
p.args, err = ParseEventsArguments(arguments, chain, eventFilterConfig)
if err != nil {
return nil, fmt.Errorf("invalid arguments for events data provider: %w", err)
}

subCtx, cancel := context.WithCancel(ctx)

// Set up a subscription to events based on arguments.
sub := p.createSubscription(subCtx)

p.BaseDataProviderImpl = NewBaseDataProviderImpl(
topic,
cancel,
send,
sub,
)

return p, nil
}

// Run starts processing the subscription for events and handles responses.
//
// No errors are expected during normal operations.
func (p *EventsDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send))
}

// createSubscription creates a new subscription using the specified input arguments.
func (p *EventsDataProvider) createSubscription(ctx context.Context) subscription.Subscription {
if p.args.StartBlockID != flow.ZeroID {
return p.stateStreamApi.SubscribeEventsFromStartBlockID(ctx, p.args.StartBlockID, p.args.Filter)
}

if p.args.StartBlockHeight != request.EmptyHeight {
return p.stateStreamApi.SubscribeEventsFromStartHeight(ctx, p.args.StartBlockHeight, p.args.Filter)
}

return p.stateStreamApi.SubscribeEventsFromLatest(ctx, p.args.Filter)
}

// handleResponse processes an event and sends the formatted response.
//
// No errors are expected during normal operations.
func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(*flow.Event) error {
messageIndex := counters.NewMonotonousCounter(0)
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved

return func(event *flow.Event) error {
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}
index := messageIndex.Value()

send <- &models.EventResponse{
Event: event,
MessageIndex: strconv.FormatUint(index, 10),
}

return nil
}
}

// ParseEventsArguments validates and initializes the events arguments.
func ParseEventsArguments(
arguments map[string]string,
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
) (EventsArguments, error) {
var args EventsArguments

// Check for mutual exclusivity of start_block_id and start_block_height early
_, hasStartBlockID := arguments["start_block_id"]
_, hasStartBlockHeight := arguments["start_block_height"]
Comment on lines +118 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may as well also capture the values since you use them below


if hasStartBlockID && hasStartBlockHeight {
return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'")
}

// Parse 'start_block_id' if provided
if hasStartBlockID {
var startBlockID parser.ID
err := startBlockID.Parse(arguments["start_block_id"])
if err != nil {
return args, fmt.Errorf("invalid 'start_block_id': %w", err)
}
args.StartBlockID = startBlockID.Flow()
} else {
args.StartBlockID = flow.ZeroID
}

// Parse 'start_block_height' if provided
if hasStartBlockHeight {
var err error
args.StartBlockHeight, err = util.ToUint64(arguments["start_block_height"])
if err != nil {
return args, fmt.Errorf("invalid 'start_block_height': %w", err)
}
} else {
args.StartBlockHeight = request.EmptyHeight
}

// Parse 'event_types' as []string{}
var eventTypes []string
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" {
eventTypes = strings.Split(eventTypesIn, ",")
}

// Parse 'addresses' as []string{}
var addresses []string
if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" {
addresses = strings.Split(addressesIn, ",")
}

// Parse 'contracts' as []string{}
var contracts []string
if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" {
contracts = strings.Split(contractsIn, ",")
}

// Initialize the event filter with the parsed arguments
filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes, addresses, contracts)
if err != nil {
return args, fmt.Errorf("failed to create event filter: %w", err)
}
args.Filter = filter

return args, nil
}
181 changes: 181 additions & 0 deletions engine/access/rest/websockets/data_providers/events_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package data_providers

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream"
ssmock "github.com/onflow/flow-go/engine/access/state_stream/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

// EventsProviderSuite is a test suite for testing the events providers functionality.
type EventsProviderSuite struct {
suite.Suite

log zerolog.Logger
api *ssmock.API

chain flow.Chain
rootBlock flow.Block
finalizedBlock *flow.Header
}

func TestEventsProviderSuite(t *testing.T) {
suite.Run(t, new(EventsProviderSuite))
}

func (s *EventsProviderSuite) SetupTest() {
s.log = unittest.Logger()
s.api = ssmock.NewAPI(s.T())

s.chain = flow.Testnet.Chain()

s.rootBlock = unittest.BlockFixture()
s.rootBlock.Header.Height = 0
}

// invalidArgumentsTestCases returns a list of test cases with invalid argument combinations
// for testing the behavior of events data providers. Each test case includes a name,
// a set of input arguments, and the expected error message that should be returned.
//
// The test cases cover scenarios such as:
// 1. Supplying both 'start_block_id' and 'start_block_height' simultaneously, which is not allowed.
// 2. Providing invalid 'start_block_id' value.
// 3. Providing invalid 'start_block_height' value.
func (s *EventsProviderSuite) invalidArgumentsTestCases() []testErrType {
return []testErrType{
{
name: "provide both 'start_block_id' and 'start_block_height' arguments",
arguments: map[string]string{
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
"start_block_id": s.rootBlock.ID().String(),
"start_block_height": fmt.Sprintf("%d", s.rootBlock.Header.Height),
},
expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'",
},
{
name: "invalid 'start_block_id' argument",
arguments: map[string]string{
"start_block_id": "invalid_block_id",
},
expectedErrorMsg: "invalid ID format",
},
{
name: "invalid 'start_block_height' argument",
arguments: map[string]string{
"start_block_height": "-1",
},
expectedErrorMsg: "value must be an unsigned 64 bit integer",
},
}
}

// TestEventsDataProvider_InvalidArguments tests the behavior of the event data provider
// when invalid arguments are provided. It verifies that appropriate errors are returned
// for missing or conflicting arguments.
// This test covers the test cases:
// 1. Providing both 'start_block_id' and 'start_block_height' simultaneously.
// 2. Invalid 'start_block_id' argument.
// 3. Invalid 'start_block_height' argument.
func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() {
ctx := context.Background()
send := make(chan interface{})

topic := EventsTopic

for _, test := range s.invalidArgumentsTestCases() {
s.Run(test.name, func() {
provider, err := NewEventsDataProvider(
ctx,
s.log,
s.api,
s.chain,
state_stream.DefaultEventFilterConfig,
topic,
test.arguments,
send)
s.Require().Nil(provider)
s.Require().Error(err)
s.Require().Contains(err.Error(), test.expectedErrorMsg)
})
}
}

func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()
send := make(chan interface{}, 10)
topic := EventsTopic
eventsCount := 4

// Create a channel to simulate the subscription's event channel
eventChan := make(chan interface{})

// Create a mock subscription and mock the channel
sub := ssmock.NewSubscription(s.T())
sub.On("Channel").Return((<-chan interface{})(eventChan))
sub.On("Err").Return(nil)

s.api.On("SubscribeEventsFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub)

arguments :=
map[string]string{
"start_block_id": s.rootBlock.ID().String(),
}

// Create the EventsDataProvider instance
provider, err := NewEventsDataProvider(
ctx,
s.log,
s.api,
s.chain,
state_stream.DefaultEventFilterConfig,
topic,
arguments,
send)
s.Require().NotNil(provider)
s.Require().NoError(err)

// Run the provider in a separate goroutine to simulate subscription processing
go func() {
err = provider.Run()
s.Require().NoError(err)
}()

// Simulate emitting events to the event channel
go func() {
defer close(eventChan) // Close the channel when done

for i := 0; i < eventsCount; i++ {
eventChan <- &flow.Event{
Type: "flow.AccountCreated",
}
}
}()

// Collect responses
var responses []*models.EventResponse
for i := 0; i < eventsCount; i++ {
res := <-send
eventRes, ok := res.(*models.EventResponse)
s.Require().True(ok, "Expected *models.EventResponse, got %T", res)
responses = append(responses, eventRes)
}

// Verifying that indices are strictly increasing
for i := 1; i < len(responses); i++ {
prevIndex, _ := strconv.Atoi(responses[i-1].MessageIndex)
currentIndex, _ := strconv.Atoi(responses[i].MessageIndex)
s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1")
}

AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
// Ensure the provider is properly closed after the test
provider.Close()
}
Loading
Loading