Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
nit

enable node stats analyzer for runtimes

wip

tweaks

nit

misc

address comments

address comments

nit
  • Loading branch information
Andrew7234 committed Jan 11, 2024
1 parent 84ca324 commit 48e50fc
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 33 deletions.
53 changes: 49 additions & 4 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
)

Expand Down Expand Up @@ -61,8 +62,9 @@ type blockBasedAnalyzer struct {

processor BlockProcessor

target storage.TargetStorage
logger *log.Logger
target storage.TargetStorage
logger *log.Logger
metrics metrics.AnalysisMetrics

slowSync bool
}
Expand Down Expand Up @@ -259,6 +261,32 @@ func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (o
return true
}

// sendQueueLengthMetric updates the relevant Prometheus metric with an approximation
// of the number of known blocks that require processing, calculated as the difference
// between the analyzer's highest processed block and the current block height
// on-chain (fetched by the node-stats analyzer).
// Note that the true count may be higher during fast-sync.
func (b *blockBasedAnalyzer) sendQueueLengthMetric(ctx context.Context, queueLength uint64) error {

Check failure on line 269 in analyzer/block/block.go

View workflow job for this annotation

GitHub Actions / lint-go

`(*blockBasedAnalyzer).sendQueueLengthMetric` - `ctx` is unused (unparam)
// For block-based analyzers, the analyzer name is identical to the layer name.
b.metrics.QueueLength(b.analyzerName).Set(float64(queueLength))
return nil
}

// Returns the chain height of the layer this analyzer is processing.
// The heights are fetched and added to the database by the node-stats analyzer.
func (b *blockBasedAnalyzer) nodeHeight(ctx context.Context) (int, error) {
var nodeHeight int
err := b.target.QueryRow(ctx, queries.NodeHeight, b.analyzerName).Scan(&nodeHeight)
switch err {
case nil:
return nodeHeight, nil
case pgx.ErrNoRows:
return -1, nil
default:
return -1, fmt.Errorf("error fetching chain height for consensus: %w", err)
}
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Run prework.
Expand Down Expand Up @@ -325,6 +353,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
backoff.Failure()
continue
}
nodeHeight, err := b.nodeHeight(ctx)
if err != nil {
b.logger.Warn("error fetching current node height: %w", err)
}

// Process blocks.
b.logger.Debug("picked blocks for processing", "heights", heights)
Expand All @@ -337,6 +369,12 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// in the batch will exceed the current lock expiry of 5min. The analyzer will terminate
// the batch early and attempt to refresh the locks for a new batch.
if b.slowSync {
// In slow-sync mode, we update the node-height at each block for a more
// precise measurement.
nodeHeight, err = b.nodeHeight(ctx)
if err != nil {
b.logger.Warn("error fetching current node height: %w", err)
}
select {
case <-time.After(backoff.Timeout()):
// Process the next block
Expand Down Expand Up @@ -379,6 +417,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
b.unlockBlocks(ctx, []uint64{height})
continue
}
// If we successfully fetched the node height earlier, update the estimated queue length.
if nodeHeight != -1 {
b.sendQueueLengthMetric(ctx, uint64(nodeHeight)-height)

Check failure on line 422 in analyzer/block/block.go

View workflow job for this annotation

GitHub Actions / lint-go

Error return value of `b.sendQueueLengthMetric` is not checked (errcheck)
}
cancel()
backoff.Success()
b.logger.Info("processed block", "height", height)
Expand Down Expand Up @@ -425,13 +467,16 @@ func NewAnalyzer(
if batchSize == 0 {
batchSize = defaultBatchSize
}
return &blockBasedAnalyzer{
a := &blockBasedAnalyzer{
blockRange: blockRange,
batchSize: batchSize,
analyzerName: name,
processor: processor,
target: target,
logger: logger.With("analyzer", name, "mode", mode),
metrics: metrics.NewDefaultAnalysisMetrics(name),
slowSync: mode == analyzer.SlowSyncMode,
}, nil
}

return a, nil
}
2 changes: 1 addition & 1 deletion analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
// Fetch all data.
fetchTimer := m.metrics.BlockFetchLatencies()
data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode)
fetchTimer.ObserveDuration()
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) {
return analyzer.ErrOutOfRange
}
return err
}
fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading.

