Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metrics] add block analyzer queuelength metrics #577

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
)

Expand Down Expand Up @@ -61,8 +62,9 @@ type blockBasedAnalyzer struct {

processor BlockProcessor

target storage.TargetStorage
logger *log.Logger
target storage.TargetStorage
logger *log.Logger
metrics metrics.AnalysisMetrics

slowSync bool
}
Expand Down Expand Up @@ -259,6 +261,31 @@ func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (o
return true
}

// sendQueueLengthMetric updates the relevant Prometheus metric with an approximation
// of the number of known blocks that require processing, calculated as the difference
// between the analyzer's highest processed block and the current block height
// on-chain (fetched by the node-stats analyzer).
// Note that the true count may be higher during fast-sync.
func (b *blockBasedAnalyzer) sendQueueLengthMetric(queueLength uint64) {
// For block-based analyzers, the analyzer name is identical to the layer name.
b.metrics.QueueLength(b.analyzerName).Set(float64(queueLength))
}

// Returns the chain height of the layer this analyzer is processing.
// The heights are fetched and added to the database by the node-stats analyzer.
func (b *blockBasedAnalyzer) nodeHeight(ctx context.Context) (int, error) {
var nodeHeight int
err := b.target.QueryRow(ctx, queries.NodeHeight, b.analyzerName).Scan(&nodeHeight)
switch err {
case nil:
return nodeHeight, nil
case pgx.ErrNoRows:
return -1, nil
default:
return -1, fmt.Errorf("error fetching chain height for consensus: %w", err)
}
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Run prework.
Expand Down Expand Up @@ -325,6 +352,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
backoff.Failure()
continue
}
nodeHeight, err := b.nodeHeight(ctx)
if err != nil {
b.logger.Warn("error fetching current node height: %w", err)
}

// Process blocks.
b.logger.Debug("picked blocks for processing", "heights", heights)
Expand All @@ -337,6 +368,12 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// in the batch will exceed the current lock expiry of 5min. The analyzer will terminate
// the batch early and attempt to refresh the locks for a new batch.
if b.slowSync {
// In slow-sync mode, we update the node-height at each block for a more
// precise measurement.
nodeHeight, err = b.nodeHeight(ctx)
if err != nil {
b.logger.Warn("error fetching current node height: %w", err)
}
select {
case <-time.After(backoff.Timeout()):
// Process the next block
Expand Down Expand Up @@ -379,6 +416,10 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
b.unlockBlocks(ctx, []uint64{height})
continue
}
// If we successfully fetched the node height earlier, update the estimated queue length.
if nodeHeight != -1 {
b.sendQueueLengthMetric(uint64(nodeHeight) - height)
}
cancel()
backoff.Success()
b.logger.Info("processed block", "height", height)
Expand Down Expand Up @@ -425,13 +466,16 @@ func NewAnalyzer(
if batchSize == 0 {
batchSize = defaultBatchSize
}
return &blockBasedAnalyzer{
a := &blockBasedAnalyzer{
blockRange: blockRange,
batchSize: batchSize,
analyzerName: name,
processor: processor,
target: target,
logger: logger.With("analyzer", name, "mode", mode),
metrics: metrics.NewDefaultAnalysisMetrics(name),
slowSync: mode == analyzer.SlowSyncMode,
}, nil
}

return a, nil
}
16 changes: 8 additions & 8 deletions analyzer/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func setupAnalyzer(t *testing.T, testDb *postgres.Client, p *mockProcessor, cfg
p.isFastSync = (mode == analyzer.FastSyncMode)

// Initialize the block analyzer.
logger, err := log.NewLogger(fmt.Sprintf("test-analyzer-%s", p.name), os.Stdout, log.FmtJSON, log.LevelInfo)
logger, err := log.NewLogger(fmt.Sprintf("test_analyzer-%s", p.name), os.Stdout, log.FmtJSON, log.LevelInfo)
require.NoError(t, err, "log.NewLogger")
var blockRange config.BlockRange
switch mode {
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestFastSyncBlockAnalyzer(t *testing.T) {
ctx := context.Background()

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", storage: db}
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)

// Run the analyzer and ensure all blocks are processed.
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) {
ps := []*mockProcessor{}
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: "test-analyzer", storage: db}
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) {
return fmt.Errorf("failing analyzer")
}
}
p := &mockProcessor{name: "test-analyzer", storage: db, fail: fail}
p := &mockProcessor{name: "test_analyzer", storage: db, fail: fail}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) {
ps := []*mockProcessor{}
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), storage: db}
p := &mockProcessor{name: fmt.Sprintf("test_analyzer_%d", i), storage: db}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) {
ctx := context.Background()

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", storage: db}
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)

