Skip to content

Commit

Permalink
add altered accounts to boradcast
Browse files Browse the repository at this point in the history
  • Loading branch information
mihaieremia committed Jul 21, 2024
1 parent 69d7998 commit da75aec
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 3 deletions.
2 changes: 1 addition & 1 deletion common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// BlockScrs defines the subscription event type for block scrs
BlockScrs string = "block_scrs"

AlteredAccountsEvent string = "altered_accounts"
AlteredAccountsEvent string = "mvx_altered_accounts"
)

const (
Expand Down
1 change: 0 additions & 1 deletion dispatcher/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func getEventType(subEntry data.SubscriptionEntry) string {
subEntry.EventType == common.BlockTxs ||
subEntry.EventType == common.BlockScrs ||
subEntry.EventType == common.AlteredAccountsEvent ||
subEntry.EventType == "mvx_altered_accounts" ||
subEntry.EventType == common.BlockEvents {
return subEntry.EventType
}
Expand Down
30 changes: 29 additions & 1 deletion process/eventsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData)
}
eh.handleBlockScrs(scrs)

alteredEvent := data.AlteredAccountsEvent{
Hash: eventsData.Hash,
ShardID: eventsData.Header.GetShardID(),
TimeStamp: eventsData.Header.GetTimeStamp(),
Accounts: eventsData.AlteredAccounts,
}

txsWithOrder := data.BlockEventsWithOrder{
Hash: eventsData.Hash,
ShardID: eventsData.Header.GetShardID(),
Expand All @@ -120,7 +127,7 @@ func (eh *eventsHandler) HandleSaveBlockEvents(allEvents data.ArgsSaveBlockData)
Events: eventsData.LogEvents,
}
eh.handleBlockEventsWithOrder(txsWithOrder)

eh.handleAlteredAccounts(alteredEvent)
return nil
}

Expand Down Expand Up @@ -314,6 +321,27 @@ func (eh *eventsHandler) handleBlockEventsWithOrder(blockTxs data.BlockEventsWit
eh.metricsHandler.AddRequest(getRabbitOpID(common.BlockEvents), time.Since(t))
}

// handleBlockEventsWithOrder will handle full block events received from observer
func (eh *eventsHandler) handleAlteredAccounts(alteredAccountsEvent data.AlteredAccountsEvent) {
if len(alteredAccountsEvent.Accounts) == 0 {
log.Warn("received empty accounts", "event", common.AlteredAccountsEvent,
"will process", false,
)
return
}

log.Info("received", "event", common.AlteredAccountsEvent,
"block hash", alteredAccountsEvent.Hash,
)

t := time.Now()

for _, publisher := range eh.publishers {
publisher.BroadcastAlteredAccounts(alteredAccountsEvent)
}
eh.metricsHandler.AddRequest(getRabbitOpID(common.AlteredAccountsEvent), time.Since(t))
}

func (eh *eventsHandler) tryCheckProcessedWithRetry(id, blockHash string) bool {
var err error
var setSuccessful bool
Expand Down
1 change: 1 addition & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Publisher interface {
BroadcastFinalized(event data.FinalizedBlock)
BroadcastTxs(event data.BlockTxs)
BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder)
BroadcastAlteredAccounts(accounts data.AlteredAccountsEvent)
BroadcastScrs(event data.BlockScrs)
Close() error
IsInterfaceNil() bool
Expand Down
1 change: 1 addition & 0 deletions rabbitmq/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type PublisherService interface {
BroadcastTxs(event data.BlockTxs)
BroadcastScrs(event data.BlockScrs)
BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder)
BroadcastAlteredAccounts(accounts data.AlteredAccountsEvent)
Close() error
IsInterfaceNil() bool
}

0 comments on commit da75aec

Please sign in to comment.