Skip to content

Commit

Permalink
Merge pull request #577 from oasisprotocol/andrew7234/block-analyzer-…
Browse files Browse the repository at this point in the history
…queuelength

[metrics] add block analyzer queuelength metrics
  • Loading branch information
Andrew7234 authored Jan 17, 2024
2 parents f2dc8d8 + cf37b92 commit fc099d7
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 41 deletions.
52 changes: 48 additions & 4 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
)

Expand Down Expand Up @@ -61,8 +62,9 @@ type blockBasedAnalyzer struct {

processor BlockProcessor

target storage.TargetStorage
logger *log.Logger
target storage.TargetStorage
logger *log.Logger
metrics metrics.AnalysisMetrics

slowSync bool
}
Expand Down Expand Up @@ -259,6 +261,31 @@ func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (o
return true
}

// sendQueueLengthMetric updates the relevant Prometheus metric with an approximation
// of the number of known blocks that require processing, calculated as the difference
// between the analyzer's highest processed block and the current block height
// on-chain (fetched by the node-stats analyzer).
// Note that the true count may be higher during fast-sync.
func (b *blockBasedAnalyzer) sendQueueLengthMetric(queueLength uint64) {
// For block-based analyzers, the analyzer name is identical to the layer name.
b.metrics.QueueLength(b.analyzerName).Set(float64(queueLength))
}

// Returns the chain height of the layer this analyzer is processing.
// The heights are fetched and added to the database by the node-stats analyzer.
func (b *blockBasedAnalyzer) nodeHeight(ctx context.Context) (int, error) {
var nodeHeight int
err := b.target.QueryRow(ctx, queries.NodeHeight, b.analyzerName).Scan(&nodeHeight)
switch err {
case nil:
return nodeHeight, nil
case pgx.ErrNoRows:
return -1, nil
default:
return -1, fmt.Errorf("error fetching chain height for consensus: %w", err)
}
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Run prework.
Expand Down Expand Up @@ -325,6 +352,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
backoff.Failure()
continue
}
nodeHeight, err := b.nodeHeight(ctx)
if err != nil {
b.logger.Warn("error fetching current node height: %w", err)
}

// Process blocks.
b.logger.Debug("picked blocks for processing", "heights", heights)
Expand All @@ -337,6 +368,12 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// in the batch will exceed the current lock expiry of 5min. The analyzer will terminate
// the batch early and attempt to refresh the locks for a new batch.
if b.slowSync {
// In slow-sync mode, we update the node-height at each block for a more
// precise measurement.
nodeHeight, err = b.nodeHeight(ctx)
if err != nil {
b.logger.Warn("error fetching current node height: %w", err)
}
select {
case <-time.After(backoff.Timeout()):
// Process the next block
Expand Down Expand Up @@ -379,6 +416,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
b.unlockBlocks(ctx, []uint64{height})
continue
}
// If we successfully fetched the node height earlier, update the estimated queue length.
if nodeHeight != -1 {
b.sendQueueLengthMetric(uint64(nodeHeight) - height)
}
cancel()
backoff.Success()
b.logger.Info("processed block", "height", height)
Expand Down Expand Up @@ -425,13 +466,16 @@ func NewAnalyzer(
if batchSize == 0 {
batchSize = defaultBatchSize
}
return &blockBasedAnalyzer{
a := &blockBasedAnalyzer{
blockRange: blockRange,
batchSize: batchSize,
analyzerName: name,
processor: processor,
target: target,
logger: logger.With("analyzer", name, "mode", mode),
metrics: metrics.NewDefaultAnalysisMetrics(name),
slowSync: mode == analyzer.SlowSyncMode,
}, nil
}

return a, nil
}
16 changes: 8 additions & 8 deletions analyzer/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func setupAnalyzer(t *testing.T, testDb *postgres.Client, p *mockProcessor, cfg
p.isFastSync = (mode == analyzer.FastSyncMode)

// Initialize the block analyzer.
logger, err := log.NewLogger(fmt.Sprintf("test-analyzer-%s", p.name), os.Stdout, log.FmtJSON, log.LevelInfo)
logger, err := log.NewLogger(fmt.Sprintf("test_analyzer-%s", p.name), os.Stdout, log.FmtJSON, log.LevelInfo)
require.NoError(t, err, "log.NewLogger")
var blockRange config.BlockRange
switch mode {
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestFastSyncBlockAnalyzer(t *testing.T) {
ctx := context.Background()

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", storage: db}
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)

// Run the analyzer and ensure all blocks are processed.
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) {
ps := []*mockProcessor{}
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: "test-analyzer", storage: db}
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) {
return fmt.Errorf("failing analyzer")
}
}
p := &mockProcessor{name: "test-analyzer", storage: db, fail: fail}
p := &mockProcessor{name: "test_analyzer", storage: db, fail: fail}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) {
ps := []*mockProcessor{}
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), storage: db}
p := &mockProcessor{name: fmt.Sprintf("test_analyzer_%d", i), storage: db}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) {
ctx := context.Background()

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", storage: db}
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)