// Process data, prepare updates.
analysisTimer := m.metrics.BlockAnalysisLatencies()
Expand Down
17 changes: 6 additions & 11 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
)

Expand All @@ -41,9 +40,9 @@ type itemBasedAnalyzer[Item any] struct {

processor ItemProcessor[Item]

target storage.TargetStorage
logger *log.Logger
queueLengthMetric prometheus.Gauge
target storage.TargetStorage
logger *log.Logger
metrics metrics.AnalysisMetrics
}

var _ analyzer.Analyzer = (*itemBasedAnalyzer[any])(nil)
Expand Down Expand Up @@ -88,12 +87,8 @@ func NewAnalyzer[Item any](
processor,
target,
logger,
prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("%s_queue_length", name),
Help: fmt.Sprintf("count of items in the work queue for the %s analyzer", name),
}),
metrics.NewDefaultAnalysisMetrics(name),
}
prometheus.MustRegister(a.queueLengthMetric)

return a, nil
}
Expand All @@ -105,7 +100,7 @@ func (a *itemBasedAnalyzer[Item]) sendQueueLengthMetric(ctx context.Context) (in
a.logger.Warn("error fetching queue length", "err", err)
return 0, err
}
a.queueLengthMetric.Set(float64(queueLength))
a.metrics.QueueLength(a.analyzerName).Set(float64(queueLength))
return queueLength, nil
}

Expand Down
59 changes: 44 additions & 15 deletions analyzer/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,48 @@ import (
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"

consensusAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api"
runtimeAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/runtime/client/api"
)

const (
nodeStatsAnalyzerName = "node_stats"
)

type processor struct {
source nodeapi.ConsensusApiLite
target storage.TargetStorage
logger *log.Logger
layers []common.Layer
source nodeapi.ConsensusApiLite
emeraldSource nodeapi.RuntimeApiLite
sapphireSource nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
}

var _ item.ItemProcessor[common.Layer] = (*processor)(nil)

