Skip to content

Commit

Permalink
ParallelUnbalancedWork for efficient unbalanced parallel loops (#7787)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Dec 2, 2024
1 parent 1d42b4a commit 4544a6c
Show file tree
Hide file tree
Showing 12 changed files with 546 additions and 113 deletions.
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Benchmark.Runner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public DashboardConfig(params Job[] jobs)
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Descriptor);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Statistics);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Params);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Metrics);
AddLogger(BenchmarkDotNet.Loggers.ConsoleLogger.Default);
AddExporter(BenchmarkDotNet.Exporters.Json.JsonExporter.FullCompressed);
AddDiagnoser(BenchmarkDotNet.Diagnosers.MemoryDiagnoser.Default);
Expand Down Expand Up @@ -59,7 +60,7 @@ public static void Main(string[] args)
{
foreach (Assembly assembly in additionalJobAssemblies)
{
BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core80)), args);
BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core90)), args);
}

foreach (Assembly assembly in simpleJobAssemblies)
Expand Down
59 changes: 59 additions & 0 deletions src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using BenchmarkDotNet.Attributes;
using Nethermind.Core.Threading;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Benchmarks.Core;

[HideColumns("Job", "RatioSD")]
public class ParallelBenchmark
{
private int[] _times;

[GlobalSetup]
public void Setup()
{
_times = new int[200];

for (int i = 0; i < _times.Length; i++)
{
_times[i] = i % 100;
}
}

[Benchmark(Baseline = true)]
public void ParallelFor()
{
Parallel.For(
0,
_times.Length,
(i) => Thread.Sleep(_times[i]));
}

[Benchmark]
public void ParallelForEach()
{
Parallel.ForEach(
_times,
(time) => Thread.Sleep(time));
}

[Benchmark]
public void UnbalancedParallel()
{
ParallelUnbalancedWork.For<int[]>(
0,
_times.Length,
ParallelUnbalancedWork.DefaultOptions,
_times,
(i, value) =>
{
Thread.Sleep(value[i]);
return value;
},
(value) => { });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,21 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe
{
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (preWarmer: this, block, stateRoot),
static (i, state) =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot);
scope.WorldState.WarmUp(state.block.Withdrawals[i].Address);
}
finally
{
_envPool.Return(env);
state.preWarmer._envPool.Return(env);
}

return state;
});
}
}
Expand All @@ -135,24 +132,19 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp

try
{
int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
ParallelUnbalancedWork.For<BlockState>(0, block.Transactions.Length, parallelOptions, new(this, block, stateRoot, spec), static (i, state) =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
IReadOnlyTxProcessorSource env = state.PreWarmer._envPool.Get();
SystemTransaction systemTransaction = state.PreWarmer._systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
if (state.Block.TransactionProcessed > i) return state;

tx = block.Transactions[i];
tx = state.Block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);

Address senderAddress = tx.SenderAddress!;
if (!scope.WorldState.AccountExists(senderAddress))
Expand All @@ -163,7 +155,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
UInt256 nonceDelta = UInt256.Zero;
for (int prev = 0; prev < i; prev++)
{
if (senderAddress == block.Transactions[prev].SenderAddress)
if (senderAddress == state.Block.Transactions[prev].SenderAddress)
{
nonceDelta++;
}
Expand All @@ -174,26 +166,28 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
scope.WorldState.IncrementNonce(senderAddress, nonceDelta);
}

if (spec.UseTxAccessLists)
if (state.Spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.Block.Header.Clone()), NullTxTracer.Instance);
if (state.PreWarmer._logger.IsTrace) state.PreWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex) when (ex is EvmException or OverflowException)
{
// Ignore, regular tx processing exceptions
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
if (state.PreWarmer._logger.IsDebug) state.PreWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
state.PreWarmer._systemTransactionPool.Return(systemTransaction);
state.PreWarmer._envPool.Return(env);
}

return state;
});
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -273,21 +267,16 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
}

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: PreWarmer, block, StateRoot),
static (i, state) =>
{
int i = 0;
// Process addresses in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
Transaction tx = block.Transactions[i];
Transaction tx = state.block.Transactions[i];
Address? sender = tx.SenderAddress;

var env = PreWarmer._envPool.Get();
var env = state.preWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
Expand All @@ -300,8 +289,10 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
finally
{
PreWarmer._envPool.Return(env);
state.preWarmer._envPool.Return(env);
}

return state;
});
}
catch (OperationCanceledException)
Expand All @@ -316,4 +307,13 @@ private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvF
public IReadOnlyTxProcessorSource Create() => envFactory.Create();
public bool Return(IReadOnlyTxProcessorSource obj) => true;
}

private struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec)
{
public BlockCachePreWarmer PreWarmer = preWarmer;
public Block Block = block;
public Hash256 StateRoot = stateRoot;
public IReleaseSpec Spec = spec;
}
}

11 changes: 8 additions & 3 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
using Nethermind.Evm;
using Nethermind.Evm.Tracing;
Expand Down Expand Up @@ -333,11 +334,15 @@ protected virtual TxReceipt[] ProcessBlock(
[MethodImpl(MethodImplOptions.NoInlining)]
private static void CalculateBlooms(TxReceipt[] receipts)
{
int index = 0;
Parallel.For(0, receipts.Length, _ =>
ParallelUnbalancedWork.For(
0,
receipts.Length,
ParallelUnbalancedWork.DefaultOptions,
receipts,
static (i, receipts) =>
{
int i = Interlocked.Increment(ref index) - 1;
receipts[i].CalculateBloom();
return receipts;
});
}

Expand Down
65 changes: 40 additions & 25 deletions src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
using Nethermind.Logging;
using Nethermind.Serialization.Rlp;
Expand Down Expand Up @@ -48,13 +49,20 @@ public void RecoverData(Block block)
// so we assume the rest of txs in the block are already recovered
return;

Parallel.For(0, txs.Length, i =>
ParallelUnbalancedWork.For(
0,
txs.Length,
ParallelUnbalancedWork.DefaultOptions,
txs,
static (i, txs) =>
{
Transaction tx = txs[i];
if (!tx.IsHashCalculated)
{
tx.CalculateHashInternal();
}

return txs;
});


Expand Down Expand Up @@ -111,14 +119,21 @@ public void RecoverData(Block block)
if (recoverFromEcdsa > 3)
{
// Recover ecdsa in Parallel
Parallel.For(0, txs.Length, i =>
ParallelUnbalancedWork.For(
0,
txs.Length,
ParallelUnbalancedWork.DefaultOptions,
(recover: this, txs, releaseSpec, useSignatureChainId),
static (i, state) =>
{
Transaction tx = txs[i];
if (!ShouldRecoverSignatures(tx)) return;
Transaction tx = state.txs[i];
if (!ShouldRecoverSignatures(tx)) return state;

tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId);
RecoverAuthorities(tx);
if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}");
tx.SenderAddress ??= state.recover._ecdsa.RecoverAddress(tx, state.useSignatureChainId);
state.recover.RecoverAuthorities(tx, state.releaseSpec);
if (state.recover._logger.IsTrace) state.recover._logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}");

return state;
});
}
else
Expand All @@ -128,32 +143,32 @@ public void RecoverData(Block block)
if (!ShouldRecoverSignatures(tx)) continue;

tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId);
RecoverAuthorities(tx);
RecoverAuthorities(tx, releaseSpec);
if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}");
}
}
}

void RecoverAuthorities(Transaction tx)
private void RecoverAuthorities(Transaction tx, IReleaseSpec releaseSpec)
{
if (!releaseSpec.IsAuthorizationListEnabled
|| !tx.HasAuthorizationList)
{
if (!releaseSpec.IsAuthorizationListEnabled
|| !tx.HasAuthorizationList)
{
return;
}
return;
}

if (tx.AuthorizationList.Length > 3)
if (tx.AuthorizationList.Length > 3)
{
Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) =>
{
Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) =>
{
tuple.Authority = _ecdsa.RecoverAddress(tuple);
});
}
else
tuple.Authority = _ecdsa.RecoverAddress(tuple);
});
}
else
{
foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan())
{
foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan())
{
tuple.Authority ??= _ecdsa.RecoverAddress(tuple);
}
tuple.Authority ??= _ecdsa.RecoverAddress(tuple);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ public void Truncate(int newLength)
Count = newLength;
}

public ref T GetRef(int index)
{
GuardIndex(index);
return ref _array[index];
}

public T this[int index]
{
get
Expand Down
Loading

0 comments on commit 4544a6c

Please sign in to comment.