Skip to content

Commit

Permalink
Throttle Eth_ module queue (#5945)
Browse files Browse the repository at this point in the history
* Add loggers to bounded module

* next stage of experiments

* fix

* fix

* refactor exceptions & concurrency fixes

* cosmetic

* cosmetic cleanup

* rename config

* fix description

* Naming

---------

Co-authored-by: benaadams <[email protected]>
  • Loading branch information
MarekM25 and benaadams authored Jul 21, 2023
1 parent 9caae45 commit d56d659
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public Task InitRpcModules()
AccountAbstractionModuleFactory accountAbstractionModuleFactory = new(_userOperationPools, _entryPointContractAddresses.ToArray());
ILogManager logManager = _nethermindApi.LogManager ??
throw new ArgumentNullException(nameof(_nethermindApi.LogManager));
getFromApi.RpcModuleProvider!.RegisterBoundedByCpuCount(accountAbstractionModuleFactory, rpcConfig.Timeout);
getFromApi.RpcModuleProvider!.RegisterBoundedByCpuCount(accountAbstractionModuleFactory, rpcConfig.Timeout, _nethermindApi.LogManager);

ISubscriptionFactory? subscriptionFactory = _nethermindApi.SubscriptionFactory;
//Register custom UserOperation websocket subscription types in the SubscriptionFactory.
Expand Down
8 changes: 4 additions & 4 deletions src/Nethermind/Nethermind.Init/Steps/RegisterRpcModules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public virtual async Task Execute(CancellationToken cancellationToken)
_api.GasPriceOracle,
_api.EthSyncingInfo);

rpcModuleProvider.RegisterBounded(ethModuleFactory, rpcConfig.EthModuleConcurrentInstances ?? Environment.ProcessorCount, rpcConfig.Timeout);
rpcModuleProvider.RegisterBounded(ethModuleFactory, rpcConfig.EthModuleConcurrentInstances ?? Environment.ProcessorCount, rpcConfig.Timeout, _api.LogManager, rpcConfig.RequestQueueLimit);

if (_api.DbProvider is null) throw new StepDependencyException(nameof(_api.DbProvider));
if (_api.BlockPreprocessor is null) throw new StepDependencyException(nameof(_api.BlockPreprocessor));
Expand All @@ -119,7 +119,7 @@ public virtual async Task Execute(CancellationToken cancellationToken)
if (_api.WitnessRepository is null) throw new StepDependencyException(nameof(_api.WitnessRepository));

ProofModuleFactory proofModuleFactory = new(_api.DbProvider, _api.BlockTree, _api.ReadOnlyTrieStore, _api.BlockPreprocessor, _api.ReceiptFinder, _api.SpecProvider, _api.LogManager);
rpcModuleProvider.RegisterBounded(proofModuleFactory, 2, rpcConfig.Timeout);
rpcModuleProvider.RegisterBounded(proofModuleFactory, 2, rpcConfig.Timeout, _api.LogManager);

DebugModuleFactory debugModuleFactory = new(
_api.DbProvider,
Expand All @@ -135,7 +135,7 @@ public virtual async Task Execute(CancellationToken cancellationToken)
_api.SpecProvider,
_api.SyncModeSelector,
_api.LogManager);
rpcModuleProvider.RegisterBoundedByCpuCount(debugModuleFactory, rpcConfig.Timeout);
rpcModuleProvider.RegisterBoundedByCpuCount(debugModuleFactory, rpcConfig.Timeout, _api.LogManager);

TraceModuleFactory traceModuleFactory = new(
_api.DbProvider,
Expand All @@ -149,7 +149,7 @@ public virtual async Task Execute(CancellationToken cancellationToken)
_api.PoSSwitcher,
_api.LogManager);

rpcModuleProvider.RegisterBoundedByCpuCount(traceModuleFactory, rpcConfig.Timeout);
rpcModuleProvider.RegisterBoundedByCpuCount(traceModuleFactory, rpcConfig.Timeout, _api.LogManager);