func NewAnalyzer(
cfg config.ItemBasedAnalyzerConfig,
layers []common.Layer,
sourceClient nodeapi.ConsensusApiLite,
emeraldClient nodeapi.RuntimeApiLite,
sapphireClient nodeapi.RuntimeApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
if cfg.Interval == 0 {
cfg.Interval = 3 * time.Second
}
logger = logger.With("analyzer", nodeStatsAnalyzerName)
// Default to [consensus, emerald, sapphire] if layers is not specified.
if len(layers) == 0 {
layers = []common.Layer{common.LayerConsensus, common.LayerEmerald, common.LayerSapphire}
}
p := &processor{
source: sourceClient,
target: target,
logger: logger.With("analyzer", nodeStatsAnalyzerName),
layers: layers,
source: sourceClient,
emeraldSource: emeraldClient,
sapphireSource: sapphireClient,
target: target,
logger: logger.With("analyzer", nodeStatsAnalyzerName),
}

return item.NewAnalyzer[common.Layer](
Expand All @@ -54,22 +68,37 @@ func NewAnalyzer(
}

func (p *processor) GetItems(ctx context.Context, limit uint64) ([]common.Layer, error) {
return []common.Layer{common.LayerConsensus}, nil
return p.layers, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, layer common.Layer) error {
p.logger.Debug("fetching node height", "layer", layer)
// We currently only support consensus. Update when this is no longer the case.
if layer != common.LayerConsensus {
latestHeight := uint64(0) // will be fetched from the node
switch layer {
case common.LayerConsensus:
latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = uint64(latestBlock.Height)
case common.LayerEmerald:
latestBlock, err := p.emeraldSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = latestBlock.Round
case common.LayerSapphire:
latestBlock, err := p.sapphireSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = latestBlock.Round
default:
return fmt.Errorf("unsupported layer %s", layer)
}
latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s, %w", layer, err)
}
batch.Queue(queries.ConsensusNodeHeightUpsert,
batch.Queue(queries.NodeHeightUpsert,
layer,
latestBlock.Height,
latestHeight,
)

return nil
Expand Down
7 changes: 6 additions & 1 deletion analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ var (
SET processed_time = CURRENT_TIMESTAMP, is_fast_sync = $3
WHERE height = $1 AND analyzer = $2`

ConsensusNodeHeightUpsert = `
NodeHeight = `
SELECT height
FROM chain.latest_node_heights
WHERE layer = $1`

NodeHeightUpsert = `
INSERT INTO chain.latest_node_heights (layer, height)
VALUES
($1, $2)
Expand Down
11 changes: 10 additions & 1 deletion cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,20 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
}
if cfg.Analyzers.NodeStats != nil {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {

Check failure on line 469 in cmd/analyzer/analyzer.go

View workflow job for this annotation

GitHub Actions / lint-go

unnecessary leading newline (whitespace)

Check failure on line 470 in cmd/analyzer/analyzer.go

View workflow job for this annotation

GitHub Actions / lint-go

File is not `gofumpt`-ed (gofumpt)
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
}
return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, sourceClient, dbClient, logger)
emeraldClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
}
sapphireClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
}
return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, cfg.Analyzers.NodeStats.Layers, sourceClient, emeraldClient, sapphireClient, dbClient, logger)
})
}
if cfg.Analyzers.AggregateStats != nil {
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,12 +433,24 @@ func (cfg *MetadataRegistryConfig) Validate() error {
// NodeStatsConfig is the configuration for the node stats analyzer.
type NodeStatsConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`

// Layers is the list of runtimes and/or consensus that the node-stats analyzer
// should query for the latest node height.
Layers []common.Layer `koanf:"layers"`
}

func (cfg *NodeStatsConfig) Validate() error {
if cfg.Interval > 6*time.Second {
return fmt.Errorf("node stats interval must be less than or equal to the block time of 6 seconds")
}
// Deduplicate layers
seen := make(map[common.Layer]struct{})
for _, layer := range cfg.Layers {
if _, ok := seen[layer]; ok {
return fmt.Errorf("duplicate layer detected in layers")
}
seen[layer] = struct{}{}
}
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions metrics/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type AnalysisMetrics struct {

// Latencies of fetching a block's data from the node.
blockFetchLatencies *prometheus.HistogramVec

// Queue length of analyzer.
queueLengths *prometheus.GaugeVec
}

type CacheReadStatus string
Expand Down Expand Up @@ -98,12 +101,20 @@ func NewDefaultAnalysisMetrics(runtime string) AnalysisMetrics {
},
[]string{"layer"}, // Labels.
),
queueLengths: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: fmt.Sprintf("%s_queue_length", runtime),
Help: "How many blocks or items are left to process, partitioned by analyzer",
},
[]string{"analyzer"}, // Labels
),
}
metrics.databaseOperations = registerOnce(metrics.databaseOperations).(*prometheus.CounterVec)
metrics.databaseLatencies = registerOnce(metrics.databaseLatencies).(*prometheus.HistogramVec)
metrics.localCacheReads = registerOnce(metrics.localCacheReads).(*prometheus.CounterVec)
metrics.blockAnalysisLatencies = registerOnce(metrics.blockAnalysisLatencies).(*prometheus.HistogramVec)
metrics.blockFetchLatencies = registerOnce(metrics.blockFetchLatencies).(*prometheus.HistogramVec)
metrics.queueLengths = registerOnce(metrics.queueLengths).(*prometheus.GaugeVec)
return metrics
}

Expand Down Expand Up @@ -133,3 +144,7 @@ func (m *AnalysisMetrics) BlockAnalysisLatencies() *prometheus.Timer {
func (m *AnalysisMetrics) BlockFetchLatencies() *prometheus.Timer {
return prometheus.NewTimer(m.blockFetchLatencies.WithLabelValues(m.runtime))
}

func (m *AnalysisMetrics) QueueLength(analyzer string) prometheus.Gauge {
return m.queueLengths.WithLabelValues(analyzer)
}

0 comments on commit 48e50fc

Please sign in to comment.