Skip to content

Commit

Permalink
Feature/Blocktree suggest pacer (#7648)
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap authored Oct 24, 2024
1 parent d3d0ec0 commit 9ddd14f
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Core;
using Nethermind.Core.Test.Builders;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Blockchain.Test;

public class BlockTreeSuggestPacerTests
{
[Test]
public void WillNotBlockIfInBatchLimit()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject);
using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5);

pacer.WaitForQueue(1, default).IsCompleted.Should().BeTrue();
}

[Test]
public void WillBlockIfBatchTooLarge()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject);
using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5);

pacer.WaitForQueue(11, default).IsCompleted.Should().BeFalse();
}

[Test]
public void WillOnlyUnblockOnceHeadReachHighEnough()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject);
using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5);

Task waitTask = pacer.WaitForQueue(11, default);
waitTask.IsCompleted.Should().BeFalse();

blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(1).TestObject));
waitTask.IsCompleted.Should().BeFalse();

blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(5).TestObject));
waitTask.IsCompleted.Should().BeFalse();

blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(6).TestObject));
waitTask.IsCompleted.Should().BeTrue();
}
}
63 changes: 63 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core;

namespace Nethermind.Blockchain;

/// <summary>
/// Utility class during bulk loading to prevent processing queue from becoming too large
/// </summary>
public class BlockTreeSuggestPacer : IDisposable
{
private TaskCompletionSource? _dbBatchProcessed;
private long _blockNumberReachedToUnlock = 0;
private readonly long _stopBatchSize;
private readonly long _resumeBatchSize;
private readonly IBlockTree _blockTree;

public BlockTreeSuggestPacer(IBlockTree blockTree, long stopBatchSize, long resumeBatchSize)
{
blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
_blockTree = blockTree;
_stopBatchSize = stopBatchSize;
_resumeBatchSize = resumeBatchSize;
}

private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e)
{
TaskCompletionSource? completionSource = _dbBatchProcessed;
if (completionSource is null) return;
if (e.Block.Number < _blockNumberReachedToUnlock) return;

_dbBatchProcessed = null;
completionSource.SetResult();
}

public async Task WaitForQueue(long currentBlockNumber, CancellationToken token)
{
long currentHeadNumber = _blockTree.Head?.Number ?? 0;
if (currentBlockNumber - currentHeadNumber > _stopBatchSize && _dbBatchProcessed is null)
{
_blockNumberReachedToUnlock = currentBlockNumber - _stopBatchSize + _resumeBatchSize;
TaskCompletionSource completionSource = new TaskCompletionSource();
_dbBatchProcessed = completionSource;
}

if (_dbBatchProcessed is not null)
{
await using (token.Register(() => _dbBatchProcessed.TrySetCanceled()))
{
await _dbBatchProcessed.Task;
}
}
}

public void Dispose()
{
_blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock;
}
}
40 changes: 12 additions & 28 deletions src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Nethermind.Blockchain.Visitors
{
public class DbBlocksLoader : IBlockTreeVisitor
public class DbBlocksLoader : IBlockTreeVisitor, IDisposable
{
public const int DefaultBatchSize = 4000;

Expand All @@ -20,8 +20,7 @@ public class DbBlocksLoader : IBlockTreeVisitor
private readonly IBlockTree _blockTree;
private readonly ILogger _logger;

private TaskCompletionSource<object> _dbBatchProcessed;
private long _currentDbLoadBatchEnd;
private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer;

public DbBlocksLoader(IBlockTree blockTree,
ILogger logger,
Expand All @@ -30,34 +29,17 @@ public DbBlocksLoader(IBlockTree blockTree,
long maxBlocksToLoad = long.MaxValue)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2);
_logger = logger;

_batchSize = batchSize;
StartLevelInclusive = Math.Max(0L, startBlockNumber ?? (_blockTree.Head?.Number + 1) ?? 0L);
_blocksToLoad = Math.Min(maxBlocksToLoad, _blockTree.BestKnownNumber - StartLevelInclusive);
EndLevelExclusive = StartLevelInclusive + _blocksToLoad + 1;

if (_blocksToLoad != 0)
{
_blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
}

LogPlannedOperation();
}

private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e)
{
if (_dbBatchProcessed is not null)
{
if (e.Block.Number == _currentDbLoadBatchEnd)
{
TaskCompletionSource<object> completionSource = _dbBatchProcessed;
_dbBatchProcessed = null;
completionSource.SetResult(null);
}
}
}

