From a17ebd98aa14685f8f1acc6e8d9ce9b0e0f24b5c Mon Sep 17 00:00:00 2001 From: Sergey <83376337+freak12techno@users.noreply.github.com> Date: Thu, 20 Jul 2023 00:39:35 +0300 Subject: [PATCH] feat: use mutex when calculating snapshots (#2) --- pkg/app_manager.go | 267 +++++++++++++++++-------------- pkg/reporters/discord/discord.go | 5 +- 2 files changed, 148 insertions(+), 124 deletions(-) diff --git a/pkg/app_manager.go b/pkg/app_manager.go index 312d89b..58b5cbc 100644 --- a/pkg/app_manager.go +++ b/pkg/app_manager.go @@ -16,6 +16,7 @@ import ( "main/pkg/tendermint" "main/pkg/types" "main/pkg/utils" + "sync" "time" "github.com/rs/zerolog" @@ -34,6 +35,8 @@ type AppManager struct { Reporters []reportersPkg.Reporter IsPopulatingBlocks bool IsPopulatingActiveSet bool + + mutex sync.Mutex } func NewAppManager( @@ -110,118 +113,125 @@ func (a *AppManager) ListenForEvents() { for { select { case result := <-a.WebsocketManager.Channel: - block, ok := result.(*types.Block) - if !ok { - a.Logger.Warn().Msg("Event is not a block!") - continue - } + a.ProcessEvent(result) + } + } +} - if a.StateManager.HasBlockAtHeight(block.Height) { - a.Logger.Info(). - Int64("height", block.Height). - Msg("Already have block at this height, not generating report.") - continue - } +func (a *AppManager) ProcessEvent(emittable types.WebsocketEmittable) { + a.mutex.Lock() + defer a.mutex.Unlock() - if err := a.UpdateValidators(block.Height - 1); err != nil { - a.Logger.Error(). - Err(err). - Msg("Error updating validators") - } + block, ok := emittable.(*types.Block) + if !ok { + a.Logger.Warn().Msg("Event is not a block!") + return + } - if err := a.AddLastActiveSet(block.Height); err != nil { - a.Logger.Error(). - Err(err). - Msg("Error updating historical validators") - } + if a.StateManager.HasBlockAtHeight(block.Height) { + a.Logger.Info(). + Int64("height", block.Height). + Msg("Already have block at this height, not generating report.") + return + } - a.Logger.Debug().Int64("height", block.Height).Msg("Got new block from Tendermint") - if err := a.StateManager.AddBlock(block); err != nil { - a.Logger.Error(). - Err(err). - Msg("Error inserting new block") - } + if err := a.UpdateValidators(block.Height - 1); err != nil { + a.Logger.Error(). + Err(err). + Msg("Error updating validators") + } - totalBlocksCount := a.StateManager.GetBlocksCountSinceLatest(a.Config.StoreBlocks) - a.Logger.Info(). - Int64("count", totalBlocksCount). - Int64("height", block.Height). - Msg("Added new Tendermint block into state") - - blocksCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow) - historicalValidatorsCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow) - - hasEnoughBlocks := blocksCount >= a.Config.BlocksWindow - hasEnoughHistoricalValidators := historicalValidatorsCount >= a.Config.BlocksWindow - - if !hasEnoughBlocks || !hasEnoughHistoricalValidators { - a.Logger.Info(). - Int64("blocks_count", blocksCount). - Int64("historical_validators_count", historicalValidatorsCount). - Int64("expected", a.Config.BlocksWindow). - Msg("Not enough data for producing a snapshot, skipping.") - continue - } + if err := a.AddLastActiveSet(block.Height); err != nil { + a.Logger.Error(). + Err(err). + Msg("Error updating historical validators") + } - snapshot := a.StateManager.GetSnapshot() - - for _, entry := range snapshot.Entries { - a.Logger.Trace(). - Str("valoper", entry.Validator.OperatorAddress). - Str("moniker", entry.Validator.Moniker). - Int64("signed", entry.SignatureInfo.Signed). - Int64("not_signed", entry.SignatureInfo.NotSigned). - Int64("no_signature", entry.SignatureInfo.NoSignature). - Int64("not_active", entry.SignatureInfo.NotActive). - Int64("proposed", entry.SignatureInfo.Proposed). - Msg("Validator signing info") - } + a.Logger.Debug().Int64("height", block.Height).Msg("Got new block from Tendermint") + if err := a.StateManager.AddBlock(block); err != nil { + a.Logger.Error(). + Err(err). + Msg("Error inserting new block") + } - if !a.SnapshotManager.HasNewerSnapshot() { - a.Logger.Info().Msg("No older snapshot present, cannot generate report") - a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot) - continue - } + totalBlocksCount := a.StateManager.GetBlocksCountSinceLatest(a.Config.StoreBlocks) + a.Logger.Info(). + Int64("count", totalBlocksCount). + Int64("height", block.Height). + Msg("Added new Tendermint block into state") - a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot) - if err := a.StateManager.SaveSnapshot(&snapshotPkg.Info{ - Height: block.Height, - Snapshot: snapshot, - }); err != nil { - a.Logger.Error().Err(err).Msg("Could not save latest snapshot to database") - } + blocksCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow) + historicalValidatorsCount := a.StateManager.GetActiveSetsCountSinceLatest(a.Config.BlocksWindow) - olderHeight := a.SnapshotManager.GetOlderHeight() - a.Logger.Info(). - Int64("older_height", olderHeight). - Int64("height", block.Height). - Msg("Generating snapshot report") + hasEnoughBlocks := blocksCount >= a.Config.BlocksWindow + hasEnoughHistoricalValidators := historicalValidatorsCount >= a.Config.BlocksWindow - report, err := a.SnapshotManager.GetReport() - if err != nil { - a.Logger.Error().Err(err).Msg("Could not generate report") - continue - } + if !hasEnoughBlocks || !hasEnoughHistoricalValidators { + a.Logger.Info(). + Int64("blocks_count", blocksCount). + Int64("historical_validators_count", historicalValidatorsCount). + Int64("expected", a.Config.BlocksWindow). + Msg("Not enough data for producing a snapshot, skipping.") + return + } - if report.Empty() { - a.Logger.Info().Msg("Report is empty, no events to send.") - continue - } + snapshot := a.StateManager.GetSnapshot() + + for _, entry := range snapshot.Entries { + a.Logger.Trace(). + Str("valoper", entry.Validator.OperatorAddress). + Str("moniker", entry.Validator.Moniker). + Int64("signed", entry.SignatureInfo.Signed). + Int64("not_signed", entry.SignatureInfo.NotSigned). + Int64("no_signature", entry.SignatureInfo.NoSignature). + Int64("not_active", entry.SignatureInfo.NotActive). + Int64("proposed", entry.SignatureInfo.Proposed). + Msg("Validator signing info") + } - for _, entry := range report.Entries { - a.Logger.Info(). - Str("entry", fmt.Sprintf("%+v", entry)). - Msg("Report entry") - } + if !a.SnapshotManager.HasNewerSnapshot() { + a.Logger.Info().Msg("No older snapshot present, cannot generate report") + a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot) + return + } - for _, reporter := range a.Reporters { - if err := reporter.Send(report); err != nil { - a.Logger.Error(). - Err(err). - Str("name", string(reporter.Name())). - Msg("Error sending report") - } - } + a.SnapshotManager.CommitNewSnapshot(block.Height, snapshot) + if err := a.StateManager.SaveSnapshot(&snapshotPkg.Info{ + Height: block.Height, + Snapshot: snapshot, + }); err != nil { + a.Logger.Error().Err(err).Msg("Could not save latest snapshot to database") + } + + olderHeight := a.SnapshotManager.GetOlderHeight() + a.Logger.Info(). + Int64("older_height", olderHeight). + Int64("height", block.Height). + Msg("Generating snapshot report") + + report, err := a.SnapshotManager.GetReport() + if err != nil { + a.Logger.Error().Err(err).Msg("Could not generate report") + return + } + + if report.Empty() { + a.Logger.Info().Msg("Report is empty, no events to send.") + return + } + + for _, entry := range report.Entries { + a.Logger.Info(). + Str("entry", fmt.Sprintf("%+v", entry)). + Msg("Report entry") + } + + for _, reporter := range a.Reporters { + if err := reporter.Send(report); err != nil { + a.Logger.Error(). + Err(err). + Str("name", string(reporter.Name())). + Msg("Error sending report") } } } @@ -231,37 +241,38 @@ func (a *AppManager) PopulateSlashingParams() { return } - if params, err := a.RPCManager.GetSlashingParams(a.StateManager.GetLastBlockHeight() - 1); err != nil { + params, err := a.RPCManager.GetSlashingParams(a.StateManager.GetLastBlockHeight() - 1) + if err != nil { a.Logger.Warn(). Err(err). Msg("Error updating slashing params") return - } else { - minSignedPerWindow, err := params.Params.MinSignedPerWindow.Float64() - if err != nil { - a.Logger.Warn(). - Err(err). - Msg("Got malformed slashing params from node") - return - } - - a.Config.BlocksWindow = params.Params.SignedBlocksWindow - a.Config.MinSignedPerWindow = minSignedPerWindow + } - a.Logger.Info(). - Int64("blocks_window", a.Config.BlocksWindow). - Float64("min_signed_per_window", a.Config.MinSignedPerWindow). - Msg("Got slashing params") - - a.MetricsManager.LogSlashingParams( - a.Config.Name, - a.Config.BlocksWindow, - a.Config.MinSignedPerWindow, - a.Config.StoreBlocks, - ) - a.Config.RecalculateMissedBlocksGroups() + minSignedPerWindow, err := params.Params.MinSignedPerWindow.Float64() + if err != nil { + a.Logger.Warn(). + Err(err). + Msg("Got malformed slashing params from node") + return } + + a.Config.BlocksWindow = params.Params.SignedBlocksWindow + a.Config.MinSignedPerWindow = minSignedPerWindow + + a.Logger.Info(). + Int64("blocks_window", a.Config.BlocksWindow). + Float64("min_signed_per_window", a.Config.MinSignedPerWindow). + Msg("Got slashing params") + + a.MetricsManager.LogSlashingParams( + a.Config.Name, + a.Config.BlocksWindow, + a.Config.MinSignedPerWindow, + a.Config.StoreBlocks, + ) + a.Config.RecalculateMissedBlocksGroups() } func (a *AppManager) UpdateValidators(height int64) error { @@ -399,13 +410,18 @@ func (a *AppManager) PopulateBlocks() { return } + a.mutex.Lock() + if err := a.StateManager.AddBlock(block); err != nil { a.Logger.Error(). Err(err). Msg("Error inserting older block") a.IsPopulatingBlocks = false + a.mutex.Unlock() return } + + a.mutex.Unlock() } } @@ -450,15 +466,20 @@ func (a *AppManager) PopulateActiveSet() { return } + a.mutex.Lock() + for height, activeSet := range heightActiveSets { if err := a.StateManager.AddActiveSet(height, activeSet); err != nil { a.Logger.Error(). Err(err). Msg("Error inserting active set") a.IsPopulatingActiveSet = false + a.mutex.Unlock() return } } + + a.mutex.Unlock() } a.IsPopulatingActiveSet = false diff --git a/pkg/reporters/discord/discord.go b/pkg/reporters/discord/discord.go index 43c2f9c..f8da0be 100644 --- a/pkg/reporters/discord/discord.go +++ b/pkg/reporters/discord/discord.go @@ -141,7 +141,10 @@ func (reporter *Reporter) Send(report *reportPkg.Report) error { reporter.Logger.Trace().Str("report", reportString).Msg("Sending a report") - _, err := reporter.DiscordSession.ChannelMessageSend(reporter.Channel, reportString) + _, err := reporter.DiscordSession.ChannelMessageSend( + reporter.Channel, + reportString, + ) return err }