// Run the analyzer and ensure all blocks are processed.
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) {
ctx := context.Background()

db := setupDB(t)
p := &mockProcessor{name: "test-analyzer", storage: db, fail: func(height uint64) error {
p := &mockProcessor{name: "test_analyzer", storage: db, fail: func(height uint64) error {
// Fail ~5% of the time.
if rand.Float64() > 0.95 { // /nolint:gosec // G404: Use of weak random number generator (math/rand instead of crypto/rand).
return fmt.Errorf("failed by chance")
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) {
ps := []*mockProcessor{}
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test-analyzer-%d", i), storage: db}
p := &mockProcessor{name: fmt.Sprintf("test_analyzer_%d", i), storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
Expand Down
2 changes: 1 addition & 1 deletion analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
// Fetch all data.
fetchTimer := m.metrics.BlockFetchLatencies()
data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode)
fetchTimer.ObserveDuration()
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) {
return analyzer.ErrOutOfRange
}
return err
}
fetchTimer.ObserveDuration() // We make no observation in case of a data fetch error; those timings are misleading.

// Process data, prepare updates.
analysisTimer := m.metrics.BlockAnalysisLatencies()
Expand Down
17 changes: 6 additions & 11 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
)

Expand All @@ -41,9 +40,9 @@ type itemBasedAnalyzer[Item any] struct {

processor ItemProcessor[Item]

target storage.TargetStorage
logger *log.Logger
queueLengthMetric prometheus.Gauge
target storage.TargetStorage
logger *log.Logger
metrics metrics.AnalysisMetrics
}

var _ analyzer.Analyzer = (*itemBasedAnalyzer[any])(nil)
Expand Down Expand Up @@ -88,12 +87,8 @@ func NewAnalyzer[Item any](
processor,
target,
logger,
prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("%s_queue_length", name),
Help: fmt.Sprintf("count of items in the work queue for the %s analyzer", name),
}),
metrics.NewDefaultAnalysisMetrics(name),
}
prometheus.MustRegister(a.queueLengthMetric)

return a, nil
}
Expand All @@ -105,7 +100,7 @@ func (a *itemBasedAnalyzer[Item]) sendQueueLengthMetric(ctx context.Context) (in
a.logger.Warn("error fetching queue length", "err", err)
return 0, err
}
a.queueLengthMetric.Set(float64(queueLength))
a.metrics.QueueLength(a.analyzerName).Set(float64(queueLength))
return queueLength, nil
}

Expand Down
59 changes: 44 additions & 15 deletions analyzer/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,48 @@ import (
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"

consensusAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api"
runtimeAPI "github.com/oasisprotocol/nexus/coreapi/v22.2.11/runtime/client/api"
)

const (
nodeStatsAnalyzerName = "node_stats"
)

type processor struct {
source nodeapi.ConsensusApiLite
target storage.TargetStorage
logger *log.Logger
layers []common.Layer
source nodeapi.ConsensusApiLite
emeraldSource nodeapi.RuntimeApiLite
sapphireSource nodeapi.RuntimeApiLite
target storage.TargetStorage
logger *log.Logger
}

var _ item.ItemProcessor[common.Layer] = (*processor)(nil)

func NewAnalyzer(
cfg config.ItemBasedAnalyzerConfig,
layers []common.Layer,
sourceClient nodeapi.ConsensusApiLite,
emeraldClient nodeapi.RuntimeApiLite,
sapphireClient nodeapi.RuntimeApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
if cfg.Interval == 0 {
cfg.Interval = 3 * time.Second
}
logger = logger.With("analyzer", nodeStatsAnalyzerName)
// Default to [consensus, emerald, sapphire] if layers is not specified.
if len(layers) == 0 {
layers = []common.Layer{common.LayerConsensus, common.LayerEmerald, common.LayerSapphire}
}
p := &processor{
source: sourceClient,
target: target,
logger: logger.With("analyzer", nodeStatsAnalyzerName),
layers: layers,
source: sourceClient,
emeraldSource: emeraldClient,
sapphireSource: sapphireClient,
target: target,
logger: logger.With("analyzer", nodeStatsAnalyzerName),
}

return item.NewAnalyzer[common.Layer](
Expand All @@ -54,22 +68,37 @@ func NewAnalyzer(
}

func (p *processor) GetItems(ctx context.Context, limit uint64) ([]common.Layer, error) {
return []common.Layer{common.LayerConsensus}, nil
return p.layers, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, layer common.Layer) error {
p.logger.Debug("fetching node height", "layer", layer)
// We currently only support consensus. Update when this is no longer the case.
if layer != common.LayerConsensus {
latestHeight := uint64(0) // will be fetched from the node
switch layer {
case common.LayerConsensus:
latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = uint64(latestBlock.Height)
case common.LayerEmerald:
latestBlock, err := p.emeraldSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = latestBlock.Round
case common.LayerSapphire:
latestBlock, err := p.sapphireSource.GetBlockHeader(ctx, runtimeAPI.RoundLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s: %w", layer, err)
}
latestHeight = latestBlock.Round
default:
return fmt.Errorf("unsupported layer %s", layer)
}
latestBlock, err := p.source.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
return fmt.Errorf("error fetching latest block height for layer %s, %w", layer, err)
}
batch.Queue(queries.ConsensusNodeHeightUpsert,
batch.Queue(queries.NodeHeightUpsert,
layer,
latestBlock.Height,
latestHeight,
)

return nil
Expand Down
7 changes: 6 additions & 1 deletion analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ var (
SET processed_time = CURRENT_TIMESTAMP, is_fast_sync = $3
WHERE height = $1 AND analyzer = $2`

ConsensusNodeHeightUpsert = `
NodeHeight = `
SELECT height
FROM chain.latest_node_heights
WHERE layer = $1`

NodeHeightUpsert = `
INSERT INTO chain.latest_node_heights (layer, height)
VALUES
($1, $2)
Expand Down
10 changes: 9 additions & 1 deletion cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,15 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
if err1 != nil {
return nil, err1
}
return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, sourceClient, dbClient, logger)
emeraldClient, err1 := sources.Runtime(ctx, common.RuntimeEmerald)
if err1 != nil {
return nil, err1
}
sapphireClient, err1 := sources.Runtime(ctx, common.RuntimeSapphire)
if err1 != nil {
return nil, err1
}
return nodestats.NewAnalyzer(cfg.Analyzers.NodeStats.ItemBasedAnalyzerConfig, cfg.Analyzers.NodeStats.Layers, sourceClient, emeraldClient, sapphireClient, dbClient, logger)
})
}
if cfg.Analyzers.AggregateStats != nil {
Expand Down
Loading
Loading