Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Prometheur metrics' pusher #7346

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessing
/// <param name="stateReader"></param>
/// <param name="logManager"></param>
/// <param name="options"></param>
public BlockchainProcessor(
IBlockTree? blockTree,
public BlockchainProcessor(IBlockTree? blockTree,
IBlockProcessor? blockProcessor,
IBlockPreprocessorStep? recoveryStep,
IStateReader stateReader,
Expand All @@ -94,6 +93,15 @@ public BlockchainProcessor(
_stats = new ProcessingStats(_logger);
}

/// <summary>
/// Raised when stats are updated.
/// </summary>
public event EventHandler<EventArgs> StatsUpdated
{
add { _stats.StatsUpdated += value; }
remove { _stats.StatsUpdated -= value; }
}

private void OnNewHeadBlock(object? sender, BlockEventArgs e)
{
_lastProcessedBlock = DateTime.UtcNow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public void UpdateStats(Block? block, IBlockTree blockTreeCtx, long blockProcess
Metrics.BlockchainHeight = block.Header.Number;
Metrics.BestKnownBlockNumber = blockTreeCtx.BestKnownNumber;

// Inform interested about the stats update
StatsUpdated?.Invoke(this, EventArgs.Empty);

_blockProcessingMicroseconds = _processingStopwatch.ElapsedMicroseconds();
_runningMicroseconds = _runStopwatch.ElapsedMicroseconds();
_runMicroseconds = (_runningMicroseconds - _lastElapsedRunningMicroseconds);
Expand All @@ -104,6 +107,8 @@ public void UpdateStats(Block? block, IBlockTree blockTreeCtx, long blockProcess
}
}

public event EventHandler<EventArgs> StatsUpdated;

private void GenerateReport() => ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);

void IThreadPoolWorkItem.Execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public ReadOnlyChainProcessingEnv(
_blockProcessingQueue = new BlockchainProcessor(blockTree, BlockProcessor, recoveryStep, stateReader, logManager, BlockchainProcessor.Options.NoReceipts);
BlockProcessingQueue = _blockProcessingQueue;
ChainProcessor = new OneTimeChainProcessor(scope.WorldState, _blockProcessingQueue);
_blockProcessingQueue = new BlockchainProcessor(blockTree, BlockProcessor, recoveryStep, stateReader, logManager, BlockchainProcessor.Options.NoReceipts);
BlockProcessingQueue = _blockProcessingQueue;
ChainProcessor = new OneTimeChainProcessor(scope.WorldState, _blockProcessingQueue);
}

protected virtual IBlockProcessor CreateBlockProcessor(
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ protected virtual Task InitBlockchain()
IsMainProcessor = true
};

blockchainProcessor.StatsUpdated += (_, _) => _api.MonitoringService.ForceUpdate();

setApi.BlockProcessingQueue = blockchainProcessor;
setApi.BlockchainProcessor = blockchainProcessor;

Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void Test_update_correct_gauge()
TestMetrics.WithCustomLabelType[new CustomLabelType(1, 11, 111)] = 1111;
TestMetrics.OldDictionaryMetrics["metrics0"] = 4;
TestMetrics.OldDictionaryMetrics["metrics1"] = 5;
metricsController.UpdateMetrics(null);
metricsController.UpdateMetrics();

var gauges = metricsController._gauges;
var keyDefault = $"{nameof(TestMetrics)}.{nameof(TestMetrics.OneTwoThree)}";
Expand Down Expand Up @@ -130,7 +130,7 @@ public void Register_and_update_metrics_should_not_throw_exception()
metricsController.RegisterMetrics(metric);
}

metricsController.UpdateMetrics(null);
metricsController.UpdateMetrics();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public interface IMetricsConfig : IConfig
[ConfigItem(DefaultValue = "5", Description = "The frequency of pushing metrics to Prometheus, in seconds.")]
int IntervalSeconds { get; }

[ConfigItem(DefaultValue = "2", Description = "The minimal frequency of pushing metrics to Prometheus, in seconds. Push should not occur more often than this")]
int MinimalIntervalSeconds { get; }

[ConfigItem(Description = "The name to display on the Grafana dashboard.", DefaultValue = "\"Nethermind\"")]
string NodeName { get; }

Expand Down
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ public class MetricsConfig : IMetricsConfig
public string ExposeHost { get; set; } = "+";
public int? ExposePort { get; set; } = null;
public bool Enabled { get; set; } = false;

public bool CountersEnabled { get; set; } = false;
public string PushGatewayUrl { get; set; } = "";
public int IntervalSeconds { get; set; } = 5;

public int MinimalIntervalSeconds { get; set; } = 2;
public string NodeName { get; set; } = "Nethermind";
public bool EnableDbSizeMetrics { get; set; } = true;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,10 @@ public interface IMonitoringService
Task StartAsync();
Task StopAsync();
void AddMetricsUpdateAction(Action callback);

/// <summary>
/// Forces gathering metrics and reporting them as soon as possible to the underlying sinks.
/// </summary>
void ForceUpdate();
}
}
172 changes: 172 additions & 0 deletions src/Nethermind/Nethermind.Monitoring/MetricPusher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

