diff --git a/api/groups/interface.go b/api/groups/interface.go index 22674215..acd2375f 100644 --- a/api/groups/interface.go +++ b/api/groups/interface.go @@ -10,8 +10,7 @@ import ( // EventsFacadeHandler defines the behavior of a facade handler needed for events group type EventsFacadeHandler interface { - HandlePushEventsV2(events data.ArgsSaveBlockData) error - HandlePushEventsV1(events data.SaveBlockData) error + HandlePushEvents(events data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) GetConnectorUserAndPass() (string, string) diff --git a/api/shared/interface.go b/api/shared/interface.go index 84d53dbb..da4fc4d9 100644 --- a/api/shared/interface.go +++ b/api/shared/interface.go @@ -24,8 +24,7 @@ type GroupHandler interface { // FacadeHandler defines the behavior of a notifier base facade handler type FacadeHandler interface { - HandlePushEventsV2(events data.ArgsSaveBlockData) error - HandlePushEventsV1(events data.SaveBlockData) error + HandlePushEvents(events data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) GetConnectorUserAndPass() (string, string) diff --git a/common/errors.go b/common/errors.go index 686e6ec7..cdbb762a 100644 --- a/common/errors.go +++ b/common/errors.go @@ -31,3 +31,9 @@ var ErrNilFacadeHandler = errors.New("nil facade handler") // ErrNilStatusMetricsHandler signals that a nil status metrics handler has been provided var ErrNilStatusMetricsHandler = errors.New("nil status metrics handler") + +// ErrWrongTypeAssertion signals a wrong type assertion +var ErrWrongTypeAssertion = errors.New("wrong type assertion") + +// ErrLoopAlreadyStarted signals that a loop has already been started +var ErrLoopAlreadyStarted = errors.New("loop already started") diff --git a/disabled/disabledHub.go b/disabled/disabledHub.go index 2b808dad..f819e711 100644 --- a/disabled/disabledHub.go +++ b/disabled/disabledHub.go @@ -9,32 +9,28 @@ import ( type Hub struct { } -// Run does nothing -func (h *Hub) Run() { +// Publish does nothing +func (h *Hub) Publish(events data.BlockEvents) { } -// Broadcast does nothing -func (h *Hub) Broadcast(_ data.BlockEvents) { +// PublishRevert does nothing +func (h *Hub) PublishRevert(revertBlock data.RevertBlock) { } -// BroadcastRevert does nothing -func (h *Hub) BroadcastRevert(_ data.RevertBlock) { +// PublishFinalized does nothing +func (h *Hub) PublishFinalized(finalizedBlock data.FinalizedBlock) { } -// BroadcastFinalized does nothing -func (h *Hub) BroadcastFinalized(_ data.FinalizedBlock) { +// PublishTxs does nothing +func (h *Hub) PublishTxs(blockTxs data.BlockTxs) { } -// BroadcastTxs does nothing -func (h *Hub) BroadcastTxs(_ data.BlockTxs) { +// PublishScrs does nothing +func (h *Hub) PublishScrs(blockScrs data.BlockScrs) { } -// BroadcastScrs does nothing -func (h *Hub) BroadcastScrs(_ data.BlockScrs) { -} - -// BroadcastBlockEventsWithOrder does nothing -func (h *Hub) BroadcastBlockEventsWithOrder(_ data.BlockEventsWithOrder) { +// PublishBlockEventsWithOrder does nothing +func (h *Hub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { } // RegisterEvent does nothing diff --git a/dispatcher/hub/commonHub.go b/dispatcher/hub/commonHub.go index 45cc59ee..027594c9 100644 --- a/dispatcher/hub/commonHub.go +++ b/dispatcher/hub/commonHub.go @@ -1,7 +1,6 @@ package hub import ( - "context" "sync" "github.com/google/uuid" @@ -22,20 +21,10 @@ type ArgsCommonHub struct { } type commonHub struct { - filter filters.EventFilter - subscriptionMapper dispatcher.SubscriptionMapperHandler - mutDispatchers sync.RWMutex - dispatchers map[uuid.UUID]dispatcher.EventDispatcher - register chan dispatcher.EventDispatcher - unregister chan dispatcher.EventDispatcher - broadcast chan data.BlockEvents - broadcastRevert chan data.RevertBlock - broadcastFinalized chan data.FinalizedBlock - broadcastTxs chan data.BlockTxs - broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder - broadcastScrs chan data.BlockScrs - closeChan chan struct{} - cancelFunc func() + filter filters.EventFilter + subscriptionMapper dispatcher.SubscriptionMapperHandler + mutDispatchers sync.RWMutex + dispatchers map[uuid.UUID]dispatcher.EventDispatcher } // NewCommonHub creates a new commonHub instance @@ -46,19 +35,10 @@ func NewCommonHub(args ArgsCommonHub) (*commonHub, error) { } return &commonHub{ - mutDispatchers: sync.RWMutex{}, - filter: args.Filter, - subscriptionMapper: args.SubscriptionMapper, - dispatchers: make(map[uuid.UUID]dispatcher.EventDispatcher), - register: make(chan dispatcher.EventDispatcher), - unregister: make(chan dispatcher.EventDispatcher), - broadcast: make(chan data.BlockEvents), - broadcastRevert: make(chan data.RevertBlock), - broadcastFinalized: make(chan data.FinalizedBlock), - broadcastTxs: make(chan data.BlockTxs), - broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder), - broadcastScrs: make(chan data.BlockScrs), - closeChan: make(chan struct{}), + mutDispatchers: sync.RWMutex{}, + filter: args.Filter, + subscriptionMapper: args.SubscriptionMapper, + dispatchers: make(map[uuid.UUID]dispatcher.EventDispatcher), }, nil } @@ -73,131 +53,27 @@ func checkArgs(args ArgsCommonHub) error { return nil } -// Run is launched as a goroutine and listens for events on the exposed channels -func (ch *commonHub) Run() { - var ctx context.Context - ctx, ch.cancelFunc = context.WithCancel(context.Background()) - - go ch.run(ctx) -} - -func (ch *commonHub) run(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Debug("commonHub is stopping...") - return - - case events := <-ch.broadcast: - ch.handleBroadcast(events) - - case revertEvent := <-ch.broadcastRevert: - ch.handleRevertBroadcast(revertEvent) - - case finalizedEvent := <-ch.broadcastFinalized: - ch.handleFinalizedBroadcast(finalizedEvent) - - case txsEvent := <-ch.broadcastTxs: - ch.handleTxsBroadcast(txsEvent) - - case txsEvent := <-ch.broadcastBlockEventsWithOrder: - ch.handleBlockEventsWithOrderBroadcast(txsEvent) - - case scrsEvent := <-ch.broadcastScrs: - ch.handleScrsBroadcast(scrsEvent) - - case dispatcherClient := <-ch.register: - ch.registerDispatcher(dispatcherClient) - - case dispatcherClient := <-ch.unregister: - ch.unregisterDispatcher(dispatcherClient) - } - } -} - // Subscribe is used by a dispatcher to send a dispatcher.SubscribeEvent func (ch *commonHub) Subscribe(event data.SubscribeEvent) { ch.subscriptionMapper.MatchSubscribeEvent(event) } -// Broadcast handles block events pushed by producers into the broadcast channel -// Upon reading the channel, the hub notifies the registered dispatchers, if any -func (ch *commonHub) Broadcast(events data.BlockEvents) { - select { - case ch.broadcast <- events: - case <-ch.closeChan: - } -} - -// BroadcastRevert handles revert event pushed by producers into the broadcast channel -// Upon reading the channel, the hub notifies the registered dispatchers, if any -func (ch *commonHub) BroadcastRevert(event data.RevertBlock) { - select { - case ch.broadcastRevert <- event: - case <-ch.closeChan: - } -} - -// BroadcastFinalized handles finalized event pushed by producers into the broadcast channel -// Upon reading the channel, the hub notifies the registered dispatchers, if any -func (ch *commonHub) BroadcastFinalized(event data.FinalizedBlock) { - select { - case ch.broadcastFinalized <- event: - case <-ch.closeChan: - } -} - -// BroadcastTxs handles block txs event pushed by producers into the broadcast channel -// Upon reading the channel, the hub notifies the registered dispatchers, if any -func (ch *commonHub) BroadcastTxs(event data.BlockTxs) { - select { - case ch.broadcastTxs <- event: - case <-ch.closeChan: - } -} - -// BroadcastScrs handles block scrs event pushed by producers into the broadcast channel -// Upon reading the channel, the hub notifies the registered dispatchers, if any -func (ch *commonHub) BroadcastScrs(event data.BlockScrs) { - select { - case ch.broadcastScrs <- event: - case <-ch.closeChan: - } -} - -// BroadcastBlockEventsWithOrder handles full block events pushed by producers into the channel -func (ch *commonHub) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) { - select { - case ch.broadcastBlockEventsWithOrder <- event: - case <-ch.closeChan: - } -} - // RegisterEvent will send event to a receive-only channel used to register dispatchers func (ch *commonHub) RegisterEvent(event dispatcher.EventDispatcher) { - select { - case ch.register <- event: - case <-ch.closeChan: - } + ch.registerDispatcher(event) } // UnregisterEvent will send event to a receive-only channel used by a dispatcher to signal it has disconnected func (ch *commonHub) UnregisterEvent(event dispatcher.EventDispatcher) { - select { - case ch.unregister <- event: - case <-ch.closeChan: - } + ch.unregisterDispatcher(event) } -func (ch *commonHub) handleBroadcast(blockEvents data.BlockEvents) { +// Publish will publish logs and events to dispatcher +func (ch *commonHub) Publish(blockEvents data.BlockEvents) { subscriptions := ch.subscriptionMapper.Subscriptions() - for _, subscription := range subscriptions { - if subscription.EventType != common.PushLogsAndEvents { - continue - } - - ch.handlePushBlockEvents(blockEvents, subscription) + for _, sub := range subscriptions[common.PushLogsAndEvents] { + ch.handlePushBlockEvents(blockEvents, sub) } } @@ -217,17 +93,14 @@ func (ch *commonHub) handlePushBlockEvents(blockEvents data.BlockEvents, subscri ch.mutDispatchers.RUnlock() } -func (ch *commonHub) handleRevertBroadcast(revertBlock data.RevertBlock) { +// PublishRevert will publish revert event to dispatcher +func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) { subscriptions := ch.subscriptionMapper.Subscriptions() dispatchersMap := make(map[uuid.UUID]data.RevertBlock) - for _, subscription := range subscriptions { - if subscription.EventType != common.RevertBlockEvents { - continue - } - - dispatchersMap[subscription.DispatcherID] = revertBlock + for _, sub := range subscriptions[common.RevertBlockEvents] { + dispatchersMap[sub.DispatcherID] = revertBlock } ch.mutDispatchers.RLock() @@ -239,16 +112,13 @@ func (ch *commonHub) handleRevertBroadcast(revertBlock data.RevertBlock) { } } -func (ch *commonHub) handleFinalizedBroadcast(finalizedBlock data.FinalizedBlock) { +// PublishFinalized will publish finalized event to dispatcher +func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) { subscriptions := ch.subscriptionMapper.Subscriptions() dispatchersMap := make(map[uuid.UUID]data.FinalizedBlock) - for _, subscription := range subscriptions { - if subscription.EventType != common.FinalizedBlockEvents { - continue - } - + for _, subscription := range subscriptions[common.FinalizedBlockEvents] { dispatchersMap[subscription.DispatcherID] = finalizedBlock } @@ -261,16 +131,13 @@ func (ch *commonHub) handleFinalizedBroadcast(finalizedBlock data.FinalizedBlock } } -func (ch *commonHub) handleTxsBroadcast(blockTxs data.BlockTxs) { +// PublishTxs will publish txs event to dispatcher +func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) { subscriptions := ch.subscriptionMapper.Subscriptions() dispatchersMap := make(map[uuid.UUID]data.BlockTxs) - for _, subscription := range subscriptions { - if subscription.EventType != common.BlockTxs { - continue - } - + for _, subscription := range subscriptions[common.BlockTxs] { dispatchersMap[subscription.DispatcherID] = blockTxs } @@ -283,16 +150,13 @@ func (ch *commonHub) handleTxsBroadcast(blockTxs data.BlockTxs) { } } -func (ch *commonHub) handleBlockEventsWithOrderBroadcast(blockTxs data.BlockEventsWithOrder) { +// PublishBlockEventsWithOrder will publish block events with order to dispatcher +func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { subscriptions := ch.subscriptionMapper.Subscriptions() dispatchersMap := make(map[uuid.UUID]data.BlockEventsWithOrder) - for _, subscription := range subscriptions { - if subscription.EventType != common.BlockEvents { - continue - } - + for _, subscription := range subscriptions[common.BlockEvents] { dispatchersMap[subscription.DispatcherID] = blockTxs } @@ -305,16 +169,13 @@ func (ch *commonHub) handleBlockEventsWithOrderBroadcast(blockTxs data.BlockEven } } -func (ch *commonHub) handleScrsBroadcast(blockScrs data.BlockScrs) { +// PublishScrs will publish scrs events to dispatcher +func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) { subscriptions := ch.subscriptionMapper.Subscriptions() dispatchersMap := make(map[uuid.UUID]data.BlockScrs) - for _, subscription := range subscriptions { - if subscription.EventType != common.BlockScrs { - continue - } - + for _, subscription := range subscriptions[common.BlockScrs] { dispatchersMap[subscription.DispatcherID] = blockScrs } @@ -355,12 +216,6 @@ func (ch *commonHub) unregisterDispatcher(d dispatcher.EventDispatcher) { // Close will close the goroutine and channels func (ch *commonHub) Close() error { - if ch.cancelFunc != nil { - ch.cancelFunc() - } - - close(ch.closeChan) - return nil } diff --git a/dispatcher/hub/commonHub_test.go b/dispatcher/hub/commonHub_test.go index ce720c03..13dfda30 100644 --- a/dispatcher/hub/commonHub_test.go +++ b/dispatcher/hub/commonHub_test.go @@ -66,9 +66,6 @@ func TestCommonHub_RegisterDispatcher(t *testing.T) { dispatcher1 := mocks.NewDispatcherMock(nil, hub) dispatcher2 := mocks.NewDispatcherMock(nil, hub) - hub.Run() - defer hub.Close() - hub.RegisterEvent(dispatcher1) hub.RegisterEvent(dispatcher2) @@ -88,9 +85,6 @@ func TestCommonHub_UnregisterDispatcher(t *testing.T) { dispatcher1 := mocks.NewDispatcherMock(nil, hub) dispatcher2 := mocks.NewDispatcherMock(nil, hub) - hub.Run() - defer hub.Close() - hub.RegisterEvent(dispatcher1) hub.RegisterEvent(dispatcher2) @@ -112,18 +106,15 @@ func TestCommonHub_HandleBroadcastDispatcherReceivesEvents(t *testing.T) { consumer := mocks.NewConsumerMock() dispatcher1 := mocks.NewDispatcherMock(consumer, hub) - hub.registerDispatcher(dispatcher1) + hub.RegisterEvent(dispatcher1) hub.Subscribe(data.SubscribeEvent{ DispatcherID: dispatcher1.GetID(), SubscriptionEntries: []data.SubscriptionEntry{}, }) - hub.Run() - defer hub.Close() - blockEvents := getEvents() - hub.Broadcast(blockEvents) + hub.Publish(blockEvents) time.Sleep(time.Millisecond * 100) @@ -142,12 +133,14 @@ func TestCommonHub_HandleBroadcastMultipleDispatchers(t *testing.T) { dispatcher1 := mocks.NewDispatcherMock(consumer1, hub) consumer2 := mocks.NewConsumerMock() dispatcher2 := mocks.NewDispatcherMock(consumer2, hub) - - hub.Run() - defer hub.Close() + consumer3 := mocks.NewConsumerMock() + dispatcher3 := mocks.NewDispatcherMock(consumer3, hub) hub.RegisterEvent(dispatcher1) hub.RegisterEvent(dispatcher2) + hub.RegisterEvent(dispatcher3) + + time.Sleep(time.Millisecond * 100) hub.Subscribe(data.SubscribeEvent{ DispatcherID: dispatcher1.GetID(), @@ -167,27 +160,25 @@ func TestCommonHub_HandleBroadcastMultipleDispatchers(t *testing.T) { }, }, }) + hub.Subscribe(data.SubscribeEvent{ + DispatcherID: dispatcher3.GetID(), + SubscriptionEntries: []data.SubscriptionEntry{ + { + Address: "erd3", + Identifier: "random", + }, + }, + }) blockEvents := getEvents() - hub.Broadcast(blockEvents) + hub.Publish(blockEvents) time.Sleep(time.Millisecond * 100) - require.True(t, consumer1.HasEvent(blockEvents.Events[0])) require.True(t, consumer2.HasEvent(blockEvents.Events[1])) -} - -func TestCommonHubRun(t *testing.T) { - t.Parallel() - - args := createMockCommonHubArgs() - hub, err := NewCommonHub(args) - require.Nil(t, err) - - hub.Run() - err = hub.Close() - require.Nil(t, err) + require.True(t, consumer1.HasEvent(blockEvents.Events[0])) + require.True(t, consumer3.HasEvent(blockEvents.Events[2])) } func TestCommonHub_HandleRevertBroadcast(t *testing.T) { @@ -212,15 +203,12 @@ func TestCommonHub_HandleRevertBroadcast(t *testing.T) { }, }) - hub.Run() - defer hub.Close() - blockEvents := data.RevertBlock{ Hash: "hash1", Nonce: 1, } - hub.BroadcastRevert(blockEvents) + hub.PublishRevert(blockEvents) time.Sleep(time.Millisecond * 100) @@ -249,14 +237,11 @@ func TestCommonHub_HandleFinalizedBroadcast(t *testing.T) { }, }) - hub.Run() - defer hub.Close() - blockEvents := data.FinalizedBlock{ Hash: "hash1", } - hub.BroadcastFinalized(blockEvents) + hub.PublishFinalized(blockEvents) time.Sleep(time.Millisecond * 100) @@ -285,14 +270,11 @@ func TestCommonHub_HandleTxsBroadcast(t *testing.T) { }, }) - hub.Run() - defer hub.Close() - blockEvents := data.BlockTxs{ Hash: "hash1", } - hub.BroadcastTxs(blockEvents) + hub.PublishTxs(blockEvents) time.Sleep(time.Millisecond * 100) @@ -321,14 +303,11 @@ func TestCommonHub_HandleBlockEventsBroadcast(t *testing.T) { }, }) - hub.Run() - defer hub.Close() - blockEvents := data.BlockEventsWithOrder{ Hash: "hash1", } - hub.handleBlockEventsWithOrderBroadcast(blockEvents) + hub.PublishBlockEventsWithOrder(blockEvents) time.Sleep(time.Millisecond * 100) @@ -357,14 +336,11 @@ func TestCommonHub_HandleScrsBroadcast(t *testing.T) { }, }) - hub.Run() - defer hub.Close() - blockEvents := data.BlockScrs{ Hash: "hash1", } - hub.BroadcastScrs(blockEvents) + hub.PublishScrs(blockEvents) time.Sleep(time.Millisecond * 100) diff --git a/dispatcher/interface.go b/dispatcher/interface.go index 85d54b6f..9a2c33a1 100644 --- a/dispatcher/interface.go +++ b/dispatcher/interface.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/multiversx/mx-chain-notifier-go/data" + "github.com/multiversx/mx-chain-notifier-go/process" ) // EventDispatcher defines the behaviour of a event dispatcher component @@ -20,20 +21,19 @@ type EventDispatcher interface { ScrsEvent(event data.BlockScrs) } -// Hub defines the behaviour of a hub component which should be able to register -// and unregister dispatching events +// Hub defines the behaviour of a component which should be able to receive events +// and publish them to subscribers type Hub interface { - Run() - Broadcast(events data.BlockEvents) - BroadcastRevert(event data.RevertBlock) - BroadcastFinalized(event data.FinalizedBlock) - BroadcastTxs(event data.BlockTxs) - BroadcastScrs(event data.BlockScrs) - BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) + process.PublisherHandler + Dispatcher +} + +// Dispatcher defines the behaviour of a dispatcher component which should be able to register +// and unregister dispatching events +type Dispatcher interface { RegisterEvent(event EventDispatcher) UnregisterEvent(event EventDispatcher) Subscribe(event data.SubscribeEvent) - Close() error IsInterfaceNil() bool } @@ -64,6 +64,6 @@ type WSUpgrader interface { type SubscriptionMapperHandler interface { MatchSubscribeEvent(event data.SubscribeEvent) RemoveSubscriptions(dispatcherID uuid.UUID) - Subscriptions() []data.Subscription + Subscriptions() map[string][]data.Subscription IsInterfaceNil() bool } diff --git a/dispatcher/subscription.go b/dispatcher/subscription.go index 814b9c7e..a5c048ee 100644 --- a/dispatcher/subscription.go +++ b/dispatcher/subscription.go @@ -98,13 +98,15 @@ func (sm *SubscriptionMapper) RemoveSubscriptions(dispatcherID uuid.UUID) { } // Subscriptions returns a slice reflecting the subscriptions present in the map -func (sm *SubscriptionMapper) Subscriptions() []data.Subscription { +func (sm *SubscriptionMapper) Subscriptions() map[string][]data.Subscription { sm.rwMut.RLock() defer sm.rwMut.RUnlock() - var subscriptions []data.Subscription + subscriptions := make(map[string][]data.Subscription) for _, sub := range sm.subscriptions { - subscriptions = append(subscriptions, sub...) + for _, s := range sub { + subscriptions[s.EventType] = append(subscriptions[s.EventType], s) + } } return subscriptions diff --git a/dispatcher/subscription_test.go b/dispatcher/subscription_test.go index 9778cb45..b1d6a44f 100644 --- a/dispatcher/subscription_test.go +++ b/dispatcher/subscription_test.go @@ -7,9 +7,9 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/data" - "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -26,7 +26,7 @@ func TestSubscriptionMap_Subscriptions(t *testing.T) { subs := subMap.Subscriptions() - require.True(t, len(subs) >= len(subEvents)) + require.True(t, len(subs[common.PushLogsAndEvents]) >= len(subEvents)) } func TestSubscriptionsMap_ShouldMatchAllForEmptySubscriptionEntry(t *testing.T) { @@ -73,16 +73,18 @@ func TestSubscriptionMapper_MatchSubscribeEventResultsInCorrectSet(t *testing.T) subsFromMap := subMap.Subscriptions() - require.True(t, len(subsFromMap) == len(subsFromEntries)) + require.True(t, len(subsFromMap[common.PushLogsAndEvents]) == len(subsFromEntries)) - for _, sub1 := range subsFromMap { - found := false - for _, sub2 := range subsFromEntries { - if reflect.DeepEqual(sub1, sub2) { - found = true + for _, subs := range subsFromMap { + for _, sub1 := range subs { + found := false + for _, sub2 := range subsFromEntries { + if reflect.DeepEqual(sub1, sub2) { + found = true + } } + require.True(t, found) } - require.True(t, found) } } @@ -96,16 +98,20 @@ func TestSubscriptionMap_MatchSubscribeEventCorrectMatchLevel(t *testing.T) { subEvent := data.SubscribeEvent{ SubscriptionEntries: []data.SubscriptionEntry{ { - Address: addr1, + EventType: common.BlockEvents, + Address: addr1, }, { + EventType: common.BlockEvents, Address: addr2, Identifier: "ESDTTransfer", }, { + EventType: common.BlockTxs, Identifier: "wrapEGLD", }, { + EventType: common.BlockTxs, Address: addr3, Identifier: "withdraw", Topics: []string{"1", "2"}, @@ -119,17 +125,17 @@ func TestSubscriptionMap_MatchSubscribeEventCorrectMatchLevel(t *testing.T) { subs := subMap.Subscriptions() - require.NotEmpty(t, subs[0]) - require.True(t, subs[0].MatchLevel == MatchAddress) + require.NotEmpty(t, subs[common.BlockEvents][0]) + require.True(t, subs[common.BlockEvents][0].MatchLevel == MatchAddress) - require.NotEmpty(t, subs[1]) - require.True(t, subs[1].MatchLevel == MatchAddressIdentifier) + require.NotEmpty(t, subs[common.BlockEvents][1]) + require.True(t, subs[common.BlockEvents][1].MatchLevel == MatchAddressIdentifier) - require.NotEmpty(t, subs[2]) - require.True(t, subs[2].MatchLevel == MatchIdentifier) + require.NotEmpty(t, subs[common.BlockTxs][0]) + require.True(t, subs[common.BlockTxs][0].MatchLevel == MatchIdentifier) - require.NotEmpty(t, subs[3]) - require.True(t, subs[3].MatchLevel == MatchTopics) + require.NotEmpty(t, subs[common.BlockTxs][1]) + require.True(t, subs[common.BlockTxs][1].MatchLevel == MatchTopics) } func TestSubscriptionMapper_RemoveSubscriptions(t *testing.T) { @@ -149,9 +155,11 @@ func TestSubscriptionMapper_RemoveSubscriptions(t *testing.T) { subsFromMap := subMap.Subscriptions() - for _, sub := range subsFromMap { - if sub.DispatcherID == rmDispatcherID { - t.Error("should clear all subscriptions") + for _, subs := range subsFromMap { + for _, sub := range subs { + if sub.DispatcherID == rmDispatcherID { + t.Error("should clear all subscriptions") + } } } } diff --git a/dispatcher/ws/errors.go b/dispatcher/ws/errors.go index eb75cd45..75f95edc 100644 --- a/dispatcher/ws/errors.go +++ b/dispatcher/ws/errors.go @@ -2,8 +2,8 @@ package ws import "errors" -// ErrNilHubHandler signals that a nil hub handler has been provided -var ErrNilHubHandler = errors.New("nil hub handler") +// ErrNilDispatcher signals that a nil dispatcher has been provided +var ErrNilDispatcher = errors.New("nil dispatcher") // ErrNilWSUpgrader signals that a nil websocket upgrader has been provided var ErrNilWSUpgrader = errors.New("nil websocket upgrader") diff --git a/dispatcher/ws/export_test.go b/dispatcher/ws/export_test.go index 5f907815..29955ba1 100644 --- a/dispatcher/ws/export_test.go +++ b/dispatcher/ws/export_test.go @@ -8,7 +8,7 @@ type ArgsWSDispatcher struct { // NewTestWSDispatcher - func NewTestWSDispatcher(args ArgsWSDispatcher) (*websocketDispatcher, error) { wsArgs := argsWebSocketDispatcher{ - Hub: args.Hub, + Dispatcher: args.Dispatcher, Conn: args.Conn, Marshaller: args.Marshaller, } diff --git a/dispatcher/ws/wsDispatcher.go b/dispatcher/ws/wsDispatcher.go index 2ac0a8bc..21c3d6d1 100644 --- a/dispatcher/ws/wsDispatcher.go +++ b/dispatcher/ws/wsDispatcher.go @@ -32,7 +32,7 @@ var ( // argsWebSocketDispatcher defines the arguments needed for ws dispatcher type argsWebSocketDispatcher struct { - Hub dispatcher.Hub + Dispatcher dispatcher.Dispatcher Conn dispatcher.WSConnection Marshaller marshal.Marshalizer } @@ -42,14 +42,14 @@ type websocketDispatcher struct { wg sync.WaitGroup send chan []byte conn dispatcher.WSConnection - hub dispatcher.Hub + dispatcher dispatcher.Dispatcher marshaller marshal.Marshalizer } // newWebSocketDispatcher createa a new ws dispatcher instance func newWebSocketDispatcher(args argsWebSocketDispatcher) (*websocketDispatcher, error) { - if check.IfNil(args.Hub) { - return nil, ErrNilHubHandler + if check.IfNil(args.Dispatcher) { + return nil, ErrNilDispatcher } if args.Conn == nil { return nil, ErrNilWSConn @@ -62,7 +62,7 @@ func newWebSocketDispatcher(args argsWebSocketDispatcher) (*websocketDispatcher, id: uuid.New(), send: make(chan []byte, 256), conn: args.Conn, - hub: args.Hub, + dispatcher: args.Dispatcher, marshaller: args.Marshaller, }, nil } @@ -254,7 +254,7 @@ func (wd *websocketDispatcher) writePump() { // readPump listens for incoming events and reads the content from the socket stream func (wd *websocketDispatcher) readPump() { defer func() { - wd.hub.UnregisterEvent(wd) + wd.dispatcher.UnregisterEvent(wd) if err := wd.conn.Close(); err != nil { log.Error("failed to close socket on defer", "err", err.Error()) } @@ -293,7 +293,7 @@ func (wd *websocketDispatcher) trySendSubscribeEvent(eventBytes []byte) { return } subscribeEvent.DispatcherID = wd.id - wd.hub.Subscribe(subscribeEvent) + wd.dispatcher.Subscribe(subscribeEvent) } func (wd *websocketDispatcher) setSocketWriteLimits() error { diff --git a/dispatcher/ws/wsDispatcher_test.go b/dispatcher/ws/wsDispatcher_test.go index 04f92479..eaad626a 100644 --- a/dispatcher/ws/wsDispatcher_test.go +++ b/dispatcher/ws/wsDispatcher_test.go @@ -33,7 +33,7 @@ func (tw *testWriter) Close() error { func createMockWSDispatcherArgs() ws.ArgsWSDispatcher { args := ws.ArgsWSDispatcher{} - args.Hub = &mocks.HubStub{} + args.Dispatcher = &mocks.HubStub{} args.Conn = &mocks.WSConnStub{} args.Marshaller = &mock.MarshalizerMock{} return args @@ -46,11 +46,11 @@ func TestNewWebSocketDispatcher(t *testing.T) { t.Parallel() args := createMockWSDispatcherArgs() - args.Hub = nil + args.Dispatcher = nil wd, err := ws.NewTestWSDispatcher(args) require.Nil(t, wd) - assert.Equal(t, ws.ErrNilHubHandler, err) + assert.Equal(t, ws.ErrNilDispatcher, err) }) t.Run("nil ws conn", func(t *testing.T) { diff --git a/dispatcher/ws/wsHandler.go b/dispatcher/ws/wsHandler.go index b30ee222..6da302db 100644 --- a/dispatcher/ws/wsHandler.go +++ b/dispatcher/ws/wsHandler.go @@ -11,13 +11,13 @@ import ( // ArgsWebSocketProcessor defines the argument needed to create a websocketHandler type ArgsWebSocketProcessor struct { - Hub dispatcher.Hub + Dispatcher dispatcher.Dispatcher Upgrader dispatcher.WSUpgrader Marshaller marshal.Marshalizer } type websocketProcessor struct { - hub dispatcher.Hub + dispatcher dispatcher.Dispatcher upgrader dispatcher.WSUpgrader marshaller marshal.Marshalizer } @@ -30,15 +30,15 @@ func NewWebSocketProcessor(args ArgsWebSocketProcessor) (*websocketProcessor, er } return &websocketProcessor{ - hub: args.Hub, + dispatcher: args.Dispatcher, upgrader: args.Upgrader, marshaller: args.Marshaller, }, nil } func checkArgs(args ArgsWebSocketProcessor) error { - if check.IfNil(args.Hub) { - return ErrNilHubHandler + if check.IfNil(args.Dispatcher) { + return ErrNilDispatcher } if args.Upgrader == nil { return ErrNilWSUpgrader @@ -59,7 +59,7 @@ func (wh *websocketProcessor) ServeHTTP(w http.ResponseWriter, r *http.Request) } args := argsWebSocketDispatcher{ - Hub: wh.hub, + Dispatcher: wh.dispatcher, Conn: conn, Marshaller: wh.marshaller, } @@ -68,7 +68,7 @@ func (wh *websocketProcessor) ServeHTTP(w http.ResponseWriter, r *http.Request) log.Error("failed creating a new websocket dispatcher", "err", err.Error()) return } - wsDispatcher.hub.RegisterEvent(wsDispatcher) + wsDispatcher.dispatcher.RegisterEvent(wsDispatcher) go wsDispatcher.writePump() go wsDispatcher.readPump() diff --git a/dispatcher/ws/wsHandler_test.go b/dispatcher/ws/wsHandler_test.go index 98c096e5..3c6d1b0e 100644 --- a/dispatcher/ws/wsHandler_test.go +++ b/dispatcher/ws/wsHandler_test.go @@ -14,7 +14,7 @@ import ( func createMockArgsWSHandler() ws.ArgsWebSocketProcessor { return ws.ArgsWebSocketProcessor{ - Hub: &mocks.HubStub{}, + Dispatcher: &mocks.HubStub{}, Upgrader: &mocks.WSUpgraderStub{}, Marshaller: &mock.MarshalizerMock{}, } @@ -23,15 +23,15 @@ func createMockArgsWSHandler() ws.ArgsWebSocketProcessor { func TestNewWebSocketHandler(t *testing.T) { t.Parallel() - t.Run("nil hub handler", func(t *testing.T) { + t.Run("nil dispatcher handler", func(t *testing.T) { t.Parallel() args := createMockArgsWSHandler() - args.Hub = nil + args.Dispatcher = nil wh, err := ws.NewWebSocketProcessor(args) require.True(t, check.IfNil(wh)) - assert.Equal(t, ws.ErrNilHubHandler, err) + assert.Equal(t, ws.ErrNilDispatcher, err) }) t.Run("nil ws upgrader", func(t *testing.T) { diff --git a/facade/errors.go b/facade/errors.go index dd16a5d0..69c384f1 100644 --- a/facade/errors.go +++ b/facade/errors.go @@ -7,6 +7,3 @@ var ErrNilEventsHandler = errors.New("nil events handler") // ErrNilWSHandler signals that a nil websocket handler was provided var ErrNilWSHandler = errors.New("nil websocket handler") - -// ErrNilEventsInterceptor signals that a nil events interceptor was provided -var ErrNilEventsInterceptor = errors.New("nil events interceptor") diff --git a/facade/interface.go b/facade/interface.go index bfb20ae0..7507dca7 100644 --- a/facade/interface.go +++ b/facade/interface.go @@ -8,12 +8,9 @@ import ( // EventsHandler defines the behavior of an events handler component. // This will handle push events from observer node. type EventsHandler interface { - HandlePushEvents(events data.BlockEvents) error + HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) - HandleBlockTxs(blockTxs data.BlockTxs) - HandleBlockScrs(blockScrs data.BlockScrs) - HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) IsInterfaceNil() bool } diff --git a/facade/notifierFacade.go b/facade/notifierFacade.go index 8fc90954..de3230a6 100644 --- a/facade/notifierFacade.go +++ b/facade/notifierFacade.go @@ -18,16 +18,14 @@ type ArgsNotifierFacade struct { APIConfig config.ConnectorApiConfig EventsHandler EventsHandler WSHandler dispatcher.WSHandler - EventsInterceptor EventsInterceptor StatusMetricsHandler common.StatusMetricsHandler } type notifierFacade struct { - config config.ConnectorApiConfig - eventsHandler EventsHandler - wsHandler dispatcher.WSHandler - eventsInterceptor EventsInterceptor - statusMetrics common.StatusMetricsHandler + config config.ConnectorApiConfig + eventsHandler EventsHandler + wsHandler dispatcher.WSHandler + statusMetrics common.StatusMetricsHandler } // NewNotifierFacade creates a new notifier facade instance @@ -38,11 +36,10 @@ func NewNotifierFacade(args ArgsNotifierFacade) (*notifierFacade, error) { } return ¬ifierFacade{ - eventsHandler: args.EventsHandler, - config: args.APIConfig, - wsHandler: args.WSHandler, - eventsInterceptor: args.EventsInterceptor, - statusMetrics: args.StatusMetricsHandler, + eventsHandler: args.EventsHandler, + config: args.APIConfig, + wsHandler: args.WSHandler, + statusMetrics: args.StatusMetricsHandler, }, nil } @@ -53,9 +50,6 @@ func checkArgs(args ArgsNotifierFacade) error { if check.IfNil(args.WSHandler) { return ErrNilWSHandler } - if check.IfNil(args.EventsInterceptor) { - return ErrNilEventsInterceptor - } if check.IfNil(args.StatusMetricsHandler) { return common.ErrNilStatusMetricsHandler } @@ -63,76 +57,10 @@ func checkArgs(args ArgsNotifierFacade) error { return nil } -// HandlePushEventsV2 will handle push events received from observer -// It splits block data and handles log, txs and srcs events separately -func (nf *notifierFacade) HandlePushEventsV2(allEvents data.ArgsSaveBlockData) error { - eventsData, err := nf.eventsInterceptor.ProcessBlockEvents(&allEvents) - if err != nil { - return err - } - - pushEvents := data.BlockEvents{ - Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), - TimeStamp: eventsData.Header.GetTimeStamp(), - Events: eventsData.LogEvents, - } - err = nf.eventsHandler.HandlePushEvents(pushEvents) - if err != nil { - return err - } - - txs := data.BlockTxs{ - Hash: eventsData.Hash, - Txs: eventsData.Txs, - } - nf.eventsHandler.HandleBlockTxs(txs) - - scrs := data.BlockScrs{ - Hash: eventsData.Hash, - Scrs: eventsData.Scrs, - } - nf.eventsHandler.HandleBlockScrs(scrs) - - txsWithOrder := data.BlockEventsWithOrder{ - Hash: eventsData.Hash, - ShardID: eventsData.Header.GetShardID(), - TimeStamp: eventsData.Header.GetTimeStamp(), - Txs: eventsData.TxsWithOrder, - Scrs: eventsData.ScrsWithOrder, - Events: eventsData.LogEvents, - } - nf.eventsHandler.HandleBlockEventsWithOrder(txsWithOrder) - - return nil -} - -// HandlePushEventsV1 will handle push events received from observer +// HandlePushEvents will handle push events received from observer // It splits block data and handles log, txs and srcs events separately -// TODO: remove this implementation -func (nf *notifierFacade) HandlePushEventsV1(eventsData data.SaveBlockData) error { - pushEvents := data.BlockEvents{ - Hash: eventsData.Hash, - Events: eventsData.LogEvents, - } - err := nf.eventsHandler.HandlePushEvents(pushEvents) - if err != nil { - return err - } - - txs := data.BlockTxs{ - Hash: eventsData.Hash, - Txs: eventsData.Txs, - } - nf.eventsHandler.HandleBlockTxs(txs) - - scrs := data.BlockScrs{ - Hash: eventsData.Hash, - Scrs: eventsData.Scrs, - } - nf.eventsHandler.HandleBlockScrs(scrs) - - return nil +func (nf *notifierFacade) HandlePushEvents(allEvents data.ArgsSaveBlockData) error { + return nf.eventsHandler.HandleSaveBlockEvents(allEvents) } // HandleRevertEvents will handle revents events received from observer diff --git a/facade/notifierFacade_test.go b/facade/notifierFacade_test.go index 42731d29..577fdbb1 100644 --- a/facade/notifierFacade_test.go +++ b/facade/notifierFacade_test.go @@ -1,7 +1,6 @@ package facade_test import ( - "errors" "net/http" "net/http/httptest" "testing" @@ -9,7 +8,6 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" - "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/config" @@ -25,7 +23,6 @@ func createMockFacadeArgs() facade.ArgsNotifierFacade { EventsHandler: &mocks.EventsHandlerStub{}, APIConfig: config.ConnectorApiConfig{}, WSHandler: &mocks.WSHandlerStub{}, - EventsInterceptor: &mocks.EventsInterceptorStub{}, StatusMetricsHandler: &mocks.StatusMetricsStub{}, } } @@ -55,17 +52,6 @@ func TestNewNotifierFacade(t *testing.T) { require.Equal(t, facade.ErrNilWSHandler, err) }) - t.Run("nil events interceptor", func(t *testing.T) { - t.Parallel() - - args := createMockFacadeArgs() - args.EventsInterceptor = nil - - f, err := facade.NewNotifierFacade(args) - require.True(t, check.IfNil(f)) - require.Equal(t, facade.ErrNilEventsInterceptor, err) - }) - t.Run("nil status metrics handler", func(t *testing.T) { t.Parallel() @@ -90,29 +76,6 @@ func TestNewNotifierFacade(t *testing.T) { func TestHandlePushEvents(t *testing.T) { t.Parallel() - t.Run("process block events error, should fail", func(t *testing.T) { - t.Parallel() - - args := createMockFacadeArgs() - - expectedErr := errors.New("expected error") - args.EventsInterceptor = &mocks.EventsInterceptorStub{ - ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { - return nil, expectedErr - }, - } - - facade, err := facade.NewNotifierFacade(args) - require.Nil(t, err) - - blockData := data.ArgsSaveBlockData{ - HeaderHash: []byte("blockHash"), - Header: &block.HeaderV2{}, - } - err = facade.HandlePushEventsV2(blockData) - require.Equal(t, expectedErr, err) - }) - t.Run("should work", func(t *testing.T) { t.Parallel() @@ -127,138 +90,29 @@ func TestHandlePushEvents(t *testing.T) { ExecutionOrder: 1, }, } - scrs := map[string]*outport.SCRInfo{ - "hash2": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - }, - }, - } - logData := []*outport.LogData{ - { - Log: &transaction.Log{ - Address: []byte("logaddr1"), - Events: []*transaction.Event{}, - }, - TxHash: "logHash1", - }, - } - - logEvents := []data.Event{ - { - Address: "addr1", - }, - } - - header := &block.HeaderV2{ - Header: &block.Header{ - ShardID: 2, - }, - } blockData := data.ArgsSaveBlockData{ HeaderHash: []byte(blockHash), TransactionsPool: &outport.TransactionPool{ - Transactions: txs, - SmartContractResults: scrs, - Logs: logData, + Transactions: txs, }, Header: &block.HeaderV2{}, } - expTxs := map[string]*transaction.Transaction{ - "hash1": { - Nonce: 1, - }, - } - expScrs := map[string]*smartContractResult.SmartContractResult{ - "hash2": { - Nonce: 2, - }, - } - - expTxsData := data.BlockTxs{ - Hash: blockHash, - Txs: expTxs, - } - expScrsData := data.BlockScrs{ - Hash: blockHash, - Scrs: expScrs, - } - expLogEvents := data.BlockEvents{ - Hash: blockHash, - Events: logEvents, - ShardID: 2, - } - - expTxsWithOrder := map[string]*outport.TxInfo{ - "hash1": { - Transaction: &transaction.Transaction{ - Nonce: 1, - }, - ExecutionOrder: 1, - }, - } - expScrsWithOrder := map[string]*outport.SCRInfo{ - "hash2": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - }, - }, - } - expTxsWithOrderData := data.BlockEventsWithOrder{ - Hash: blockHash, - ShardID: 2, - Txs: expTxsWithOrder, - Scrs: expScrsWithOrder, - Events: logEvents, - } - - pushWasCalled := false - txsWasCalled := false - scrsWasCalled := false - blockEventsWithOrderWasCalled := false + saveBlockWasCalled := false args.EventsHandler = &mocks.EventsHandlerStub{ - HandlePushEventsCalled: func(events data.BlockEvents) error { - pushWasCalled = true - assert.Equal(t, expLogEvents, events) + HandleSaveBlockEventsCalled: func(allEvents data.ArgsSaveBlockData) error { + saveBlockWasCalled = true return nil }, - HandleBlockTxsCalled: func(blockTxs data.BlockTxs) { - txsWasCalled = true - assert.Equal(t, expTxsData, blockTxs) - }, - HandleBlockScrsCalled: func(blockScrs data.BlockScrs) { - scrsWasCalled = true - assert.Equal(t, expScrsData, blockScrs) - }, - HandleBlockEventsWithOrderCalled: func(blockTxs data.BlockEventsWithOrder) { - blockEventsWithOrderWasCalled = true - assert.Equal(t, expTxsWithOrderData, blockTxs) - }, - } - args.EventsInterceptor = &mocks.EventsInterceptorStub{ - ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { - return &data.InterceptorBlockData{ - Hash: blockHash, - Header: header, - Txs: expTxs, - Scrs: expScrs, - LogEvents: logEvents, - TxsWithOrder: expTxsWithOrder, - ScrsWithOrder: expScrsWithOrder, - }, nil - }, } facade, err := facade.NewNotifierFacade(args) require.Nil(t, err) - facade.HandlePushEventsV2(blockData) + err = facade.HandlePushEvents(blockData) + require.Nil(t, err) - assert.True(t, pushWasCalled) - assert.True(t, txsWasCalled) - assert.True(t, scrsWasCalled) - assert.True(t, blockEventsWithOrderWasCalled) + assert.True(t, saveBlockWasCalled) }) } diff --git a/factory/processFactory.go b/factory/processFactory.go index cecd9daf..9fd41f67 100644 --- a/factory/processFactory.go +++ b/factory/processFactory.go @@ -16,52 +16,6 @@ var log = logger.GetOrCreate("factory") const bech32PubkeyConverterType = "bech32" -// ArgsEventsHandlerFactory defines the arguments needed for events handler creation -type ArgsEventsHandlerFactory struct { - CheckDuplicates bool - Locker process.LockService - MqPublisher process.Publisher - HubPublisher process.Publisher - APIType string - StatusMetricsHandler common.StatusMetricsHandler -} - -// CreateEventsHandler will create an events handler processor -func CreateEventsHandler(args ArgsEventsHandlerFactory) (process.EventsHandler, error) { - publisher, err := getPublisher(args.APIType, args.MqPublisher, args.HubPublisher) - if err != nil { - return nil, err - } - - argsEventsHandler := process.ArgsEventsHandler{ - Locker: args.Locker, - Publisher: publisher, - StatusMetricsHandler: args.StatusMetricsHandler, - CheckDuplicates: args.CheckDuplicates, - } - eventsHandler, err := process.NewEventsHandler(argsEventsHandler) - if err != nil { - return nil, err - } - - return eventsHandler, nil -} - -func getPublisher( - apiType string, - mqPublisher process.Publisher, - hubPublisher process.Publisher, -) (process.Publisher, error) { - switch apiType { - case common.MessageQueuePublisherType: - return mqPublisher, nil - case common.WSPublisherType: - return hubPublisher, nil - default: - return nil, common.ErrInvalidAPIType - } -} - // CreateEventsInterceptor will create the events interceptor func CreateEventsInterceptor(cfg config.GeneralConfig) (process.EventsInterceptor, error) { pubKeyConverter, err := getPubKeyConverter(cfg) @@ -85,7 +39,8 @@ func getPubKeyConverter(cfg config.GeneralConfig) (core.PubkeyConverter, error) } } -func createPayloadHandler(marshaller marshal.Marshalizer, facade process.EventsFacadeHandler) (websocket.PayloadHandler, error) { +// CreatePayloadHandler will create a new instance of payload handler +func CreatePayloadHandler(marshaller marshal.Marshalizer, facade process.EventsFacadeHandler) (websocket.PayloadHandler, error) { dataPreProcessorArgs := preprocess.ArgsEventsPreProcessor{ Marshaller: marshaller, Facade: facade, diff --git a/factory/pubsubFactory.go b/factory/pubsubFactory.go index 717510bf..5ebb8ce3 100644 --- a/factory/pubsubFactory.go +++ b/factory/pubsubFactory.go @@ -4,17 +4,23 @@ import ( "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/config" - "github.com/multiversx/mx-chain-notifier-go/disabled" + "github.com/multiversx/mx-chain-notifier-go/dispatcher" + "github.com/multiversx/mx-chain-notifier-go/process" "github.com/multiversx/mx-chain-notifier-go/rabbitmq" ) // CreatePublisher creates publisher component -func CreatePublisher(apiType string, config config.MainConfig, marshaller marshal.Marshalizer) (rabbitmq.PublisherService, error) { +func CreatePublisher( + apiType string, + config config.MainConfig, + marshaller marshal.Marshalizer, + commonHub dispatcher.Hub, +) (process.Publisher, error) { switch apiType { case common.MessageQueuePublisherType: return createRabbitMqPublisher(config.RabbitMQ, marshaller) case common.WSPublisherType: - return &disabled.Publisher{}, nil + return createWSPublisher(commonHub) default: return nil, common.ErrInvalidAPIType } @@ -36,5 +42,9 @@ func createRabbitMqPublisher(config config.RabbitMQConfig, marshaller marshal.Ma return nil, err } - return rabbitPublisher, nil + return process.NewPublisher(rabbitPublisher) +} + +func createWSPublisher(commonHub dispatcher.Hub) (process.Publisher, error) { + return process.NewPublisher(commonHub) } diff --git a/factory/webFactory.go b/factory/webFactory.go index e4d4d753..521db4fe 100644 --- a/factory/webFactory.go +++ b/factory/webFactory.go @@ -14,7 +14,7 @@ func CreateWebServerHandler(facade shared.FacadeHandler, configs config.Configs) return nil, err } - payloadHandler, err := createPayloadHandler(marshaller, facade) + payloadHandler, err := CreatePayloadHandler(marshaller, facade) if err != nil { return nil, err } diff --git a/factory/wsFactory.go b/factory/wsFactory.go index 1a19fd98..4122a1de 100644 --- a/factory/wsFactory.go +++ b/factory/wsFactory.go @@ -19,25 +19,25 @@ const ( ) // CreateWSHandler creates websocket handler component based on api type -func CreateWSHandler(apiType string, hub dispatcher.Hub, marshaller marshal.Marshalizer) (dispatcher.WSHandler, error) { +func CreateWSHandler(apiType string, wsDispatcher dispatcher.Dispatcher, marshaller marshal.Marshalizer) (dispatcher.WSHandler, error) { switch apiType { case common.MessageQueuePublisherType: return &disabled.WSHandler{}, nil case common.WSPublisherType: - return createWSHandler(hub, marshaller) + return createWSHandler(wsDispatcher, marshaller) default: return nil, common.ErrInvalidAPIType } } -func createWSHandler(hub dispatcher.Hub, marshaller marshal.Marshalizer) (dispatcher.WSHandler, error) { +func createWSHandler(wsDispatcher dispatcher.Dispatcher, marshaller marshal.Marshalizer) (dispatcher.WSHandler, error) { upgrader, err := ws.NewWSUpgraderWrapper(readBufferSize, writeBufferSize) if err != nil { return nil, err } args := ws.ArgsWebSocketProcessor{ - Hub: hub, + Dispatcher: wsDispatcher, Upgrader: upgrader, Marshaller: marshaller, } @@ -70,7 +70,7 @@ func createWsObsConnector( return nil, err } - payloadHandler, err := createPayloadHandler(marshaller, facade) + payloadHandler, err := CreatePayloadHandler(marshaller, facade) if err != nil { return nil, err } diff --git a/integrationTests/interface.go b/integrationTests/interface.go index 30a0d18d..40eec4d0 100644 --- a/integrationTests/interface.go +++ b/integrationTests/interface.go @@ -7,7 +7,7 @@ import ( // PublisherHandler defines publisher behaviour type PublisherHandler interface { - Run() + Run() error Broadcast(events data.BlockEvents) BroadcastRevert(event data.RevertBlock) BroadcastFinalized(event data.FinalizedBlock) diff --git a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go index 4fed4bf8..0c9f97e5 100644 --- a/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go +++ b/integrationTests/rabbitmq/testNotifierWithRabbitMQ_test.go @@ -21,10 +21,6 @@ import ( var log = logger.GetOrCreate("integrationTests/rabbitmq") func TestNotifierWithRabbitMQ(t *testing.T) { - t.Run("with http observer connnector + payload version 0", func(t *testing.T) { - testNotifierWithRabbitMQ(t, common.HTTPConnectorType, common.PayloadV0) - }) - t.Run("with http observer connnector", func(t *testing.T) { testNotifierWithRabbitMQ(t, common.HTTPConnectorType, common.PayloadV1) }) @@ -40,10 +36,10 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion notifier, err := integrationTests.NewTestNotifierWithRabbitMq(cfg.MainConfig) require.Nil(t, err) - client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, common.PayloadV1) + client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, payloadVersion) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() wg := &sync.WaitGroup{} @@ -59,7 +55,7 @@ func testNotifierWithRabbitMQ(t *testing.T, observerType string, payloadVersion integrationTests.WaitTimeout(t, wg, time.Second*2) - assert.Equal(t, 6, len(notifier.RedisClient.GetEntries())) + assert.Equal(t, 3, len(notifier.RedisClient.GetEntries())) assert.Equal(t, 6, len(notifier.RabbitMQClient.GetEntries())) } diff --git a/integrationTests/testNotifierProxy.go b/integrationTests/testNotifierProxy.go index 88d58340..0a0c839b 100644 --- a/integrationTests/testNotifierProxy.go +++ b/integrationTests/testNotifierProxy.go @@ -19,6 +19,7 @@ import ( type testNotifier struct { Facade shared.FacadeHandler + Hub dispatcher.Hub Publisher PublisherHandler WSHandler dispatcher.WSHandler RedisClient *mocks.RedisClientMock @@ -42,18 +43,31 @@ func NewTestNotifierWithWS(cfg config.MainConfig) (*testNotifier, error) { Filter: filters.NewDefaultFilter(), SubscriptionMapper: dispatcher.NewSubscriptionMapper(), } - publisher, err := hub.NewCommonHub(args) + commonHub, err := hub.NewCommonHub(args) + if err != nil { + return nil, err + } + publisher, err := process.NewPublisher(commonHub) if err != nil { return nil, err } statusMetricsHandler := metrics.NewStatusMetrics() + eventsInterceptorArgs := process.ArgsEventsInterceptor{ + PubKeyConverter: &mocks.PubkeyConverterMock{}, + } + eventsInterceptor, err := process.NewEventsInterceptor(eventsInterceptorArgs) + if err != nil { + return nil, err + } + argsEventsHandler := process.ArgsEventsHandler{ Locker: locker, Publisher: publisher, StatusMetricsHandler: statusMetricsHandler, CheckDuplicates: cfg.General.CheckDuplicates, + EventsInterceptor: eventsInterceptor, } eventsHandler, err := process.NewEventsHandler(argsEventsHandler) if err != nil { @@ -65,7 +79,7 @@ func NewTestNotifierWithWS(cfg config.MainConfig) (*testNotifier, error) { return nil, err } wsHandlerArgs := ws.ArgsWebSocketProcessor{ - Hub: publisher, + Dispatcher: commonHub, Upgrader: upgrader, Marshaller: marshaller, } @@ -74,19 +88,10 @@ func NewTestNotifierWithWS(cfg config.MainConfig) (*testNotifier, error) { return nil, err } - eventsInterceptorArgs := process.ArgsEventsInterceptor{ - PubKeyConverter: &mocks.PubkeyConverterMock{}, - } - eventsInterceptor, err := process.NewEventsInterceptor(eventsInterceptorArgs) - if err != nil { - return nil, err - } - facadeArgs := facade.ArgsNotifierFacade{ EventsHandler: eventsHandler, APIConfig: cfg.ConnectorApi, WSHandler: wsHandler, - EventsInterceptor: eventsInterceptor, StatusMetricsHandler: statusMetricsHandler, } facade, err := facade.NewNotifierFacade(facadeArgs) @@ -96,6 +101,7 @@ func NewTestNotifierWithWS(cfg config.MainConfig) (*testNotifier, error) { return &testNotifier{ Facade: facade, + Hub: commonHub, Publisher: publisher, WSHandler: wsHandler, RedisClient: redisClient, @@ -124,18 +130,11 @@ func NewTestNotifierWithRabbitMq(cfg config.MainConfig) (*testNotifier, error) { Config: cfg.RabbitMQ, Marshaller: marshaller, } - publisher, err := rabbitmq.NewRabbitMqPublisher(publisherArgs) + publisherHandler, err := rabbitmq.NewRabbitMqPublisher(publisherArgs) if err != nil { return nil, err } - - argsEventsHandler := process.ArgsEventsHandler{ - Locker: locker, - Publisher: publisher, - StatusMetricsHandler: statusMetricsHandler, - CheckDuplicates: cfg.General.CheckDuplicates, - } - eventsHandler, err := process.NewEventsHandler(argsEventsHandler) + publisher, err := process.NewPublisher(publisherHandler) if err != nil { return nil, err } @@ -148,12 +147,23 @@ func NewTestNotifierWithRabbitMq(cfg config.MainConfig) (*testNotifier, error) { return nil, err } + argsEventsHandler := process.ArgsEventsHandler{ + Locker: locker, + Publisher: publisher, + StatusMetricsHandler: statusMetricsHandler, + CheckDuplicates: cfg.General.CheckDuplicates, + EventsInterceptor: eventsInterceptor, + } + eventsHandler, err := process.NewEventsHandler(argsEventsHandler) + if err != nil { + return nil, err + } + wsHandler := &disabled.WSHandler{} facadeArgs := facade.ArgsNotifierFacade{ EventsHandler: eventsHandler, APIConfig: cfg.ConnectorApi, WSHandler: wsHandler, - EventsInterceptor: eventsInterceptor, StatusMetricsHandler: statusMetricsHandler, } facade, err := facade.NewNotifierFacade(facadeArgs) @@ -163,6 +173,7 @@ func NewTestNotifierWithRabbitMq(cfg config.MainConfig) (*testNotifier, error) { return &testNotifier{ Facade: facade, + Hub: &disabled.Hub{}, Publisher: publisher, WSHandler: wsHandler, RedisClient: redisClient, diff --git a/integrationTests/testObserverConnector.go b/integrationTests/testObserverConnector.go index 7987b999..a550fb31 100644 --- a/integrationTests/testObserverConnector.go +++ b/integrationTests/testObserverConnector.go @@ -15,26 +15,12 @@ import ( "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/config" "github.com/multiversx/mx-chain-notifier-go/factory" - "github.com/multiversx/mx-chain-notifier-go/process" - "github.com/multiversx/mx-chain-notifier-go/process/preprocess" ) // CreateObserverConnector will create observer connector component func CreateObserverConnector(facade shared.FacadeHandler, connType string, apiType string, payloadVersion uint32) (ObserverConnector, error) { marshaller := &marshal.JsonMarshalizer{} - preProcessorArgs := preprocess.ArgsEventsPreProcessor{ - Marshaller: marshaller, - Facade: facade, - } - - eventsProcessors := make(map[uint32]process.DataProcessor) - dataPreProcessor, err := preprocess.NewEventsPreProcessorV1(preProcessorArgs) - if err != nil { - return nil, err - } - - eventsProcessors[payloadVersion] = dataPreProcessor - payloadHandler, err := process.NewPayloadHandler(eventsProcessors) + payloadHandler, err := factory.CreatePayloadHandler(marshaller, facade) if err != nil { return nil, err } diff --git a/integrationTests/websocket/testNotifierWithWebsockets_test.go b/integrationTests/websocket/testNotifierWithWebsockets_test.go index 38663e2b..a5461612 100644 --- a/integrationTests/websocket/testNotifierWithWebsockets_test.go +++ b/integrationTests/websocket/testNotifierWithWebsockets_test.go @@ -27,7 +27,7 @@ func TestNotifierWithWebsockets_PushEvents(t *testing.T) { webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -112,7 +112,7 @@ func TestNotifierWithWebsockets_BlockEvents(t *testing.T) { webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -204,7 +204,7 @@ func TestNotifierWithWebsockets_RevertEvents(t *testing.T) { webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -265,7 +265,7 @@ func TestNotifierWithWebsockets_FinalizedEvents(t *testing.T) { webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -316,7 +316,7 @@ func TestNotifierWithWebsockets_TxsEvents(t *testing.T) { webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -400,7 +400,7 @@ func TestNotifierWithWebsockets_ScrsEvents(t *testing.T) { webServer, err := integrationTests.CreateObserverConnector(notifier.Facade, common.HTTPConnectorType, common.WSPublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -494,7 +494,7 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { client, err := integrationTests.CreateObserverConnector(notifier.Facade, observerType, common.MessageQueuePublisherType, common.PayloadV1) require.Nil(t, err) - notifier.Publisher.Run() + _ = notifier.Publisher.Run() defer notifier.Publisher.Close() ws, err := integrationTests.NewWSClient(notifier.WSHandler) @@ -715,6 +715,6 @@ func testNotifierWithWebsockets_AllEvents(t *testing.T, observerType string) { integrationTests.WaitTimeout(t, wg, time.Second*4) - assert.Equal(t, numEvents, len(notifier.RedisClient.GetEntries())) - assert.Equal(t, numEvents, len(notifier.RedisClient.GetEntries())) + expectedNumRedisEvents := 3 // one redis event per push, revert, finalized + assert.Equal(t, expectedNumRedisEvents, len(notifier.RedisClient.GetEntries())) } diff --git a/mocks/eventsHandlerStub.go b/mocks/eventsHandlerStub.go index e7da9d06..ab00d346 100644 --- a/mocks/eventsHandlerStub.go +++ b/mocks/eventsHandlerStub.go @@ -4,18 +4,15 @@ import "github.com/multiversx/mx-chain-notifier-go/data" // EventsHandlerStub implements EventsHandler interface type EventsHandlerStub struct { - HandlePushEventsCalled func(events data.BlockEvents) error - HandleRevertEventsCalled func(revertBlock data.RevertBlock) - HandleFinalizedEventsCalled func(finalizedBlock data.FinalizedBlock) - HandleBlockTxsCalled func(blockTxs data.BlockTxs) - HandleBlockScrsCalled func(blockScrs data.BlockScrs) - HandleBlockEventsWithOrderCalled func(blockTxs data.BlockEventsWithOrder) + HandleSaveBlockEventsCalled func(allEvents data.ArgsSaveBlockData) error + HandleRevertEventsCalled func(revertBlock data.RevertBlock) + HandleFinalizedEventsCalled func(finalizedBlock data.FinalizedBlock) } -// HandlePushEvents - -func (e *EventsHandlerStub) HandlePushEvents(events data.BlockEvents) error { - if e.HandlePushEventsCalled != nil { - return e.HandlePushEventsCalled(events) +// HandleSaveBlockEvents - +func (e *EventsHandlerStub) HandleSaveBlockEvents(events data.ArgsSaveBlockData) error { + if e.HandleSaveBlockEventsCalled != nil { + return e.HandleSaveBlockEventsCalled(events) } return nil @@ -35,27 +32,6 @@ func (e *EventsHandlerStub) HandleFinalizedEvents(finalizedBlock data.FinalizedB } } -// HandleBlockTxs - -func (e *EventsHandlerStub) HandleBlockTxs(blockTxs data.BlockTxs) { - if e.HandleBlockTxsCalled != nil { - e.HandleBlockTxsCalled(blockTxs) - } -} - -// HandleBlockScrs - -func (e *EventsHandlerStub) HandleBlockScrs(blockScrs data.BlockScrs) { - if e.HandleBlockScrsCalled != nil { - e.HandleBlockScrsCalled(blockScrs) - } -} - -// HandleBlockEventsWithOrder - -func (e *EventsHandlerStub) HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { - if e.HandleBlockEventsWithOrderCalled != nil { - e.HandleBlockEventsWithOrderCalled(blockTxs) - } -} - // IsInterfaceNil - func (e *EventsHandlerStub) IsInterfaceNil() bool { return e == nil diff --git a/mocks/facadeStub.go b/mocks/facadeStub.go index 5e207514..c8c11750 100644 --- a/mocks/facadeStub.go +++ b/mocks/facadeStub.go @@ -8,8 +8,7 @@ import ( // FacadeStub implements FacadeHandler interface type FacadeStub struct { - HandlePushEventsV2Called func(events data.ArgsSaveBlockData) error - HandlePushEventsV1Called func(eventsData data.SaveBlockData) error + HandlePushEventsCalled func(events data.ArgsSaveBlockData) error HandleRevertEventsCalled func(events data.RevertBlock) HandleFinalizedEventsCalled func(events data.FinalizedBlock) ServeCalled func(w http.ResponseWriter, r *http.Request) @@ -18,19 +17,10 @@ type FacadeStub struct { GetMetricsForPrometheusCalled func() string } -// HandlePushEventsV2 - -func (fs *FacadeStub) HandlePushEventsV2(events data.ArgsSaveBlockData) error { - if fs.HandlePushEventsV2Called != nil { - return fs.HandlePushEventsV2Called(events) - } - - return nil -} - -// HandlePushEventsV1 - -func (fs *FacadeStub) HandlePushEventsV1(events data.SaveBlockData) error { - if fs.HandlePushEventsV1Called != nil { - return fs.HandlePushEventsV1Called(events) +// HandlePushEvents - +func (fs *FacadeStub) HandlePushEvents(events data.ArgsSaveBlockData) error { + if fs.HandlePushEventsCalled != nil { + return fs.HandlePushEventsCalled(events) } return nil diff --git a/mocks/hubStub.go b/mocks/hubStub.go index aa192468..0164d23f 100644 --- a/mocks/hubStub.go +++ b/mocks/hubStub.go @@ -7,65 +7,57 @@ import ( // HubStub implements Hub interface type HubStub struct { - RunCalled func() - BroadcastCalled func(events data.BlockEvents) - BroadcastRevertCalled func(event data.RevertBlock) - BroadcastFinalizedCalled func(event data.FinalizedBlock) - BroadcastTxsCalled func(event data.BlockTxs) - BroadcastScrsCalled func(event data.BlockScrs) - BroadcastBlockEventsWithOrderCalled func(event data.BlockEventsWithOrder) - RegisterEventCalled func(event dispatcher.EventDispatcher) - UnregisterEventCalled func(event dispatcher.EventDispatcher) - SubscribeCalled func(event data.SubscribeEvent) - CloseCalled func() error + PublishCalled func(events data.BlockEvents) + PublishRevertCalled func(revertBlock data.RevertBlock) + PublishFinalizedCalled func(finalizedBlock data.FinalizedBlock) + PublishTxsCalled func(blockTxs data.BlockTxs) + PublishScrsCalled func(blockScrs data.BlockScrs) + PublishBlockEventsWithOrderCalled func(blockTxs data.BlockEventsWithOrder) + RegisterEventCalled func(event dispatcher.EventDispatcher) + UnregisterEventCalled func(event dispatcher.EventDispatcher) + SubscribeCalled func(event data.SubscribeEvent) + CloseCalled func() error } -// Run - -func (h *HubStub) Run() { - if h.RunCalled != nil { - h.RunCalled() +// Publish - +func (h *HubStub) Publish(events data.BlockEvents) { + if h.PublishCalled != nil { + h.PublishCalled(events) } } -// Broadcast - -func (h *HubStub) Broadcast(events data.BlockEvents) { - if h.BroadcastCalled != nil { - h.BroadcastCalled(events) +// PublishRevert - +func (h *HubStub) PublishRevert(revertBlock data.RevertBlock) { + if h.PublishRevertCalled != nil { + h.PublishRevertCalled(revertBlock) } } -// BroadcastRevert - -func (h *HubStub) BroadcastRevert(event data.RevertBlock) { - if h.BroadcastRevertCalled != nil { - h.BroadcastRevertCalled(event) +// PublishFinalized - +func (h *HubStub) PublishFinalized(finalizedBlock data.FinalizedBlock) { + if h.PublishFinalizedCalled != nil { + h.PublishFinalizedCalled(finalizedBlock) } } -// BroadcastFinalized - -func (h *HubStub) BroadcastFinalized(event data.FinalizedBlock) { - if h.BroadcastFinalizedCalled != nil { - h.BroadcastFinalizedCalled(event) +// PublishTxs - +func (h *HubStub) PublishTxs(blockTxs data.BlockTxs) { + if h.PublishTxsCalled != nil { + h.PublishTxsCalled(blockTxs) } } -// BroadcastTxs - -func (h *HubStub) BroadcastTxs(event data.BlockTxs) { - if h.BroadcastTxsCalled != nil { - h.BroadcastTxsCalled(event) +// PublishScrs - +func (h *HubStub) PublishScrs(blockScrs data.BlockScrs) { + if h.PublishScrsCalled != nil { + h.PublishScrsCalled(blockScrs) } } -// BroadcastScrs - -func (h *HubStub) BroadcastScrs(event data.BlockScrs) { - if h.BroadcastScrsCalled != nil { - h.BroadcastScrsCalled(event) - } -} - -// BroadcastBlockEventsWithOrder - -func (h *HubStub) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) { - if h.BroadcastBlockEventsWithOrderCalled != nil { - h.BroadcastBlockEventsWithOrderCalled(event) +// PublishBlockEventsWithOrder - +func (h *HubStub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { + if h.PublishBlockEventsWithOrderCalled != nil { + h.PublishBlockEventsWithOrderCalled(blockTxs) } } diff --git a/mocks/publisherHandlerStub.go b/mocks/publisherHandlerStub.go new file mode 100644 index 00000000..d74f5ae0 --- /dev/null +++ b/mocks/publisherHandlerStub.go @@ -0,0 +1,70 @@ +package mocks + +import "github.com/multiversx/mx-chain-notifier-go/data" + +// PublisherHandlerStub - +type PublisherHandlerStub struct { + PublishCalled func(events data.BlockEvents) + PublishRevertCalled func(revertBlock data.RevertBlock) + PublishFinalizedCalled func(finalizedBlock data.FinalizedBlock) + PublishTxsCalled func(blockTxs data.BlockTxs) + PublishScrsCalled func(blockScrs data.BlockScrs) + PublishBlockEventsWithOrderCalled func(blockTxs data.BlockEventsWithOrder) + CloseCalled func() error +} + +// Publish - +func (p *PublisherHandlerStub) Publish(events data.BlockEvents) { + if p.PublishCalled != nil { + p.PublishCalled(events) + } +} + +// PublishRevert - +func (p *PublisherHandlerStub) PublishRevert(revertBlock data.RevertBlock) { + if p.PublishRevertCalled != nil { + p.PublishRevertCalled(revertBlock) + } +} + +// PublishFinalized - +func (p *PublisherHandlerStub) PublishFinalized(finalizedBlock data.FinalizedBlock) { + if p.PublishFinalizedCalled != nil { + p.PublishFinalizedCalled(finalizedBlock) + } +} + +// PublishTxs - +func (p *PublisherHandlerStub) PublishTxs(blockTxs data.BlockTxs) { + if p.PublishTxsCalled != nil { + p.PublishTxsCalled(blockTxs) + } +} + +// PublishScrs - +func (p *PublisherHandlerStub) PublishScrs(blockScrs data.BlockScrs) { + if p.PublishScrsCalled != nil { + p.PublishScrsCalled(blockScrs) + } +} + +// PublishBlockEventsWithOrder - +func (p *PublisherHandlerStub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { + if p.PublishBlockEventsWithOrderCalled != nil { + p.PublishBlockEventsWithOrderCalled(blockTxs) + } +} + +// Close - +func (p *PublisherHandlerStub) Close() error { + if p.CloseCalled != nil { + return p.CloseCalled() + } + + return nil +} + +// IsInterfaceNil - +func (p *PublisherHandlerStub) IsInterfaceNil() bool { + return p == nil +} diff --git a/mocks/publisherStub.go b/mocks/publisherStub.go index 4af20e80..fbea84ce 100644 --- a/mocks/publisherStub.go +++ b/mocks/publisherStub.go @@ -4,20 +4,23 @@ import "github.com/multiversx/mx-chain-notifier-go/data" // PublisherStub implements PublisherService interface type PublisherStub struct { - RunCalled func() + RunCalled func() error BroadcastCalled func(events data.BlockEvents) BroadcastRevertCalled func(event data.RevertBlock) BroadcastFinalizedCalled func(event data.FinalizedBlock) BroadcastTxsCalled func(event data.BlockTxs) BroadcastScrsCalled func(event data.BlockScrs) BroadcastBlockEventsWithOrderCalled func(event data.BlockEventsWithOrder) + CloseCalled func() error } // Run - -func (ps *PublisherStub) Run() { +func (ps *PublisherStub) Run() error { if ps.RunCalled != nil { - ps.RunCalled() + return ps.RunCalled() } + + return nil } // Broadcast - @@ -62,6 +65,15 @@ func (ps *PublisherStub) BroadcastBlockEventsWithOrder(event data.BlockEventsWit } } +// Close - +func (ps *PublisherStub) Close() error { + if ps.CloseCalled != nil { + return ps.CloseCalled() + } + + return nil +} + // IsInterfaceNil - func (ps *PublisherStub) IsInterfaceNil() bool { return ps == nil diff --git a/mocks/rabbitMqClientMock.go b/mocks/rabbitMqClientMock.go index 6f6389bc..65242d92 100644 --- a/mocks/rabbitMqClientMock.go +++ b/mocks/rabbitMqClientMock.go @@ -54,8 +54,8 @@ func (rc *RabbitClientMock) ReopenChannel() { // GetEntries - func (rc *RabbitClientMock) GetEntries() map[string]amqp.Publishing { - rc.mut.Lock() - defer rc.mut.Unlock() + rc.mut.RLock() + defer rc.mut.RUnlock() return rc.events } diff --git a/notifier/notifierRunner.go b/notifier/notifierRunner.go index 5ca6dab3..2e02a690 100644 --- a/notifier/notifierRunner.go +++ b/notifier/notifierRunner.go @@ -8,7 +8,6 @@ import ( logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-notifier-go/api/shared" "github.com/multiversx/mx-chain-notifier-go/config" - "github.com/multiversx/mx-chain-notifier-go/dispatcher" "github.com/multiversx/mx-chain-notifier-go/facade" "github.com/multiversx/mx-chain-notifier-go/factory" "github.com/multiversx/mx-chain-notifier-go/metrics" @@ -35,6 +34,8 @@ func NewNotifierRunner(cfgs *config.Configs) (*notifierRunner, error) { // Start will trigger the notifier service func (nr *notifierRunner) Start() error { + publisherType := nr.configs.Flags.PublisherType + externalMarshaller, err := marshalFactory.NewMarshalizer(nr.configs.MainConfig.General.ExternalMarshaller.Type) if err != nil { return err @@ -45,37 +46,36 @@ func (nr *notifierRunner) Start() error { return err } - publisher, err := factory.CreatePublisher(nr.configs.Flags.PublisherType, nr.configs.MainConfig, externalMarshaller) + commonHub, err := factory.CreateHub(publisherType) if err != nil { return err } - hub, err := factory.CreateHub(nr.configs.Flags.PublisherType) + publisher, err := factory.CreatePublisher(publisherType, nr.configs.MainConfig, externalMarshaller, commonHub) if err != nil { return err } - wsHandler, err := factory.CreateWSHandler(nr.configs.Flags.PublisherType, hub, externalMarshaller) + wsHandler, err := factory.CreateWSHandler(publisherType, commonHub, externalMarshaller) if err != nil { return err } statusMetricsHandler := metrics.NewStatusMetrics() - argsEventsHandler := factory.ArgsEventsHandlerFactory{ - CheckDuplicates: nr.configs.MainConfig.General.CheckDuplicates, - Locker: lockService, - MqPublisher: publisher, - HubPublisher: hub, - APIType: nr.configs.Flags.PublisherType, - StatusMetricsHandler: statusMetricsHandler, - } - eventsHandler, err := factory.CreateEventsHandler(argsEventsHandler) + eventsInterceptor, err := factory.CreateEventsInterceptor(nr.configs.MainConfig.General) if err != nil { return err } - eventsInterceptor, err := factory.CreateEventsInterceptor(nr.configs.MainConfig.General) + argsEventsHandler := process.ArgsEventsHandler{ + CheckDuplicates: nr.configs.MainConfig.General.CheckDuplicates, + Locker: lockService, + Publisher: publisher, + StatusMetricsHandler: statusMetricsHandler, + EventsInterceptor: eventsInterceptor, + } + eventsHandler, err := process.NewEventsHandler(argsEventsHandler) if err != nil { return err } @@ -84,7 +84,6 @@ func (nr *notifierRunner) Start() error { EventsHandler: eventsHandler, APIConfig: nr.configs.MainConfig.ConnectorApi, WSHandler: wsHandler, - EventsInterceptor: eventsInterceptor, StatusMetricsHandler: statusMetricsHandler, } facade, err := facade.NewNotifierFacade(facadeArgs) @@ -102,14 +101,17 @@ func (nr *notifierRunner) Start() error { return err } - startHandlers(hub, publisher) + err = publisher.Run() + if err != nil { + return err + } err = webServer.Run() if err != nil { return err } - err = waitForGracefulShutdown(webServer, publisher, hub, wsConnector) + err = waitForGracefulShutdown(webServer, publisher, wsConnector) if err != nil { return err } @@ -118,15 +120,9 @@ func (nr *notifierRunner) Start() error { return nil } -func startHandlers(hub dispatcher.Hub, publisher rabbitmq.PublisherService) { - hub.Run() - publisher.Run() -} - func waitForGracefulShutdown( server shared.WebServerHandler, publisher rabbitmq.PublisherService, - hub dispatcher.Hub, wsConnector process.WSClient, ) error { quit := make(chan os.Signal, 1) @@ -148,10 +144,5 @@ func waitForGracefulShutdown( return err } - err = hub.Close() - if err != nil { - return err - } - return nil } diff --git a/process/errors.go b/process/errors.go index 8fcc796c..51ed1751 100644 --- a/process/errors.go +++ b/process/errors.go @@ -25,3 +25,9 @@ var ErrNilBlockBody = errors.New("nil block body provided") // ErrNilBlockHeader signals that a nil block header has been provided var ErrNilBlockHeader = errors.New("nil block header provided") + +// ErrNilPublisherHandler signals that a nil publisher handler has been provided +var ErrNilPublisherHandler = errors.New("nil publisher handler provided") + +// ErrNilEventsInterceptor signals that a nil events interceptor was provided +var ErrNilEventsInterceptor = errors.New("nil events interceptor") diff --git a/process/eventsHandler.go b/process/eventsHandler.go index 5f84cbf7..a5a131f8 100644 --- a/process/eventsHandler.go +++ b/process/eventsHandler.go @@ -2,6 +2,7 @@ package process import ( "context" + "encoding/hex" "fmt" "time" @@ -19,9 +20,6 @@ const ( minRetries = 1 revertKeyPrefix = "revert_" finalizedKeyPrefix = "finalized_" - txsKeyPrefix = "txs_" - txsWithOrderKeyPrefix = "txsWithOrder_" - scrsKeyPrefix = "scrs_" rabbitmqMetricPrefix = "RabbitMQ" redisMetricPrefix = "Redis" @@ -32,14 +30,16 @@ type ArgsEventsHandler struct { Locker LockService Publisher Publisher StatusMetricsHandler common.StatusMetricsHandler + EventsInterceptor EventsInterceptor CheckDuplicates bool } type eventsHandler struct { - locker LockService - publisher Publisher - metricsHandler common.StatusMetricsHandler - checkDuplicates bool + locker LockService + publisher Publisher + metricsHandler common.StatusMetricsHandler + eventsInterceptor EventsInterceptor + checkDuplicates bool } // NewEventsHandler creates a new events handler component @@ -50,10 +50,11 @@ func NewEventsHandler(args ArgsEventsHandler) (*eventsHandler, error) { } return &eventsHandler{ - locker: args.Locker, - publisher: args.Publisher, - metricsHandler: args.StatusMetricsHandler, - checkDuplicates: args.CheckDuplicates, + locker: args.Locker, + publisher: args.Publisher, + metricsHandler: args.StatusMetricsHandler, + eventsInterceptor: args.EventsInterceptor, + checkDuplicates: args.CheckDuplicates, }, nil } @@ -67,42 +68,79 @@ func checkArgs(args ArgsEventsHandler) error { if check.IfNil(args.StatusMetricsHandler) { return common.ErrNilStatusMetricsHandler } + if check.IfNil(args.EventsInterceptor) { + return ErrNilEventsInterceptor + } return nil } -// HandlePushEvents will handle push events received from observer -func (eh *eventsHandler) HandlePushEvents(events data.BlockEvents) error { - if events.Hash == "" { - log.Debug("received empty hash", "event", common.PushLogsAndEvents, - "will process", false, - ) - return common.ErrReceivedEmptyEvents +// HandleSaveBlockEvents will handle save block events received from observer +func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) error { + blockHash := hex.EncodeToString(allEvents.HeaderHash) + shouldProcessPushEvents := eh.shouldProcessSaveBlockEvents(blockHash) + if !shouldProcessPushEvents { + return nil } - shouldProcessEvents := true - if eh.checkDuplicates { - shouldProcessEvents = eh.tryCheckProcessedWithRetry(common.PushLogsAndEvents, events.Hash) + eventsData, err := eh.eventsInterceptor.ProcessBlockEvents(&allEvents) + if err != nil { + return err } - if !shouldProcessEvents { - log.Info("received duplicated events", "event", common.PushLogsAndEvents, - "block hash", events.Hash, + pushEvents := data.BlockEvents{ + Hash: eventsData.Hash, + ShardID: eventsData.Header.GetShardID(), + TimeStamp: eventsData.Header.GetTimeStamp(), + Events: eventsData.LogEvents, + } + err = eh.handlePushEvents(pushEvents) + if err != nil { + return err + } + + txs := data.BlockTxs{ + Hash: eventsData.Hash, + Txs: eventsData.Txs, + } + eh.handleBlockTxs(txs) + + scrs := data.BlockScrs{ + Hash: eventsData.Hash, + Scrs: eventsData.Scrs, + } + eh.handleBlockScrs(scrs) + + txsWithOrder := data.BlockEventsWithOrder{ + Hash: eventsData.Hash, + ShardID: eventsData.Header.GetShardID(), + TimeStamp: eventsData.Header.GetTimeStamp(), + Txs: eventsData.TxsWithOrder, + Scrs: eventsData.ScrsWithOrder, + Events: eventsData.LogEvents, + } + eh.handleBlockEventsWithOrder(txsWithOrder) + + return nil +} + +// HandlePushEvents will handle push events received from observer +func (eh *eventsHandler) handlePushEvents(events data.BlockEvents) error { + if events.Hash == "" { + log.Debug("received empty hash", "event", common.PushLogsAndEvents, "will process", false, ) - return nil + return common.ErrReceivedEmptyEvents } if len(events.Events) == 0 { log.Warn("received empty events", "event", common.PushLogsAndEvents, "block hash", events.Hash, - "will process", shouldProcessEvents, ) events.Events = make([]data.Event, 0) } else { log.Info("received", "event", common.PushLogsAndEvents, "block hash", events.Hash, - "will process", shouldProcessEvents, ) } @@ -112,6 +150,24 @@ func (eh *eventsHandler) HandlePushEvents(events data.BlockEvents) error { return nil } +func (eh *eventsHandler) shouldProcessSaveBlockEvents(blockHash string) bool { + shouldProcessEvents := true + if eh.checkDuplicates { + shouldProcessEvents = eh.tryCheckProcessedWithRetry(common.PushLogsAndEvents, blockHash) + } + + if !shouldProcessEvents { + log.Info("received duplicated push events", + "block hash", blockHash, + "will process", false, + ) + + return false + } + + return true +} + // HandleRevertEvents will handle revents events received from observer func (eh *eventsHandler) HandleRevertEvents(revertBlock data.RevertBlock) { if revertBlock.Hash == "" { @@ -175,36 +231,22 @@ func (eh *eventsHandler) HandleFinalizedEvents(finalizedBlock data.FinalizedBloc eh.metricsHandler.AddRequest(getRabbitOpID(common.FinalizedBlockEvents), time.Since(t)) } -// HandleBlockTxs will handle txs events received from observer -func (eh *eventsHandler) HandleBlockTxs(blockTxs data.BlockTxs) { +// handleBlockTxs will handle txs events received from observer +func (eh *eventsHandler) handleBlockTxs(blockTxs data.BlockTxs) { if blockTxs.Hash == "" { log.Warn("received empty hash", "event", common.BlockTxs, "will process", false, ) return } - shouldProcessTxs := true - if eh.checkDuplicates { - shouldProcessTxs = eh.tryCheckProcessedWithRetry(common.BlockTxs, blockTxs.Hash) - } - - if !shouldProcessTxs { - log.Info("received duplicated events", "event", common.BlockTxs, - "block hash", blockTxs.Hash, - "will process", false, - ) - return - } if len(blockTxs.Txs) == 0 { log.Warn("received empty events", "event", common.BlockTxs, "block hash", blockTxs.Hash, - "will process", shouldProcessTxs, ) } else { log.Info("received", "event", common.BlockTxs, "block hash", blockTxs.Hash, - "will process", shouldProcessTxs, ) } @@ -213,36 +255,22 @@ func (eh *eventsHandler) HandleBlockTxs(blockTxs data.BlockTxs) { eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockTxs), time.Since(t)) } -// HandleBlockScrs will handle scrs events received from observer -func (eh *eventsHandler) HandleBlockScrs(blockScrs data.BlockScrs) { +// handleBlockScrs will handle scrs events received from observer +func (eh *eventsHandler) handleBlockScrs(blockScrs data.BlockScrs) { if blockScrs.Hash == "" { log.Warn("received empty hash", "event", common.BlockScrs, "will process", false, ) return } - shouldProcessScrs := true - if eh.checkDuplicates { - shouldProcessScrs = eh.tryCheckProcessedWithRetry(common.BlockScrs, blockScrs.Hash) - } - - if !shouldProcessScrs { - log.Info("received duplicated events", "event", common.BlockScrs, - "block hash", blockScrs.Hash, - "will process", false, - ) - return - } if len(blockScrs.Scrs) == 0 { log.Warn("received empty events", "event", common.BlockScrs, "block hash", blockScrs.Hash, - "will process", shouldProcessScrs, ) } else { log.Info("received", "event", common.BlockScrs, "block hash", blockScrs.Hash, - "will process", shouldProcessScrs, ) } @@ -251,30 +279,17 @@ func (eh *eventsHandler) HandleBlockScrs(blockScrs data.BlockScrs) { eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockScrs), time.Since(t)) } -// HandleBlockEventsWithOrder will handle full block events received from observer -func (eh *eventsHandler) HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { +// handleBlockEventsWithOrder will handle full block events received from observer +func (eh *eventsHandler) handleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { if blockTxs.Hash == "" { log.Warn("received empty hash", "event", common.BlockEvents, "will process", false, ) return } - shouldProcessTxs := true - if eh.checkDuplicates { - shouldProcessTxs = eh.tryCheckProcessedWithRetry(common.BlockEvents, blockTxs.Hash) - } - - if !shouldProcessTxs { - log.Info("received duplicated events", "event", common.BlockEvents, - "block hash", blockTxs.Hash, - "will process", false, - ) - return - } log.Info("received", "event", common.BlockEvents, "block hash", blockTxs.Hash, - "will process", shouldProcessTxs, ) t := time.Now() @@ -322,12 +337,6 @@ func getPrefixLockerKey(id string) string { return revertKeyPrefix case common.FinalizedBlockEvents: return finalizedKeyPrefix - case common.BlockTxs: - return txsKeyPrefix - case common.BlockScrs: - return scrsKeyPrefix - case common.BlockEvents: - return txsWithOrderKeyPrefix } return "" diff --git a/process/eventsHandler_test.go b/process/eventsHandler_test.go index a6173622..6e46d955 100644 --- a/process/eventsHandler_test.go +++ b/process/eventsHandler_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/transaction" @@ -13,15 +14,23 @@ import ( "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" "github.com/multiversx/mx-chain-notifier-go/process" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func createMockEventsHandlerArgs() process.ArgsEventsHandler { return process.ArgsEventsHandler{ - CheckDuplicates: false, - Locker: &mocks.LockerStub{}, + Locker: &mocks.LockerStub{ + HasConnectionCalled: func(ctx context.Context) bool { + return true + }, + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil + }, + }, Publisher: &mocks.PublisherStub{}, StatusMetricsHandler: &mocks.StatusMetricsStub{}, + EventsInterceptor: &mocks.EventsInterceptorStub{}, } } @@ -61,6 +70,17 @@ func TestNewEventsHandler(t *testing.T) { require.Nil(t, eventsHandler) }) + t.Run("nil events interceptor", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.EventsInterceptor = nil + + eventsHandler, err := process.NewEventsHandler(args) + require.Equal(t, process.ErrNilEventsInterceptor, err) + require.Nil(t, eventsHandler) + }) + t.Run("should work", func(t *testing.T) { t.Parallel() @@ -71,39 +91,279 @@ func TestNewEventsHandler(t *testing.T) { }) } -func TestHandlePushEvents(t *testing.T) { +func TestHandleSaveBlockEvents(t *testing.T) { t.Parallel() - t.Run("broadcast event was called", func(t *testing.T) { + t.Run("duplicated events, should return early", func(t *testing.T) { t.Parallel() - events := data.BlockEvents{ - Hash: "hash1", - ShardID: 1, - TimeStamp: 1234, - Events: []data.Event{}, + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return false, nil + }, } - wasCalled := false + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + require.Fail(t, "should have not been called") + return nil, nil + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + err = eventsHandler.HandleSaveBlockEvents(data.ArgsSaveBlockData{}) + require.Nil(t, err) + }) + + t.Run("failed to pre-process events, should fail", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil + }, + } + + expectedErr := errors.New("expected err") + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + return nil, expectedErr + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + err = eventsHandler.HandleSaveBlockEvents(data.ArgsSaveBlockData{}) + require.Equal(t, expectedErr, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + blockHash := "blockHash1" + txs := map[string]*outport.TxInfo{ + "hash1": { + Transaction: &transaction.Transaction{ + Nonce: 1, + }, + ExecutionOrder: 1, + }, + } + scrs := map[string]*outport.SCRInfo{ + "hash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 2, + }, + }, + } + logData := []*outport.LogData{ + { + Log: &transaction.Log{ + Address: []byte("logaddr1"), + Events: []*transaction.Event{}, + }, + TxHash: "logHash1", + }, + } + + logEvents := []data.Event{ + { + Address: "addr1", + }, + } + + header := &block.HeaderV2{ + Header: &block.Header{ + ShardID: 2, + }, + } + blockData := data.ArgsSaveBlockData{ + HeaderHash: []byte(blockHash), + TransactionsPool: &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + Logs: logData, + }, + Header: &block.HeaderV2{}, + } + + expTxs := map[string]*transaction.Transaction{ + "hash1": { + Nonce: 1, + }, + } + expScrs := map[string]*smartContractResult.SmartContractResult{ + "hash2": { + Nonce: 2, + }, + } + + expTxsData := data.BlockTxs{ + Hash: blockHash, + Txs: expTxs, + } + expScrsData := data.BlockScrs{ + Hash: blockHash, + Scrs: expScrs, + } + expLogEvents := data.BlockEvents{ + Hash: blockHash, + Events: logEvents, + ShardID: 2, + } + + expTxsWithOrder := map[string]*outport.TxInfo{ + "hash1": { + Transaction: &transaction.Transaction{ + Nonce: 1, + }, + ExecutionOrder: 1, + }, + } + expScrsWithOrder := map[string]*outport.SCRInfo{ + "hash2": { + SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 2, + }, + }, + } + expTxsWithOrderData := data.BlockEventsWithOrder{ + Hash: blockHash, + ShardID: 2, + Txs: expTxsWithOrder, + Scrs: expScrsWithOrder, + Events: logEvents, + } + + pushWasCalled := false + txsWasCalled := false + scrsWasCalled := false + blockEventsWithOrderWasCalled := false + args := createMockEventsHandlerArgs() + + args.EventsInterceptor = &mocks.EventsInterceptorStub{ + ProcessBlockEventsCalled: func(eventsData *data.ArgsSaveBlockData) (*data.InterceptorBlockData, error) { + return &data.InterceptorBlockData{ + Hash: blockHash, + Header: header, + Txs: expTxs, + Scrs: expScrs, + LogEvents: logEvents, + TxsWithOrder: expTxsWithOrder, + ScrsWithOrder: expScrsWithOrder, + }, nil + }, + } + args.Publisher = &mocks.PublisherStub{ - BroadcastCalled: func(evs data.BlockEvents) { - require.Equal(t, events, evs) - wasCalled = true + BroadcastCalled: func(events data.BlockEvents) { + pushWasCalled = true + assert.Equal(t, expLogEvents, events) + }, + BroadcastTxsCalled: func(event data.BlockTxs) { + txsWasCalled = true + assert.Equal(t, expTxsData, event) + }, + BroadcastScrsCalled: func(event data.BlockScrs) { + scrsWasCalled = true + assert.Equal(t, expScrsData, event) + }, + BroadcastBlockEventsWithOrderCalled: func(event data.BlockEventsWithOrder) { + blockEventsWithOrderWasCalled = true + assert.Equal(t, expTxsWithOrderData, event) }, } eventsHandler, err := process.NewEventsHandler(args) require.Nil(t, err) - eventsHandler.HandlePushEvents(events) - require.True(t, wasCalled) + err = eventsHandler.HandleSaveBlockEvents(blockData) + require.Nil(t, err) + + assert.True(t, pushWasCalled) + assert.True(t, txsWasCalled) + assert.True(t, scrsWasCalled) + assert.True(t, blockEventsWithOrderWasCalled) }) +} - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { +func TestShouldProcessSaveBlockEvents(t *testing.T) { + t.Parallel() + + t.Run("should process", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return true, nil + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + shouldProcess := eventsHandler.ShouldProcessSaveBlockEvents("blockHash1") + require.True(t, shouldProcess) + }) + + t.Run("duplicated events, should not process", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + args.CheckDuplicates = true + + args.Locker = &mocks.LockerStub{ + IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { + return false, nil + }, + } + + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + shouldProcess := eventsHandler.ShouldProcessSaveBlockEvents("blockHash1") + require.False(t, shouldProcess) + }) +} + +func TestHandlePushEvents(t *testing.T) { + t.Parallel() + + t.Run("empty hash should return error", func(t *testing.T) { + t.Parallel() + + args := createMockEventsHandlerArgs() + eventsHandler, err := process.NewEventsHandler(args) + require.Nil(t, err) + + events := data.BlockEvents{ + Hash: "", + ShardID: 1, + TimeStamp: 1234, + Events: []data.Event{}, + } + + err = eventsHandler.HandlePushEvents(events) + require.Equal(t, common.ErrReceivedEmptyEvents, err) + }) + + t.Run("broadcast event was called", func(t *testing.T) { t.Parallel() - blockEvents := data.BlockEvents{ + events := data.BlockEvents{ Hash: "hash1", ShardID: 1, TimeStamp: 1234, @@ -112,24 +372,19 @@ func TestHandlePushEvents(t *testing.T) { wasCalled := false args := createMockEventsHandlerArgs() - args.CheckDuplicates = true args.Publisher = &mocks.PublisherStub{ - BroadcastCalled: func(events data.BlockEvents) { - require.Equal(t, blockEvents, events) + BroadcastCalled: func(evs data.BlockEvents) { + require.Equal(t, events, evs) wasCalled = true }, } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } eventsHandler, err := process.NewEventsHandler(args) require.Nil(t, err) - eventsHandler.HandlePushEvents(blockEvents) - require.False(t, wasCalled) + err = eventsHandler.HandlePushEvents(events) + require.Nil(t, err) + require.True(t, wasCalled) }) } @@ -275,40 +530,6 @@ func TestHandleTxsEvents(t *testing.T) { eventsHandler.HandleBlockTxs(blockTxs) require.True(t, wasCalled) }) - - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { - t.Parallel() - - blockTxs := data.BlockTxs{ - Hash: "hash1", - Txs: map[string]*transaction.Transaction{ - "hash1": { - Nonce: 1, - }, - }, - } - - wasCalled := false - args := createMockEventsHandlerArgs() - args.CheckDuplicates = true - args.Publisher = &mocks.PublisherStub{ - BroadcastTxsCalled: func(event data.BlockTxs) { - require.Equal(t, blockTxs, event) - wasCalled = true - }, - } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } - - eventsHandler, err := process.NewEventsHandler(args) - require.Nil(t, err) - - eventsHandler.HandleBlockTxs(blockTxs) - require.False(t, wasCalled) - }) } func TestHandleScrsEvents(t *testing.T) { @@ -341,39 +562,6 @@ func TestHandleScrsEvents(t *testing.T) { eventsHandler.HandleBlockScrs(blockScrs) require.True(t, wasCalled) }) - - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { - t.Parallel() - - wasCalled := false - args := createMockEventsHandlerArgs() - args.CheckDuplicates = true - args.Publisher = &mocks.PublisherStub{ - BroadcastScrsCalled: func(event data.BlockScrs) { - wasCalled = true - }, - } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } - - eventsHandler, err := process.NewEventsHandler(args) - require.Nil(t, err) - - events := data.BlockScrs{ - Hash: "hash1", - Scrs: map[string]*smartContractResult.SmartContractResult{ - "hash2": { - Nonce: 2, - }, - }, - } - - eventsHandler.HandleBlockScrs(events) - require.False(t, wasCalled) - }) } func TestHandleBlockEventsWithOrderEvents(t *testing.T) { @@ -409,31 +597,6 @@ func TestHandleBlockEventsWithOrderEvents(t *testing.T) { eventsHandler.HandleBlockEventsWithOrder(events) require.True(t, wasCalled) }) - - t.Run("check duplicates enabled, should not process event", func(t *testing.T) { - t.Parallel() - - wasCalled := false - args := createMockEventsHandlerArgs() - args.CheckDuplicates = true - args.Publisher = &mocks.PublisherStub{ - BroadcastBlockEventsWithOrderCalled: func(event data.BlockEventsWithOrder) { - require.Equal(t, events, events) - wasCalled = true - }, - } - args.Locker = &mocks.LockerStub{ - IsEventProcessedCalled: func(ctx context.Context, blockHash string) (bool, error) { - return false, nil - }, - } - - eventsHandler, err := process.NewEventsHandler(args) - require.Nil(t, err) - - eventsHandler.HandleBlockEventsWithOrder(events) - require.False(t, wasCalled) - }) } func TestTryCheckProcessedWithRetry(t *testing.T) { diff --git a/process/export_test.go b/process/export_test.go index 6ff81d51..8e107de4 100644 --- a/process/export_test.go +++ b/process/export_test.go @@ -10,6 +10,31 @@ func (eh *eventsHandler) TryCheckProcessedWithRetry(prefix, blockHash string) bo return eh.tryCheckProcessedWithRetry(prefix, blockHash) } +// HandlePushEvents - +func (eh *eventsHandler) HandlePushEvents(events data.BlockEvents) error { + return eh.handlePushEvents(events) +} + +// HandleBlockTxs - +func (eh *eventsHandler) HandleBlockTxs(blockTxs data.BlockTxs) { + eh.handleBlockTxs(blockTxs) +} + +// HandleBlockScrs - +func (eh *eventsHandler) HandleBlockScrs(blockScrs data.BlockScrs) { + eh.handleBlockScrs(blockScrs) +} + +// HandleBlockEventsWithOrder - +func (eh *eventsHandler) HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { + eh.handleBlockEventsWithOrder(blockTxs) +} + +// ShouldProcessSaveBlockEvents - +func (eh *eventsHandler) ShouldProcessSaveBlockEvents(blockHash string) bool { + return eh.shouldProcessSaveBlockEvents(blockHash) +} + // GetLogEventsFromTransactionsPool exports internal method for testing func (ei *eventsInterceptor) GetLogEventsFromTransactionsPool(logs []*outport.LogData) []data.Event { return ei.getLogEventsFromTransactionsPool(logs) diff --git a/process/interface.go b/process/interface.go index 5eea6740..34c93fef 100644 --- a/process/interface.go +++ b/process/interface.go @@ -17,23 +17,22 @@ type LockService interface { // Publisher defines the behaviour of a publisher component which should be // able to publish received events and broadcast them to channels type Publisher interface { + Run() error Broadcast(events data.BlockEvents) BroadcastRevert(event data.RevertBlock) BroadcastFinalized(event data.FinalizedBlock) BroadcastTxs(event data.BlockTxs) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) BroadcastScrs(event data.BlockScrs) + Close() error IsInterfaceNil() bool } // EventsHandler defines the behaviour of an events handler component type EventsHandler interface { - HandlePushEvents(events data.BlockEvents) error + HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) - HandleBlockTxs(blockTxs data.BlockTxs) - HandleBlockScrs(blockScrs data.BlockScrs) - HandleBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) IsInterfaceNil() bool } @@ -58,8 +57,20 @@ type DataProcessor interface { // EventsFacadeHandler defines the behavior of a facade handler needed for events group type EventsFacadeHandler interface { - HandlePushEventsV2(events data.ArgsSaveBlockData) error + HandlePushEvents(events data.ArgsSaveBlockData) error HandleRevertEvents(revertBlock data.RevertBlock) HandleFinalizedEvents(finalizedBlock data.FinalizedBlock) IsInterfaceNil() bool } + +// PublisherHandler defines the behavior of a publisher component +type PublisherHandler interface { + Publish(events data.BlockEvents) + PublishRevert(revertBlock data.RevertBlock) + PublishFinalized(finalizedBlock data.FinalizedBlock) + PublishTxs(blockTxs data.BlockTxs) + PublishScrs(blockScrs data.BlockScrs) + PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) + Close() error + IsInterfaceNil() bool +} diff --git a/process/preprocess/eventsPreProcessorV0.go b/process/preprocess/eventsPreProcessorV0.go index 446d8061..8d2398b1 100644 --- a/process/preprocess/eventsPreProcessorV0.go +++ b/process/preprocess/eventsPreProcessorV0.go @@ -59,7 +59,7 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { Header: header, } - err = d.facade.HandlePushEventsV2(*saveBlockData) + err = d.facade.HandlePushEvents(*saveBlockData) if err != nil { return err } diff --git a/process/preprocess/eventsPreProcessorV0_test.go b/process/preprocess/eventsPreProcessorV0_test.go index 8f1f4648..84ff20c5 100644 --- a/process/preprocess/eventsPreProcessorV0_test.go +++ b/process/preprocess/eventsPreProcessorV0_test.go @@ -6,6 +6,7 @@ import ( "testing" coreData "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/data" notifierData "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" @@ -17,10 +18,14 @@ import ( func TestPreProcessorV0_SaveBlock(t *testing.T) { t.Parallel() + marshaller := &marshal.JsonMarshalizer{} + blockData, err := testdata.NewBlockData(marshaller) + require.Nil(t, err) + t.Run("nil block data", func(t *testing.T) { t.Parallel() - outportBlock := testdata.OutportBlockV0() + outportBlock := blockData.OutportBlockV0() outportBlock.HeaderType = "invalid" marshalledBlock, _ := json.Marshal(outportBlock) @@ -38,12 +43,12 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { expectedErr := errors.New("exp error") args.Facade = &mocks.FacadeStub{ - HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + HandlePushEventsCalled: func(events data.ArgsSaveBlockData) error { return expectedErr }, } - outportBlock := testdata.OutportBlockV0() + outportBlock := blockData.OutportBlockV0() dp, err := preprocess.NewEventsPreProcessorV0(args) require.Nil(t, err) @@ -60,7 +65,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { wasCalled := false args.Facade = &mocks.FacadeStub{ - HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + HandlePushEventsCalled: func(events data.ArgsSaveBlockData) error { wasCalled = true return nil }, @@ -69,7 +74,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { dp, err := preprocess.NewEventsPreProcessorV0(args) require.Nil(t, err) - outportBlock := testdata.OutportBlockV0() + outportBlock := blockData.OutportBlockV0() marshalledBlock, err := json.Marshal(outportBlock) require.Nil(t, err) diff --git a/process/preprocess/eventsPreProcessorV1.go b/process/preprocess/eventsPreProcessorV1.go index e6a25fe0..f4392b0d 100644 --- a/process/preprocess/eventsPreProcessorV1.go +++ b/process/preprocess/eventsPreProcessorV1.go @@ -66,8 +66,7 @@ func (d *eventsPreProcessorV1) SaveBlock(marshalledData []byte) error { Header: header, } - // TODO: refactor to remove facade versioning - err = d.facade.HandlePushEventsV2(*saveBlockData) + err = d.facade.HandlePushEvents(*saveBlockData) if err != nil { return err } diff --git a/process/preprocess/eventsPreProcessorV1_test.go b/process/preprocess/eventsPreProcessorV1_test.go index fb7a6a96..fa30d379 100644 --- a/process/preprocess/eventsPreProcessorV1_test.go +++ b/process/preprocess/eventsPreProcessorV1_test.go @@ -82,7 +82,7 @@ func TestPreProcessorV1_SaveBlock(t *testing.T) { expectedErr := errors.New("exp error") args.Facade = &mocks.FacadeStub{ - HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + HandlePushEventsCalled: func(events data.ArgsSaveBlockData) error { return expectedErr }, } diff --git a/process/publisher.go b/process/publisher.go new file mode 100644 index 00000000..4d6713ec --- /dev/null +++ b/process/publisher.go @@ -0,0 +1,151 @@ +package process + +import ( + "context" + "sync" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-notifier-go/common" + "github.com/multiversx/mx-chain-notifier-go/data" +) + +type publisher struct { + handler PublisherHandler + + broadcast chan data.BlockEvents + broadcastRevert chan data.RevertBlock + broadcastFinalized chan data.FinalizedBlock + broadcastTxs chan data.BlockTxs + broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder + broadcastScrs chan data.BlockScrs + + cancelFunc func() + closeChan chan struct{} + mutState sync.RWMutex +} + +// NewPublisher will create a new publisher component +func NewPublisher(handler PublisherHandler) (*publisher, error) { + if check.IfNil(handler) { + return nil, ErrNilPublisherHandler + } + + p := &publisher{ + handler: handler, + broadcast: make(chan data.BlockEvents), + broadcastRevert: make(chan data.RevertBlock), + broadcastFinalized: make(chan data.FinalizedBlock), + broadcastTxs: make(chan data.BlockTxs), + broadcastScrs: make(chan data.BlockScrs), + broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder), + closeChan: make(chan struct{}), + } + + return p, nil +} + +// Run creates a goroutine and listens for events on the exposed channels +func (p *publisher) Run() error { + p.mutState.Lock() + defer p.mutState.Unlock() + + if p.cancelFunc != nil { + return common.ErrLoopAlreadyStarted + } + + var ctx context.Context + ctx, p.cancelFunc = context.WithCancel(context.Background()) + + go p.run(ctx) + + return nil +} + +func (p *publisher) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + p.handler.Close() + return + case events := <-p.broadcast: + p.handler.Publish(events) + case revertBlock := <-p.broadcastRevert: + p.handler.PublishRevert(revertBlock) + case finalizedBlock := <-p.broadcastFinalized: + p.handler.PublishFinalized(finalizedBlock) + case blockTxs := <-p.broadcastTxs: + p.handler.PublishTxs(blockTxs) + case blockScrs := <-p.broadcastScrs: + p.handler.PublishScrs(blockScrs) + case blockEvents := <-p.broadcastBlockEventsWithOrder: + p.handler.PublishBlockEventsWithOrder(blockEvents) + } + } +} + +// Broadcast will handle the block events pushed by producers +func (p *publisher) Broadcast(events data.BlockEvents) { + select { + case p.broadcast <- events: + case <-p.closeChan: + } +} + +// BroadcastRevert will handle the revert event pushed by producers +func (p *publisher) BroadcastRevert(events data.RevertBlock) { + select { + case p.broadcastRevert <- events: + case <-p.closeChan: + } +} + +// BroadcastFinalized will handle the finalized event pushed by producers +func (p *publisher) BroadcastFinalized(events data.FinalizedBlock) { + select { + case p.broadcastFinalized <- events: + case <-p.closeChan: + } +} + +// BroadcastTxs will handle the txs event pushed by producers +func (p *publisher) BroadcastTxs(events data.BlockTxs) { + select { + case p.broadcastTxs <- events: + case <-p.closeChan: + } +} + +// BroadcastScrs will handle the scrs event pushed by producers +func (p *publisher) BroadcastScrs(events data.BlockScrs) { + select { + case p.broadcastScrs <- events: + case <-p.closeChan: + } +} + +// BroadcastBlockEventsWithOrder will handle the full block events pushed by producers +func (p *publisher) BroadcastBlockEventsWithOrder(events data.BlockEventsWithOrder) { + select { + case p.broadcastBlockEventsWithOrder <- events: + case <-p.closeChan: + } +} + +// Close will close the channels +func (p *publisher) Close() error { + p.mutState.RLock() + defer p.mutState.RUnlock() + + if p.cancelFunc != nil { + p.cancelFunc() + } + + close(p.closeChan) + + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (p *publisher) IsInterfaceNil() bool { + return p == nil +} diff --git a/process/publisher_test.go b/process/publisher_test.go new file mode 100644 index 00000000..5fb9cbd4 --- /dev/null +++ b/process/publisher_test.go @@ -0,0 +1,245 @@ +package process_test + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/multiversx/mx-chain-notifier-go/common" + "github.com/multiversx/mx-chain-notifier-go/data" + "github.com/multiversx/mx-chain-notifier-go/mocks" + "github.com/multiversx/mx-chain-notifier-go/process" + "github.com/stretchr/testify/require" +) + +func TestNewPublisher(t *testing.T) { + t.Parallel() + + t.Run("nil handler", func(t *testing.T) { + t.Parallel() + + p, err := process.NewPublisher(nil) + require.Nil(t, p) + require.Equal(t, process.ErrNilPublisherHandler, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + p, err := process.NewPublisher(&mocks.PublisherHandlerStub{}) + require.Nil(t, err) + require.False(t, p.IsInterfaceNil()) + }) +} + +func TestRun(t *testing.T) { + t.Parallel() + + t.Run("should fail if triggered multiple times", func(t *testing.T) { + t.Parallel() + + p, err := process.NewPublisher(&mocks.PublisherHandlerStub{}) + require.Nil(t, err) + + err = p.Run() + require.Nil(t, err) + + defer p.Close() + + err = p.Run() + require.Equal(t, common.ErrLoopAlreadyStarted, err) + }) +} + +func TestBroadcast(t *testing.T) { + t.Parallel() + + wg := sync.WaitGroup{} + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishCalled: func(events data.BlockEvents) { + atomic.AddUint32(&numCalls, 1) + wg.Done() + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + defer p.Close() + wg.Add(1) + + p.Broadcast(data.BlockEvents{}) + + wg.Wait() + + require.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) +} + +func TestBroadcastRevert(t *testing.T) { + t.Parallel() + + wg := sync.WaitGroup{} + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishRevertCalled: func(revertBlock data.RevertBlock) { + atomic.AddUint32(&numCalls, 1) + wg.Done() + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + defer p.Close() + wg.Add(1) + + p.BroadcastRevert(data.RevertBlock{}) + + wg.Wait() + + require.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) +} + +func TestBroadcastFinalized(t *testing.T) { + t.Parallel() + + wg := sync.WaitGroup{} + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishFinalizedCalled: func(finalizedBlock data.FinalizedBlock) { + atomic.AddUint32(&numCalls, 1) + wg.Done() + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + defer p.Close() + wg.Add(1) + + p.BroadcastFinalized(data.FinalizedBlock{}) + + wg.Wait() + + require.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) +} + +func TestBroadcastTxs(t *testing.T) { + t.Parallel() + + wg := sync.WaitGroup{} + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishTxsCalled: func(blockTxs data.BlockTxs) { + atomic.AddUint32(&numCalls, 1) + wg.Done() + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + defer p.Close() + wg.Add(1) + + p.BroadcastTxs(data.BlockTxs{}) + + wg.Wait() + + require.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) +} + +func TestBroadcastScrs(t *testing.T) { + t.Parallel() + + wg := sync.WaitGroup{} + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishScrsCalled: func(blockScrs data.BlockScrs) { + atomic.AddUint32(&numCalls, 1) + wg.Done() + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + defer p.Close() + wg.Add(1) + + p.BroadcastScrs(data.BlockScrs{}) + + wg.Wait() + + require.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) +} + +func TestBroadcastBlockEventsWithOrder(t *testing.T) { + t.Parallel() + + wg := sync.WaitGroup{} + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishBlockEventsWithOrderCalled: func(blockTxs data.BlockEventsWithOrder) { + atomic.AddUint32(&numCalls, 1) + wg.Done() + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + defer p.Close() + wg.Add(1) + + p.BroadcastBlockEventsWithOrder(data.BlockEventsWithOrder{}) + + wg.Wait() + + require.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) +} + +func TestClose(t *testing.T) { + t.Parallel() + + t.Run("publish should not be called after processing loop is closed", func(t *testing.T) { + t.Parallel() + + numCalls := uint32(0) + + ph := &mocks.PublisherHandlerStub{ + PublishCalled: func(events data.BlockEvents) { + atomic.AddUint32(&numCalls, 1) + }, + } + + p, err := process.NewPublisher(ph) + require.Nil(t, err) + + _ = p.Run() + + err = p.Close() + require.Nil(t, err) + + time.Sleep(100 * time.Millisecond) + + p.Broadcast(data.BlockEvents{}) + + require.Equal(t, uint32(0), atomic.LoadUint32(&numCalls)) + }) +} diff --git a/rabbitmq/interface.go b/rabbitmq/interface.go index e4f46ff9..79caf2c5 100644 --- a/rabbitmq/interface.go +++ b/rabbitmq/interface.go @@ -20,7 +20,7 @@ type RabbitMqClient interface { // PublisherService defines the behaviour of a publisher component which should be // able to publish received events and broadcast them to channels type PublisherService interface { - Run() + Run() error Broadcast(events data.BlockEvents) BroadcastRevert(event data.RevertBlock) BroadcastFinalized(event data.FinalizedBlock) diff --git a/rabbitmq/publisher.go b/rabbitmq/publisher.go index fc7ab24b..01b167c9 100644 --- a/rabbitmq/publisher.go +++ b/rabbitmq/publisher.go @@ -1,8 +1,6 @@ package rabbitmq import ( - "context" - "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/marshal" logger "github.com/multiversx/mx-chain-logger-go" @@ -29,16 +27,6 @@ type rabbitMqPublisher struct { client RabbitMqClient marshaller marshal.Marshalizer cfg config.RabbitMQConfig - - broadcast chan data.BlockEvents - broadcastRevert chan data.RevertBlock - broadcastFinalized chan data.FinalizedBlock - broadcastTxs chan data.BlockTxs - broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder - broadcastScrs chan data.BlockScrs - - cancelFunc func() - closeChan chan struct{} } // NewRabbitMqPublisher creates a new rabbitMQ publisher instance @@ -49,16 +37,9 @@ func NewRabbitMqPublisher(args ArgsRabbitMqPublisher) (*rabbitMqPublisher, error } rp := &rabbitMqPublisher{ - broadcast: make(chan data.BlockEvents), - broadcastRevert: make(chan data.RevertBlock), - broadcastFinalized: make(chan data.FinalizedBlock), - broadcastTxs: make(chan data.BlockTxs), - broadcastScrs: make(chan data.BlockScrs), - broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder), - cfg: args.Config, - client: args.Client, - marshaller: args.Marshaller, - closeChan: make(chan struct{}), + cfg: args.Config, + client: args.Client, + marshaller: args.Marshaller, } err = rp.createExchanges() @@ -158,95 +139,8 @@ func (rp *rabbitMqPublisher) createExchange(conf config.RabbitMQExchangeConfig) return nil } -// Run is launched as a goroutine and listens for events on the exposed channels -func (rp *rabbitMqPublisher) Run() { - var ctx context.Context - ctx, rp.cancelFunc = context.WithCancel(context.Background()) - - go rp.run(ctx) -} - -func (rp *rabbitMqPublisher) run(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Debug("RabbitMQ publisher is stopping...") - rp.client.Close() - case events := <-rp.broadcast: - rp.publishToExchanges(events) - case revertBlock := <-rp.broadcastRevert: - rp.publishRevertToExchange(revertBlock) - case finalizedBlock := <-rp.broadcastFinalized: - rp.publishFinalizedToExchange(finalizedBlock) - case blockTxs := <-rp.broadcastTxs: - rp.publishTxsToExchange(blockTxs) - case blockScrs := <-rp.broadcastScrs: - rp.publishScrsToExchange(blockScrs) - case blockEvents := <-rp.broadcastBlockEventsWithOrder: - rp.publishBlockEventsWithOrderToExchange(blockEvents) - case err := <-rp.client.ConnErrChan(): - if err != nil { - log.Error("rabbitMQ connection failure", "err", err.Error()) - rp.client.Reconnect() - } - case err := <-rp.client.CloseErrChan(): - if err != nil { - log.Error("rabbitMQ channel failure", "err", err.Error()) - rp.client.ReopenChannel() - } - } - } -} - -// Broadcast will handle the block events pushed by producers and sends them to rabbitMQ channel -func (rp *rabbitMqPublisher) Broadcast(events data.BlockEvents) { - select { - case rp.broadcast <- events: - case <-rp.closeChan: - } -} - -// BroadcastRevert will handle the revert event pushed by producers and sends them to rabbitMQ channel -func (rp *rabbitMqPublisher) BroadcastRevert(events data.RevertBlock) { - select { - case rp.broadcastRevert <- events: - case <-rp.closeChan: - } -} - -// BroadcastFinalized will handle the finalized event pushed by producers and sends them to rabbitMQ channel -func (rp *rabbitMqPublisher) BroadcastFinalized(events data.FinalizedBlock) { - select { - case rp.broadcastFinalized <- events: - case <-rp.closeChan: - } -} - -// BroadcastTxs will handle the txs event pushed by producers and sends them to rabbitMQ channel -func (rp *rabbitMqPublisher) BroadcastTxs(events data.BlockTxs) { - select { - case rp.broadcastTxs <- events: - case <-rp.closeChan: - } -} - -// BroadcastScrs will handle the scrs event pushed by producers and sends them to rabbitMQ channel -func (rp *rabbitMqPublisher) BroadcastScrs(events data.BlockScrs) { - select { - case rp.broadcastScrs <- events: - case <-rp.closeChan: - } -} - -// BroadcastBlockEventsWithOrder will handle the full block events pushed by producers and sends them to rabbitMQ channel -func (rp *rabbitMqPublisher) BroadcastBlockEventsWithOrder(events data.BlockEventsWithOrder) { - select { - case rp.broadcastBlockEventsWithOrder <- events: - case <-rp.closeChan: - } -} - -func (rp *rabbitMqPublisher) publishToExchanges(events data.BlockEvents) { +// Publish will publish logs and events to rabbitmq +func (rp *rabbitMqPublisher) Publish(events data.BlockEvents) { eventsBytes, err := rp.marshaller.Marshal(events) if err != nil { log.Error("could not marshal events", "err", err.Error()) @@ -259,7 +153,8 @@ func (rp *rabbitMqPublisher) publishToExchanges(events data.BlockEvents) { } } -func (rp *rabbitMqPublisher) publishRevertToExchange(revertBlock data.RevertBlock) { +// PublishRevert will publish revert event to rabbitmq +func (rp *rabbitMqPublisher) PublishRevert(revertBlock data.RevertBlock) { revertBlockBytes, err := rp.marshaller.Marshal(revertBlock) if err != nil { log.Error("could not marshal revert event", "err", err.Error()) @@ -272,7 +167,8 @@ func (rp *rabbitMqPublisher) publishRevertToExchange(revertBlock data.RevertBloc } } -func (rp *rabbitMqPublisher) publishFinalizedToExchange(finalizedBlock data.FinalizedBlock) { +// PublishFinalized will publish finalized event to rabbitmq +func (rp *rabbitMqPublisher) PublishFinalized(finalizedBlock data.FinalizedBlock) { finalizedBlockBytes, err := rp.marshaller.Marshal(finalizedBlock) if err != nil { log.Error("could not marshal finalized event", "err", err.Error()) @@ -285,7 +181,8 @@ func (rp *rabbitMqPublisher) publishFinalizedToExchange(finalizedBlock data.Fina } } -func (rp *rabbitMqPublisher) publishTxsToExchange(blockTxs data.BlockTxs) { +// PublishTxs will publish txs event to rabbitmq +func (rp *rabbitMqPublisher) PublishTxs(blockTxs data.BlockTxs) { txsBlockBytes, err := rp.marshaller.Marshal(blockTxs) if err != nil { log.Error("could not marshal block txs event", "err", err.Error()) @@ -298,7 +195,8 @@ func (rp *rabbitMqPublisher) publishTxsToExchange(blockTxs data.BlockTxs) { } } -func (rp *rabbitMqPublisher) publishScrsToExchange(blockScrs data.BlockScrs) { +// PublishScrs will publish scrs event to rabbitmq +func (rp *rabbitMqPublisher) PublishScrs(blockScrs data.BlockScrs) { scrsBlockBytes, err := rp.marshaller.Marshal(blockScrs) if err != nil { log.Error("could not marshal block scrs event", "err", err.Error()) @@ -311,7 +209,8 @@ func (rp *rabbitMqPublisher) publishScrsToExchange(blockScrs data.BlockScrs) { } } -func (rp *rabbitMqPublisher) publishBlockEventsWithOrderToExchange(blockTxs data.BlockEventsWithOrder) { +// PublishBlockEventsWithOrder will publish block events with order to rabbitmq +func (rp *rabbitMqPublisher) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) { txsBlockBytes, err := rp.marshaller.Marshal(blockTxs) if err != nil { log.Error("could not marshal block txs event", "err", err.Error()) @@ -336,14 +235,9 @@ func (rp *rabbitMqPublisher) publishFanout(exchangeName string, payload []byte) ) } -// Close will close the channels +// Close will trigger to close rabbitmq client func (rp *rabbitMqPublisher) Close() error { - if rp.cancelFunc != nil { - rp.cancelFunc() - } - - close(rp.closeChan) - + rp.client.Close() return nil } diff --git a/rabbitmq/publisher_test.go b/rabbitmq/publisher_test.go index d06b3a53..c0155d1b 100644 --- a/rabbitmq/publisher_test.go +++ b/rabbitmq/publisher_test.go @@ -2,10 +2,7 @@ package rabbitmq_test import ( "errors" - "sync" - "sync/atomic" "testing" - "time" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/core/mock" @@ -15,7 +12,6 @@ import ( "github.com/multiversx/mx-chain-notifier-go/mocks" "github.com/multiversx/mx-chain-notifier-go/rabbitmq" "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -180,16 +176,13 @@ func TestRabbitMqPublisher(t *testing.T) { }) } -func TestBroadcast(t *testing.T) { +func TestPublish(t *testing.T) { t.Parallel() - wg := sync.WaitGroup{} - numCalls := uint32(0) - + wasCalled := false client := &mocks.RabbitClientStub{ PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - atomic.AddUint32(&numCalls, 1) - wg.Done() + wasCalled = true return nil }, } @@ -200,27 +193,18 @@ func TestBroadcast(t *testing.T) { rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - defer rabbitmq.Close() - wg.Add(1) - - rabbitmq.Broadcast(data.BlockEvents{}) + rabbitmq.Publish(data.BlockEvents{}) - wg.Wait() - - assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) + require.True(t, wasCalled) } -func TestBroadcastRevert(t *testing.T) { +func TestPublishRevert(t *testing.T) { t.Parallel() - wg := sync.WaitGroup{} - numCalls := uint32(0) - + wasCalled := false client := &mocks.RabbitClientStub{ PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - atomic.AddUint32(&numCalls, 1) - wg.Done() + wasCalled = true return nil }, } @@ -231,27 +215,18 @@ func TestBroadcastRevert(t *testing.T) { rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - defer rabbitmq.Close() - wg.Add(1) + rabbitmq.PublishRevert(data.RevertBlock{}) - rabbitmq.BroadcastRevert(data.RevertBlock{}) - - wg.Wait() - - assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) + require.True(t, wasCalled) } func TestBroadcastFinalized(t *testing.T) { t.Parallel() - wg := sync.WaitGroup{} - numCalls := uint32(0) - + wasCalled := false client := &mocks.RabbitClientStub{ PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - atomic.AddUint32(&numCalls, 1) - wg.Done() + wasCalled = true return nil }, } @@ -262,27 +237,18 @@ func TestBroadcastFinalized(t *testing.T) { rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - defer rabbitmq.Close() - wg.Add(1) + rabbitmq.PublishFinalized(data.FinalizedBlock{}) - rabbitmq.BroadcastFinalized(data.FinalizedBlock{}) - - wg.Wait() - - assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) + require.True(t, wasCalled) } func TestBroadcastTxs(t *testing.T) { t.Parallel() - wg := sync.WaitGroup{} - numCalls := uint32(0) - + wasCalled := false client := &mocks.RabbitClientStub{ PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - atomic.AddUint32(&numCalls, 1) - wg.Done() + wasCalled = true return nil }, } @@ -293,27 +259,18 @@ func TestBroadcastTxs(t *testing.T) { rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - defer rabbitmq.Close() - wg.Add(1) + rabbitmq.PublishTxs(data.BlockTxs{}) - rabbitmq.BroadcastTxs(data.BlockTxs{}) - - wg.Wait() - - assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) + require.True(t, wasCalled) } func TestBroadcastScrs(t *testing.T) { t.Parallel() - wg := sync.WaitGroup{} - numCalls := uint32(0) - + wasCalled := false client := &mocks.RabbitClientStub{ PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - atomic.AddUint32(&numCalls, 1) - wg.Done() + wasCalled = true return nil }, } @@ -324,25 +281,18 @@ func TestBroadcastScrs(t *testing.T) { rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - defer rabbitmq.Close() - wg.Add(1) + rabbitmq.PublishScrs(data.BlockScrs{}) - rabbitmq.BroadcastScrs(data.BlockScrs{}) - - wg.Wait() - - assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) + require.True(t, wasCalled) } func TestBroadcastBlockEventsWithOrder(t *testing.T) { t.Parallel() - numCalls := uint32(0) - + wasCalled := false client := &mocks.RabbitClientStub{ PublishCalled: func(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { - atomic.AddUint32(&numCalls, 1) + wasCalled = true return nil }, } @@ -353,26 +303,27 @@ func TestBroadcastBlockEventsWithOrder(t *testing.T) { rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - defer rabbitmq.Close() + rabbitmq.PublishBlockEventsWithOrder(data.BlockEventsWithOrder{}) - rabbitmq.BroadcastBlockEventsWithOrder(data.BlockEventsWithOrder{}) - - time.Sleep(time.Millisecond * 200) - - assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalls)) + require.True(t, wasCalled) } func TestClose(t *testing.T) { t.Parallel() + wasCalled := false + client := &mocks.RabbitClientStub{ + CloseCalled: func() { + wasCalled = true + }, + } + args := createMockArgsRabbitMqPublisher() + args.Client = client rabbitmq, err := rabbitmq.NewRabbitMqPublisher(args) require.Nil(t, err) - rabbitmq.Run() - - err = rabbitmq.Close() - require.Nil(t, err) + rabbitmq.Close() + require.True(t, wasCalled) } diff --git a/testdata/testData.go b/testdata/testData.go index aaf3832f..a89c89a1 100644 --- a/testdata/testData.go +++ b/testdata/testData.go @@ -1,18 +1,32 @@ package testdata import ( - "encoding/json" - + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-notifier-go/common" "github.com/multiversx/mx-chain-notifier-go/data" notifierData "github.com/multiversx/mx-chain-notifier-go/data" ) +type blockData struct { + marshaller marshal.Marshalizer +} + +// NewBlockData will create block data component for testing +func NewBlockData(marshaller marshal.Marshalizer) (*blockData, error) { + if check.IfNil(marshaller) { + return nil, common.ErrNilMarshaller + } + + return &blockData{marshaller: marshaller}, nil +} + // OldSaveBlockData defines block events data before initial refactoring -func OldSaveBlockData() *notifierData.SaveBlockData { +func (bd *blockData) OldSaveBlockData() *notifierData.SaveBlockData { return ¬ifierData.SaveBlockData{ Hash: "blockHash", Txs: map[string]*transaction.Transaction{ @@ -34,7 +48,7 @@ func OldSaveBlockData() *notifierData.SaveBlockData { } // OutportBlockV0 - -func OutportBlockV0() *notifierData.ArgsSaveBlock { +func (bd *blockData) OutportBlockV0() *notifierData.ArgsSaveBlock { saveBlockData := data.OutportBlockDataOld{ HeaderHash: []byte("headerHash3"), Body: &block.Body{ @@ -94,12 +108,12 @@ func OutportBlockV0() *notifierData.ArgsSaveBlock { } // OutportBlockV1 - -func OutportBlockV1() *outport.OutportBlock { +func (bd *blockData) OutportBlockV1() *outport.OutportBlock { header := &block.Header{ ShardID: 1, TimeStamp: 1234, } - headerBytes, _ := json.Marshal(header) + headerBytes, _ := bd.marshaller.Marshal(header) return &outport.OutportBlock{ BlockData: &outport.BlockData{ @@ -160,7 +174,7 @@ func OutportBlockV1() *outport.OutportBlock { } // RevertBlockV0 - -func RevertBlockV0() *notifierData.RevertBlock { +func (bd *blockData) RevertBlockV0() *notifierData.RevertBlock { return ¬ifierData.RevertBlock{ Hash: "headerHash1", Nonce: 1, @@ -170,12 +184,12 @@ func RevertBlockV0() *notifierData.RevertBlock { } // RevertBlockV1 - -func RevertBlockV1() *outport.BlockData { +func (bd *blockData) RevertBlockV1() *outport.BlockData { header := &block.Header{ ShardID: 1, TimeStamp: 1234, } - headerBytes, _ := json.Marshal(header) + headerBytes, _ := bd.marshaller.Marshal(header) return &outport.BlockData{ ShardID: 1, @@ -195,14 +209,14 @@ func RevertBlockV1() *outport.BlockData { } // FinalizedBlockV0 - -func FinalizedBlockV0() *notifierData.FinalizedBlock { +func (bd *blockData) FinalizedBlockV0() *notifierData.FinalizedBlock { return ¬ifierData.FinalizedBlock{ Hash: "headerHash1", } } // FinalizedBlockV1 - -func FinalizedBlockV1() *outport.FinalizedBlock { +func (bd *blockData) FinalizedBlockV1() *outport.FinalizedBlock { return &outport.FinalizedBlock{ ShardID: 1, HeaderHash: []byte("headerHash1"), diff --git a/tools/httpConnector/httpClientWrapper.go b/tools/httpConnector/httpClientWrapper.go index 9471a410..13dcf5af 100644 --- a/tools/httpConnector/httpClientWrapper.go +++ b/tools/httpConnector/httpClientWrapper.go @@ -24,7 +24,7 @@ const ( contentTypeKey = "Content-Type" contentTypeValue = "application/json" payloadVersionKey = "version" - payloadVersionValue = "0" + payloadVersionValue = "1" ) type httpClientWrapper struct { diff --git a/tools/httpConnector/main.go b/tools/httpConnector/main.go index 0821814f..f1c8d646 100644 --- a/tools/httpConnector/main.go +++ b/tools/httpConnector/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/testdata" ) @@ -18,19 +19,26 @@ func main() { return } - err = httpClient.Post("/events/push", testdata.OutportBlockV0()) + marshaller := &marshal.JsonMarshalizer{} + blockData, err := testdata.NewBlockData(marshaller) + if err != nil { + fmt.Println(err.Error()) + return + } + + err = httpClient.Post("/events/push", blockData.OutportBlockV1()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return } - err = httpClient.Post("/events/revert", testdata.RevertBlockV0()) + err = httpClient.Post("/events/revert", blockData.RevertBlockV1()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return } - err = httpClient.Post("/events/finalized", testdata.FinalizedBlockV0()) + err = httpClient.Post("/events/finalized", blockData.FinalizedBlockV1()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return diff --git a/tools/wsConnector/main.go b/tools/wsConnector/main.go index c8788581..49eed11a 100644 --- a/tools/wsConnector/main.go +++ b/tools/wsConnector/main.go @@ -19,19 +19,25 @@ func main() { return } - err = wsClient.PushEventsRequest(testdata.OutportBlockV1()) + blockData, err := testdata.NewBlockData(marshaller) if err != nil { fmt.Println(err.Error()) return } - err = wsClient.RevertEventsRequest(testdata.RevertBlockV1()) + err = wsClient.PushEventsRequest(blockData.OutportBlockV1()) if err != nil { fmt.Println(err.Error()) return } - err = wsClient.FinalizedEventsRequest(testdata.FinalizedBlockV1()) + err = wsClient.RevertEventsRequest(blockData.RevertBlockV1()) + if err != nil { + fmt.Println(err.Error()) + return + } + + err = wsClient.FinalizedEventsRequest(blockData.FinalizedBlockV1()) if err != nil { fmt.Println(err.Error()) return