diff --git a/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs b/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs index d4afb51ab17..1b6a025ef45 100644 --- a/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs +++ b/src/Nethermind/Nethermind.Benchmark.Runner/Program.cs @@ -26,6 +26,7 @@ public DashboardConfig(params Job[] jobs) AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Descriptor); AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Statistics); AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Params); + AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Metrics); AddLogger(BenchmarkDotNet.Loggers.ConsoleLogger.Default); AddExporter(BenchmarkDotNet.Exporters.Json.JsonExporter.FullCompressed); AddDiagnoser(BenchmarkDotNet.Diagnosers.MemoryDiagnoser.Default); @@ -59,7 +60,7 @@ public static void Main(string[] args) { foreach (Assembly assembly in additionalJobAssemblies) { - BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core80)), args); + BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core90)), args); } foreach (Assembly assembly in simpleJobAssemblies) diff --git a/src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs b/src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs new file mode 100644 index 00000000000..b5a46721e6c --- /dev/null +++ b/src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs @@ -0,0 +1,59 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using BenchmarkDotNet.Attributes; +using Nethermind.Core.Threading; +using System.Threading; +using System.Threading.Tasks; + +namespace Nethermind.Benchmarks.Core; + +[HideColumns("Job", "RatioSD")] +public class ParallelBenchmark +{ + private int[] _times; + + [GlobalSetup] + public void Setup() + { + _times = new int[200]; + + for (int i = 0; i < _times.Length; i++) + { + _times[i] = i % 100; + } + } + + [Benchmark(Baseline = true)] + public void ParallelFor() + { + Parallel.For( + 0, + _times.Length, + (i) => Thread.Sleep(_times[i])); + } + + [Benchmark] + public void ParallelForEach() + { + Parallel.ForEach( + _times, + (time) => Thread.Sleep(time)); + } + + [Benchmark] + public void UnbalancedParallel() + { + ParallelUnbalancedWork.For( + 0, + _times.Length, + ParallelUnbalancedWork.DefaultOptions, + _times, + (i, value) => + { + Thread.Sleep(value[i]); + return value; + }, + (value) => { }); + } +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs index 1426aca1430..7ba634d102e 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs @@ -98,24 +98,21 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe { if (spec.WithdrawalsEnabled && block.Withdrawals is not null) { - int progress = 0; - Parallel.For(0, block.Withdrawals.Length, parallelOptions, - _ => + ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (preWarmer: this, block, stateRoot), + static (i, state) => { - IReadOnlyTxProcessorSource env = _envPool.Get(); - int i = 0; + IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get(); try { - using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); - // Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - i = Interlocked.Increment(ref progress) - 1; - scope.WorldState.WarmUp(block.Withdrawals[i].Address); + using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot); + scope.WorldState.WarmUp(state.block.Withdrawals[i].Address); } finally { - _envPool.Return(env); + state.preWarmer._envPool.Return(env); } + + return state; }); } } @@ -135,24 +132,19 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp try { - int progress = 0; - Parallel.For(0, block.Transactions.Length, parallelOptions, _ => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, new(this, block, stateRoot, spec), static (i, state) => { - using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority(); - IReadOnlyTxProcessorSource env = _envPool.Get(); - SystemTransaction systemTransaction = _systemTransactionPool.Get(); + IReadOnlyTxProcessorSource env = state.PreWarmer._envPool.Get(); + SystemTransaction systemTransaction = state.PreWarmer._systemTransactionPool.Get(); Transaction? tx = null; try { - // Process transactions in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - int i = Interlocked.Increment(ref progress) - 1; // If the transaction has already been processed or being processed, exit early - if (block.TransactionProcessed > i) return; + if (state.Block.TransactionProcessed > i) return state; - tx = block.Transactions[i]; + tx = state.Block.Transactions[i]; tx.CopyTo(systemTransaction); - using IReadOnlyTxProcessingScope scope = env.Build(stateRoot); + using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot); Address senderAddress = tx.SenderAddress!; if (!scope.WorldState.AccountExists(senderAddress)) @@ -163,7 +155,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp UInt256 nonceDelta = UInt256.Zero; for (int prev = 0; prev < i; prev++) { - if (senderAddress == block.Transactions[prev].SenderAddress) + if (senderAddress == state.Block.Transactions[prev].SenderAddress) { nonceDelta++; } @@ -174,12 +166,12 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp scope.WorldState.IncrementNonce(senderAddress, nonceDelta); } - if (spec.UseTxAccessLists) + if (state.Spec.UseTxAccessLists) { scope.WorldState.WarmUp(tx.AccessList); // eip-2930 } - TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance); - if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); + TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.Block.Header.Clone()), NullTxTracer.Instance); + if (state.PreWarmer._logger.IsTrace) state.PreWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}"); } catch (Exception ex) when (ex is EvmException or OverflowException) { @@ -187,13 +179,15 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp } catch (Exception ex) { - if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex); + if (state.PreWarmer._logger.IsDebug) state.PreWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex); } finally { - _systemTransactionPool.Return(systemTransaction); - _envPool.Return(env); + state.PreWarmer._systemTransactionPool.Return(systemTransaction); + state.PreWarmer._envPool.Return(env); } + + return state; }); } catch (OperationCanceledException) @@ -273,21 +267,16 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block) } } - int progress = 0; - Parallel.For(0, block.Transactions.Length, parallelOptions, - _ => + ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: PreWarmer, block, StateRoot), + static (i, state) => { - int i = 0; - // Process addresses in sequential order, rather than partitioning scheme from Parallel.For - // Interlocked.Increment returns the incremented value, so subtract 1 to start at 0 - i = Interlocked.Increment(ref progress) - 1; - Transaction tx = block.Transactions[i]; + Transaction tx = state.block.Transactions[i]; Address? sender = tx.SenderAddress; - var env = PreWarmer._envPool.Get(); + var env = state.preWarmer._envPool.Get(); try { - using IReadOnlyTxProcessingScope scope = env.Build(StateRoot); + using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot); if (sender is not null) { scope.WorldState.WarmUp(sender); @@ -300,8 +289,10 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block) } finally { - PreWarmer._envPool.Return(env); + state.preWarmer._envPool.Return(env); } + + return state; }); } catch (OperationCanceledException) @@ -316,4 +307,13 @@ private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvF public IReadOnlyTxProcessorSource Create() => envFactory.Create(); public bool Return(IReadOnlyTxProcessorSource obj) => true; } + + private struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec) + { + public BlockCachePreWarmer PreWarmer = preWarmer; + public Block Block = block; + public Hash256 StateRoot = stateRoot; + public IReleaseSpec Spec = spec; + } } + diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs index 154733f9903..063988403c3 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs @@ -19,6 +19,7 @@ using Nethermind.Core; using Nethermind.Core.Crypto; using Nethermind.Core.Specs; +using Nethermind.Core.Threading; using Nethermind.Crypto; using Nethermind.Evm; using Nethermind.Evm.Tracing; @@ -333,11 +334,15 @@ protected virtual TxReceipt[] ProcessBlock( [MethodImpl(MethodImplOptions.NoInlining)] private static void CalculateBlooms(TxReceipt[] receipts) { - int index = 0; - Parallel.For(0, receipts.Length, _ => + ParallelUnbalancedWork.For( + 0, + receipts.Length, + ParallelUnbalancedWork.DefaultOptions, + receipts, + static (i, receipts) => { - int i = Interlocked.Increment(ref index) - 1; receipts[i].CalculateBloom(); + return receipts; }); } diff --git a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs index c748ecae850..ac780699cb9 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Specs; +using Nethermind.Core.Threading; using Nethermind.Crypto; using Nethermind.Logging; using Nethermind.Serialization.Rlp; @@ -48,13 +49,20 @@ public void RecoverData(Block block) // so we assume the rest of txs in the block are already recovered return; - Parallel.For(0, txs.Length, i => + ParallelUnbalancedWork.For( + 0, + txs.Length, + ParallelUnbalancedWork.DefaultOptions, + txs, + static (i, txs) => { Transaction tx = txs[i]; if (!tx.IsHashCalculated) { tx.CalculateHashInternal(); } + + return txs; }); @@ -111,14 +119,21 @@ public void RecoverData(Block block) if (recoverFromEcdsa > 3) { // Recover ecdsa in Parallel - Parallel.For(0, txs.Length, i => + ParallelUnbalancedWork.For( + 0, + txs.Length, + ParallelUnbalancedWork.DefaultOptions, + (recover: this, txs, releaseSpec, useSignatureChainId), + static (i, state) => { - Transaction tx = txs[i]; - if (!ShouldRecoverSignatures(tx)) return; + Transaction tx = state.txs[i]; + if (!ShouldRecoverSignatures(tx)) return state; - tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId); - RecoverAuthorities(tx); - if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}"); + tx.SenderAddress ??= state.recover._ecdsa.RecoverAddress(tx, state.useSignatureChainId); + state.recover.RecoverAuthorities(tx, state.releaseSpec); + if (state.recover._logger.IsTrace) state.recover._logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}"); + + return state; }); } else @@ -128,32 +143,32 @@ public void RecoverData(Block block) if (!ShouldRecoverSignatures(tx)) continue; tx.SenderAddress ??= _ecdsa.RecoverAddress(tx, useSignatureChainId); - RecoverAuthorities(tx); + RecoverAuthorities(tx, releaseSpec); if (_logger.IsTrace) _logger.Trace($"Recovered {tx.SenderAddress} sender for {tx.Hash}"); } } + } - void RecoverAuthorities(Transaction tx) + private void RecoverAuthorities(Transaction tx, IReleaseSpec releaseSpec) + { + if (!releaseSpec.IsAuthorizationListEnabled + || !tx.HasAuthorizationList) { - if (!releaseSpec.IsAuthorizationListEnabled - || !tx.HasAuthorizationList) - { - return; - } + return; + } - if (tx.AuthorizationList.Length > 3) + if (tx.AuthorizationList.Length > 3) + { + Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) => { - Parallel.ForEach(tx.AuthorizationList.Where(t => t.Authority is null), (tuple) => - { - tuple.Authority = _ecdsa.RecoverAddress(tuple); - }); - } - else + tuple.Authority = _ecdsa.RecoverAddress(tuple); + }); + } + else + { + foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan()) { - foreach (AuthorizationTuple tuple in tx.AuthorizationList.AsSpan()) - { - tuple.Authority ??= _ecdsa.RecoverAddress(tuple); - } + tuple.Authority ??= _ecdsa.RecoverAddress(tuple); } } } diff --git a/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs b/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs index 574e574992b..7b734e34236 100644 --- a/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs +++ b/src/Nethermind/Nethermind.Core/Collections/ArrayPoolList.cs @@ -281,6 +281,12 @@ public void Truncate(int newLength) Count = newLength; } + public ref T GetRef(int index) + { + GuardIndex(index); + return ref _array[index]; + } + public T this[int index] { get diff --git a/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs new file mode 100644 index 00000000000..c54d39cc255 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs @@ -0,0 +1,332 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace Nethermind.Core.Threading; + +/// +/// Provides methods to execute parallel loops efficiently for unbalanced workloads. +/// +public class ParallelUnbalancedWork : IThreadPoolWorkItem +{ + public static readonly ParallelOptions DefaultOptions = new() + { + // default to the number of processors + MaxDegreeOfParallelism = Environment.ProcessorCount + }; + + private readonly Data _data; + + /// + /// Executes a parallel for loop over a range of integers. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// The delegate that is invoked once per iteration. + public static void For(int fromInclusive, int toExclusive, Action action) + => For(fromInclusive, toExclusive, DefaultOptions, action); + + /// + /// Executes a parallel for loop over a range of integers, with the specified options. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The delegate that is invoked once per iteration. + public static void For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action action) + { + int threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount; + + Data data = new(threads, fromInclusive, toExclusive, action); + + for (int i = 0; i < threads - 1; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(new ParallelUnbalancedWork(data), preferLocal: false); + } + + new ParallelUnbalancedWork(data).Execute(); + + // If there are still active threads, wait for them to complete + if (data.ActiveThreads > 0) + { + data.Event.Wait(); + } + } + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data, initialization, and finalization functions. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The function to initialize the local data for each thread. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + Func init, + Func action, + Action @finally) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, init, default, action, @finally); + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data, initialization, and finalization functions. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The initial the local data for each thread. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + TLocal value, + Func action, + Action @finally) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, null, value, action, @finally); + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// The initial state of the thread-local data. + /// The delegate that is invoked once per iteration. + public static void For(int fromInclusive, int toExclusive, TLocal state, Func action) + => For(fromInclusive, toExclusive, DefaultOptions, state, action); + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data and specified options. + /// + /// The type of the thread-local data. + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The initial state of the thread-local data. + /// The delegate that is invoked once per iteration. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + TLocal state, + Func action) + => InitProcessor.For(fromInclusive, toExclusive, parallelOptions, null, state, action); + + /// + /// Initializes a new instance of the class. + /// + /// The shared data for the parallel work. + private ParallelUnbalancedWork(Data data) + { + _data = data; + } + + /// + /// Executes the parallel work item. + /// + public void Execute() + { + int i = _data.Index.GetNext(); + while (i < _data.ToExclusive) + { + _data.Action(i); + // Get the next index + i = _data.Index.GetNext(); + } + + // Signal that this thread has completed its work + _data.MarkThreadCompleted(); + } + + /// + /// Provides a thread-safe counter for sharing indices among threads. + /// + private class SharedCounter(int fromInclusive) + { + private PaddedValue _index = new(fromInclusive); + + /// + /// Gets the next index in a thread-safe manner. + /// + /// The next index. + public int GetNext() => Interlocked.Increment(ref _index.Value) - 1; + + [StructLayout(LayoutKind.Explicit, Size = 128)] + private struct PaddedValue(int value) + { + [FieldOffset(64)] + public int Value = value; + } + } + + /// + /// Represents the base data shared among threads during parallel execution. + /// + private class BaseData(int threads, int fromInclusive, int toExclusive) + { + /// + /// Gets the shared counter for indices. + /// + public SharedCounter Index { get; } = new SharedCounter(fromInclusive); + public SemaphoreSlim Event { get; } = new(initialCount: 0); + private int _activeThreads = threads; + + /// + /// Gets the exclusive upper bound of the range. + /// + public int ToExclusive => toExclusive; + + /// + /// Gets the number of active threads. + /// + public int ActiveThreads => Volatile.Read(ref _activeThreads); + + /// + /// Marks a thread as completed. + /// + /// The number of remaining active threads. + public int MarkThreadCompleted() + { + var remaining = Interlocked.Decrement(ref _activeThreads); + + if (remaining == 0) + { + Event.Release(); + } + + return remaining; + } + } + + /// + /// Represents the data shared among threads for the parallel action. + /// + private class Data(int threads, int fromInclusive, int toExclusive, Action action) : + BaseData(threads, fromInclusive, toExclusive) + { + /// + /// Gets the action to be executed for each iteration. + /// + public Action Action => action; + } + + /// + /// Provides methods to execute parallel loops with thread-local data initialization and finalization. + /// + /// The type of the thread-local data. + private class InitProcessor : IThreadPoolWorkItem + { + private readonly Data _data; + + /// + /// Executes a parallel for loop over a range of integers, with thread-local data initialization and finalization. + /// + /// The inclusive lower bound of the range. + /// The exclusive upper bound of the range. + /// An object that configures the behavior of this operation. + /// The function to initialize the local data for each thread. + /// The initial value of the local data. + /// The delegate that is invoked once per iteration. + /// The function to finalize the local data for each thread. + public static void For( + int fromInclusive, + int toExclusive, + ParallelOptions parallelOptions, + Func? init, + TLocal? initValue, + Func action, + Action? @finally = null) + { + // Determine the number of threads to use + var threads = parallelOptions.MaxDegreeOfParallelism > 0 + ? parallelOptions.MaxDegreeOfParallelism + : Environment.ProcessorCount; + + // Create shared data with thread-local initializers and finalizers + var data = new Data(threads, fromInclusive, toExclusive, action, init, initValue, @finally); + + // Queue work items to the thread pool for all threads except the current one + for (int i = 0; i < threads - 1; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(new InitProcessor(data), preferLocal: false); + } + + // Execute work on the current thread + new InitProcessor(data).Execute(); + + // If there are still active threads, wait for them to complete + if (data.ActiveThreads > 0) + { + data.Event.Wait(); + } + } + + /// + /// Initializes a new instance of the class. + /// + /// The shared data for the parallel work. + private InitProcessor(Data data) => _data = data; + + /// + /// Executes the parallel work item with thread-local data. + /// + public void Execute() + { + TLocal? value = _data.Init(); + int i = _data.Index.GetNext(); + while (i < _data.ToExclusive) + { + value = _data.Action(i, value); + i = _data.Index.GetNext(); + } + + _data.Finally(value); + + _data.MarkThreadCompleted(); + } + + /// + /// Represents the data shared among threads for the parallel action with thread-local data. + /// + /// The type of the thread-local data. + private class Data(int threads, + int fromInclusive, + int toExclusive, + Func action, + Func? init = null, + TValue? initValue = default, + Action? @finally = null) : BaseData(threads, fromInclusive, toExclusive) + { + /// + /// Gets the action to be executed for each iteration. + /// + public Func Action => action; + + /// + /// Initializes the thread-local data. + /// + /// The initialized thread-local data. + public TValue Init() => initValue ?? (init is not null ? init.Invoke() : default)!; + + /// + /// Finalizes the thread-local data. + /// + /// The thread-local data to finalize. + public void Finally(TValue value) + { + @finally?.Invoke(value); + } + } + } +} diff --git a/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs b/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs index e3ff782f422..de88ea69482 100644 --- a/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs +++ b/src/Nethermind/Nethermind.Evm/TransactionProcessing/TransactionProcessor.cs @@ -141,7 +141,7 @@ protected virtual TransactionResult Execute(Transaction tx, in BlockExecutionCon // commit - is for standard execute, we will commit thee state after execution // !commit - is for build up during block production, we won't commit state after each transaction to support rollbacks // we commit only after all block is constructed - bool commit = opts.HasFlag(ExecutionOptions.Commit) || !spec.IsEip658Enabled; + bool commit = opts.HasFlag(ExecutionOptions.Commit) || (!opts.HasFlag(ExecutionOptions.Warmup) && !spec.IsEip658Enabled); TransactionResult result; if (!(result = ValidateStatic(tx, header, spec, opts, out long intrinsicGas))) return result; diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs index 74f027bb80e..1c48629b742 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs @@ -9,14 +9,13 @@ using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; - using Nethermind.Core; +using Nethermind.Core.Threading; using Nethermind.JsonRpc.Exceptions; using Nethermind.JsonRpc.Modules; using Nethermind.Logging; using Nethermind.Serialization.Json; using Nethermind.State; - using static Nethermind.JsonRpc.Modules.RpcModuleProvider; using static Nethermind.JsonRpc.Modules.RpcModuleProvider.ResolvedMethodInfo; @@ -357,17 +356,24 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec } else if (providedParametersLength > parallelThreshold) { - Parallel.For(0, providedParametersLength, (int i) => + ParallelUnbalancedWork.For( + 0, + providedParametersLength, + ParallelUnbalancedWork.DefaultOptions, + (providedParameters, expectedParameters, executionParameters, hasMissing), + static (i, state) => { - JsonElement providedParameter = providedParameters[i]; - ExpectedParameter expectedParameter = expectedParameters[i]; + JsonElement providedParameter = state.providedParameters[i]; + ExpectedParameter expectedParameter = state.expectedParameters[i]; object? parameter = DeserializeParameter(providedParameter, expectedParameter); - executionParameters[i] = parameter; - if (!hasMissing && ReferenceEquals(parameter, Type.Missing)) + state.executionParameters[i] = parameter; + if (!state.hasMissing && ReferenceEquals(parameter, Type.Missing)) { - hasMissing = true; + state.hasMissing = true; } + + return state; }); } diff --git a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs index 9bbbf99282d..9c0a2961637 100644 --- a/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs +++ b/src/Nethermind/Nethermind.State/PersistentStorageProvider.cs @@ -14,6 +14,7 @@ using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; +using Nethermind.Core.Threading; using Nethermind.Int256; using Nethermind.Logging; using Nethermind.State.Tracing; @@ -22,6 +23,7 @@ namespace Nethermind.State; using Nethermind.Core.Cpu; + /// /// Manages persistent storage allowing for snapshotting and restoring /// Persists data to ITrieStore @@ -266,19 +268,27 @@ void UpdateRootHashesSingleThread() void UpdateRootHashesMultiThread() { // We can recalculate the roots in parallel as they are all independent tries - Parallel.ForEach(_storages, RuntimeInformation.ParallelOptionsLogicalCores, kvp => + using var storages = _storages.ToPooledList(); + ParallelUnbalancedWork.For( + 0, + storages.Count, + RuntimeInformation.ParallelOptionsLogicalCores, + (storages, toUpdateRoots: _toUpdateRoots), + static (i, state) => { - if (!_toUpdateRoots.Contains(kvp.Key)) + ref var kvp = ref state.storages.GetRef(i); + if (!state.toUpdateRoots.Contains(kvp.Key)) { // Wasn't updated don't recalculate - return; + return state; } StorageTree storageTree = kvp.Value; storageTree.UpdateRootHash(canBeParallel: false); + return state; }); // Update the storage roots in the main thread non in parallel - foreach (KeyValuePair kvp in _storages) + foreach (ref var kvp in storages.AsSpan()) { if (!_toUpdateRoots.Contains(kvp.Key)) { @@ -288,7 +298,6 @@ void UpdateRootHashesMultiThread() // Update the storage root for the Account _stateProvider.UpdateStorageRoot(address: kvp.Key, kvp.Value.RootHash); } - } } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs index 208d355a844..115fe36dfdb 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs @@ -3,11 +3,11 @@ using System; using System.Threading; -using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Core; using Nethermind.Core.Caching; using Nethermind.Core.Collections; +using Nethermind.Core.Threading; namespace Nethermind.Synchronization.FastBlocks { @@ -123,7 +123,7 @@ public bool TryGetInfosForBatch(int batchSize, Func blockExist, { bool hasNonNull = false; bool hasInserted = false; - Parallel.For(0, workingArray.Count, (i) => + ParallelUnbalancedWork.For(0, workingArray.Count, (i) => { if (workingArray[i] is not null) { diff --git a/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs b/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs index b32c4f95a59..0223672333f 100644 --- a/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs +++ b/src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs @@ -7,10 +7,10 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Tasks; using Nethermind.Core.Buffers; using Nethermind.Core.Cpu; using Nethermind.Core.Crypto; +using Nethermind.Core.Threading; using Nethermind.Serialization.Rlp; using Nethermind.Trie.Pruning; @@ -175,33 +175,33 @@ private static int GetChildrenRlpLengthForBranchParallel(ITrieNodeResolver tree, private static int GetChildrenRlpLengthForBranchNonRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool bufferPool) { int totalLength = 0; - Parallel.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, - () => 0, - (i, _, local) => + ParallelUnbalancedWork.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, + (local: 0, item, tree, bufferPool, rootPath), + static (i, state) => { - object? data = item._data[i]; + object? data = state.item._data[i]; if (ReferenceEquals(data, _nullNode) || data is null) { - local++; + state.local++; } else if (data is Hash256) { - local += Rlp.LengthOfKeccakRlp; + state.local += Rlp.LengthOfKeccakRlp; } else { - TreePath path = rootPath; + TreePath path = state.rootPath; path.AppendMut(i); TrieNode childNode = Unsafe.As(data); - childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool); - local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; + childNode.ResolveKey(state.tree, ref path, isRoot: false, bufferPool: state.bufferPool); + state.local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; } - return local; + return state; }, - local => + state => { - Interlocked.Add(ref totalLength, local); + Interlocked.Add(ref totalLength, state.local); }); return totalLength; @@ -236,40 +236,40 @@ private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, r private static int GetChildrenRlpLengthForBranchRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool? bufferPool) { int totalLength = 0; - Parallel.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, - () => 0, - (i, _, local) => + ParallelUnbalancedWork.For(0, BranchesCount, RuntimeInformation.ParallelOptionsLogicalCores, + (local: 0, item, tree, bufferPool, rootPath), + static (i, state) => { - ValueRlpStream rlpStream = item.RlpStream; - item.SeekChild(ref rlpStream, i); - object? data = item._data[i]; + ValueRlpStream rlpStream = state.item.RlpStream; + state.item.SeekChild(ref rlpStream, i); + object? data = state.item._data[i]; if (data is null) { - local += rlpStream.PeekNextRlpLength(); + state.local += rlpStream.PeekNextRlpLength(); } else if (ReferenceEquals(data, _nullNode)) { - local++; + state.local++; } else if (data is Hash256) { - local += Rlp.LengthOfKeccakRlp; + state.local += Rlp.LengthOfKeccakRlp; } else { - TreePath path = rootPath; + TreePath path = state.rootPath; path.AppendMut(i); Debug.Assert(data is TrieNode, "Data is not TrieNode"); TrieNode childNode = Unsafe.As(data); - childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool); - local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; + childNode.ResolveKey(state.tree, ref path, isRoot: false, bufferPool: state.bufferPool); + state.local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp; } - return local; + return state; }, - local => + state => { - Interlocked.Add(ref totalLength, local); + Interlocked.Add(ref totalLength, state.local); }); return totalLength;