Skip to content

Commit

Permalink
feat: use mutex when calculating snapshots (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
freak12techno committed Jul 19, 2023
1 parent 5e61569 commit a17ebd9
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 124 deletions.
267 changes: 144 additions & 123 deletions pkg/app_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"main/pkg/tendermint"
"main/pkg/types"
"main/pkg/utils"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -34,6 +35,8 @@ type AppManager struct {
Reporters []reportersPkg.Reporter
IsPopulatingBlocks bool
IsPopulatingActiveSet bool

mutex sync.Mutex
}

func NewAppManager(
Expand Down Expand Up @@ -110,118 +113,125 @@ func (a *AppManager) ListenForEvents() {
for {
select {
case result := <-a.WebsocketManager.Channel:
block, ok := result.(*types.Block)
if !ok {
a.Logger.Warn().Msg("Event is not a block!")
continue
}
a.ProcessEvent(result)
}
}
}

if a.StateManager.HasBlockAtHeight(block.Height) {
a.Logger.Info().
Int64("height", block.Height).
Msg("Already have block at this height, not generating report.")
continue
}
func (a *AppManager) ProcessEvent(emittable types.WebsocketEmittable) {
a.mutex.Lock()
defer a.mutex.Unlock()

if err := a.UpdateValidators(block.Height - 1); err != nil {
a.Logger.Error().
Err(err).
Msg("Error updating validators")
}
block, ok := emittable.(*types.Block)
if !ok {
a.Logger.Warn().Msg("Event is not a block!")
return
}

if err := a.AddLastActiveSet(block.Height); err != nil {
a.Logger.Error().
Err(err).
Msg("Error updating historical validators")
}
if a.StateManager.HasBlockAtHeight(block.Height) {
a.Logger.Info().
Int64("height", block.Height).
Msg("Already have block at this height, not generating report.")
return
}

a.Logger.Debug().Int64("height", block.Height).Msg("Got new block from Tendermint")
if err := a.StateManager.AddBlock(block); err != nil {
a.Logger.Error().
Err(err).
Msg("Error inserting new block")
}
if err := a.UpdateValidators(block.Height - 1); err != nil {
a.Logger.Error().
Err(err).
Msg("Error updating validators")
}

totalBlocksCount := a.StateManager.GetBlocksCountSinceLatest(a.Config.StoreBlocks)
a.Logger.Info().
Int64("count", totalBlocksCount).
Int64("height", block.Height).
Msg("Added new Tendermint block into state")

blocksCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow)
historicalValidatorsCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow)

hasEnoughBlocks := blocksCount >= a.Config.BlocksWindow
hasEnoughHistoricalValidators := historicalValidatorsCount >= a.Config.BlocksWindow

if !hasEnoughBlocks || !hasEnoughHistoricalValidators {
a.Logger.Info().
Int64("blocks_count", blocksCount).
Int64("historical_validators_count", historicalValidatorsCount).
Int64("expected", a.Config.BlocksWindow).
Msg("Not enough data for producing a snapshot, skipping.")
continue
}
if err := a.AddLastActiveSet(block.Height); err != nil {
a.Logger.Error().
Err(err).
Msg("Error updating historical validators")
}

snapshot := a.StateManager.GetSnapshot()

for _, entry := range snapshot.Entries {
a.Logger.Trace().
Str("valoper", entry.Validator.OperatorAddress).
Str("moniker", entry.Validator.Moniker).
Int64("signed", entry.SignatureInfo.Signed).
Int64("not_signed", entry.SignatureInfo.NotSigned).
Int64("no_signature", entry.SignatureInfo.NoSignature).
Int64("not_active", entry.SignatureInfo.NotActive).
Int64("proposed", entry.SignatureInfo.Proposed).
Msg("Validator signing info")
}
a.Logger.Debug().Int64("height", block.Height).Msg("Got new block from Tendermint")
if err := a.StateManager.AddBlock(block); err != nil {
a.Logger.Error().
Err(err).
Msg("Error inserting new block")
}

if !a.SnapshotManager.HasNewerSnapshot() {
a.Logger.Info().Msg("No older snapshot present, cannot generate report")
a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot)
continue
}
totalBlocksCount := a.StateManager.GetBlocksCountSinceLatest(a.Config.StoreBlocks)
a.Logger.Info().
Int64("count", totalBlocksCount).
Int64("height", block.Height).
Msg("Added new Tendermint block into state")

a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot)
if err := a.StateManager.SaveSnapshot(&snapshotPkg.Info{
Height: block.Height,
Snapshot: snapshot,
}); err != nil {
a.Logger.Error().Err(err).Msg("Could not save latest snapshot to database")
}
blocksCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow)
historicalValidatorsCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow)

olderHeight := a.SnapshotManager.GetOlderHeight()
a.Logger.Info().
Int64("older_height", olderHeight).
Int64("height", block.Height).
Msg("Generating snapshot report")
hasEnoughBlocks := blocksCount >= a.Config.BlocksWindow
hasEnoughHistoricalValidators := historicalValidatorsCount >= a.Config.BlocksWindow

report, err := a.SnapshotManager.GetReport()
if err != nil {
a.Logger.Error().Err(err).Msg("Could not generate report")
continue
}
if !hasEnoughBlocks || !hasEnoughHistoricalValidators {
a.Logger.Info().
Int64("blocks_count", blocksCount).
Int64("historical_validators_count", historicalValidatorsCount).
Int64("expected", a.Config.BlocksWindow).
Msg("Not enough data for producing a snapshot, skipping.")
return
}

if report.Empty() {
a.Logger.Info().Msg("Report is empty, no events to send.")
continue
}
snapshot := a.StateManager.GetSnapshot()