#nullable enable
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Prometheus;

namespace Nethermind.Monitoring;

using System.Diagnostics;
using System.Text;

/// <summary>
/// A metric server that regularly pushes metrics to a Prometheus PushGateway.
/// </summary>
public class MetricPusher : MetricHandler
{
private readonly HttpMethod _method;
private readonly Uri _targetUrl;
private readonly Func<HttpClient> _httpClientProvider;
private readonly SemaphoreSlim _wait;

public MetricPusher(MetricPusherOptions options)
{
if (string.IsNullOrEmpty(options.Endpoint))
throw new ArgumentNullException(nameof(options.Endpoint));

if (string.IsNullOrEmpty(options.Job))
throw new ArgumentNullException(nameof(options.Job));

if (options.IntervalMilliseconds <= 0)
Scooletz marked this conversation as resolved.
Show resolved Hide resolved
throw new ArgumentException("Interval must be greater than zero", nameof(options.IntervalMilliseconds));

_registry = options.Registry ?? Prometheus.Metrics.DefaultRegistry;

_httpClientProvider = options.HttpClientProvider ?? (() => SingletonHttpClient);

StringBuilder sb = new StringBuilder($"{options.Endpoint!.TrimEnd('/')}/job/{options.Job}");
if (!string.IsNullOrEmpty(options.Instance))
sb.Append($"/instance/{options.Instance}");

if (options.AdditionalLabels != null)
{
foreach (Tuple<string, string> pair in options.AdditionalLabels)
{
if (pair == null || string.IsNullOrEmpty(pair.Item1) || string.IsNullOrEmpty(pair.Item2))
throw new NotSupportedException(
$"Invalid {nameof(MetricPusher)} additional label: ({pair?.Item1}):({pair?.Item2})");

sb.Append($"/{pair.Item1}/{pair.Item2}");
}
}

if (!Uri.TryCreate(sb.ToString(), UriKind.Absolute, out Uri? targetUrl))
{
throw new ArgumentException("Endpoint must be a valid url", nameof(options.Endpoint));
}

_targetUrl = targetUrl;
_onError = options.OnError;

_method = options.ReplaceOnPush ? HttpMethod.Put : HttpMethod.Post;
_wait = new SemaphoreSlim(0);
}

private static readonly HttpClient SingletonHttpClient = new();
private readonly CollectorRegistry _registry;
private readonly Action<Exception>? _onError;

protected override Task StartServer(CancellationToken cancel)
{
return Task.Run(async delegate
{
while (!cancel.IsCancellationRequested)
{
try
{
await _wait.WaitAsync(cancel);
}
catch (OperationCanceledException)
{
break;
}

// As the wait above is cancellable, this will send at the end
await PushOnce();
}
});
}

public void ScheduleUpdatePush() => _wait.Release();

private async Task PushOnce()
{
try
{
HttpClient httpClient = _httpClientProvider();

var request = new HttpRequestMessage
{
Method = _method,
RequestUri = _targetUrl,
// We use a copy-pasted implementation of PushStreamContent here to avoid taking a dependency on the old ASP.NET Web API where it lives.
Content =
new PushRegistryHttpContent(_registry, PrometheusConstants.ExporterContentTypeValue),
};

// ReSharper disable once MethodSupportsCancellation
using HttpResponseMessage response = await httpClient.SendAsync(request);

// If anything goes wrong, we want to get at least an entry in the trace log.
response.EnsureSuccessStatusCode();
}
catch (ScrapeFailedException ex)
{
// We do not consider failed scrapes a reportable error since the user code that raises the failure should be the one logging it.
Trace.WriteLine($"Skipping metrics push due to failed scrape: {ex.Message}");
}
catch (Exception ex)
{
HandleFailedPush(ex);
}
}

private void HandleFailedPush(Exception ex)
{
if (_onError != null)
{
// Asynchronous because we don't trust the callee to be fast.
Task.Run(() => _onError(ex));
}
else
{
// If there is no error handler registered, we write to trace to at least hopefully get some attention to the problem.
Trace.WriteLine($"Error in MetricPusher: {ex}");
}
}

private sealed class PushRegistryHttpContent : HttpContent
{
private readonly CollectorRegistry _registry;

private static readonly MediaTypeHeaderValue OctetStreamHeaderValue =
MediaTypeHeaderValue.Parse("application/octet-stream");

/// <summary>
/// Initializes a new instance of the <see cref="Prometheus.PushStreamContentInternal"/> class with the given <see cref="MediaTypeHeaderValue"/>.
/// </summary>
public PushRegistryHttpContent(CollectorRegistry registry, MediaTypeHeaderValue? mediaType)
{
_registry = registry;
Headers.ContentType = mediaType ?? OctetStreamHeaderValue;
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
await _registry.CollectAndExportAsTextAsync(stream, default);
}

protected override bool TryComputeLength(out long length)
{
length = default;
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ namespace Nethermind.Monitoring.Metrics
public interface IMetricsController
{
void RegisterMetrics(Type type);
void StartUpdating();
void StartUpdating(Action metricsUpdated);
void StopUpdating();
void AddMetricsUpdateAction(Action callback);
void ForceUpdate();
}
}
Loading