Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/Blocktree suggest pacer #7648

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Core;
using Nethermind.Core.Test.Builders;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Blockchain.Test;

public class BlockTreeSuggestPacerTests
{
[Test]
public void WillNotBlockIfInBatchLimit()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject);
using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5);

pacer.WaitForQueue(1, default).IsCompleted.Should().BeTrue();
}

[Test]
public void WillBlockIfBatchTooLarge()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject);
using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5);

pacer.WaitForQueue(11, default).IsCompleted.Should().BeFalse();
}

[Test]
public void WillOnlyUnblockOnceHeadReachHighEnough()
{
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.Head.Returns(Build.A.Block.WithNumber(0).TestObject);
using BlockTreeSuggestPacer pacer = new BlockTreeSuggestPacer(blockTree, 10, 5);

Task waitTask = pacer.WaitForQueue(11, default);
waitTask.IsCompleted.Should().BeFalse();

blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(1).TestObject));
waitTask.IsCompleted.Should().BeFalse();

blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(5).TestObject));
waitTask.IsCompleted.Should().BeFalse();

blockTree.NewHeadBlock += Raise.EventWith(new BlockEventArgs(Build.A.Block.WithNumber(6).TestObject));
waitTask.IsCompleted.Should().BeTrue();
}
}
63 changes: 63 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTreeSuggestPacer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core;

namespace Nethermind.Blockchain;

/// <summary>
/// Utility class during bulk loading to prevent processing queue from becoming too large
/// </summary>
public class BlockTreeSuggestPacer : IDisposable
{
private TaskCompletionSource? _dbBatchProcessed;
private long _blockNumberReachedToUnlock = 0;
private readonly long _stopBatchSize;
private readonly long _resumeBatchSize;
private readonly IBlockTree _blockTree;

public BlockTreeSuggestPacer(IBlockTree blockTree, long stopBatchSize, long resumeBatchSize)
{
blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
_blockTree = blockTree;
_stopBatchSize = stopBatchSize;
_resumeBatchSize = resumeBatchSize;
}

private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e)
{
TaskCompletionSource? completionSource = _dbBatchProcessed;
if (completionSource is null) return;
if (e.Block.Number < _blockNumberReachedToUnlock) return;

_dbBatchProcessed = null;
completionSource.SetResult();
}

public async Task WaitForQueue(long currentBlockNumber, CancellationToken token)
{
long currentHeadNumber = _blockTree.Head?.Number ?? 0;
if (currentBlockNumber - currentHeadNumber > _stopBatchSize && _dbBatchProcessed == null)
asdacap marked this conversation as resolved.
Show resolved Hide resolved
{
_blockNumberReachedToUnlock = currentBlockNumber - _stopBatchSize + _resumeBatchSize;
TaskCompletionSource completionSource = new TaskCompletionSource();
_dbBatchProcessed = completionSource;
}

if (_dbBatchProcessed != null)
asdacap marked this conversation as resolved.
Show resolved Hide resolved
{
await using (token.Register(() => _dbBatchProcessed.TrySetCanceled()))
{
await _dbBatchProcessed.Task;
}
}
}

public void Dispose()
{
_blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock;
}
}
40 changes: 12 additions & 28 deletions src/Nethermind/Nethermind.Blockchain/Visitors/DbBlocksLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Nethermind.Blockchain.Visitors
{
public class DbBlocksLoader : IBlockTreeVisitor
public class DbBlocksLoader : IBlockTreeVisitor, IDisposable
{
public const int DefaultBatchSize = 4000;

Expand All @@ -20,8 +20,7 @@ public class DbBlocksLoader : IBlockTreeVisitor
private readonly IBlockTree _blockTree;
private readonly ILogger _logger;

private TaskCompletionSource<object> _dbBatchProcessed;
private long _currentDbLoadBatchEnd;
private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer;

public DbBlocksLoader(IBlockTree blockTree,
ILogger logger,
Expand All @@ -30,34 +29,17 @@ public DbBlocksLoader(IBlockTree blockTree,
long maxBlocksToLoad = long.MaxValue)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2);
_logger = logger;

_batchSize = batchSize;
StartLevelInclusive = Math.Max(0L, startBlockNumber ?? (_blockTree.Head?.Number + 1) ?? 0L);
_blocksToLoad = Math.Min(maxBlocksToLoad, _blockTree.BestKnownNumber - StartLevelInclusive);
EndLevelExclusive = StartLevelInclusive + _blocksToLoad + 1;

if (_blocksToLoad != 0)
{
_blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
}

LogPlannedOperation();
}

private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e)
{
if (_dbBatchProcessed is not null)
{
if (e.Block.Number == _currentDbLoadBatchEnd)
{
TaskCompletionSource<object> completionSource = _dbBatchProcessed;
_dbBatchProcessed = null;
completionSource.SetResult(null);
}
}
}