for _, entry := range snapshot.Entries {
a.Logger.Trace().
Str("valoper", entry.Validator.OperatorAddress).
Str("moniker", entry.Validator.Moniker).
Int64("signed", entry.SignatureInfo.Signed).
Int64("not_signed", entry.SignatureInfo.NotSigned).
Int64("no_signature", entry.SignatureInfo.NoSignature).
Int64("not_active", entry.SignatureInfo.NotActive).
Int64("proposed", entry.SignatureInfo.Proposed).
Msg("Validator signing info")
}

for _, entry := range report.Entries {
a.Logger.Info().
Str("entry", fmt.Sprintf("%+v", entry)).
Msg("Report entry")
}
if !a.SnapshotManager.HasNewerSnapshot() {
a.Logger.Info().Msg("No older snapshot present, cannot generate report")
a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot)
return
}

for _, reporter := range a.Reporters {
if err := reporter.Send(report); err != nil {
a.Logger.Error().
Err(err).
Str("name", string(reporter.Name())).
Msg("Error sending report")
}
}
a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot)
if err := a.StateManager.SaveSnapshot(&snapshotPkg.Info{
Height: block.Height,
Snapshot: snapshot,
}); err != nil {
a.Logger.Error().Err(err).Msg("Could not save latest snapshot to database")
}

olderHeight := a.SnapshotManager.GetOlderHeight()
a.Logger.Info().
Int64("older_height", olderHeight).
Int64("height", block.Height).
Msg("Generating snapshot report")

report, err := a.SnapshotManager.GetReport()
if err != nil {
a.Logger.Error().Err(err).Msg("Could not generate report")
return
}

if report.Empty() {
a.Logger.Info().Msg("Report is empty, no events to send.")
return
}

for _, entry := range report.Entries {
a.Logger.Info().
Str("entry", fmt.Sprintf("%+v", entry)).
Msg("Report entry")
}

for _, reporter := range a.Reporters {
if err := reporter.Send(report); err != nil {
a.Logger.Error().
Err(err).
Str("name", string(reporter.Name())).
Msg("Error sending report")
}
}
}
Expand All @@ -231,37 +241,38 @@ func (a *AppManager) PopulateSlashingParams() {
return
}

if params, err := a.RPCManager.GetSlashingParams(a.StateManager.GetLastBlockHeight() - 1); err != nil {
params, err := a.RPCManager.GetSlashingParams(a.StateManager.GetLastBlockHeight() - 1)
if err != nil {
a.Logger.Warn().
Err(err).
Msg("Error updating slashing params")

return
} else {
minSignedPerWindow, err := params.Params.MinSignedPerWindow.Float64()
if err != nil {
a.Logger.Warn().
Err(err).
Msg("Got malformed slashing params from node")
return
}

a.Config.BlocksWindow = params.Params.SignedBlocksWindow
a.Config.MinSignedPerWindow = minSignedPerWindow
}

a.Logger.Info().
Int64("blocks_window", a.Config.BlocksWindow).
Float64("min_signed_per_window", a.Config.MinSignedPerWindow).
Msg("Got slashing params")

a.MetricsManager.LogSlashingParams(
a.Config.Name,
a.Config.BlocksWindow,
a.Config.MinSignedPerWindow,
a.Config.StoreBlocks,
)
a.Config.RecalculateMissedBlocksGroups()
minSignedPerWindow, err := params.Params.MinSignedPerWindow.Float64()
if err != nil {
a.Logger.Warn().
Err(err).
Msg("Got malformed slashing params from node")
return
}

a.Config.BlocksWindow = params.Params.SignedBlocksWindow
a.Config.MinSignedPerWindow = minSignedPerWindow

a.Logger.Info().
Int64("blocks_window", a.Config.BlocksWindow).
Float64("min_signed_per_window", a.Config.MinSignedPerWindow).
Msg("Got slashing params")

a.MetricsManager.LogSlashingParams(
a.Config.Name,
a.Config.BlocksWindow,
a.Config.MinSignedPerWindow,
a.Config.StoreBlocks,
)
a.Config.RecalculateMissedBlocksGroups()
}

func (a *AppManager) UpdateValidators(height int64) error {
Expand Down Expand Up @@ -399,13 +410,18 @@ func (a *AppManager) PopulateBlocks() {
return
}

a.mutex.Lock()

if err := a.StateManager.AddBlock(block); err != nil {
a.Logger.Error().
Err(err).
Msg("Error inserting older block")
a.IsPopulatingBlocks = false
a.mutex.Unlock()
return
}

a.mutex.Unlock()
}
}

Expand Down Expand Up @@ -450,15 +466,20 @@ func (a *AppManager) PopulateActiveSet() {
return
}

a.mutex.Lock()

for height, activeSet := range heightActiveSets {
if err := a.StateManager.AddActiveSet(height, activeSet); err != nil {
a.Logger.Error().
Err(err).
Msg("Error inserting active set")
a.IsPopulatingActiveSet = false
a.mutex.Unlock()
return
}
}

a.mutex.Unlock()
}

a.IsPopulatingActiveSet = false
Expand Down
5 changes: 4 additions & 1 deletion pkg/reporters/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ func (reporter *Reporter) Send(report *reportPkg.Report) error {

reporter.Logger.Trace().Str("report", reportString).Msg("Sending a report")

_, err := reporter.DiscordSession.ChannelMessageSend(reporter.Channel, reportString)
_, err := reporter.DiscordSession.ChannelMessageSend(
reporter.Channel,
reportString,
)
return err
}

Expand Down

0 comments on commit a17ebd9

Please sign in to comment.