From 48e50fcaaf16ea44207f8fdcf86290c8be50a559 Mon Sep 17 00:00:00 2001 From: Andrew Low Date: Thu, 30 Nov 2023 16:08:45 -0500 Subject: [PATCH] wip nit enable node stats analyzer for runtimes wip tweaks nit misc address comments address comments nit --- analyzer/block/block.go | 53 ++++++++++++++++++++++++--- analyzer/consensus/consensus.go | 2 +- analyzer/item/item.go | 17 ++++----- analyzer/node_stats/node_stats.go | 59 +++++++++++++++++++++++-------- analyzer/queries/queries.go | 7 +++- cmd/analyzer/analyzer.go | 11 +++++- config/config.go | 12 +++++++ metrics/analysis.go | 15 ++++++++ 8 files changed, 143 insertions(+), 33 deletions(-) diff --git a/analyzer/block/block.go b/analyzer/block/block.go index 884c3cd06..ccd315ce8 100644 --- a/analyzer/block/block.go +++ b/analyzer/block/block.go @@ -17,6 +17,7 @@ import ( "github.com/oasisprotocol/nexus/analyzer/util" "github.com/oasisprotocol/nexus/config" "github.com/oasisprotocol/nexus/log" + "github.com/oasisprotocol/nexus/metrics" "github.com/oasisprotocol/nexus/storage" ) @@ -61,8 +62,9 @@ type blockBasedAnalyzer struct { processor BlockProcessor - target storage.TargetStorage - logger *log.Logger + target storage.TargetStorage + logger *log.Logger + metrics metrics.AnalysisMetrics slowSync bool } @@ -259,6 +261,32 @@ func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (o return true } +// sendQueueLengthMetric updates the relevant Prometheus metric with an approximation +// of the number of known blocks that require processing, calculated as the difference +// between the analyzer's highest processed block and the current block height +// on-chain (fetched by the node-stats analyzer). +// Note that the true count may be higher during fast-sync. +func (b *blockBasedAnalyzer) sendQueueLengthMetric(ctx context.Context, queueLength uint64) error { + // For block-based analyzers, the analyzer name is identical to the layer name. + b.metrics.QueueLength(b.analyzerName).Set(float64(queueLength)) + return nil +} + +// Returns the chain height of the layer this analyzer is processing. +// The heights are fetched and added to the database by the node-stats analyzer. +func (b *blockBasedAnalyzer) nodeHeight(ctx context.Context) (int, error) { + var nodeHeight int + err := b.target.QueryRow(ctx, queries.NodeHeight, b.analyzerName).Scan(&nodeHeight) + switch err { + case nil: + return nodeHeight, nil + case pgx.ErrNoRows: + return -1, nil + default: + return -1, fmt.Errorf("error fetching chain height for consensus: %w", err) + } +} + // Start starts the block analyzer. func (b *blockBasedAnalyzer) Start(ctx context.Context) { // Run prework. @@ -325,6 +353,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) { backoff.Failure() continue } + nodeHeight, err := b.nodeHeight(ctx) + if err != nil { + b.logger.Warn("error fetching current node height: %w", err) + } // Process blocks. b.logger.Debug("picked blocks for processing", "heights", heights) @@ -337,6 +369,12 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) { // in the batch will exceed the current lock expiry of 5min. The analyzer will terminate // the batch early and attempt to refresh the locks for a new batch. if b.slowSync { + // In slow-sync mode, we update the node-height at each block for a more + // precise measurement. + nodeHeight, err = b.nodeHeight(ctx) + if err != nil { + b.logger.Warn("error fetching current node height: %w", err) + } select { case <-time.After(backoff.Timeout()): // Process the next block @@ -379,6 +417,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) { b.unlockBlocks(ctx, []uint64{height}) continue } + // If we successfully fetched the node height earlier, update the estimated queue length. + if nodeHeight != -1 { + b.sendQueueLengthMetric(ctx, uint64(nodeHeight)-height) + } cancel() backoff.Success() b.logger.Info("processed block", "height", height) @@ -425,13 +467,16 @@ func NewAnalyzer( if batchSize == 0 { batchSize = defaultBatchSize } - return &blockBasedAnalyzer{ + a := &blockBasedAnalyzer{ blockRange: blockRange, batchSize: batchSize, analyzerName: name, processor: processor, target: target, logger: logger.With("analyzer", name, "mode", mode), + metrics: metrics.NewDefaultAnalysisMetrics(name), slowSync: mode == analyzer.SlowSyncMode, - }, nil + } + + return a, nil } diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index f4eb5f097..426b33f10 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -297,13 +297,13 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error { // Fetch all data. fetchTimer := m.metrics.BlockFetchLatencies() data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode) + fetchTimer.ObserveDuration() if err != nil { if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) { return analyzer.ErrOutOfRange } return err } - fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading. // Process data, prepare updates. analysisTimer := m.metrics.BlockAnalysisLatencies() diff --git a/analyzer/item/item.go b/analyzer/item/item.go index e1a95bbcc..1901ea2db 100644 --- a/analyzer/item/item.go +++ b/analyzer/item/item.go @@ -12,12 +12,11 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/oasisprotocol/nexus/analyzer" "github.com/oasisprotocol/nexus/analyzer/util" "github.com/oasisprotocol/nexus/config" "github.com/oasisprotocol/nexus/log" + "github.com/oasisprotocol/nexus/metrics" "github.com/oasisprotocol/nexus/storage" ) @@ -41,9 +40,9 @@ type itemBasedAnalyzer[Item any] struct { processor ItemProcessor[Item] - target storage.TargetStorage - logger *log.Logger - queueLengthMetric prometheus.Gauge + target storage.TargetStorage + logger *log.Logger + metrics metrics.AnalysisMetrics } var _ analyzer.Analyzer = (*itemBasedAnalyzer[any])(nil) @@ -88,12 +87,8 @@ func NewAnalyzer[Item any]( processor, target, logger, - prometheus.NewGauge(prometheus.GaugeOpts{ - Name: fmt.Sprintf("%s_queue_length", name), - Help: fmt.Sprintf("count of items in the work queue for the %s analyzer", name), - }), + metrics.NewDefaultAnalysisMetrics(name), } - prometheus.MustRegister(a.queueLengthMetric) return a, nil } @@ -105,7 +100,7 @@ func (a *itemBasedAnalyzer[Item]) sendQueueLengthMetric(ctx context.Context) (in a.logger.Warn("error fetching queue length", "err", err) return 0, err } - a.queueLengthMetric.Set(float64(queueLength)) + a.metrics.QueueLength(a.analyzerName).Set(float64(queueLength)) return queueLength, nil } diff --git a/analyzer/node_stats/node_stats.go b/analyzer/node_stats/node_stats.go index 8335deae4..e8021cfd1 100644 --- a/analyzer/node_stats/node_stats.go +++ b/analyzer/node_stats/node_stats.go @@ -15,6 +15,7 @@ import ( "github.com/oasisprotocol/nexus/storage/oasis/nodeapi" consensusAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api" + runtimeAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/runtime/client/api" ) const ( @@ -22,16 +23,22 @@ const ( ) type processor struct { - source nodeapi.ConsensusApiLite - target storage.TargetStorage - logger *log.Logger + layers []common.Layer + source nodeapi.ConsensusApiLite + emeraldSource nodeapi.RuntimeApiLite + sapphireSource nodeapi.RuntimeApiLite + target storage.TargetStorage + logger *log.Logger } var _ item.ItemProcessor[common.Layer] = (*processor)(nil) func NewAnalyzer( cfg config.ItemBasedAnalyzerConfig, + layers []common.Layer, sourceClient nodeapi.ConsensusApiLite, + emeraldClient nodeapi.RuntimeApiLite, + sapphireClient nodeapi.RuntimeApiLite, target storage.TargetStorage, logger *log.Logger, ) (analyzer.Analyzer, error) { @@ -39,10 +46,17 @@ func NewAnalyzer( cfg.Interval = 3 * time.Second } logger = logger.With("analyzer", nodeStatsAnalyzerName) + // Default to [consensus, emerald, sapphire] if layers is not specified. + if len(layers) == 0 { + layers = []common.Layer{common.LayerConsensus, common.LayerEmerald, common.LayerSapphire} + } p := &processor{ - source: sourceClient, - target: target, - logger: logger.With("analyzer", nodeStatsAnalyzerName), + layers: layers, + source: sourceClient, + emeraldSource: emeraldClient, + sapphireSource: sapphireClient, + target: target, + logger: logger.With("analyzer", nodeStatsAnalyzerName), } return item.NewAnalyzer[common.Layer]( @@ -54,22 +68,37 @@ func NewAnalyzer( } func (p *processor) GetItems(ctx context.Context, limit uint64) ([]common.Layer, error) { - return []common.Layer{common.LayerConsensus}, nil + return p.layers, nil } func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, layer common.Layer) error { p.logger.Debug("fetching node height", "layer", layer) - // We currently only support consensus. Update when this is no longer the case. - if layer != common.LayerConsensus { + latestHeight := uint64(0) // will be fetched from the node + switch layer { + case common.LayerConsensus: + latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest) + if err != nil { + return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err) + } + latestHeight = uint64(latestBlock.Height) + case common.LayerEmerald: + latestBlock, err := p.emeraldSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest) + if err != nil { + return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err) + } + latestHeight = latestBlock.Round + case common.LayerSapphire: + latestBlock, err := p.sapphireSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest) + if err != nil { + return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err) + } + latestHeight = latestBlock.Round + default: return fmt.Errorf("unsupported layer %s", layer) } - latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest) - if err != nil { - return fmt.Errorf("error fetching latest block height for layer %s, %w", layer, err) - } - batch.Queue(queries.ConsensusNodeHeightUpsert, + batch.Queue(queries.NodeHeightUpsert, layer, - latestBlock.Height, + latestHeight, ) return nil diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index 5601bb80e..3b18e8764 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -152,7 +152,12 @@ var ( SET processed_time = CURRENT_TIMESTAMP, is_fast_sync = $3 WHERE height = $1 AND analyzer = $2` - ConsensusNodeHeightUpsert = ` + NodeHeight = ` + SELECT height + FROM chain.latest_node_heights + WHERE layer = $1` + + NodeHeightUpsert = ` INSERT INTO chain.latest_node_heights (layer, height) VALUES ($1, $2) diff --git a/cmd/analyzer/analyzer.go b/cmd/analyzer/analyzer.go index a42c9d6db..818bb9742 100644 --- a/cmd/analyzer/analyzer.go +++ b/cmd/analyzer/analyzer.go @@ -467,11 +467,20 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo } if cfg.Analyzers.NodeStats != nil { analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) { + sourceClient, err1 := sources.Consensus(ctx) if err1 != nil { return nil, err1 } - return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, sourceClient, dbClient, logger) + emeraldClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald) + if err1 != nil { + return nil, err1 + } + sapphireClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire) + if err1 != nil { + return nil, err1 + } + return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, cfg.Analyzers.NodeStats.Layers, sourceClient, emeraldClient, sapphireClient, dbClient, logger) }) } if cfg.Analyzers.AggregateStats != nil { diff --git a/config/config.go b/config/config.go index 5b986db5b..17851e3c6 100644 --- a/config/config.go +++ b/config/config.go @@ -433,12 +433,24 @@ func (cfg *MetadataRegistryConfig) Validate() error { // NodeStatsConfig is the configuration for the node stats analyzer. type NodeStatsConfig struct { ItemBasedAnalyzerConfig `koanf:",squash"` + + // Layers is the list of runtimes and/or consensus that the node-stats analyzer + // should query for the latest node height. + Layers []common.Layer `koanf:"layers"` } func (cfg *NodeStatsConfig) Validate() error { if cfg.Interval > 6*time.Second { return fmt.Errorf("node stats interval must be less than or equal to the block time of 6 seconds") } + // Deduplicate layers + seen := make(map[common.Layer]struct{}) + for _, layer := range cfg.Layers { + if _, ok := seen[layer]; ok { + return fmt.Errorf("duplicate layer detected in layers") + } + seen[layer] = struct{}{} + } return nil } diff --git a/metrics/analysis.go b/metrics/analysis.go index 542ea42ef..63ccb5387 100644 --- a/metrics/analysis.go +++ b/metrics/analysis.go @@ -26,6 +26,9 @@ type AnalysisMetrics struct { // Latencies of fetching a block's data from the node. blockFetchLatencies *prometheus.HistogramVec + + // Queue length of analyzer. + queueLengths *prometheus.GaugeVec } type CacheReadStatus string @@ -98,12 +101,20 @@ func NewDefaultAnalysisMetrics(runtime string) AnalysisMetrics { }, []string{"layer"}, // Labels. ), + queueLengths: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: fmt.Sprintf("%s_queue_length", runtime), + Help: "How many blocks or items are left to process, partitioned by analyzer", + }, + []string{"analyzer"}, // Labels + ), } metrics.databaseOperations = registerOnce(metrics.databaseOperations).(*prometheus.CounterVec) metrics.databaseLatencies = registerOnce(metrics.databaseLatencies).(*prometheus.HistogramVec) metrics.localCacheReads = registerOnce(metrics.localCacheReads).(*prometheus.CounterVec) metrics.blockAnalysisLatencies = registerOnce(metrics.blockAnalysisLatencies).(*prometheus.HistogramVec) metrics.blockFetchLatencies = registerOnce(metrics.blockFetchLatencies).(*prometheus.HistogramVec) + metrics.queueLengths = registerOnce(metrics.queueLengths).(*prometheus.GaugeVec) return metrics } @@ -133,3 +144,7 @@ func (m *AnalysisMetrics) BlockAnalysisLatencies() *prometheus.Timer { func (m *AnalysisMetrics) BlockFetchLatencies() *prometheus.Timer { return prometheus.NewTimer(m.blockFetchLatencies.WithLabelValues(m.runtime)) } + +func (m *AnalysisMetrics) QueueLength(analyzer string) prometheus.Gauge { + return m.queueLengths.WithLabelValues(analyzer) +}