public bool PreventsAcceptingNewBlocks => true;
public bool CalculateTotalDifficultyIfMissing => true;
public long StartLevelInclusive { get; }
Expand Down Expand Up @@ -99,20 +81,17 @@ Task<HeaderVisitOutcome> IBlockTreeVisitor.VisitHeader(BlockHeader header, Cance
async Task<BlockVisitOutcome> IBlockTreeVisitor.VisitBlock(Block block, CancellationToken cancellationToken)
{
// this will hang
Task waitTask = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken);

long i = block.Number - StartLevelInclusive;
if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 && _blockTree.Head.Number + _batchSize < block.Number)
if (!waitTask.IsCompleted)
{
if (_logger.IsInfo)
{
_logger.Info($"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more.");
}

_dbBatchProcessed = new TaskCompletionSource<object>();
await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled()))
{
_currentDbLoadBatchEnd = block.Number - _batchSize;
await _dbBatchProcessed.Task;
}
await waitTask;
}

return BlockVisitOutcome.Suggest;
Expand All @@ -134,5 +113,10 @@ private void LogPlannedOperation()
if (_logger.IsInfo) _logger.Info($"Found {_blocksToLoad} blocks to load from DB starting from current head block {_blockTree.Head?.ToString(Block.Format.Short)}");
}
}

public void Dispose()
{
_blockTreeSuggestPacer.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Nethermind.Blockchain.Visitors
{
public class StartupBlockTreeFixer : IBlockTreeVisitor
public class StartupBlockTreeFixer : IBlockTreeVisitor, IDisposable
{
public const int DefaultBatchSize = 4000;
private readonly IBlockTree _blockTree;
Expand All @@ -33,11 +33,9 @@ public class StartupBlockTreeFixer : IBlockTreeVisitor
private long? _lastProcessedLevel;
private long? _processingGapStart;

private TaskCompletionSource _dbBatchProcessed;
private long _currentDbLoadBatchEnd;
private bool _firstBlockVisited = true;
private bool _suggestBlocks = true;
private readonly long _batchSize;
private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer;

public StartupBlockTreeFixer(
ISyncConfig syncConfig,
Expand All @@ -47,36 +45,18 @@ public StartupBlockTreeFixer(
long batchSize = DefaultBatchSize)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2);
_stateReader = stateReader;
_logger = logger;

_batchSize = batchSize;
long assumedHead = _blockTree.Head?.Number ?? 0;
_startNumber = Math.Max(syncConfig.PivotNumberParsed, assumedHead + 1);
_blocksToLoad = (assumedHead + 1) >= _startNumber ? (_blockTree.BestKnownNumber - _startNumber + 1) : 0;

_currentLevelNumber = _startNumber - 1; // because we always increment on entering
if (_blocksToLoad != 0)
{
_blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
}

LogPlannedOperation();
}

private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e)
{
if (_dbBatchProcessed is not null)
{
if (e.Block.Number == _currentDbLoadBatchEnd)
{
TaskCompletionSource completionSource = _dbBatchProcessed;
_dbBatchProcessed = null;
completionSource.SetResult();
}
}
}

public bool PreventsAcceptingNewBlocks => true;
public bool CalculateTotalDifficultyIfMissing => true;
public long StartLevelInclusive => _startNumber;
Expand Down Expand Up @@ -168,22 +148,18 @@ async Task<BlockVisitOutcome> IBlockTreeVisitor.VisitBlock(Block block, Cancella

if (!_suggestBlocks) return BlockVisitOutcome.None;

Task waitSuggestQueue = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken);

long i = block.Number - StartLevelInclusive;
if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 &&
_blockTree.Head.Number + _batchSize < block.Number)
if (!waitSuggestQueue.IsCompleted)
{
if (_logger.IsInfo)
{
_logger.Info(
$"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more.");
}

_dbBatchProcessed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled()))
{
_currentDbLoadBatchEnd = block.Number - _batchSize;
await _dbBatchProcessed.Task;
}
await waitSuggestQueue;
}

return BlockVisitOutcome.Suggest;
Expand Down Expand Up @@ -266,5 +242,10 @@ private void LogPlannedOperation()
$"Found {_blocksToLoad} block tree levels to review for fixes starting from {StartLevelInclusive}");
}
}

public void Dispose()
{
_blockTreeSuggestPacer.Dispose();
}
}
}
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private async Task RunBlockTreeInitTasks(CancellationToken cancellationToken)

if (!syncConfig.FastSync)
{
DbBlocksLoader loader = new(_api.BlockTree, _logger);
using DbBlocksLoader loader = new(_api.BlockTree, _logger);
await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t =>
{
if (t.IsFaulted)
Expand All @@ -59,7 +59,7 @@ await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t =>
}
else
{
StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!);
using StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!);
await _api.BlockTree.Accept(fixer, cancellationToken).ContinueWith(t =>
{
if (t.IsFaulted)
Expand Down

0 comments on commit 9ddd14f

Please sign in to comment.