From 76e630584e0b2d87e181fba3d4dea6636ac096e9 Mon Sep 17 00:00:00 2001 From: scooletz Date: Tue, 20 Aug 2024 16:32:25 +0200 Subject: [PATCH 01/10] a different kind of mentrics pusher --- .../Config/IMetricsConfig.cs | 3 + .../Config/MetricsConfig.cs | 2 + .../Nethermind.Monitoring/MetricPusher.cs | 177 ++++++++++++++++++ .../MonitoringService.cs | 3 - 4 files changed, 182 insertions(+), 3 deletions(-) create mode 100644 src/Nethermind/Nethermind.Monitoring/MetricPusher.cs diff --git a/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs b/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs index 0186df10e5d..531f9497c47 100644 --- a/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs +++ b/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs @@ -17,6 +17,9 @@ public interface IMetricsConfig : IConfig [ConfigItem(Description = "Whether to publish various metrics to Prometheus Pushgateway at a given interval.", DefaultValue = "false")] bool Enabled { get; } + [ConfigItem(Description = "Whether to publish various metrics to Prometheus Pushgateway right after block is processed.", DefaultValue = "false")] + bool PushAfterBlock { get; } + [ConfigItem(Description = "Whether to publish metrics using .NET diagnostics that can be collected with dotnet-counters.", DefaultValue = "false")] bool CountersEnabled { get; } diff --git a/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs b/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs index a6cb0e1534d..15ea273d5c9 100644 --- a/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs +++ b/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs @@ -8,6 +8,8 @@ public class MetricsConfig : IMetricsConfig public string ExposeHost { get; set; } = "+"; public int? ExposePort { get; set; } = null; public bool Enabled { get; set; } = false; + + public bool PushAfterBlock { get; set; } = false; public bool CountersEnabled { get; set; } = false; public string PushGatewayUrl { get; set; } = ""; public int IntervalSeconds { get; set; } = 5; diff --git a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs new file mode 100644 index 00000000000..4a016a63e07 --- /dev/null +++ b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs @@ -0,0 +1,177 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +#nullable enable +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using Prometheus; + +namespace Nethermind.Monitoring; + +using System.Diagnostics; +using System.Text; + +/// +/// A metric server that regularly pushes metrics to a Prometheus PushGateway. +/// +public class MetricPusher : MetricHandler +{ + private readonly TimeSpan _pushInterval; + private readonly HttpMethod _method; + private readonly Uri _targetUrl; + private readonly Func _httpClientProvider; + private readonly SemaphoreSlim _wait; + + public MetricPusher(MetricPusherOptions options) + { + if (string.IsNullOrEmpty(options.Endpoint)) + throw new ArgumentNullException(nameof(options.Endpoint)); + + if (string.IsNullOrEmpty(options.Job)) + throw new ArgumentNullException(nameof(options.Job)); + + if (options.IntervalMilliseconds <= 0) + throw new ArgumentException("Interval must be greater than zero", nameof(options.IntervalMilliseconds)); + + _registry = options.Registry ?? Prometheus.Metrics.DefaultRegistry; + + _httpClientProvider = options.HttpClientProvider ?? (() => SingletonHttpClient); + + StringBuilder sb = new StringBuilder($"{options.Endpoint!.TrimEnd('/')}/job/{options.Job}"); + if (!string.IsNullOrEmpty(options.Instance)) + sb.Append($"/instance/{options.Instance}"); + + if (options.AdditionalLabels != null) + { + foreach (Tuple pair in options.AdditionalLabels) + { + if (pair == null || string.IsNullOrEmpty(pair.Item1) || string.IsNullOrEmpty(pair.Item2)) + throw new NotSupportedException( + $"Invalid {nameof(MetricPusher)} additional label: ({pair?.Item1}):({pair?.Item2})"); + + sb.Append($"/{pair.Item1}/{pair.Item2}"); + } + } + + if (!Uri.TryCreate(sb.ToString(), UriKind.Absolute, out Uri? targetUrl)) + { + throw new ArgumentException("Endpoint must be a valid url", nameof(options.Endpoint)); + } + + _targetUrl = targetUrl; + + _pushInterval = TimeSpan.FromMilliseconds(options.IntervalMilliseconds); + _onError = options.OnError; + + _method = options.ReplaceOnPush ? HttpMethod.Put : HttpMethod.Post; + _wait = new SemaphoreSlim(0); + } + + private static readonly HttpClient SingletonHttpClient = new(); + private readonly CollectorRegistry _registry; + private readonly Action? _onError; + + protected override Task StartServer(CancellationToken cancel) + { + return Task.Run(async delegate + { + while (!cancel.IsCancellationRequested) + { + try + { + await _wait.WaitAsync(_pushInterval, cancel); + } + catch (OperationCanceledException) + { + break; + } + + await PushOnce(); + } + + // Push the final state + await PushOnce(); + }); + } + + public void Push() => _wait.Release(); + + private async Task PushOnce() + { + try + { + HttpClient httpClient = _httpClientProvider(); + + var request = new HttpRequestMessage + { + Method = _method, + RequestUri = _targetUrl, + // We use a copy-pasted implementation of PushStreamContent here to avoid taking a dependency on the old ASP.NET Web API where it lives. + Content = + new PushRegistryHttpContent(_registry, PrometheusConstants.ExporterContentTypeValue), + }; + + // ReSharper disable once MethodSupportsCancellation + using HttpResponseMessage response = await httpClient.SendAsync(request); + + // If anything goes wrong, we want to get at least an entry in the trace log. + response.EnsureSuccessStatusCode(); + } + catch (ScrapeFailedException ex) + { + // We do not consider failed scrapes a reportable error since the user code that raises the failure should be the one logging it. + Trace.WriteLine($"Skipping metrics push due to failed scrape: {ex.Message}"); + } + catch (Exception ex) + { + HandleFailedPush(ex); + } + } + + private void HandleFailedPush(Exception ex) + { + if (_onError != null) + { + // Asynchronous because we don't trust the callee to be fast. + Task.Run(() => _onError(ex)); + } + else + { + // If there is no error handler registered, we write to trace to at least hopefully get some attention to the problem. + Trace.WriteLine(string.Format("Error in MetricPusher: {0}", ex)); + } + } + + private sealed class PushRegistryHttpContent : HttpContent + { + private readonly CollectorRegistry _registry; + + private static readonly MediaTypeHeaderValue OctetStreamHeaderValue = + MediaTypeHeaderValue.Parse("application/octet-stream"); + + /// + /// Initializes a new instance of the class with the given . + /// + public PushRegistryHttpContent(CollectorRegistry registry, MediaTypeHeaderValue? mediaType) + { + _registry = registry; + Headers.ContentType = mediaType ?? OctetStreamHeaderValue; + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) + { + await _registry.CollectAndExportAsTextAsync(stream, default); + } + + protected override bool TryComputeLength(out long length) + { + length = default; + return false; + } + } +} diff --git a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs index e40540dcfbe..a12db389bd8 100644 --- a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs @@ -22,7 +22,6 @@ public class MonitoringService : IMonitoringService private readonly string _exposeHost; private readonly int? _exposePort; private readonly string _nodeName; - private readonly bool _pushEnabled; private readonly string _pushGatewayUrl; private readonly int _intervalSeconds; @@ -34,7 +33,6 @@ public MonitoringService(IMetricsController metricsController, IMetricsConfig me int? exposePort = metricsConfig.ExposePort; string nodeName = metricsConfig.NodeName; string pushGatewayUrl = metricsConfig.PushGatewayUrl; - bool pushEnabled = metricsConfig.Enabled; int intervalSeconds = metricsConfig.IntervalSeconds; _exposeHost = exposeHost; @@ -43,7 +41,6 @@ public MonitoringService(IMetricsController metricsController, IMetricsConfig me ? throw new ArgumentNullException(nameof(nodeName)) : nodeName; _pushGatewayUrl = pushGatewayUrl; - _pushEnabled = pushEnabled; _intervalSeconds = intervalSeconds <= 0 ? throw new ArgumentException($"Invalid monitoring push interval: {intervalSeconds}s") : intervalSeconds; From 36c46059319e61ca4c323eee2865195ff099ecf2 Mon Sep 17 00:00:00 2001 From: scooletz Date: Wed, 21 Aug 2024 11:47:09 +0200 Subject: [PATCH 02/10] formatting --- src/Nethermind/Nethermind.Monitoring/MetricPusher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs index 4a016a63e07..012a813587a 100644 --- a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs +++ b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs @@ -143,7 +143,7 @@ private void HandleFailedPush(Exception ex) else { // If there is no error handler registered, we write to trace to at least hopefully get some attention to the problem. - Trace.WriteLine(string.Format("Error in MetricPusher: {0}", ex)); + Trace.WriteLine($"Error in MetricPusher: {ex}"); } } From 54596c1417f5ccdead12aaa56d3d7d3e129e0a61 Mon Sep 17 00:00:00 2001 From: scooletz Date: Wed, 21 Aug 2024 11:47:21 +0200 Subject: [PATCH 03/10] better dict iteration --- .../Metrics/MetricsController.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index 5e2e21c7f2d..c1b1ced2504 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -229,12 +229,12 @@ private void UpdateMetrics(Type type) if (info.LabelNames is null) { IDictionary dict = info.Dictionary; - // Its fine that the key here need to call `ToString()`. Better here then in the metrics, where it might - // impact the performance of whatever is updating the metrics. - foreach (object keyObj in dict.Keys) // Different dictionary seems to iterate to different KV type. So need to use `Keys` here. + // It's fine that the key here need to call `ToString()`. + // Better here then in the metrics, where it might impact the performance of whatever is updating the metrics. + foreach (DictionaryEntry kvp in dict) { - string keyStr = keyObj.ToString(); - double value = Convert.ToDouble(dict[keyObj]); + string keyStr = kvp.Key.ToString(); + double value = Convert.ToDouble(kvp.Value); string gaugeName = GetGaugeNameKey(info.DictionaryName, keyStr); if (ReplaceValueIfChanged(value, gaugeName) is null) @@ -250,10 +250,10 @@ private void UpdateMetrics(Type type) { IDictionary dict = info.Dictionary; string gaugeName = info.GaugeName; - foreach (object key in dict.Keys) + foreach (DictionaryEntry kvp in dict) { - double value = Convert.ToDouble(dict[key]); - switch (key) + double value = Convert.ToDouble(kvp.Value); + switch (kvp.Key) { case IMetricLabels label: ReplaceValueIfChanged(value, gaugeName, label.Labels); @@ -270,7 +270,7 @@ private void UpdateMetrics(Type type) break; } default: - ReplaceValueIfChanged(value, gaugeName, key.ToString()); + ReplaceValueIfChanged(value, gaugeName, kvp.Key.ToString()); break; } } From e80b6bf472c3e6f96e31d274dc53aa116469f1ec Mon Sep 17 00:00:00 2001 From: scooletz Date: Wed, 21 Aug 2024 13:04:46 +0200 Subject: [PATCH 04/10] allowing force update --- .../MetricsTests.cs | 4 +-- .../Nethermind.Monitoring/MetricPusher.cs | 2 +- .../Metrics/MetricsController.cs | 32 ++++++++++++++++--- .../MonitoringService.cs | 3 +- 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs b/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs index 05326ff65fb..9ff889e9a30 100644 --- a/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs +++ b/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs @@ -69,7 +69,7 @@ public void Test_update_correct_gauge() TestMetrics.WithCustomLabelType[new CustomLabelType(1, 11, 111)] = 1111; TestMetrics.OldDictionaryMetrics["metrics0"] = 4; TestMetrics.OldDictionaryMetrics["metrics1"] = 5; - metricsController.UpdateMetrics(null); + metricsController.UpdateMetrics(); var gauges = metricsController._gauges; var keyDefault = $"{nameof(TestMetrics)}.{nameof(TestMetrics.OneTwoThree)}"; @@ -130,7 +130,7 @@ public void Register_and_update_metrics_should_not_throw_exception() metricsController.RegisterMetrics(metric); } - metricsController.UpdateMetrics(null); + metricsController.UpdateMetrics(); }); } diff --git a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs index 012a813587a..15217778a82 100644 --- a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs +++ b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs @@ -99,7 +99,7 @@ protected override Task StartServer(CancellationToken cancel) }); } - public void Push() => _wait.Release(); + public void ForceUpdate() => _wait.Release(); private async Task PushOnce() { diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index c1b1ced2504..7bc977c67da 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -12,6 +12,7 @@ using System.Runtime.Serialization; using System.Text.RegularExpressions; using System.Threading; +using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Attributes; using Nethermind.Core.Collections; @@ -24,7 +25,8 @@ namespace Nethermind.Monitoring.Metrics public partial class MetricsController : IMetricsController { private readonly int _intervalSeconds; - private Timer _timer; + private CancellationTokenSource _cts; + private readonly SemaphoreSlim _wait = new SemaphoreSlim(0); private readonly Dictionary)[]> _membersCache = new(); private readonly Dictionary _dictionaryCache = new(); private readonly HashSet _metricTypes = new(); @@ -193,11 +195,33 @@ public MetricsController(IMetricsConfig metricsConfig) _useCounters = metricsConfig.CountersEnabled; } - public void StartUpdating() => _timer = new Timer(UpdateMetrics, null, TimeSpan.Zero, TimeSpan.FromSeconds(_intervalSeconds)); + public void StartUpdating() + { + _cts = new CancellationTokenSource(); + Task.Run(() => RunLoop(_cts.Token)); + + return; + async Task RunLoop(CancellationToken ct) + { + while (ct.IsCancellationRequested == false) + { + UpdateMetrics(); + try + { + await _wait.WaitAsync(TimeSpan.FromSeconds(_intervalSeconds), ct); + } + catch (OperationCanceledException) + { + } + } + } + } + + public void ForceUpdate() => _wait.Release(); - public void StopUpdating() => _timer?.Change(Timeout.Infinite, 0); + public void StopUpdating() => _cts.Cancel(); - public void UpdateMetrics(object state) + public void UpdateMetrics() { foreach (Action callback in _callbacks) { diff --git a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs index a12db389bd8..ae5a62b51fb 100644 --- a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs @@ -83,7 +83,8 @@ public async Task StartAsync() { new NethermindKestrelMetricServer(_exposeHost, _exposePort.Value).Start(); } - await Task.Factory.StartNew(() => _metricsController.StartUpdating(), TaskCreationOptions.LongRunning); + _metricsController.StartUpdating(); + if (_logger.IsInfo) _logger.Info($"Started monitoring for the group: {_options.Group}, instance: {_options.Instance}"); } From c06d6a4cd74791051d06eb39dc13317ff92280ad Mon Sep 17 00:00:00 2001 From: scooletz Date: Wed, 21 Aug 2024 13:30:53 +0200 Subject: [PATCH 05/10] preparing for the force updates --- .../IMonitoringService .cs | 5 +++++ .../Metrics/IMetricsController.cs | 3 ++- .../Metrics/MetricsController.cs | 17 ++++++++++++---- .../MonitoringService.cs | 20 +++++++++++++------ 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs b/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs index f8d13fc417f..83cd09f1c4f 100644 --- a/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs +++ b/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs @@ -11,5 +11,10 @@ public interface IMonitoringService Task StartAsync(); Task StopAsync(); void AddMetricsUpdateAction(Action callback); + + /// + /// Forces gathering metrics and reporting them as soon as possible to the underlying sinks. + /// + void ForceUpdate(); } } diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs index a5ed5783c91..7f33924ab4c 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs @@ -8,8 +8,9 @@ namespace Nethermind.Monitoring.Metrics public interface IMetricsController { void RegisterMetrics(Type type); - void StartUpdating(); + void StartUpdating(Action onForced); void StopUpdating(); void AddMetricsUpdateAction(Action callback); + void ForceUpdate(); } } diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index 7bc977c67da..a7f789f3901 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -26,7 +26,7 @@ public partial class MetricsController : IMetricsController { private readonly int _intervalSeconds; private CancellationTokenSource _cts; - private readonly SemaphoreSlim _wait = new SemaphoreSlim(0); + private readonly SemaphoreSlim _wait = new(0); private readonly Dictionary)[]> _membersCache = new(); private readonly Dictionary _dictionaryCache = new(); private readonly HashSet _metricTypes = new(); @@ -195,7 +195,7 @@ public MetricsController(IMetricsConfig metricsConfig) _useCounters = metricsConfig.CountersEnabled; } - public void StartUpdating() + public void StartUpdating(Action onForced) { _cts = new CancellationTokenSource(); Task.Run(() => RunLoop(_cts.Token)); @@ -205,14 +205,23 @@ async Task RunLoop(CancellationToken ct) { while (ct.IsCancellationRequested == false) { - UpdateMetrics(); + bool forced = false; + try { - await _wait.WaitAsync(TimeSpan.FromSeconds(_intervalSeconds), ct); + forced = await _wait.WaitAsync(TimeSpan.FromSeconds(_intervalSeconds), ct); } catch (OperationCanceledException) { } + + UpdateMetrics(); + + if (forced && onForced != null) + { + // The update was forced and there's a onForced delegate. Execute it. + onForced(); + } } } } diff --git a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs index ae5a62b51fb..6dff1c91a22 100644 --- a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs @@ -9,7 +9,6 @@ using Nethermind.Monitoring.Config; using System.Net.Sockets; using Prometheus; -using System.Runtime.InteropServices; namespace Nethermind.Monitoring { @@ -51,8 +50,10 @@ public MonitoringService(IMetricsController metricsController, IMetricsConfig me _options = GetOptions(); } - public async Task StartAsync() + public Task StartAsync() { + MetricPusher pusher = null; + if (!string.IsNullOrWhiteSpace(_pushGatewayUrl)) { MetricPusherOptions pusherOptions = new MetricPusherOptions @@ -75,17 +76,20 @@ public async Task StartAsync() if (_logger.IsTrace) _logger.Error(ex.Message, ex); // keeping it as Error to log the exception details with it. } }; - MetricPusher metricPusher = new MetricPusher(pusherOptions); - metricPusher.Start(); + pusher = new MetricPusher(pusherOptions); + pusher.Start(); } if (_exposePort is not null) { new NethermindKestrelMetricServer(_exposeHost, _exposePort.Value).Start(); } - _metricsController.StartUpdating(); + + _metricsController.StartUpdating(pusher != null ? pusher.ForceUpdate : null); if (_logger.IsInfo) _logger.Info($"Started monitoring for the group: {_options.Group}, instance: {_options.Instance}"); + + return Task.CompletedTask; } public void AddMetricsUpdateAction(Action callback) @@ -93,10 +97,14 @@ public void AddMetricsUpdateAction(Action callback) _metricsController.AddMetricsUpdateAction(callback); } + public void ForceUpdate() + { + _metricsController.ForceUpdate(); + } + public Task StopAsync() { _metricsController.StopUpdating(); - return Task.CompletedTask; } From 69122150876da05673886c914c547169bd6c3b2b Mon Sep 17 00:00:00 2001 From: scooletz Date: Wed, 21 Aug 2024 14:10:26 +0200 Subject: [PATCH 06/10] enhancing stats with force --- .../Processing/BlockchainProcessor.cs | 12 ++++++++++-- .../Processing/ProcessingStats.cs | 5 +++++ .../Processing/ReadOnlyChainProcessingEnv.cs | 3 --- .../Nethermind.Init/Steps/InitializeBlockchain.cs | 2 ++ .../Nethermind.Monitoring/NullMonitoringService.cs | 1 + 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs index c3cfb0b7234..5d98e71d713 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs @@ -73,8 +73,7 @@ public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessing /// /// /// - public BlockchainProcessor( - IBlockTree? blockTree, + public BlockchainProcessor(IBlockTree? blockTree, IBlockProcessor? blockProcessor, IBlockPreprocessorStep? recoveryStep, IStateReader stateReader, @@ -94,6 +93,15 @@ public BlockchainProcessor( _stats = new ProcessingStats(_logger); } + /// + /// Raised when stats are updated. + /// + public event EventHandler StatsUpdated + { + add { _stats.StatsUpdated += value; } + remove { _stats.StatsUpdated -= value; } + } + private void OnNewHeadBlock(object? sender, BlockEventArgs e) { _lastProcessedBlock = DateTime.UtcNow; diff --git a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs index 72b73241995..b51f07f82c2 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs @@ -84,6 +84,9 @@ public void UpdateStats(Block? block, IBlockTree blockTreeCtx, long blockProcess Metrics.BlockchainHeight = block.Header.Number; Metrics.BestKnownBlockNumber = blockTreeCtx.BestKnownNumber; + // Inform interested about the stats update + StatsUpdated?.Invoke(this, EventArgs.Empty); + _blockProcessingMicroseconds = _processingStopwatch.ElapsedMicroseconds(); _runningMicroseconds = _runStopwatch.ElapsedMicroseconds(); _runMicroseconds = (_runningMicroseconds - _lastElapsedRunningMicroseconds); @@ -104,6 +107,8 @@ public void UpdateStats(Block? block, IBlockTree blockTreeCtx, long blockProcess } } + public event EventHandler StatsUpdated; + private void GenerateReport() => ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); void IThreadPoolWorkItem.Execute() diff --git a/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs b/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs index f4bc42832c2..b16be8b9676 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs @@ -44,9 +44,6 @@ public ReadOnlyChainProcessingEnv( _blockProcessingQueue = new BlockchainProcessor(blockTree, BlockProcessor, recoveryStep, stateReader, logManager, BlockchainProcessor.Options.NoReceipts); BlockProcessingQueue = _blockProcessingQueue; ChainProcessor = new OneTimeChainProcessor(scope.WorldState, _blockProcessingQueue); - _blockProcessingQueue = new BlockchainProcessor(blockTree, BlockProcessor, recoveryStep, stateReader, logManager, BlockchainProcessor.Options.NoReceipts); - BlockProcessingQueue = _blockProcessingQueue; - ChainProcessor = new OneTimeChainProcessor(scope.WorldState, _blockProcessingQueue); } protected virtual IBlockProcessor CreateBlockProcessor( diff --git a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs index 21189802d53..dc46aa0e028 100644 --- a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs +++ b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs @@ -127,6 +127,8 @@ protected virtual Task InitBlockchain() IsMainProcessor = true }; + blockchainProcessor.StatsUpdated += (_, _) => _api.MonitoringService.ForceUpdate(); + setApi.BlockProcessingQueue = blockchainProcessor; setApi.BlockchainProcessor = blockchainProcessor; diff --git a/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs index 54fb766d0a9..a3595b2cdeb 100644 --- a/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs @@ -25,5 +25,6 @@ public Task StopAsync() } public void AddMetricsUpdateAction(Action callback) { } + public void ForceUpdate() { } } } From b78d8e9e2abf8220528cc615642da0db19b56db1 Mon Sep 17 00:00:00 2001 From: scooletz Date: Fri, 30 Aug 2024 15:52:07 +0200 Subject: [PATCH 07/10] change of logic --- .../Config/IMetricsConfig.cs | 6 +++--- .../Config/MetricsConfig.cs | 3 ++- .../Metrics/MetricsController.cs | 20 ++++++++++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs b/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs index 531f9497c47..dd091dfc058 100644 --- a/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs +++ b/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs @@ -17,9 +17,6 @@ public interface IMetricsConfig : IConfig [ConfigItem(Description = "Whether to publish various metrics to Prometheus Pushgateway at a given interval.", DefaultValue = "false")] bool Enabled { get; } - [ConfigItem(Description = "Whether to publish various metrics to Prometheus Pushgateway right after block is processed.", DefaultValue = "false")] - bool PushAfterBlock { get; } - [ConfigItem(Description = "Whether to publish metrics using .NET diagnostics that can be collected with dotnet-counters.", DefaultValue = "false")] bool CountersEnabled { get; } @@ -29,6 +26,9 @@ public interface IMetricsConfig : IConfig [ConfigItem(DefaultValue = "5", Description = "The frequency of pushing metrics to Prometheus, in seconds.")] int IntervalSeconds { get; } + [ConfigItem(DefaultValue = "2", Description = "The minimal frequency of pushing metrics to Prometheus, in seconds. Push should not occur more often than this")] + int MinimalIntervalSeconds { get; } + [ConfigItem(Description = "The name to display on the Grafana dashboard.", DefaultValue = "\"Nethermind\"")] string NodeName { get; } diff --git a/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs b/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs index 15ea273d5c9..39730841de9 100644 --- a/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs +++ b/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs @@ -9,10 +9,11 @@ public class MetricsConfig : IMetricsConfig public int? ExposePort { get; set; } = null; public bool Enabled { get; set; } = false; - public bool PushAfterBlock { get; set; } = false; public bool CountersEnabled { get; set; } = false; public string PushGatewayUrl { get; set; } = ""; public int IntervalSeconds { get; set; } = 5; + + public int MinimalIntervalSeconds { get; set; } = 2; public string NodeName { get; set; } = "Nethermind"; public bool EnableDbSizeMetrics { get; set; } = true; } diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index a7f789f3901..ef544c3af8e 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -25,6 +25,7 @@ namespace Nethermind.Monitoring.Metrics public partial class MetricsController : IMetricsController { private readonly int _intervalSeconds; + private readonly int _minIntervalSeconds; private CancellationTokenSource _cts; private readonly SemaphoreSlim _wait = new(0); private readonly Dictionary)[]> _membersCache = new(); @@ -192,6 +193,8 @@ private static Gauge CreateGauge(string name, string help = null, IDictionary Date: Mon, 2 Sep 2024 10:49:45 +0200 Subject: [PATCH 08/10] controller updated with the new signalling --- .../Metrics/MetricsController.cs | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index ef544c3af8e..e73988c043e 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -22,12 +22,18 @@ namespace Nethermind.Monitoring.Metrics { + /// + /// The metrics controller handles gathering metrics every . + /// Additionally, it allows forcing an update by calling that makes the wait interrupted and reporting executed immediately. + /// To do not flood the endpoint, is configured so that report happens with a gap that is at least that big. + /// public partial class MetricsController : IMetricsController { private readonly int _intervalSeconds; private readonly int _minIntervalSeconds; private CancellationTokenSource _cts; - private readonly SemaphoreSlim _wait = new(0); + private TaskCompletionSource _wait = new(); + private readonly object _waitLock = new(); private readonly Dictionary)[]> _membersCache = new(); private readonly Dictionary _dictionaryCache = new(); private readonly HashSet _metricTypes = new(); @@ -194,7 +200,6 @@ public MetricsController(IMetricsConfig metricsConfig) { _intervalSeconds = metricsConfig.IntervalSeconds == 0 ? 5 : metricsConfig.IntervalSeconds; _minIntervalSeconds = metricsConfig.MinimalIntervalSeconds == 0 ? 2 : metricsConfig.MinimalIntervalSeconds; - _useCounters = metricsConfig.CountersEnabled; } @@ -213,14 +218,20 @@ async Task RunLoop(CancellationToken ct) { bool forced = false; + Task wait = GetForceUpdateWait(); + try { - forced = await _wait.WaitAsync(waitTime, ct); + + Task finished = await Task.WhenAny(wait, Task.Delay(waitTime, ct)); + forced = finished == wait; } catch (OperationCanceledException) { } + ResetForceUpdate(); + UpdateMetrics(); if (forced && onForced != null) @@ -244,7 +255,32 @@ async Task RunLoop(CancellationToken ct) } } - public void ForceUpdate() => _wait.Release(); + public void ForceUpdate() + { + lock (_waitLock) + { + _wait.TrySetResult(); + } + } + + private Task GetForceUpdateWait() + { + lock (_waitLock) + { + return _wait.Task; + } + } + + private void ResetForceUpdate() + { + lock (_waitLock) + { + if (_wait.Task.IsCompleted) + { + _wait = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + } + } public void StopUpdating() => _cts.Cancel(); From fed5a86b0910801f63047e783b0e241bed39250c Mon Sep 17 00:00:00 2001 From: scooletz Date: Mon, 2 Sep 2024 11:32:20 +0200 Subject: [PATCH 09/10] proper math on intervals --- .../Nethermind.Monitoring/Metrics/MetricsController.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index e73988c043e..fef44a8c0b4 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -211,8 +211,8 @@ public void StartUpdating(Action onForced) return; async Task RunLoop(CancellationToken ct) { - var constantDelay = TimeSpan.FromSeconds(Math.Max(_intervalSeconds - _minIntervalSeconds, 1)); - var waitTime = TimeSpan.FromSeconds(_intervalSeconds) - constantDelay; + var minDelay = TimeSpan.FromSeconds(_minIntervalSeconds); + TimeSpan waitTime = TimeSpan.FromSeconds(_intervalSeconds) - minDelay; while (ct.IsCancellationRequested == false) { @@ -245,7 +245,7 @@ async Task RunLoop(CancellationToken ct) // Always wait a minimal amount of time so that the metrics are not flooded try { - await Task.Delay(constantDelay, ct).ConfigureAwait(false); + await Task.Delay(minDelay, ct).ConfigureAwait(false); } catch (OperationCanceledException) { From b38f2dd61ae13b273a744a348df3953072e06791 Mon Sep 17 00:00:00 2001 From: scooletz Date: Tue, 3 Sep 2024 14:04:12 +0200 Subject: [PATCH 10/10] pushing scheduled properly --- .../Nethermind.Monitoring/MetricPusher.cs | 11 +++-------- .../Metrics/IMetricsController.cs | 2 +- .../Metrics/MetricsController.cs | 15 ++++----------- .../Nethermind.Monitoring/MonitoringService.cs | 2 +- 4 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs index 15217778a82..360c927866c 100644 --- a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs +++ b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs @@ -21,7 +21,6 @@ namespace Nethermind.Monitoring; /// public class MetricPusher : MetricHandler { - private readonly TimeSpan _pushInterval; private readonly HttpMethod _method; private readonly Uri _targetUrl; private readonly Func _httpClientProvider; @@ -64,8 +63,6 @@ public MetricPusher(MetricPusherOptions options) } _targetUrl = targetUrl; - - _pushInterval = TimeSpan.FromMilliseconds(options.IntervalMilliseconds); _onError = options.OnError; _method = options.ReplaceOnPush ? HttpMethod.Put : HttpMethod.Post; @@ -84,22 +81,20 @@ protected override Task StartServer(CancellationToken cancel) { try { - await _wait.WaitAsync(_pushInterval, cancel); + await _wait.WaitAsync(cancel); } catch (OperationCanceledException) { break; } + // As the wait above is cancellable, this will send at the end await PushOnce(); } - - // Push the final state - await PushOnce(); }); } - public void ForceUpdate() => _wait.Release(); + public void ScheduleUpdatePush() => _wait.Release(); private async Task PushOnce() { diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs index 7f33924ab4c..afab446cf1a 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs @@ -8,7 +8,7 @@ namespace Nethermind.Monitoring.Metrics public interface IMetricsController { void RegisterMetrics(Type type); - void StartUpdating(Action onForced); + void StartUpdating(Action metricsUpdated); void StopUpdating(); void AddMetricsUpdateAction(Action callback); void ForceUpdate(); diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index fef44a8c0b4..b5ef88ca0b6 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -203,7 +203,7 @@ public MetricsController(IMetricsConfig metricsConfig) _useCounters = metricsConfig.CountersEnabled; } - public void StartUpdating(Action onForced) + public void StartUpdating(Action metricsUpdated) { _cts = new CancellationTokenSource(); Task.Run(() => RunLoop(_cts.Token)); @@ -216,15 +216,11 @@ async Task RunLoop(CancellationToken ct) while (ct.IsCancellationRequested == false) { - bool forced = false; - Task wait = GetForceUpdateWait(); try { - - Task finished = await Task.WhenAny(wait, Task.Delay(waitTime, ct)); - forced = finished == wait; + await Task.WhenAny(wait, Task.Delay(waitTime, ct)); } catch (OperationCanceledException) { @@ -234,11 +230,8 @@ async Task RunLoop(CancellationToken ct) UpdateMetrics(); - if (forced && onForced != null) - { - // The update was forced and there's a onForced delegate. Execute it. - onForced(); - } + // Inform about metrics updated + metricsUpdated?.Invoke(); if (ct.IsCancellationRequested == false) { diff --git a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs index 6dff1c91a22..b0d3fa2afe6 100644 --- a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs @@ -85,7 +85,7 @@ public Task StartAsync() new NethermindKestrelMetricServer(_exposeHost, _exposePort.Value).Start(); } - _metricsController.StartUpdating(pusher != null ? pusher.ForceUpdate : null); + _metricsController.StartUpdating(pusher != null ? pusher.ScheduleUpdatePush : null); if (_logger.IsInfo) _logger.Info($"Started monitoring for the group: {_options.Group}, instance: {_options.Instance}");