diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs index c3cfb0b7234..5d98e71d713 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs @@ -73,8 +73,7 @@ public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessing /// /// /// - public BlockchainProcessor( - IBlockTree? blockTree, + public BlockchainProcessor(IBlockTree? blockTree, IBlockProcessor? blockProcessor, IBlockPreprocessorStep? recoveryStep, IStateReader stateReader, @@ -94,6 +93,15 @@ public BlockchainProcessor( _stats = new ProcessingStats(_logger); } + /// + /// Raised when stats are updated. + /// + public event EventHandler StatsUpdated + { + add { _stats.StatsUpdated += value; } + remove { _stats.StatsUpdated -= value; } + } + private void OnNewHeadBlock(object? sender, BlockEventArgs e) { _lastProcessedBlock = DateTime.UtcNow; diff --git a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs index 72b73241995..b51f07f82c2 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs @@ -84,6 +84,9 @@ public void UpdateStats(Block? block, IBlockTree blockTreeCtx, long blockProcess Metrics.BlockchainHeight = block.Header.Number; Metrics.BestKnownBlockNumber = blockTreeCtx.BestKnownNumber; + // Inform interested about the stats update + StatsUpdated?.Invoke(this, EventArgs.Empty); + _blockProcessingMicroseconds = _processingStopwatch.ElapsedMicroseconds(); _runningMicroseconds = _runStopwatch.ElapsedMicroseconds(); _runMicroseconds = (_runningMicroseconds - _lastElapsedRunningMicroseconds); @@ -104,6 +107,8 @@ public void UpdateStats(Block? block, IBlockTree blockTreeCtx, long blockProcess } } + public event EventHandler StatsUpdated; + private void GenerateReport() => ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); void IThreadPoolWorkItem.Execute() diff --git a/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs b/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs index f4bc42832c2..b16be8b9676 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/ReadOnlyChainProcessingEnv.cs @@ -44,9 +44,6 @@ public ReadOnlyChainProcessingEnv( _blockProcessingQueue = new BlockchainProcessor(blockTree, BlockProcessor, recoveryStep, stateReader, logManager, BlockchainProcessor.Options.NoReceipts); BlockProcessingQueue = _blockProcessingQueue; ChainProcessor = new OneTimeChainProcessor(scope.WorldState, _blockProcessingQueue); - _blockProcessingQueue = new BlockchainProcessor(blockTree, BlockProcessor, recoveryStep, stateReader, logManager, BlockchainProcessor.Options.NoReceipts); - BlockProcessingQueue = _blockProcessingQueue; - ChainProcessor = new OneTimeChainProcessor(scope.WorldState, _blockProcessingQueue); } protected virtual IBlockProcessor CreateBlockProcessor( diff --git a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs index 21189802d53..dc46aa0e028 100644 --- a/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs +++ b/src/Nethermind/Nethermind.Init/Steps/InitializeBlockchain.cs @@ -127,6 +127,8 @@ protected virtual Task InitBlockchain() IsMainProcessor = true }; + blockchainProcessor.StatsUpdated += (_, _) => _api.MonitoringService.ForceUpdate(); + setApi.BlockProcessingQueue = blockchainProcessor; setApi.BlockchainProcessor = blockchainProcessor; diff --git a/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs b/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs index 05326ff65fb..9ff889e9a30 100644 --- a/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs +++ b/src/Nethermind/Nethermind.Monitoring.Test/MetricsTests.cs @@ -69,7 +69,7 @@ public void Test_update_correct_gauge() TestMetrics.WithCustomLabelType[new CustomLabelType(1, 11, 111)] = 1111; TestMetrics.OldDictionaryMetrics["metrics0"] = 4; TestMetrics.OldDictionaryMetrics["metrics1"] = 5; - metricsController.UpdateMetrics(null); + metricsController.UpdateMetrics(); var gauges = metricsController._gauges; var keyDefault = $"{nameof(TestMetrics)}.{nameof(TestMetrics.OneTwoThree)}"; @@ -130,7 +130,7 @@ public void Register_and_update_metrics_should_not_throw_exception() metricsController.RegisterMetrics(metric); } - metricsController.UpdateMetrics(null); + metricsController.UpdateMetrics(); }); } diff --git a/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs b/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs index 0186df10e5d..dd091dfc058 100644 --- a/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs +++ b/src/Nethermind/Nethermind.Monitoring/Config/IMetricsConfig.cs @@ -26,6 +26,9 @@ public interface IMetricsConfig : IConfig [ConfigItem(DefaultValue = "5", Description = "The frequency of pushing metrics to Prometheus, in seconds.")] int IntervalSeconds { get; } + [ConfigItem(DefaultValue = "2", Description = "The minimal frequency of pushing metrics to Prometheus, in seconds. Push should not occur more often than this")] + int MinimalIntervalSeconds { get; } + [ConfigItem(Description = "The name to display on the Grafana dashboard.", DefaultValue = "\"Nethermind\"")] string NodeName { get; } diff --git a/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs b/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs index a6cb0e1534d..39730841de9 100644 --- a/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs +++ b/src/Nethermind/Nethermind.Monitoring/Config/MetricsConfig.cs @@ -8,9 +8,12 @@ public class MetricsConfig : IMetricsConfig public string ExposeHost { get; set; } = "+"; public int? ExposePort { get; set; } = null; public bool Enabled { get; set; } = false; + public bool CountersEnabled { get; set; } = false; public string PushGatewayUrl { get; set; } = ""; public int IntervalSeconds { get; set; } = 5; + + public int MinimalIntervalSeconds { get; set; } = 2; public string NodeName { get; set; } = "Nethermind"; public bool EnableDbSizeMetrics { get; set; } = true; } diff --git a/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs b/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs index f8d13fc417f..83cd09f1c4f 100644 --- a/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs +++ b/src/Nethermind/Nethermind.Monitoring/IMonitoringService .cs @@ -11,5 +11,10 @@ public interface IMonitoringService Task StartAsync(); Task StopAsync(); void AddMetricsUpdateAction(Action callback); + + /// + /// Forces gathering metrics and reporting them as soon as possible to the underlying sinks. + /// + void ForceUpdate(); } } diff --git a/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs new file mode 100644 index 00000000000..360c927866c --- /dev/null +++ b/src/Nethermind/Nethermind.Monitoring/MetricPusher.cs @@ -0,0 +1,172 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +#nullable enable +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using Prometheus; + +namespace Nethermind.Monitoring; + +using System.Diagnostics; +using System.Text; + +/// +/// A metric server that regularly pushes metrics to a Prometheus PushGateway. +/// +public class MetricPusher : MetricHandler +{ + private readonly HttpMethod _method; + private readonly Uri _targetUrl; + private readonly Func _httpClientProvider; + private readonly SemaphoreSlim _wait; + + public MetricPusher(MetricPusherOptions options) + { + if (string.IsNullOrEmpty(options.Endpoint)) + throw new ArgumentNullException(nameof(options.Endpoint)); + + if (string.IsNullOrEmpty(options.Job)) + throw new ArgumentNullException(nameof(options.Job)); + + if (options.IntervalMilliseconds <= 0) + throw new ArgumentException("Interval must be greater than zero", nameof(options.IntervalMilliseconds)); + + _registry = options.Registry ?? Prometheus.Metrics.DefaultRegistry; + + _httpClientProvider = options.HttpClientProvider ?? (() => SingletonHttpClient); + + StringBuilder sb = new StringBuilder($"{options.Endpoint!.TrimEnd('/')}/job/{options.Job}"); + if (!string.IsNullOrEmpty(options.Instance)) + sb.Append($"/instance/{options.Instance}"); + + if (options.AdditionalLabels != null) + { + foreach (Tuple pair in options.AdditionalLabels) + { + if (pair == null || string.IsNullOrEmpty(pair.Item1) || string.IsNullOrEmpty(pair.Item2)) + throw new NotSupportedException( + $"Invalid {nameof(MetricPusher)} additional label: ({pair?.Item1}):({pair?.Item2})"); + + sb.Append($"/{pair.Item1}/{pair.Item2}"); + } + } + + if (!Uri.TryCreate(sb.ToString(), UriKind.Absolute, out Uri? targetUrl)) + { + throw new ArgumentException("Endpoint must be a valid url", nameof(options.Endpoint)); + } + + _targetUrl = targetUrl; + _onError = options.OnError; + + _method = options.ReplaceOnPush ? HttpMethod.Put : HttpMethod.Post; + _wait = new SemaphoreSlim(0); + } + + private static readonly HttpClient SingletonHttpClient = new(); + private readonly CollectorRegistry _registry; + private readonly Action? _onError; + + protected override Task StartServer(CancellationToken cancel) + { + return Task.Run(async delegate + { + while (!cancel.IsCancellationRequested) + { + try + { + await _wait.WaitAsync(cancel); + } + catch (OperationCanceledException) + { + break; + } + + // As the wait above is cancellable, this will send at the end + await PushOnce(); + } + }); + } + + public void ScheduleUpdatePush() => _wait.Release(); + + private async Task PushOnce() + { + try + { + HttpClient httpClient = _httpClientProvider(); + + var request = new HttpRequestMessage + { + Method = _method, + RequestUri = _targetUrl, + // We use a copy-pasted implementation of PushStreamContent here to avoid taking a dependency on the old ASP.NET Web API where it lives. + Content = + new PushRegistryHttpContent(_registry, PrometheusConstants.ExporterContentTypeValue), + }; + + // ReSharper disable once MethodSupportsCancellation + using HttpResponseMessage response = await httpClient.SendAsync(request); + + // If anything goes wrong, we want to get at least an entry in the trace log. + response.EnsureSuccessStatusCode(); + } + catch (ScrapeFailedException ex) + { + // We do not consider failed scrapes a reportable error since the user code that raises the failure should be the one logging it. + Trace.WriteLine($"Skipping metrics push due to failed scrape: {ex.Message}"); + } + catch (Exception ex) + { + HandleFailedPush(ex); + } + } + + private void HandleFailedPush(Exception ex) + { + if (_onError != null) + { + // Asynchronous because we don't trust the callee to be fast. + Task.Run(() => _onError(ex)); + } + else + { + // If there is no error handler registered, we write to trace to at least hopefully get some attention to the problem. + Trace.WriteLine($"Error in MetricPusher: {ex}"); + } + } + + private sealed class PushRegistryHttpContent : HttpContent + { + private readonly CollectorRegistry _registry; + + private static readonly MediaTypeHeaderValue OctetStreamHeaderValue = + MediaTypeHeaderValue.Parse("application/octet-stream"); + + /// + /// Initializes a new instance of the class with the given . + /// + public PushRegistryHttpContent(CollectorRegistry registry, MediaTypeHeaderValue? mediaType) + { + _registry = registry; + Headers.ContentType = mediaType ?? OctetStreamHeaderValue; + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) + { + await _registry.CollectAndExportAsTextAsync(stream, default); + } + + protected override bool TryComputeLength(out long length) + { + length = default; + return false; + } + } +} diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs index a5ed5783c91..afab446cf1a 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/IMetricsController.cs @@ -8,8 +8,9 @@ namespace Nethermind.Monitoring.Metrics public interface IMetricsController { void RegisterMetrics(Type type); - void StartUpdating(); + void StartUpdating(Action metricsUpdated); void StopUpdating(); void AddMetricsUpdateAction(Action callback); + void ForceUpdate(); } } diff --git a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs index 5e2e21c7f2d..b5ef88ca0b6 100644 --- a/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs +++ b/src/Nethermind/Nethermind.Monitoring/Metrics/MetricsController.cs @@ -12,6 +12,7 @@ using System.Runtime.Serialization; using System.Text.RegularExpressions; using System.Threading; +using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Attributes; using Nethermind.Core.Collections; @@ -21,10 +22,18 @@ namespace Nethermind.Monitoring.Metrics { + /// + /// The metrics controller handles gathering metrics every . + /// Additionally, it allows forcing an update by calling that makes the wait interrupted and reporting executed immediately. + /// To do not flood the endpoint, is configured so that report happens with a gap that is at least that big. + /// public partial class MetricsController : IMetricsController { private readonly int _intervalSeconds; - private Timer _timer; + private readonly int _minIntervalSeconds; + private CancellationTokenSource _cts; + private TaskCompletionSource _wait = new(); + private readonly object _waitLock = new(); private readonly Dictionary)[]> _membersCache = new(); private readonly Dictionary _dictionaryCache = new(); private readonly HashSet _metricTypes = new(); @@ -190,14 +199,85 @@ private static Gauge CreateGauge(string name, string help = null, IDictionary _timer = new Timer(UpdateMetrics, null, TimeSpan.Zero, TimeSpan.FromSeconds(_intervalSeconds)); + public void StartUpdating(Action metricsUpdated) + { + _cts = new CancellationTokenSource(); + Task.Run(() => RunLoop(_cts.Token)); + + return; + async Task RunLoop(CancellationToken ct) + { + var minDelay = TimeSpan.FromSeconds(_minIntervalSeconds); + TimeSpan waitTime = TimeSpan.FromSeconds(_intervalSeconds) - minDelay; + + while (ct.IsCancellationRequested == false) + { + Task wait = GetForceUpdateWait(); + + try + { + await Task.WhenAny(wait, Task.Delay(waitTime, ct)); + } + catch (OperationCanceledException) + { + } + + ResetForceUpdate(); + + UpdateMetrics(); + + // Inform about metrics updated + metricsUpdated?.Invoke(); + + if (ct.IsCancellationRequested == false) + { + // Always wait a minimal amount of time so that the metrics are not flooded + try + { + await Task.Delay(minDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + } + } + } + + public void ForceUpdate() + { + lock (_waitLock) + { + _wait.TrySetResult(); + } + } + + private Task GetForceUpdateWait() + { + lock (_waitLock) + { + return _wait.Task; + } + } + + private void ResetForceUpdate() + { + lock (_waitLock) + { + if (_wait.Task.IsCompleted) + { + _wait = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + } + } - public void StopUpdating() => _timer?.Change(Timeout.Infinite, 0); + public void StopUpdating() => _cts.Cancel(); - public void UpdateMetrics(object state) + public void UpdateMetrics() { foreach (Action callback in _callbacks) { @@ -229,12 +309,12 @@ private void UpdateMetrics(Type type) if (info.LabelNames is null) { IDictionary dict = info.Dictionary; - // Its fine that the key here need to call `ToString()`. Better here then in the metrics, where it might - // impact the performance of whatever is updating the metrics. - foreach (object keyObj in dict.Keys) // Different dictionary seems to iterate to different KV type. So need to use `Keys` here. + // It's fine that the key here need to call `ToString()`. + // Better here then in the metrics, where it might impact the performance of whatever is updating the metrics. + foreach (DictionaryEntry kvp in dict) { - string keyStr = keyObj.ToString(); - double value = Convert.ToDouble(dict[keyObj]); + string keyStr = kvp.Key.ToString(); + double value = Convert.ToDouble(kvp.Value); string gaugeName = GetGaugeNameKey(info.DictionaryName, keyStr); if (ReplaceValueIfChanged(value, gaugeName) is null) @@ -250,10 +330,10 @@ private void UpdateMetrics(Type type) { IDictionary dict = info.Dictionary; string gaugeName = info.GaugeName; - foreach (object key in dict.Keys) + foreach (DictionaryEntry kvp in dict) { - double value = Convert.ToDouble(dict[key]); - switch (key) + double value = Convert.ToDouble(kvp.Value); + switch (kvp.Key) { case IMetricLabels label: ReplaceValueIfChanged(value, gaugeName, label.Labels); @@ -270,7 +350,7 @@ private void UpdateMetrics(Type type) break; } default: - ReplaceValueIfChanged(value, gaugeName, key.ToString()); + ReplaceValueIfChanged(value, gaugeName, kvp.Key.ToString()); break; } } diff --git a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs index e40540dcfbe..b0d3fa2afe6 100644 --- a/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/MonitoringService.cs @@ -9,7 +9,6 @@ using Nethermind.Monitoring.Config; using System.Net.Sockets; using Prometheus; -using System.Runtime.InteropServices; namespace Nethermind.Monitoring { @@ -22,7 +21,6 @@ public class MonitoringService : IMonitoringService private readonly string _exposeHost; private readonly int? _exposePort; private readonly string _nodeName; - private readonly bool _pushEnabled; private readonly string _pushGatewayUrl; private readonly int _intervalSeconds; @@ -34,7 +32,6 @@ public MonitoringService(IMetricsController metricsController, IMetricsConfig me int? exposePort = metricsConfig.ExposePort; string nodeName = metricsConfig.NodeName; string pushGatewayUrl = metricsConfig.PushGatewayUrl; - bool pushEnabled = metricsConfig.Enabled; int intervalSeconds = metricsConfig.IntervalSeconds; _exposeHost = exposeHost; @@ -43,7 +40,6 @@ public MonitoringService(IMetricsController metricsController, IMetricsConfig me ? throw new ArgumentNullException(nameof(nodeName)) : nodeName; _pushGatewayUrl = pushGatewayUrl; - _pushEnabled = pushEnabled; _intervalSeconds = intervalSeconds <= 0 ? throw new ArgumentException($"Invalid monitoring push interval: {intervalSeconds}s") : intervalSeconds; @@ -54,8 +50,10 @@ public MonitoringService(IMetricsController metricsController, IMetricsConfig me _options = GetOptions(); } - public async Task StartAsync() + public Task StartAsync() { + MetricPusher pusher = null; + if (!string.IsNullOrWhiteSpace(_pushGatewayUrl)) { MetricPusherOptions pusherOptions = new MetricPusherOptions @@ -78,16 +76,20 @@ public async Task StartAsync() if (_logger.IsTrace) _logger.Error(ex.Message, ex); // keeping it as Error to log the exception details with it. } }; - MetricPusher metricPusher = new MetricPusher(pusherOptions); - metricPusher.Start(); + pusher = new MetricPusher(pusherOptions); + pusher.Start(); } if (_exposePort is not null) { new NethermindKestrelMetricServer(_exposeHost, _exposePort.Value).Start(); } - await Task.Factory.StartNew(() => _metricsController.StartUpdating(), TaskCreationOptions.LongRunning); + + _metricsController.StartUpdating(pusher != null ? pusher.ScheduleUpdatePush : null); + if (_logger.IsInfo) _logger.Info($"Started monitoring for the group: {_options.Group}, instance: {_options.Instance}"); + + return Task.CompletedTask; } public void AddMetricsUpdateAction(Action callback) @@ -95,10 +97,14 @@ public void AddMetricsUpdateAction(Action callback) _metricsController.AddMetricsUpdateAction(callback); } + public void ForceUpdate() + { + _metricsController.ForceUpdate(); + } + public Task StopAsync() { _metricsController.StopUpdating(); - return Task.CompletedTask; } diff --git a/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs b/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs index 54fb766d0a9..a3595b2cdeb 100644 --- a/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs +++ b/src/Nethermind/Nethermind.Monitoring/NullMonitoringService.cs @@ -25,5 +25,6 @@ public Task StopAsync() } public void AddMetricsUpdateAction(Action callback) { } + public void ForceUpdate() { } } }