// Run the analyzer and ensure all blocks are processed.
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) {
ctx := context.Background()

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", storage: db, fail: func(height uint64) error {
p := &mockProcessor{name: "test_analyzer", storage: db, fail: func(height uint64) error {
// Fail ~5% of the time.
if rand.Float64() > 0.95 { // /nolint:gosec // G404: Use of weak random number generator (math/rand instead of crypto/rand).
return fmt.Errorf("failed by chance")
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) {
ps := []*mockProcessor{}
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), storage: db}
p := &mockProcessor{name: fmt.Sprintf("test_analyzer_%d", i), storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down
2 changes: 1 addition & 1 deletion analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
// Fetch all data.
fetchTimer := m.metrics.BlockFetchLatencies()
data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode)
fetchTimer.ObserveDuration()
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) {
return analyzer.ErrOutOfRange
}
return err
}
fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading.

// Process data, prepare updates.
analysisTimer := m.metrics.BlockAnalysisLatencies()
Expand Down
17 changes: 6 additions & 11 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
)

Expand All @@ -41,9 +40,9 @@ type itemBasedAnalyzer[Item any] struct {

processor ItemProcessor[Item]

target storage.TargetStorage
logger *log.Logger
queueLengthMetric prometheus.Gauge
target storage.TargetStorage
logger *log.Logger
metrics metrics.AnalysisMetrics
}

var _ analyzer.Analyzer = (*itemBasedAnalyzer[any])(nil)
Expand Down Expand Up @@ -88,12 +87,8 @@ func NewAnalyzer[Item any](
processor,
target,
logger,
prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("%s_queue_length", name),
Help: fmt.Sprintf("count of items in the work queue for the %s analyzer", name),
}),
metrics.NewDefaultAnalysisMetrics(name),
}
prometheus.MustRegister(a.queueLengthMetric)

return a, nil
}
Expand All @@ -105,7 +100,7 @@ func (a *itemBasedAnalyzer[Item]) sendQueueLengthMetric(ctx context.Context) (in
a.logger.Warn("error fetching queue length", "err", err)
return 0, err
}
a.queueLengthMetric.Set(float64(queueLength))
a.metrics.QueueLength(a.analyzerName).Set(float64(queueLength))
return queueLength, nil
}

Expand Down
59 changes: 44 additions & 15 deletions analyzer/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,48 @@ import (
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"

consensusAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api"
runtimeAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/runtime/client/api"
)

const (
nodeStatsAnalyzerName = "node_stats"
)

type processor struct {
source nodeapi.ConsensusApiLite
target storage.TargetStorage
logger *log.Logger
layers []common.Layer
source nodeapi.ConsensusApiLite
emeraldSource nodeapi.RuntimeApiLite
sapphireSource nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
}

var _ item.ItemProcessor[common.Layer] = (*processor)(nil)

func NewAnalyzer(
cfg config.ItemBasedAnalyzerConfig,
layers []common.Layer,
sourceClient nodeapi.ConsensusApiLite,
emeraldClient nodeapi.RuntimeApiLite,
sapphireClient nodeapi.RuntimeApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
if cfg.Interval == 0 {
cfg.Interval = 3 * time.Second
}
logger = logger.With("analyzer", nodeStatsAnalyzerName)
// Default to [consensus, emerald, sapphire] if layers is not specified.
if len(layers) == 0 {
layers = []common.Layer{common.LayerConsensus, common.LayerEmerald, common.LayerSapphire}
}
p := &processor{
source: sourceClient,
target: target,
logger: logger.With("analyzer", nodeStatsAnalyzerName),
layers: layers,
source: sourceClient,
emeraldSource: emeraldClient,
sapphireSource: sapphireClient,
target: target,
logger: logger.With("analyzer", nodeStatsAnalyzerName),
}

return item.NewAnalyzer[common.Layer](
Expand All @@ -54,22 +68,37 @@ func NewAnalyzer(
}

func (p *processor) GetItems(ctx context.Context, limit uint64) ([]common.Layer, error) {
return []common.Layer{common.LayerConsensus}, nil
return p.layers, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, layer common.Layer) error {
p.logger.Debug("fetching node height", "layer", layer)
// We currently only support consensus. Update when this is no longer the case.
if layer != common.LayerConsensus {
latestHeight := uint64(0) // will be fetched from the node
switch layer {
case common.LayerConsensus:
latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = uint64(latestBlock.Height)
case common.LayerEmerald:
latestBlock, err := p.emeraldSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = latestBlock.Round
case common.LayerSapphire:
latestBlock, err := p.sapphireSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = latestBlock.Round
default:
return fmt.Errorf("unsupported layer %s", layer)
}
latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s, %w", layer, err)
}
batch.Queue(queries.ConsensusNodeHeightUpsert,
batch.Queue(queries.NodeHeightUpsert,
layer,
latestBlock.Height,
latestHeight,
)

return nil
Expand Down
7 changes: 6 additions & 1 deletion analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ var (
SET processed_time = CURRENT_TIMESTAMP, is_fast_sync = $3
WHERE height = $1 AND analyzer = $2`

ConsensusNodeHeightUpsert = `
NodeHeight = `
SELECT height
FROM chain.latest_node_heights
WHERE layer = $1`

NodeHeightUpsert = `
INSERT INTO chain.latest_node_heights (layer, height)
VALUES
($1, $2)
Expand Down
10 changes: 9 additions & 1 deletion cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,15 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
if err1 != nil {
return nil, err1
}
return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, sourceClient, dbClient, logger)
emeraldClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
}
sapphireClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
}
return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, cfg.Analyzers.NodeStats.Layers, sourceClient, emeraldClient, sapphireClient, dbClient, logger)
})
}
if cfg.Analyzers.AggregateStats != nil {
Expand Down
Loading

0 comments on commit fc099d7

Please sign in to comment.