public bool PreventsAcceptingNewBlocks => true;
public bool CalculateTotalDifficultyIfMissing => true;
public long StartLevelInclusive { get; }
Expand Down Expand Up @@ -99,20 +81,17 @@ Task<HeaderVisitOutcome> IBlockTreeVisitor.VisitHeader(BlockHeader header, Cance
async Task<BlockVisitOutcome> IBlockTreeVisitor.VisitBlock(Block block, CancellationToken cancellationToken)
{
// this will hang
Task waitTask = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken);

long i = block.Number - StartLevelInclusive;
if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 && _blockTree.Head.Number + _batchSize < block.Number)
if (!waitTask.IsCompleted)
{
if (_logger.IsInfo)
{
_logger.Info($"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more.");
}

_dbBatchProcessed = new TaskCompletionSource<object>();
await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled()))
{
_currentDbLoadBatchEnd = block.Number - _batchSize;
await _dbBatchProcessed.Task;
}
await waitTask;
}

return BlockVisitOutcome.Suggest;
Expand All @@ -134,5 +113,10 @@ private void LogPlannedOperation()
if (_logger.IsInfo) _logger.Info($"Found {_blocksToLoad} blocks to load from DB starting from current head block {_blockTree.Head?.ToString(Block.Format.Short)}");
}
}

public void Dispose()
{
_blockTreeSuggestPacer.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Nethermind.Blockchain.Visitors
{
public class StartupBlockTreeFixer : IBlockTreeVisitor
public class StartupBlockTreeFixer : IBlockTreeVisitor, IDisposable
{
public const int DefaultBatchSize = 4000;
private readonly IBlockTree _blockTree;
Expand All @@ -33,11 +33,9 @@ public class StartupBlockTreeFixer : IBlockTreeVisitor
private long? _lastProcessedLevel;
private long? _processingGapStart;

private TaskCompletionSource _dbBatchProcessed;
private long _currentDbLoadBatchEnd;
private bool _firstBlockVisited = true;
private bool _suggestBlocks = true;
private readonly long _batchSize;
private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer;

public StartupBlockTreeFixer(
ISyncConfig syncConfig,
Expand All @@ -47,36 +45,18 @@ public StartupBlockTreeFixer(
long batchSize = DefaultBatchSize)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_blockTreeSuggestPacer = new BlockTreeSuggestPacer(_blockTree, batchSize, batchSize / 2);
_stateReader = stateReader;
_logger = logger;

_batchSize = batchSize;
long assumedHead = _blockTree.Head?.Number ?? 0;
_startNumber = Math.Max(syncConfig.PivotNumberParsed, assumedHead + 1);
_blocksToLoad = (assumedHead + 1) >= _startNumber ? (_blockTree.BestKnownNumber - _startNumber + 1) : 0;

_currentLevelNumber = _startNumber - 1; // because we always increment on entering
if (_blocksToLoad != 0)
{
_blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
}

LogPlannedOperation();
}

private void BlockTreeOnNewHeadBlock(object sender, BlockEventArgs e)
{
if (_dbBatchProcessed is not null)
{
if (e.Block.Number == _currentDbLoadBatchEnd)
{
TaskCompletionSource completionSource = _dbBatchProcessed;
_dbBatchProcessed = null;
completionSource.SetResult();
}
}
}

public bool PreventsAcceptingNewBlocks => true;
public bool CalculateTotalDifficultyIfMissing => true;
public long StartLevelInclusive => _startNumber;
Expand Down Expand Up @@ -168,22 +148,18 @@ async Task<BlockVisitOutcome> IBlockTreeVisitor.VisitBlock(Block block, Cancella

if (!_suggestBlocks) return BlockVisitOutcome.None;

Task waitSuggestQueue = _blockTreeSuggestPacer.WaitForQueue(block.Number, cancellationToken);

long i = block.Number - StartLevelInclusive;
if (i % _batchSize == _batchSize - 1 && i != _blocksToLoad - 1 &&
_blockTree.Head.Number + _batchSize < block.Number)
if (!waitSuggestQueue.IsCompleted)
{
if (_logger.IsInfo)
{
_logger.Info(
$"Loaded {i + 1} out of {_blocksToLoad} blocks from DB into processing queue, waiting for processor before loading more.");
}

_dbBatchProcessed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using (cancellationToken.Register(() => _dbBatchProcessed.SetCanceled()))
{
_currentDbLoadBatchEnd = block.Number - _batchSize;
await _dbBatchProcessed.Task;
}
await waitSuggestQueue;
}

return BlockVisitOutcome.Suggest;
Expand Down Expand Up @@ -266,5 +242,10 @@ private void LogPlannedOperation()
$"Found {_blocksToLoad} block tree levels to review for fixes starting from {StartLevelInclusive}");
}
}

public void Dispose()
{
_blockTreeSuggestPacer.Dispose();
}
}
}
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Init/Steps/ReviewBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private async Task RunBlockTreeInitTasks(CancellationToken cancellationToken)

if (!syncConfig.FastSync)
{
DbBlocksLoader loader = new(_api.BlockTree, _logger);
using DbBlocksLoader loader = new(_api.BlockTree, _logger);
await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t =>
{
if (t.IsFaulted)
Expand All @@ -59,7 +59,7 @@ await _api.BlockTree.Accept(loader, cancellationToken).ContinueWith(t =>
}
else
{
StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!);
using StartupBlockTreeFixer fixer = new(syncConfig, _api.BlockTree, _api.WorldStateManager!.GlobalStateReader, _logger!);
await _api.BlockTree.Accept(fixer, cancellationToken).ContinueWith(t =>
{
if (t.IsFaulted)
Expand Down