From 6fe233cdb345210eccce4e0ac9afdf4d18059520 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Thu, 24 Oct 2024 10:37:20 +0800 Subject: [PATCH 1/3] Blocktree suggest pacer --- .../BlockTreeSuggestPacerTests.cs | 54 ++++++++++++++++ .../BlockTreeSuggestPacer.cs | 63 +++++++++++++++++++ .../Visitors/DbBlocksLoader.cs | 40 ++++-------- .../Visitors/StartupBlockTreeFixer.cs | 36 ++--------- 4 files changed, 135 insertions(+), 58 deletions(-) create mode 100644 src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs create mode 100644 src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs new file mode 100644 index 00000000000..34392783f02 --- /dev/null +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockTreeSuggestPacerTests.cs @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System.Threading.Tasks; +using FluentAssertions; +using Nethermind.Core; +using Nethermind.Core.Test.Builders; +using NSubstitute; +using NUnit.Framework; + +namespace Nethermind.Blockchain.Test; + +public class BlockTreeSuggestPacerTests +{ + [Test] + public void WillNotBlockIfInBatchLimit() + { + IBlockTree blockTree = Substitute.For(); + blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject); + using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5); + + pacer.WaitForQueue(1, default).IsCompleted.Should().BeTrue(); + } + + [Test] + public void WillBlockIfBatchTooLarge() + { + IBlockTree blockTree = Substitute.For(); + blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject); + using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5); + + pacer.WaitForQueue(11, default).IsCompleted.Should().BeFalse(); + } + + [Test] + public void WillOnlyUnblockOnceHeadReachHighEnough() + { + IBlockTree blockTree = Substitute.For(); + blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject); + using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5); + + Task waitTask = pacer.WaitForQueue(11, default); + waitTask.IsCompleted.Should().BeFalse(); + + blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(1).TestObject)); + waitTask.IsCompleted.Should().BeFalse(); + + blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(5).TestObject)); + waitTask.IsCompleted.Should().BeFalse(); + + blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(6).TestObject)); + waitTask.IsCompleted.Should().BeTrue(); + } +} diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs b/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs new file mode 100644 index 00000000000..68da51097d4 --- /dev/null +++ b/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs @@ -0,0 +1,63 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Core; + +namespace Nethermind.Blockchain; + +/// +/// Utility class during bulk loading to prevent processing queue from becoming too large +/// +public class BlockTreeSuggestPacer : IDisposable +{ + private TaskCompletionSource? _dbBatchProcessed; + private long _blockNumberReachedToUnlock = 0; + private readonly long _stopBatchSize; + private readonly long _resumeBatchSize; + private readonly IBlockTree _blockTree; + + public BlockTreeSuggestPacer(IBlockTree blockTree, long stopBatchSize, long resumeBatchSize) + { + blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; + _blockTree = blockTree; + _stopBatchSize = stopBatchSize; + _resumeBatchSize = resumeBatchSize; + } + + private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) + { + TaskCompletionSource? completionSource = _dbBatchProcessed; + if (completionSource is null) return; + if (e.Block.Number < _blockNumberReachedToUnlock) return; + + _dbBatchProcessed = null; + completionSource.SetResult(); + } + + public async Task WaitForQueue(long currentBlockNumber, CancellationToken token) + { + long currentHeadNumber = _blockTree.Head?.Number ?? 0; + if (currentBlockNumber - currentHeadNumber > _stopBatchSize && _dbBatchProcessed == null) + { + _blockNumberReachedToUnlock = currentBlockNumber - _stopBatchSize + _resumeBatchSize; + TaskCompletionSource completionSource = new TaskCompletionSource(); + _dbBatchProcessed = completionSource; + } + + if (_dbBatchProcessed != null) + { + await using (token.Register(() => _dbBatchProcessed.TrySetCanceled())) + { + await _dbBatchProcessed.Task; + } + } + } + + public void Dispose() + { + _blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock; + } +} diff --git a/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs b/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs index 2f156c57d13..61c09d0635e 100644 --- a/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs +++ b/src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs @@ -11,7 +11,7 @@ namespace Nethermind.Blockchain.Visitors { - public class DbBlocksLoader : IBlockTreeVisitor + public class DbBlocksLoader : IBlockTreeVisitor, IDisposable { public const int DefaultBatchSize = 4000; @@ -20,8 +20,7 @@ public class DbBlocksLoader : IBlockTreeVisitor private readonly IBlockTree _blockTree; private readonly ILogger _logger; - private TaskCompletionSource _dbBatchProcessed; - private long _currentDbLoadBatchEnd; + private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer; public DbBlocksLoader(IBlockTree blockTree, ILogger logger, @@ -30,6 +29,7 @@ public DbBlocksLoader(IBlockTree blockTree, long maxBlocksToLoad = long.MaxValue) { _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); + _blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2); _logger = logger; _batchSize = batchSize; @@ -37,27 +37,9 @@ public DbBlocksLoader(IBlockTree blockTree, _blocksToLoad = Math.Min(maxBlocksToLoad, _blockTree.BestKnownNumber - StartLevelInclusive); EndLevelExclusive = StartLevelInclusive + _blocksToLoad + 1; - if (_blocksToLoad != 0) - { - _blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; - } - LogPlannedOperation(); } - private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) - { - if (_dbBatchProcessed is not null) - { - if (e.Block.Number == _currentDbLoadBatchEnd) - { - TaskCompletionSource completionSource = _dbBatchProcessed; - _dbBatchProcessed = null; - completionSource.SetResult(null); - } - } - } - public bool PreventsAcceptingNewBlocks => true; public bool CalculateTotalDifficultyIfMissing => true; public long StartLevelInclusive { get; } @@ -99,20 +81,17 @@ Task IBlockTreeVisitor.VisitHeader(BlockHeader header, Cance async Task IBlockTreeVisitor.VisitBlock(Block block, CancellationToken cancellationToken) { // this will hang + Task waitTask = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken); + long i = block.Number - StartLevelInclusive; - if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 && _blockTree.Head.Number + _batchSize < block.Number) + if (!waitTask.IsCompleted) { if (_logger.IsInfo) { _logger.Info($"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more."); } - _dbBatchProcessed = new TaskCompletionSource(); - await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled())) - { - _currentDbLoadBatchEnd = block.Number - _batchSize; - await _dbBatchProcessed.Task; - } + await waitTask; } return BlockVisitOutcome.Suggest; @@ -134,5 +113,10 @@ private void LogPlannedOperation() if (_logger.IsInfo) _logger.Info($"Found {_blocksToLoad} blocks to load from DB starting from current head block {_blockTree.Head?.ToString(Block.Format.Short)}"); } } + + public void Dispose() + { + _blockTreeSuggestPacer.Dispose(); + } } } diff --git a/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs b/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs index d63d56c2e34..1b05460eaf6 100644 --- a/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs +++ b/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs @@ -33,11 +33,9 @@ public class StartupBlockTreeFixer : IBlockTreeVisitor private long? _lastProcessedLevel; private long? _processingGapStart; - private TaskCompletionSource _dbBatchProcessed; - private long _currentDbLoadBatchEnd; private bool _firstBlockVisited = true; private bool _suggestBlocks = true; - private readonly long _batchSize; + private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer; public StartupBlockTreeFixer( ISyncConfig syncConfig, @@ -47,36 +45,18 @@ public StartupBlockTreeFixer( long batchSize = DefaultBatchSize) { _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); + _blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2); _stateReader = stateReader; _logger = logger; - _batchSize = batchSize; long assumedHead = _blockTree.Head?.Number ?? 0; _startNumber = Math.Max(syncConfig.PivotNumberParsed, assumedHead + 1); _blocksToLoad = (assumedHead + 1) >= _startNumber ? (_blockTree.BestKnownNumber - _startNumber + 1) : 0; _currentLevelNumber = _startNumber - 1; // because we always increment on entering - if (_blocksToLoad != 0) - { - _blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; - } - LogPlannedOperation(); } - private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) - { - if (_dbBatchProcessed is not null) - { - if (e.Block.Number == _currentDbLoadBatchEnd) - { - TaskCompletionSource completionSource = _dbBatchProcessed; - _dbBatchProcessed = null; - completionSource.SetResult(); - } - } - } - public bool PreventsAcceptingNewBlocks => true; public bool CalculateTotalDifficultyIfMissing => true; public long StartLevelInclusive => _startNumber; @@ -168,9 +148,10 @@ async Task IBlockTreeVisitor.VisitBlock(Block block, Cancella if (!_suggestBlocks) return BlockVisitOutcome.None; + Task waitSuggestQueue = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken); + long i = block.Number - StartLevelInclusive; - if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 && - _blockTree.Head.Number + _batchSize < block.Number) + if (!waitSuggestQueue.IsCompleted) { if (_logger.IsInfo) { @@ -178,12 +159,7 @@ async Task IBlockTreeVisitor.VisitBlock(Block block, Cancella $"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more."); } - _dbBatchProcessed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled())) - { - _currentDbLoadBatchEnd = block.Number - _batchSize; - await _dbBatchProcessed.Task; - } + await waitSuggestQueue; } return BlockVisitOutcome.Suggest; From ba0272d4309367a75851aa7a198b050a46d188ea Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Thu, 24 Oct 2024 10:39:58 +0800 Subject: [PATCH 2/3] Dispose properly --- .../Visitors/StartupBlockTreeFixer.cs | 7 ++++++- src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs b/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs index 1b05460eaf6..6534b2ea3a4 100644 --- a/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs +++ b/src/Nethermind/Nethermind.Blockchain/Visitors/StartupBlockTreeFixer.cs @@ -15,7 +15,7 @@ namespace Nethermind.Blockchain.Visitors { - public class StartupBlockTreeFixer : IBlockTreeVisitor + public class StartupBlockTreeFixer : IBlockTreeVisitor, IDisposable { public const int DefaultBatchSize = 4000; private readonly IBlockTree _blockTree; @@ -242,5 +242,10 @@ private void LogPlannedOperation() $"Found {_blocksToLoad} block tree levels to review for fixes starting from {StartLevelInclusive}"); } } + + public void Dispose() + { + _blockTreeSuggestPacer.Dispose(); + } } } diff --git a/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs b/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs index 2a200190434..b75a4fc9cb9 100644 --- a/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs +++ b/src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs @@ -44,7 +44,7 @@ private async Task RunBlockTreeInitTasks(CancellationToken cancellationToken) if (!syncConfig.FastSync) { - DbBlocksLoader loader = new(_api.BlockTree, _logger); + using DbBlocksLoader loader = new(_api.BlockTree, _logger); await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t => { if (t.IsFaulted) @@ -59,7 +59,7 @@ await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t => } else { - StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!); + using StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!); await _api.BlockTree.Accept(fixer, cancellationToken).ContinueWith(t => { if (t.IsFaulted) From cd6913f003b03bfc6f6afb11d7979d58d2c30cf8 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Thu, 24 Oct 2024 20:53:16 +0800 Subject: [PATCH 3/3] Addressing commetn --- src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs b/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs index 68da51097d4..d040c878349 100644 --- a/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs +++ b/src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs @@ -40,14 +40,14 @@ private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e) public async Task WaitForQueue(long currentBlockNumber, CancellationToken token) { long currentHeadNumber = _blockTree.Head?.Number ?? 0; - if (currentBlockNumber - currentHeadNumber > _stopBatchSize && _dbBatchProcessed == null) + if (currentBlockNumber - currentHeadNumber > _stopBatchSize && _dbBatchProcessed is null) { _blockNumberReachedToUnlock = currentBlockNumber - _stopBatchSize + _resumeBatchSize; TaskCompletionSource completionSource = new TaskCompletionSource(); _dbBatchProcessed = completionSource; } - if (_dbBatchProcessed != null) + if (_dbBatchProcessed is not null) { await using (token.Register(() => _dbBatchProcessed.TrySetCanceled())) {