if (_api.EthereumEcdsa is null) throw new StepDependencyException(nameof(_api.EthereumEcdsa));
if (_api.Wallet is null) throw new StepDependencyException(nameof(_api.Wallet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Nethermind.Db.Blooms;
using Nethermind.Facade;
using Nethermind.Facade.Eth;
using Nethermind.JsonRpc.Exceptions;
using Nethermind.JsonRpc.Modules.Eth.FeeHistory;
using Nethermind.JsonRpc.Modules.Eth.GasPrice;
using Nethermind.State;
Expand Down Expand Up @@ -60,7 +61,7 @@ public async Task Initialize()
Substitute.For<IReceiptStorage>(),
Substitute.For<IGasPriceOracle>(),
Substitute.For<IEthSyncingInfo>()),
1, 1000);
1, 1000, LimboLogs.Instance);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Task InitRpcModules()
if (apiRpcModuleProvider.GetPool(ModuleType.Trace) is IRpcModulePool<ITraceRpcModule> traceModulePool)
{
TraceStoreModuleFactory traceModuleFactory = new(traceModulePool.Factory, _db!, _api.BlockTree!, _api.ReceiptFinder!, _traceSerializer!, _logManager, _config.DeserializationParallelization);
apiRpcModuleProvider.RegisterBoundedByCpuCount(traceModuleFactory, _jsonRpcConfig.Timeout);
apiRpcModuleProvider.RegisterBoundedByCpuCount(traceModuleFactory, _jsonRpcConfig.Timeout, _logManager);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;

namespace Nethermind.JsonRpc.Exceptions;

public class LimitExceededException : Exception
{
public LimitExceededException(string message)
: base(message)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System;

namespace Nethermind.JsonRpc.Modules
namespace Nethermind.JsonRpc.Exceptions
{
public class ModuleRentalTimeoutException : TimeoutException
{
Expand Down
13 changes: 13 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc/IJsonRpcConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ public interface IJsonRpcConfig : IConfig
DefaultValue = "20000")]
int Timeout { get; set; }

[ConfigItem(
Description = "The queued request limit for calls above the max concurrency amount for (" +
nameof(IEthRpcModule.eth_call) + ", " +
nameof(IEthRpcModule.eth_estimateGas) + ", " +
nameof(IEthRpcModule.eth_getLogs) + ", " +
nameof(IEthRpcModule.eth_newFilter) + ", " +
nameof(IEthRpcModule.eth_newBlockFilter) + ", " +
nameof(IEthRpcModule.eth_newPendingTransactionFilter) + ", " +
nameof(IEthRpcModule.eth_uninstallFilter) + "). " +
" If value is set to 0 limit won't be applied.",
DefaultValue = "500")]
int RequestQueueLimit { get; set; }

[ConfigItem(
Description = "Base file path for diagnostic JSON RPC recorder.",
DefaultValue = "\"logs/rpc.{counter}.txt\"")]
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class JsonRpcConfig : IJsonRpcConfig
public bool Enabled { get; set; }
public string Host { get; set; } = "127.0.0.1";
public int Timeout { get; set; } = 20000;
public int RequestQueueLimit { get; set; } = 500;
public string RpcRecorderBaseFilePath { get; set; } = "logs/rpc.{counter}.txt";

public RpcRecorderState RpcRecorderState { get; set; } = RpcRecorderState.None;
Expand Down
12 changes: 10 additions & 2 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Nethermind.Core;
using Nethermind.Core.Attributes;
using Nethermind.JsonRpc.Data;
using Nethermind.JsonRpc.Exceptions;
using Nethermind.JsonRpc.Modules;
using Nethermind.Logging;
using Nethermind.Serialization.Json;
Expand Down Expand Up @@ -75,8 +76,15 @@ public async Task<JsonRpcResponse> SendRequestAsync(JsonRpcRequest rpcRequest, J
}
catch (TargetInvocationException ex)
{
if (_logger.IsError) _logger.Error($"Error during method execution, request: {rpcRequest}", ex.InnerException);
return GetErrorResponse(rpcRequest.Method, ErrorCodes.InternalError, "Internal error", ex.InnerException?.ToString(), rpcRequest.Id);
if (_logger.IsError)
_logger.Error($"Error during method execution, request: {rpcRequest}", ex.InnerException);
return GetErrorResponse(rpcRequest.Method, ErrorCodes.InternalError, "Internal error",
ex.InnerException?.ToString(), rpcRequest.Id);
}
catch (LimitExceededException ex)
{
if (_logger.IsError) _logger.Error($"Error during method execution, request: {rpcRequest}", ex);
return GetErrorResponse(rpcRequest.Method, ErrorCodes.LimitExceeded, "Too many requests", ex.ToString(), rpcRequest.Id);
}
catch (ModuleRentalTimeoutException ex)
{
Expand Down
34 changes: 33 additions & 1 deletion src/Nethermind/Nethermind.JsonRpc/Modules/BoundedModulePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.JsonRpc.Exceptions;
using Nethermind.Logging;
using ILogger = Nethermind.Logging.ILogger;

namespace Nethermind.JsonRpc.Modules
{
Expand All @@ -15,9 +18,15 @@ public class BoundedModulePool<T> : IRpcModulePool<T> where T : IRpcModule
private readonly Task<T> _sharedAsTask;
private readonly ConcurrentQueue<T> _pool = new();
private readonly SemaphoreSlim _semaphore;
private readonly ILogger _logger;
private int _rpcQueuedCalls = 0;
private readonly int _requestQueueLimit = 0;
private bool RequestLimitEnabled => _requestQueueLimit > 0;

public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, int timeout)
public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, int timeout, ILogManager logManager, int requestQueueLimit = 0)
{
_requestQueueLimit = requestQueueLimit;
_logger = logManager.GetClassLogger();
_timeout = timeout;
Factory = factory;

Expand All @@ -35,15 +44,38 @@ public BoundedModulePool(IRpcModuleFactory<T> factory, int exclusiveCapacity, in

private async Task<T> SlowPath()
{
if (RequestLimitEnabled && _rpcQueuedCalls > _requestQueueLimit)
{
throw new LimitExceededException($"Unable to start new queued requests for {typeof(T).Name}. Too many queued requests. Queued calls {_rpcQueuedCalls}.");
}

IncrementRpcQueuedCalls();
if (_logger.IsTrace)
_logger.Trace($"{typeof(T).Name} Queued RPC requests {_rpcQueuedCalls}");

if (!await _semaphore.WaitAsync(_timeout))
{
DecrementRpcQueuedCalls();
throw new ModuleRentalTimeoutException($"Unable to rent an instance of {typeof(T).Name}. Too many concurrent requests.");
}

DecrementRpcQueuedCalls();
_pool.TryDequeue(out T result);
return result;
}

private void IncrementRpcQueuedCalls()
{
if (RequestLimitEnabled)
Interlocked.Increment(ref _rpcQueuedCalls);
}

private void DecrementRpcQueuedCalls()
{
if (RequestLimitEnabled)
Interlocked.Decrement(ref _rpcQueuedCalls);
}

public void ReturnModule(T module)
{
if (ReferenceEquals(module, _shared))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using Nethermind.Logging;

namespace Nethermind.JsonRpc.Modules
{
Expand All @@ -13,19 +14,22 @@ public static void RegisterBounded<T>(
this IRpcModuleProvider rpcModuleProvider,
ModuleFactoryBase<T> factory,
int maxCount,
int timeout)
int timeout,
ILogManager logManager,
int requestQueueLimit = 0)
where T : IRpcModule
{
rpcModuleProvider.Register(new BoundedModulePool<T>(factory, maxCount, timeout));
rpcModuleProvider.Register(new BoundedModulePool<T>(factory, maxCount, timeout, logManager, requestQueueLimit));
}

public static void RegisterBoundedByCpuCount<T>(
this IRpcModuleProvider rpcModuleProvider,
ModuleFactoryBase<T> factory,
int timeout)
int timeout,
ILogManager logManager)
where T : IRpcModule
{
RegisterBounded(rpcModuleProvider, factory, _cpuCount, timeout);
RegisterBounded(rpcModuleProvider, factory, _cpuCount, timeout, logManager);
}

public static void RegisterSingle<T>(
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Mev/MevPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Task InitRpcModules()
getFromApi.SpecProvider!,
getFromApi.EngineSigner);

getFromApi.RpcModuleProvider!.RegisterBoundedByCpuCount(mevModuleFactory, rpcConfig.Timeout);
getFromApi.RpcModuleProvider!.RegisterBoundedByCpuCount(mevModuleFactory, rpcConfig.Timeout, getFromApi.LogManager);

if (_logger!.IsInfo) _logger.Info("Flashbots RPC plugin enabled");
}
Expand Down
7 changes: 4 additions & 3 deletions src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,16 @@ private static int GetStatusCode(JsonRpcResult result)
}
else
{
return ModuleTimeout(result.Response)
return IsResourceUnavailableError(result.Response)
? StatusCodes.Status503ServiceUnavailable
: StatusCodes.Status200OK;
}
}

private static bool ModuleTimeout(JsonRpcResponse? response)
private static bool IsResourceUnavailableError(JsonRpcResponse? response)
{
return response is JsonRpcErrorResponse { Error.Code: ErrorCodes.ModuleTimeout };
return response is JsonRpcErrorResponse { Error.Code: ErrorCodes.ModuleTimeout }
or JsonRpcErrorResponse { Error.Code: ErrorCodes.LimitExceeded };
}
}
}

0 comments on commit d56d659

Please sign in to comment.